Skip to content

Commit

Permalink
Better sync for logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jbaron committed Jul 31, 2023
1 parent f26911d commit 00329ae
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.roboquant.common.Observation
import org.roboquant.common.TimeSeries
import org.roboquant.common.Timeframe
import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap

/**
* Stores the last value of a metric for a particular run in memory.
Expand All @@ -34,10 +34,9 @@ import java.util.*
class LastEntryLogger(var showProgress: Boolean = false) : MetricsLogger {

// The key is runName
private val history = Hashtable<String, MutableMap<String, Observation>>()
private val history = ConcurrentHashMap<String, MutableMap<String, Observation>>()
private val progressBar = ProgressBar()

@Synchronized
override fun log(results: Map<String, Double>, time: Instant, run: String) {
if (showProgress) progressBar.update(time)

Expand Down Expand Up @@ -75,7 +74,6 @@ class LastEntryLogger(var showProgress: Boolean = false) : MetricsLogger {
/**
* Get results for the metric specified by its [name].
*/
@Synchronized
override fun getMetric(name: String): Map<String, TimeSeries> {
val result = mutableMapOf<String, TimeSeries>()
for (run in history.keys) {
Expand Down
10 changes: 6 additions & 4 deletions roboquant/src/main/kotlin/org/roboquant/loggers/MemoryLogger.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.roboquant.loggers
import org.roboquant.common.TimeSeries
import org.roboquant.common.Timeframe
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap

/**
* Store metric results in memory. Very convenient in a Jupyter notebook when you want to inspect or visualize
Expand All @@ -37,13 +38,14 @@ class MemoryLogger(var showProgress: Boolean = true) : MetricsLogger {

internal class Entry(val time: Instant, val metrics: Map<String, Double>)

internal val history = mutableMapOf<String, MutableList<Entry>>()
// Use a ConcurrentHashMap if this logger is used in parallel back-testing
internal val history = ConcurrentHashMap<String, MutableList<Entry>>()
private val progressBar = ProgressBar()

@Synchronized
override fun log(results: Map<String, Double>, time: Instant, run: String) {
if (showProgress) progressBar.update(time)
if (results.isEmpty()) return

val entries = history.getOrPut(run) { mutableListOf() }
entries.add(Entry(time, results))
}
Expand Down Expand Up @@ -71,7 +73,7 @@ class MemoryLogger(var showProgress: Boolean = true) : MetricsLogger {
* Get all the recorded runs in this logger
*/
val runs: Set<String>
get() = history.keys
get() = history.keys.toSortedSet()

/**
* Get the unique list of metric names that have been captured
Expand All @@ -88,7 +90,7 @@ class MemoryLogger(var showProgress: Boolean = true) : MetricsLogger {
val ts = getMetric(name, run)
if (ts.isNotEmpty()) result[run] = ts
}
return result
return result.toSortedMap()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ internal class TimeframeTest {
assertTrue(subFrames.all { it.start >= tf.start })
assertTrue(subFrames.all { it.end <= tf.end })
assertTrue(subFrames.all { it.end == it.start + 2.months })

tf.split(1.years).forEach { period ->
period.sample(1.months, 100).forEach {
assertTrue(period.contains(it.start))
assertTrue(period.contains(it.end))
}
}

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package org.roboquant.loggers

import kotlinx.coroutines.launch
import org.junit.jupiter.api.assertThrows
import org.roboquant.TestData
import org.roboquant.common.Timeframe
import org.roboquant.common.days
import org.roboquant.common.plus
import org.roboquant.common.*
import org.roboquant.metrics.metricResultsOf
import java.time.Instant
import java.time.temporal.ChronoUnit
Expand Down Expand Up @@ -61,6 +60,35 @@ internal class MemoryLoggerTest {
}
}

@Test
fun concurrency() {
val logger = MemoryLogger(false)
val jobs = ParallelJobs()
Timeframe.fromYears(2000, 2002).split(1.months).forEach {
jobs.add {
launch {
it.sample(1.days, 100).forEach { tf ->
val run = "run-$tf"
logger.start(run, tf)
repeat(1000) {
logger.log(mapOf("a" to 100.0 + it), tf.start + it.seconds, run)
}
logger.end(run)
}
}
}

}
jobs.joinAllBlocking()
val data = logger.getMetric("a")
for (ts in data.values) {
assertEquals(1000, ts.size)
assertEquals(100.0, ts.values[0])
}

}


@Test
fun testMetricsEntry() {
val logger = MemoryLogger(showProgress = false)
Expand Down

0 comments on commit 00329ae

Please sign in to comment.