diff --git a/roboquant-tiingo/src/test/kotlin/org/roboquant/samples/TiingoSamples.kt b/roboquant-tiingo/src/test/kotlin/org/roboquant/samples/TiingoSamples.kt index 57676068..a873c715 100644 --- a/roboquant-tiingo/src/test/kotlin/org/roboquant/samples/TiingoSamples.kt +++ b/roboquant-tiingo/src/test/kotlin/org/roboquant/samples/TiingoSamples.kt @@ -19,8 +19,10 @@ package org.roboquant.samples import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.runBlocking import org.roboquant.Roboquant +import org.roboquant.run import org.roboquant.common.* import org.roboquant.feeds.* +import org.roboquant.journals.BasicJournal import org.roboquant.loggers.ConsoleLogger import org.roboquant.metrics.ProgressMetric import org.roboquant.strategies.EMAStrategy @@ -48,9 +50,9 @@ internal class TiingoSamples { internal fun aggregatorLiveFeed() { val iex = TiingoLiveFeed.iex() iex.subscribe() - val feed = AggregatorLiveFeed(iex, 5.seconds) - val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = ConsoleLogger()) - val account = rq.run(feed, Timeframe.next(5.minutes)) + val feed = AggregatorLiveFeed(iex, 5.seconds, restrictType = TradePrice::class) + val tf = Timeframe.next(5.minutes) + val account = run(feed, EMAStrategy(), BasicJournal(true), tf) println(account.fullSummary()) } diff --git a/roboquant/src/main/kotlin/org/roboquant/feeds/AggregatorLiveFeed.kt b/roboquant/src/main/kotlin/org/roboquant/feeds/AggregatorLiveFeed.kt index 9b9773e2..037cf24f 100644 --- a/roboquant/src/main/kotlin/org/roboquant/feeds/AggregatorLiveFeed.kt +++ b/roboquant/src/main/kotlin/org/roboquant/feeds/AggregatorLiveFeed.kt @@ -27,6 +27,7 @@ import java.time.temporal.ChronoUnit import kotlin.collections.set import kotlin.math.max import kotlin.math.min +import kotlin.reflect.KClass /** * Aggregate prices in a live [feed] to a [PriceBar]. The [aggregationPeriod] is configurable. Right now there is @@ -47,7 +48,9 @@ import kotlin.math.min class AggregatorLiveFeed( private val feed: LiveFeed, private val aggregationPeriod: TimeSpan, - private val remaining: Boolean = true + private val remaining: Boolean = true, + private val continuation: Boolean = true, + private val restrictType: KClass<*>? = null ) : Feed { private fun Instant.expirationTime(): Instant { @@ -75,8 +78,15 @@ class AggregatorLiveFeed( private suspend fun send(channel: EventChannel, time: Instant, history: MutableMap) { val newEvent = synchronized(history) { - val newEvent = Event(time, history.values.toList()) + val items = history.values.toList() + val newEvent = Event(time, items) history.clear() + if (continuation) + items.forEach { + val c = it.close + val v = if (it.volume.isFinite()) 0.0 else Double.NaN + history[it.asset] = PriceBar(it.asset, c,c,c,c, v, it.timeSpan) + } newEvent } @@ -110,17 +120,18 @@ class AggregatorLiveFeed( try { while (true) { val event = inputChannel.receive() - val actions = event.items + val items = event.items // Send heart beats from the original feed - if (actions.isEmpty()) { + if (items.isEmpty()) { channel.send(event) continue } synchronized(history) { - for (action in actions) { - val pb = getPriceBar(action, aggregationPeriod) ?: continue + for (item in items) { + if (restrictType != null && ! restrictType.isInstance(item)) continue + val pb = getPriceBar(item, aggregationPeriod) ?: continue val asset = pb.asset val entry = history[asset] if (entry == null) history[asset] = pb else history[asset] = entry + pb