Skip to content

Commit

Permalink
Added features to AggregatorLiveFeed
Browse files Browse the repository at this point in the history
  • Loading branch information
jbaron committed Mar 8, 2024
1 parent d763d6d commit bc7ef2f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package org.roboquant.samples
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.runBlocking
import org.roboquant.Roboquant
import org.roboquant.run
import org.roboquant.common.*
import org.roboquant.feeds.*
import org.roboquant.journals.BasicJournal
import org.roboquant.loggers.ConsoleLogger
import org.roboquant.metrics.ProgressMetric
import org.roboquant.strategies.EMAStrategy
Expand Down Expand Up @@ -48,9 +50,9 @@ internal class TiingoSamples {
internal fun aggregatorLiveFeed() {
val iex = TiingoLiveFeed.iex()
iex.subscribe()
val feed = AggregatorLiveFeed(iex, 5.seconds)
val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = ConsoleLogger())
val account = rq.run(feed, Timeframe.next(5.minutes))
val feed = AggregatorLiveFeed(iex, 5.seconds, restrictType = TradePrice::class)
val tf = Timeframe.next(5.minutes)
val account = run(feed, EMAStrategy(), BasicJournal(true), tf)
println(account.fullSummary())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import java.time.temporal.ChronoUnit
import kotlin.collections.set
import kotlin.math.max
import kotlin.math.min
import kotlin.reflect.KClass

/**
* Aggregate prices in a live [feed] to a [PriceBar]. The [aggregationPeriod] is configurable. Right now there is
Expand All @@ -47,7 +48,9 @@ import kotlin.math.min
class AggregatorLiveFeed(
private val feed: LiveFeed,
private val aggregationPeriod: TimeSpan,
private val remaining: Boolean = true
private val remaining: Boolean = true,
private val continuation: Boolean = true,
private val restrictType: KClass<*>? = null
) : Feed {

private fun Instant.expirationTime(): Instant {
Expand Down Expand Up @@ -75,8 +78,15 @@ class AggregatorLiveFeed(

private suspend fun send(channel: EventChannel, time: Instant, history: MutableMap<Asset, PriceBar>) {
val newEvent = synchronized(history) {
val newEvent = Event(time, history.values.toList())
val items = history.values.toList()
val newEvent = Event(time, items)
history.clear()
if (continuation)
items.forEach {
val c = it.close
val v = if (it.volume.isFinite()) 0.0 else Double.NaN
history[it.asset] = PriceBar(it.asset, c,c,c,c, v, it.timeSpan)
}
newEvent
}

Expand Down Expand Up @@ -110,17 +120,18 @@ class AggregatorLiveFeed(
try {
while (true) {
val event = inputChannel.receive()
val actions = event.items
val items = event.items

// Send heart beats from the original feed
if (actions.isEmpty()) {
if (items.isEmpty()) {
channel.send(event)
continue
}

synchronized(history) {
for (action in actions) {
val pb = getPriceBar(action, aggregationPeriod) ?: continue
for (item in items) {
if (restrictType != null && ! restrictType.isInstance(item)) continue
val pb = getPriceBar(item, aggregationPeriod) ?: continue
val asset = pb.asset
val entry = history[asset]
if (entry == null) history[asset] = pb else history[asset] = entry + pb
Expand Down

0 comments on commit bc7ef2f

Please sign in to comment.