Skip to content

Commit

Permalink
Fixed bug "PriceBarSeries.get(timeline) should use filtered timeline #72
Browse files Browse the repository at this point in the history
"
  • Loading branch information
jbaron committed Jan 18, 2024
1 parent 811d777 commit 6a00fd7
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 16 deletions.
10 changes: 5 additions & 5 deletions roboquant-ta/src/main/kotlin/org/roboquant/ta/PriceBarSeries.kt
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ open class PriceBarSeries(capacity: Int) {
* Returns a PriceBarSeries that includes the data that occured within the provided [timeframe]
*/
operator fun get(timeframe: Timeframe): PriceBarSeries {
val tl = timeline.toTypedArray()
val size = tl.filter { it in timeframe }.size
val result = PriceBarSeries(size)
for (i in 0..<size) {
val time = tl[i]
if (time in timeframe) result.add(get(i), time)
val start = timeframe.start
for (idx in timeline.indices) {
val time = timeline[idx]
if (time < start) continue
if (time in timeframe) result.add(get(idx), time) else break
}
return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,21 @@ internal class PriceBarSeriesTest {
assertEquals(10, pbs2.size) // should have processed the last set
}

@Test
fun filter() {
val pbs = getPBS(100)
val now = Instant.now()
repeat(100) {
pbs.add(pb, now + it.millis)
}
assertTrue(pbs.isFull())
val tf = Timeframe(pbs.timeline[10], pbs.timeline[50], true)
val newPbs = pbs[tf]
assertEquals(41, newPbs.size)
assertEquals(pbs.timeline[10], newPbs.timeline.first())
assertEquals(pbs.timeline[50], newPbs.timeline.last())
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ class TiingoLiveFeed private constructor(
*/
override fun close() {
when (type) {
"iex" -> wsIEX.close(0, "closed called")
"fx" -> wsFX.close(0, "closed called")
"crypto" -> wsCrypto.close(0, "closed called")
"iex" -> wsIEX.close(1000, "closed called")
"fx" -> wsFX.close(1000, "closed called")
"crypto" -> wsCrypto.close(1000, "closed called")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import org.roboquant.Roboquant
import org.roboquant.common.*
import org.roboquant.feeds.AggregatorLiveFeed
import org.roboquant.feeds.PriceAction
import org.roboquant.feeds.filter
import org.roboquant.feeds.apply
import org.roboquant.loggers.ConsoleLogger
import org.roboquant.metrics.ProgressMetric
import org.roboquant.strategies.EMAStrategy
import org.roboquant.tiingo.TiingoHistoricFeed
import org.roboquant.tiingo.TiingoLiveFeed
import java.time.Instant
import kotlin.test.Ignore
import kotlin.test.Test

Expand Down Expand Up @@ -79,14 +80,18 @@ internal class TiingoSamples {

@Test
@Ignore
internal fun testLiveFeedCryptoAll() {
internal fun testLiveFeedMeasureDelay() {
val feed = TiingoLiveFeed.crypto()
feed.subscribe()
feed.filter<PriceAction>(Timeframe.next(1.minutes)) {
println(it)
false
feed.subscribe() // subscribe to all crypto currencies
var n = 0
var sum = 0L
feed.apply<PriceAction>(Timeframe.next(1.minutes)) { _, time ->
val now = Instant.now()
sum += now.toEpochMilli() - time.toEpochMilli()
n++
}
feed.close()
println("average delay is ${sum/n}ms")
}

@Test
Expand Down
5 changes: 3 additions & 2 deletions roboquant/src/main/kotlin/org/roboquant/feeds/LiveFeed.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,16 @@ abstract class LiveFeed(var heartbeatInterval: Long = 10_000) : Feed {
get() = channels.isNotEmpty()

/**
* Subclasses should use this method or `sendAsync` to send an event. If no channel is active, any event
* Subclasses should call this method or [sendAsync] to send an event. If no channel is active, any event
* sent will be dropped.
*/
protected fun send(event: Event) = runBlocking {
sendAsync(event)
}

/**
* Subclasses should use this method to send an event. If no channel is active, any event sent will be dropped.
* Subclasses should call this method or [send] to send an event. If no channel is active, any
* event sent will be dropped.
*/
protected suspend fun sendAsync(event: Event) {
for (channel in channels) {
Expand Down

0 comments on commit 6a00fd7

Please sign in to comment.