Skip to content

Commit

Permalink
Changed Event constructor order
Browse files Browse the repository at this point in the history
  • Loading branch information
jbaron committed Mar 7, 2024
1 parent 9001da9 commit d763d6d
Show file tree
Hide file tree
Showing 44 changed files with 84 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class AlpacaLiveFeed(
if (action != null) {
val now = Instant.now()
logger.trace { "received action=$action time=$now" }
val event = Event(listOf(action), now)
val event = Event(now, listOf(action))
send(event)
}
} else if (msg is ErrorMessage) {
Expand Down
4 changes: 2 additions & 2 deletions roboquant-avro/src/main/kotlin/org/roboquant/avro/AvroFeed.kt
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class AvroFeed(private val path: Path, useCache: Boolean = false) : AssetFeed {
if (now < timeframe) continue

if (now != last) {
channel.sendNotEmpty(Event(actions, last))
channel.sendNotEmpty(Event(last, actions))
last = now
actions = ArrayList<PriceItem>(actions.size)
}
Expand All @@ -152,7 +152,7 @@ class AvroFeed(private val path: Path, useCache: Boolean = false) : AssetFeed {
val action = recToPriceAction(rec, serializer)
actions.add(action)
}
channel.sendNotEmpty(Event(actions, last))
channel.sendNotEmpty(Event(last, actions))
}
}

Expand Down
9 changes: 2 additions & 7 deletions roboquant-avro/src/test/kotlin/org/roboquant/TestData.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.roboquant.common.days
import org.roboquant.common.plus
import org.roboquant.feeds.Event
import org.roboquant.feeds.HistoricFeed
import org.roboquant.feeds.PriceBar
import org.roboquant.feeds.TradePrice
import org.roboquant.feeds.random.RandomWalkFeed
import org.roboquant.feeds.util.HistoricTestFeed
Expand All @@ -40,20 +39,16 @@ internal object TestData {

private fun priceAction(asset: Asset = usStock()) = TradePrice(asset, 10.0)

private fun priceBar(asset: Asset = usStock()) = PriceBar(asset, 10.0, 11.0, 9.0, 10.0, 1000.0)

private fun time(): Instant = Instant.parse("2020-01-03T12:00:00Z")

fun event(time: Instant = time()) = Event(listOf(priceAction()), time)

fun event2(time: Instant = time()) = Event(listOf(priceBar()), time)
fun event(time: Instant = time()) = Event(time, listOf(priceAction()))

fun events(n: Int = 100, asset: Asset = usStock()): List<Event> {
val start = time()
val result = mutableListOf<Event>()
repeat(n) {
val action = TradePrice(asset, it + 100.0)
val event = Event(listOf(action), start + it.days)
val event = Event(start + it.days, listOf(action))
result.add(event)
}
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ internal class AvroFeedTest {
)
val feed = MyFeed(sortedSetOf(asset))
val now = Instant.now()
feed.events.add(Event(listOf(p1), now + 1.millis))
feed.events.add(Event(listOf(p2), now + 2.millis))
feed.events.add(Event(listOf(p3), now + 3.millis))
feed.events.add(Event(listOf(p4), now + 4.millis))
feed.events.add(Event(now + 1.millis, listOf(p1)))
feed.events.add(Event(now + 2.millis, listOf(p2)))
feed.events.add(Event(now + 3.millis, listOf(p3)))
feed.events.add(Event(now + 4.millis, listOf(p4)))

assertDoesNotThrow {
AvroFeed.record(feed, fileName)
Expand All @@ -169,7 +169,7 @@ internal class AvroFeedTest {
val asset = Asset("DUMMY")
val p1 = MyPrice(asset, 100.0)
val feed = MyFeed(sortedSetOf(asset))
feed.events.add(Event(listOf(p1), Instant.now()))
feed.events.add(Event(Instant.now(), listOf(p1)))

assertThrows<UnsupportedException> {
AvroFeed.record(feed, fileName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,6 @@ internal class AvroSamples {
println(account.openOrders.summary())
}

@Test
@Ignore
internal fun multiRun() {
val feed = AvroFeed.sp500()
val logger = LastEntryLogger()

for (fast in 10..20..2) {
for (slow in fast * 2..fast * 4..4) {
val strategy = EMAStrategy(fast, slow)
val roboquant = Roboquant(strategy, AccountMetric(), logger = logger)
roboquant.run(feed, name = "run $fast-$slow")
}
}
val maxEntry = logger.getMetric("account.equity").flatten().max()
println(maxEntry)
}

@Test
@Ignore
internal fun walkForwardParallel() = runBlocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class BinanceLiveFeed(
)

val now = if (useMachineTime) Instant.now() else Instant.ofEpochMilli(resp.eventTime)
val event = Event(listOf(action), now)
val event = Event(now, listOf(action))
send(event)
} else {
logger.warn { "Received TickerEvent for unsubscribed symbol ${resp.symbol}" }
Expand All @@ -157,7 +157,7 @@ class BinanceLiveFeed(
timeSpan
)
val now = if (useMachineTime) Instant.now() else Instant.ofEpochMilli(resp.closeTime)
val event = Event(listOf(action), now)
val event = Event(now, listOf(action))
send(event)
} else {
logger.warn { "Received CandlestickEvent for unsubscribed symbol ${resp.symbol}" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class IBKRLiveFeed(configure: IBKRConfig.() -> Unit = {}) : LiveFeed() {
val v = volume?.value()?.toDouble() ?: Double.NaN
val action = PriceBar(subscription.asset, open, high, low, close, v, subscription.interval.seconds)
val now = Instant.ofEpochSecond(time) // IBKR uses second-resolution
val event = Event(listOf(action), now)
val event = Event(now, listOf(action))
logger.trace { "send event=$event" }
send(event)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private class FastFeed(nAssets: Int, val events: Int) : Feed {

override suspend fun play(channel: EventChannel) {
repeat(events) {
channel.send(Event(actions, start + it.millis))
channel.send(Event(start + it.millis, actions))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class PolygonFundamentalsFeed(
*/
override suspend fun play(channel: EventChannel) {
events.forEach {
val event = Event(it.value, it.key)
val event = Event(it.key, it.value)
channel.send(event)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ class PolygonLiveFeed(
message.lowPrice!!, message.closePrice!!, message.volume ?: Double.NaN,
timeSpan
)
send(Event(listOf(action), getTime(message.endTimestampMillis)))
send(Event(getTime(message.endTimestampMillis), listOf(action)))
}

is PolygonWebSocketMessage.StocksMessage.Trade -> {
val asset = getSubscribedAsset(message.ticker!!)
val price = message.price
if (price != null) {
val action = TradePrice(asset, price, message.size ?: Double.NaN)
send(Event(listOf(action), getTime(message.timestampMillis)))
send(Event(getTime(message.timestampMillis), listOf(action)))
}
}

Expand All @@ -158,7 +158,7 @@ class PolygonLiveFeed(
bidPrice,
message.bidSize ?: Double.NaN,
)
send(Event(listOf(action), getTime(message.timestampMillis)))
send(Event(getTime(message.timestampMillis), listOf(action)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class QuestDBFeed(private val tableName: String, dbPath: Path = Config.home / "q

val time = record.getTimestamp(1)
if (time != last) {
channel.sendNotEmpty(Event(actions, ofEpochMicro(last)))
channel.sendNotEmpty(Event(ofEpochMicro(last), actions))
last = time
actions = mutableListOf()
}
Expand All @@ -122,7 +122,7 @@ class QuestDBFeed(private val tableName: String, dbPath: Path = Config.home / "q
val price = handler.getPriceAction(asset, record)
actions.add(price)
}
channel.sendNotEmpty(Event(actions, ofEpochMicro(last)))
channel.sendNotEmpty(Event(ofEpochMicro(last), actions))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ internal class QuestDBFeedTestIT {
val now = Instant.now()
repeat(100) {
val action = PriceQuote(asset, 100.0, 10000.0, 100.0, 10000.0)
val event = Event(listOf(action), now + it.millis)
val event = Event(now + it.millis, listOf(action))
channel.send(event)
}
}
Expand Down Expand Up @@ -118,7 +118,7 @@ internal class QuestDBFeedTestIT {
val now = Instant.now()
repeat(100) {
val action = TradePrice(asset, 100.0, 10000.0)
val event = Event(listOf(action), now + it.millis)
val event = Event(now + it.millis, listOf(action))
channel.send(event)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ internal class AtrPolicyTest {
repeat(12) {
val p = 5.0
val priceBar = PriceBar(asset, p + it, p + it, p + it, p + it)
val event = Event(listOf(priceBar), now + it.millis)
val event = Event(now + it.millis, listOf(priceBar))
val o = policy.act(signals, account, event)
orders.addAll(o)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ internal class PriceBarSeriesTest {
var priceBarSerie = pbs.getValue(asset)
assertFalse(priceBarSerie.isFull())

val event = Event(listOf(pb), Instant.now())
val event = Event(Instant.now(), listOf(pb))
repeat(5) {
pbs.addAll(event)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ internal class TaLibMetricTest {
Amount(Currency.USD, 0.0)
)

val results = metric.calculate(account, Event(emptyList(), Instant.now()))
val results = metric.calculate(account, Event(Instant.now(), emptyList()))
assertTrue(results.isEmpty())

val feed = HistoricTestFeed(100 until 111, priceBar = true)
val events = feed.filter<PriceBar>()
var mResult = emptyMap<String, Double>()
for (event in events) {
mResult = metric.calculate(account, Event(listOf(event.second), event.first))
mResult = metric.calculate(account, Event(event.first, listOf(event.second)))
}
assertTrue(mResult.isNotEmpty())
assertEquals(feed.assets.size, mResult.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ internal class TaLibStrategyTest {
val result = mutableMapOf<Instant, List<Signal>>()
var now = Instant.now()
for (event in events) {
val signals = s.generate(Event(listOf(event.second), event.first))
val signals = s.generate(Event(event.first, listOf(event.second)))
result[now] = signals
now += 1.seconds
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class TiingoLiveFeed private constructor(

@Synchronized
internal fun deliver(action: PriceItem, time: Instant) {
val event = Event(listOf(action), time)
val event = Event(time, listOf(action))
send(event)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class XChangeLiveFeed(
logger.trace { "$trade event for $asset" }
val item = TradePrice(asset, trade.price.toDouble(), trade.originalAmount.toDouble())
val now = if (useMachineTime) Instant.now() else trade.timestamp.toInstant()
val event = Event(listOf(item), now)
val event = Event(now, listOf(item))
send(event)
}

Expand All @@ -204,7 +204,7 @@ class XChangeLiveFeed(
}
val item = OrderBook(asset, asks, bids)
val now = if (useMachineTime) Instant.now() else orderBook.timeStamp.toInstant()
val event = Event(listOf(item), now)
val event = Event(now, listOf(item))
send(event)
}

Expand All @@ -224,7 +224,7 @@ class XChangeLiveFeed(
ticker.bidSize.toDouble()
)
val now = if (useMachineTime) Instant.now() else ticker.timestamp.toInstant()
val event = Event(listOf(item), now)
val event = Event(now, listOf(item))
send(event)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class XChangePollingLiveFeed(
val asset = assetMap.getValue(symbol)
val item = TradePrice(asset, trade.price.toDouble(), trade.originalAmount.toDouble())
val now = trade.timestamp.toInstant()
val event = Event(listOf(item), now)
val event = Event(now, listOf(item))
send(event)
}

Expand Down
2 changes: 1 addition & 1 deletion roboquant/src/main/kotlin/org/roboquant/Roboquant.kt
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ data class Roboquant(
val changeOrders = change.map { MarketOrder(it.key, it.value) }
val orders = cancelOrders + changeOrders
val actions = account.positions.map { TradePrice(it.asset, it.mktPrice) }
val event = Event(actions, eventTime)
val event = Event(eventTime, actions)
broker.place(orders)
val newAccount = broker.sync(event)
val metricResult = getMetrics(newAccount, event)
Expand Down
2 changes: 1 addition & 1 deletion roboquant/src/main/kotlin/org/roboquant/common/TimeSpan.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ typealias TradingPeriod = TimeSpan
* TimeSpan is an immutable class that unifies the JVM classes Duration and Period and allows to use them
* more easily in your code. It can store time-spans as small as nanoseconds.
*
* It is loosely modelled after the time duration, as described in ISO 8601.
* It is modelled after the time duration, as described in ISO 8601.
*/
class TimeSpan internal constructor(internal val period: Period, internal val duration: Duration) {

Expand Down
4 changes: 0 additions & 4 deletions roboquant/src/main/kotlin/org/roboquant/common/extensions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ fun Collection<String>.summary(header: String = "values"): Summary {
*/
operator fun <T> List<T>.get(range: IntRange): List<T> = subList(max(0, range.first), min(this.size, range.last + 1))

/**
* Extension function to allow *numpy* like indexing for lists, for example someList[0..10..2]
*/
operator fun IntRange.rangeTo(i: Int): IntProgression = IntProgression.fromClosedRange(first, last, i)

/**
* Add an element to a mutable collection, but only if it is not null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class AggregatorFeed(
if (expiration == null) {
expiration = time.expirationTime()
} else if (time >= expiration) {
val newEvent = Event(history.values.toList(), expiration)
val newEvent = Event(expiration, history.values.toList())
channel.sendNotEmpty(newEvent)
history.clear()
do {
Expand All @@ -113,7 +113,7 @@ class AggregatorFeed(

// Send remaining
if (remaining && expiration != null) {
val newEvent = Event(history.values.toList(), expiration)
val newEvent = Event(expiration, history.values.toList())
channel.sendNotEmpty(newEvent)
}
if (job.isActive) job.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class AggregatorLiveFeed(

private suspend fun send(channel: EventChannel, time: Instant, history: MutableMap<Asset, PriceBar>) {
val newEvent = synchronized(history) {
val newEvent = Event(history.values.toList(), time)
val newEvent = Event(time, history.values.toList())
history.clear()
newEvent
}
Expand Down
4 changes: 2 additions & 2 deletions roboquant/src/main/kotlin/org/roboquant/feeds/Event.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.time.Instant
* @property items the list of actions that are part of this event
* @property time the time that the actions in this event became available
*/
class Event(val items: List<Item>, val time: Instant) : Comparable<Event> {
class Event(val time: Instant, val items: List<Item>) : Comparable<Event> {

/**
* Convenience property for accessing the price actions in this event. The result is cached so that accessing
Expand All @@ -49,7 +49,7 @@ class Event(val items: List<Item>, val time: Instant) : Comparable<Event> {
/**
* Return an event without any [items] with as default [time] the current system time.
*/
fun empty(time: Instant = Instant.now()): Event = Event(emptyList(), time)
fun empty(time: Instant = Instant.now()): Event = Event(time, emptyList())

}

Expand Down
Loading

0 comments on commit d763d6d

Please sign in to comment.