Skip to content

Commit

Permalink
Added additional flexibility to CSVFeed
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Dekkers committed May 23, 2023
1 parent 3cc9205 commit 2e69fcc
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 60 deletions.
47 changes: 14 additions & 33 deletions roboquant/src/main/kotlin/org/roboquant/feeds/csv/CSVConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ package org.roboquant.feeds.csv
import org.roboquant.common.Asset
import org.roboquant.common.AssetType
import org.roboquant.common.Logging
import org.roboquant.feeds.PriceBar
import org.roboquant.feeds.util.AutoDetectTimeParser
import org.roboquant.feeds.util.TimeParser
import java.io.File
import java.nio.file.Path
import java.util.*
Expand Down Expand Up @@ -53,7 +50,9 @@ data class CSVConfig(
var priceAdjust: Boolean = false,
var template: Asset = Asset("TEMPLATE"),
var hasHeader: Boolean = true,
var separator: Char = ','
var separator: Char = ',',
var timeParser: TimeParser = AutoDetectTimeParser(),
var priceParser: PriceParser = PriceBarParser()
) {

/**
Expand All @@ -71,10 +70,10 @@ data class CSVConfig(
defaultBuilder(file)
}

private val timeParser: TimeParser = AutoDetectTimeParser()
private val info = ColumnInfo()

private val pattern by lazy { Pattern.compile(filePattern) }
private var hasColumnsDefined = false
private var isInitialized = false


/**
* default builder takes the file name, removes the file extension and uses that the symbol name
Expand Down Expand Up @@ -173,19 +172,9 @@ data class CSVConfig(
* @param line
* @return
*/
internal fun processLine(asset: Asset, line: List<String>): PriceEntry {

val now = timeParser.parse(line[info.time], asset.exchange)
val volume = if (info.hasVolume) line[info.volume].toDouble() else Double.NaN
val action = PriceBar(
asset,
line[info.open].toDouble(),
line[info.high].toDouble(),
line[info.low].toDouble(),
line[info.close].toDouble(),
volume
)
if (priceAdjust) action.adjustClose(line[info.adjustedClose].toDouble())
internal fun processLine(line: List<String>, asset: Asset): PriceEntry {
val now = timeParser.parse(line, asset)
val action = priceParser.parse(line, asset)
return PriceEntry(now, action)
}

Expand All @@ -196,19 +185,11 @@ data class CSVConfig(
* @param header the header fields
*/
@Synchronized
internal fun detectColumns(header: List<String>) {
if (hasColumnsDefined) return
if (parsePattern.isNotEmpty()) info.define(parsePattern)
else info.detectColumns(header)
hasColumnsDefined = true
require(info.time != -1) { "No time column found in header=$header" }
require(info.open != -1) { "No open-prices column found in header=$header" }
require(info.low != -1) { "No low-prices column found in header=$header" }
require(info.high != -1) { "No high-prices column found in header=$header" }
require(info.close != -1) { "No close-prices column found in header=$header" }
if (priceAdjust) require(info.adjustedClose != -1) {
"No adjusted close prices column found in header=$header"
}
internal fun configure(header: List<String>) {
if (isInitialized) return
timeParser.init(header, this)
priceParser.init(header, this)
isInitialized = true
}
}

Expand Down
5 changes: 2 additions & 3 deletions roboquant/src/main/kotlin/org/roboquant/feeds/csv/CSVFeed.kt
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,13 @@ class CSVFeed(
val result = mutableListOf<PriceEntry>()
var errors = 0
var isHeader = config.hasHeader
if (config.parsePattern.isNotEmpty()) config.detectColumns(emptyList())
for (row in it) {
if (isHeader) {
config.detectColumns(row.fields)
config.configure(row.fields)
isHeader = false
} else {
try {
val step = config.processLine(asset, row.fields)
val step = config.processLine(row.fields, asset)
result += step
} catch (t: Throwable) {
logger.debug(t) { "${asset.symbol} $row" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,9 @@ private class IncrementalReader(val asset: Asset, file: File, val config: CSVCon
var errors = 0L

init {
if (config.parsePattern.isNotEmpty()) config.detectColumns(emptyList())
if (config.hasHeader && reader.hasNext()) {
val line = reader.next().fields
config.detectColumns(line)
config.configure(line)
}
}

Expand All @@ -156,7 +155,7 @@ private class IncrementalReader(val asset: Asset, file: File, val config: CSVCon
while (reader.hasNext()) {
val line = reader.next().fields
try {
return config.processLine(asset, line)
return config.processLine(line, asset)
} catch (_: Throwable) {
errors++
}
Expand Down
147 changes: 147 additions & 0 deletions roboquant/src/main/kotlin/org/roboquant/feeds/csv/PriceParser.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2020-2023 Neural Layer
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.roboquant.feeds.csv

import org.roboquant.common.Asset
import org.roboquant.feeds.PriceAction
import org.roboquant.feeds.PriceBar
import org.roboquant.feeds.PriceQuote
import java.time.Instant

/**
* Interface for time parsers that can use a config to support parsing logic
*/
fun interface PriceParser {

fun init(header: List<String>, config: CSVConfig) {}

/**
* Return an [Instant] given the provided [line] of strings and [asset]
*/
fun parse(line: List<String>, asset: Asset): PriceAction

}

/**
* Parse lines and create PriceBar
*/
class PriceBarParser(private var priceAdjust: Boolean = false) : PriceParser {

private var open: Int = -1
private var high: Int = -1
private var low: Int = -1
private var close: Int = -1
private var volume: Int = -1
private var adjustedClose: Int = -1

private fun validate() {
require(open != -1) { "No open-prices column" }
require(low != -1) { "No low-prices column found" }
require(high != -1) { "No high-prices column found" }
require(close != -1) { "No close-prices column found" }
if (priceAdjust) require(adjustedClose != -1) {
"No adjusted close prices column found"
}
}

override fun init(header: List<String>, config: CSVConfig) {
this.priceAdjust = config.priceAdjust
val notCapital = Regex("[^A-Z]")
header.forEachIndexed { index, column ->
when (column.uppercase().replace(notCapital, "")) {
"OPEN" -> open = index
"HIGH" -> high = index
"LOW" -> low = index
"CLOSE" -> close = index
"ADJCLOSE" -> adjustedClose = index
"ADJUSTEDCLOSE" -> adjustedClose = index
"VOLUME" -> volume = index
}
}
validate()
}

/**
* Return an [Instant] given the provided [line] of strings and [asset]
*/
override fun parse(line: List<String>, asset: Asset): PriceBar {
val volume = if (volume != -1) {
val str = line[volume]
if (str.isBlank()) Double.NaN else str.toDouble()
} else Double.NaN
val action = PriceBar(
asset,
line[open].toDouble(),
line[high].toDouble(),
line[low].toDouble(),
line[close].toDouble(),
volume
)
if (priceAdjust) action.adjustClose(line[adjustedClose].toDouble())
return action
}

}


/**
* Parse lines and create PriceBar
*/
class PriceQuoteParser : PriceParser {

private var ask: Int = -1
private var bid: Int = -1
private var bidVolume: Int = -1
private var askVolume: Int = -1

private fun validate() {
require(ask != -1) { "No ask-prices column" }
require(bid != -1) { "No bid-prices column found" }
}

override fun init(header: List<String>, config: CSVConfig) {
val notCapital = Regex("[^A-Z]")
header.forEachIndexed { index, column ->
when (column.uppercase().replace(notCapital, "")) {
"ASK" -> ask = index
"BID" -> bid = index
"ASKVOLUME" -> askVolume = index
"BIDVOLUME" -> bidVolume = index
"ASKSIZE" -> askVolume = index
"BIDSIZE" -> bidVolume = index
}
}
validate()
}

/**
* Return an [Instant] given the provided [line] of strings and [asset]
*/
override fun parse(line: List<String>, asset: Asset): PriceQuote {
val volume1 = if (askVolume != -1) line[askVolume].toDouble() else Double.NaN
val volume2 = if (bidVolume != -1) line[bidVolume].toDouble() else Double.NaN
return PriceQuote(
asset,
line[ask].toDouble(),
line[bid].toDouble(),
volume1,
volume2
)
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -14,8 +14,9 @@
* limitations under the License.
*/

package org.roboquant.feeds.util
package org.roboquant.feeds.csv

import org.roboquant.common.Asset
import org.roboquant.common.ConfigurationException
import org.roboquant.common.Exchange
import java.time.Instant
Expand All @@ -29,17 +30,24 @@ import java.time.format.DateTimeFormatter
*/
fun interface TimeParser {

fun init(header: List<String>, config: CSVConfig) {}

/**
* Return an [Instant] given the provided [text] string and [exchange]
* Return an [Instant] given the provided [line] of strings and [asset]
*/
fun parse(text: String, exchange: Exchange): Instant
fun parse(line: List<String>, asset: Asset): Instant
}


private fun interface AuteDetectParser {

fun parse(text: String, exchange: Exchange): Instant
}

/**
* Datetime parser that parses local date-time
*/
private class LocalTimeParser(pattern: String) : TimeParser {
private class LocalTimeParser(pattern: String) : AuteDetectParser {

private val dtf: DateTimeFormatter = DateTimeFormatter.ofPattern(pattern)

Expand All @@ -54,7 +62,7 @@ private class LocalTimeParser(pattern: String) : TimeParser {
* Parser that parses local dates and uses the exchange closing time to determine the time.
* @param pattern
*/
private class LocalDateParser(pattern: String) : TimeParser {
private class LocalDateParser(pattern: String) : AuteDetectParser {

private val dtf: DateTimeFormatter = DateTimeFormatter.ofPattern(pattern)

Expand All @@ -76,15 +84,30 @@ private class LocalDateParser(pattern: String) : TimeParser {
*/
class AutoDetectTimeParser : TimeParser {

private lateinit var parser: TimeParser
private lateinit var parser: AuteDetectParser
private var time = 0

override fun init(header: List<String>, config: CSVConfig) {
val notCapital = Regex("[^A-Z]")
header.forEachIndexed { index, column ->
when (column.uppercase().replace(notCapital, "")) {
"TIME" -> time = index
"DATE" -> time = index
"DAY" -> time = index
"DATETIME" -> time = index
"TIMESTAMP" -> time = index
}
}
}

/**
* @see TimeParser.parse
*/
override fun parse(text: String, exchange: Exchange): Instant {
override fun parse(line: List<String>, asset: Asset): Instant {
// If this is the first time calling, detect the format and parser to use
val text = line[time]
if (!this::parser.isInitialized) detect(text)
return parser.parse(text, exchange)
return parser.parse(text, asset.exchange)
}

/**
Expand All @@ -97,13 +120,13 @@ class AutoDetectTimeParser : TimeParser {
"""19\d{6}""".toRegex() to LocalDateParser("yyyyMMdd"),
"""20\d{6}""".toRegex() to LocalDateParser("yyyyMMdd"),
"""\d{8} \d{6}""".toRegex() to LocalTimeParser("yyyyMMdd HHmmss"),
"""\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z""".toRegex() to TimeParser { str, _ -> Instant.parse(str) },
"""\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z""".toRegex() to AuteDetectParser { text, _ -> Instant.parse(text) },
"""\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}""".toRegex() to LocalTimeParser("yyyy-MM-dd HH:mm:ss"),
"""\d{4}-\d{2}-\d{2} \d{2}:\d{2}""".toRegex() to LocalTimeParser("yyyy-MM-dd HH:mm"),
"""\d{4}-\d{2}-\d{2}""".toRegex() to LocalDateParser("yyyy-MM-dd"),
"""\d{8} \d{2}:\d{2}:\d{2}""".toRegex() to LocalTimeParser("yyyyMMdd HH:mm:ss"),
"""\d{8} \d{2}:\d{2}:\d{2}""".toRegex() to LocalTimeParser("yyyyMMdd HH:mm:ss"),
"""-?\d{1,19}""".toRegex() to TimeParser { str, _ -> Instant.ofEpochMilli(str.toLong()) }
"""-?\d{1,19}""".toRegex() to AuteDetectParser { text, _ -> Instant.ofEpochMilli(text.toLong()) }
)
}

Expand Down
Loading

0 comments on commit 2e69fcc

Please sign in to comment.