Skip to content

Commit

Permalink
finos#1167 refactor and improve the flow of IgniteOrderDataProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
junaidzm13 committed Feb 6, 2024
1 parent d60be88 commit f0ca7ba
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ class IgniteOrderStore(private val parentOrderCache: IgniteCache[Int, ParentOrde
parentOrderCache.get(id)
}

def findChildOrder(sqlFilterQueries: String, sqlSortQueries: String, rowCount: Int, startIndex: Long): Iterable[ChildOrder] = {
def findChildOrder(sqlFilterQueries: String, sqlSortQueries: String, rowCount: Int, startIndex: Long): LazyList[ChildOrder] = {
val whereClause = if(sqlFilterQueries == null || sqlFilterQueries.isEmpty) "" else s" where $sqlFilterQueries"
val orderByClause = if(sqlSortQueries == null || sqlSortQueries.isEmpty) " order by id" else s" order by $sqlSortQueries"
val query = new SqlFieldsQuery(s"select * from ChildOrder$whereClause$orderByClause limit ? offset ?")
query.setArgs(rowCount, startIndex)

val results = childOrderCache.query(query).getAll.asScala.map(i => toChildOrder(i.asScala.toList))
val results = LazyList.from(childOrderCache.query(query).asScala).map(i => toChildOrder(i.asScala.toList))

logger.info(s"Loaded Ignite ChildOrder for ${results.size} rows, from index : $startIndex where $whereClause order by $sqlSortQueries")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ package org.finos.vuu.example.ignite.provider

import com.typesafe.scalalogging.StrictLogging
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.filter.FilterSpecParser
import org.finos.vuu.core.module.simul.model.ChildOrder
import org.finos.vuu.core.table.RowWithData
import org.finos.vuu.example.ignite.IgniteOrderStore
import org.finos.vuu.feature.ignite.filter.{IgniteSqlFilterClause, IgniteSqlFilterTreeVisitor}
import org.finos.vuu.feature.ignite.schema.SchemaMapper
import org.finos.vuu.feature.ignite.sort.IgniteSqlSortBuilder
import org.finos.vuu.plugin.virtualized.table.{VirtualizedRange, VirtualizedSessionTable, VirtualizedViewPortKeys}
import org.finos.vuu.provider.VirtualizedProvider
import org.finos.vuu.viewport.ViewPort
Expand All @@ -19,6 +15,7 @@ class IgniteOrderDataProvider(final val igniteStore: IgniteOrderStore, final val
(implicit clock: Clock) extends VirtualizedProvider with StrictLogging {

private val extraRowsCount = 5000 //fetch extra rows to reduce need to re-fetch when view port change by small amount
private val dataQuery = IgniteOrderDataQuery(igniteStore, schemaMapper)

override def runOnce(viewPort: ViewPort): Unit = {

Expand All @@ -29,44 +26,25 @@ class IgniteOrderDataProvider(final val igniteStore: IgniteOrderStore, final val

internalTable.setSize(totalSize)

val sqlFilterClause =
if (viewPort.filterSpec.filter == null || viewPort.filterSpec.filter.isEmpty) {
""
} else {
val filterTreeVisitor = new IgniteSqlFilterTreeVisitor
val clause = FilterSpecParser.parse[IgniteSqlFilterClause](viewPort.filterSpec.filter, filterTreeVisitor)
clause.toSql(internalTable.getTableDef)
}

val startIndex = Math.max(range.from - extraRowsCount, 0)
val endIndex = range.to + extraRowsCount
val rowCount = if (endIndex > startIndex) endIndex - startIndex else 1

internalTable.setRange(VirtualizedRange(startIndex, endIndex))

val sqlSortQueries = new IgniteSqlSortBuilder().toSql(viewPort.sortSpecInternal, schemaMapper)
val index = new AtomicInteger(startIndex) // todo: get rid of working assumption here that the dataset is fairly immutable.

logger.info(s"Loading data between $startIndex and $endIndex")
dataQuery
.fetch(viewPort.filterSpec, viewPort.sortSpecInternal, startIndex = startIndex, rowCount = rowCount)
.map(dataQuery.toInternalRow(internalTable.tableDef.keyField))
.foreach(processUpdateForIndex(internalTable, index.getAndIncrement()))

val iterator = igniteStore.findChildOrder(sqlFilterQueries = sqlFilterClause, sqlSortQueries = sqlSortQueries, rowCount = rowCount, startIndex = startIndex)

logger.info(s"Loaded data between $startIndex and $endIndex")

val index = new AtomicInteger(startIndex) // todo: get rid of working assumption here that the dataset is fairly immutable.

val keyField = internalTable.tableDef.keyField
iterator.foreach(childOrder => {
val (key, rowData) = rowKeyAndData(childOrder, keyField)
internalTable.processUpdateForIndex(index.getAndIncrement(), key, RowWithData(key, rowData), clock.now())
})
viewPort.setKeys(new VirtualizedViewPortKeys(internalTable.primaryKeys))
}

private def rowKeyAndData(childOrder: ChildOrder, keyField: String): (String, Map[String, Any]) = {
val values = childOrder.productIterator.toList
val rowData = schemaMapper.toTableRowData(values)
val key = values(schemaMapper.externalSchemaField(keyField).get.index)
(key.toString, rowData)
private def processUpdateForIndex(internalTable: VirtualizedSessionTable, index: Int): IgniteOrderDataQuery.RowKeyAndData => Unit = {
case (key, rowData) => internalTable.processUpdateForIndex(index, key, RowWithData(key, rowData), clock.now())
}

override def subscribe(key: String): Unit = {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.finos.vuu.example.ignite.provider

import org.finos.vuu.core.module.simul.model.ChildOrder
import org.finos.vuu.core.sort.ModelType.SortSpecInternal
import org.finos.vuu.example.ignite.IgniteOrderStore
import org.finos.vuu.example.ignite.provider.IgniteOrderDataQuery.RowKeyAndData
import org.finos.vuu.feature.ignite.FilterAndSortSpecToSql
import org.finos.vuu.feature.ignite.schema.SchemaMapper
import org.finos.vuu.net.FilterSpec

class IgniteOrderDataQuery private (private val igniteOrderStore: IgniteOrderStore,
private val schemaMapper: SchemaMapper) {

private val filterAndSortSpecToSql = FilterAndSortSpecToSql(schemaMapper)

def fetch(filterSpec: FilterSpec, sortSpec: SortSpecInternal, startIndex: Long, rowCount: Int): LazyList[ChildOrder] = {
igniteOrderStore.findChildOrder(
filterAndSortSpecToSql.filterToSql(filterSpec),
filterAndSortSpecToSql.sortToSql(sortSpec),
startIndex = startIndex,
rowCount = rowCount
)
}

def toInternalRow(keyField: String): Product => RowKeyAndData =
dto => {
val values = dto.productIterator.toList
val rowData = schemaMapper.toTableRowData(values)
val key = values(schemaMapper.externalSchemaField(keyField).get.index)
(key.toString, rowData)
}
}

object IgniteOrderDataQuery {
def apply(igniteOrderStore: IgniteOrderStore, schemaMapper: SchemaMapper): IgniteOrderDataQuery =
new IgniteOrderDataQuery(igniteOrderStore, schemaMapper)

private type RowKey = String
type RowKeyAndData = (RowKey, Map[String, Any])
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ object IgniteDataType extends Enumeration {
val Int : IgniteDataType = classOf[Int]
val String : IgniteDataType = classOf[String]
val Double : IgniteDataType = classOf[Double]
val Long : IgniteDataType = classOf[Long]

def fromString(s: String): IgniteDataType = {
s.trim.toLowerCase match {
case "string" => IgniteDataType.String
case "double" => IgniteDataType.Double
case "int" => IgniteDataType.Int
case _ => throw new Exception(s"Unsupported type passed: $s")
case "int" => IgniteDataType.Int
case "long" => IgniteDataType.Long
case _ => throw new Exception(s"Unsupported type passed: $s")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class IgniteOrderStoreTest extends AnyFunSuiteLike with BeforeAndAfter with Matc
val updatedParentOrder = parentOrder.copy(activeChildren = parentOrder.activeChildren + 1)
orderStore.storeChildOrder(
updatedParentOrder,
TestUtils.createChildOrder(parentOrder.id, childOrderId))
TestUtils.createChildOrder(childOrderId, parentId = parentOrder.id))
updatedParentOrder
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,26 @@ import java.util
import scala.jdk.CollectionConverters.IterableHasAsJava

object TestUtils {
def createChildOrder(parentId: Int, id: Int): ChildOrder = {
ChildOrder(parentId = parentId,
def createChildOrder(
id: Int,
parentId: Int = 1,
ric: String = "ric",
price: Double = 1.22,
quantity: Int = 100,
side: String = "Buy",
account: String = "account",
ccy: String = "EUR"): ChildOrder = {
ChildOrder(
parentId = parentId,
id = id,
ric = "ric",
price = 1.22,
quantity = 100,
side = "Buy",
account = "account",
ric = ric,
price = price,
quantity = quantity,
side = side,
account = account,
ccy = ccy,
strategy = "",
exchange = "",
ccy = "EUR",
volLimit = 100,
filledQty = 0,
openQty = 100,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.finos.vuu.example.ignite.provider

import org.apache.ignite.IgniteCache
import org.finos.vuu.core.module.simul.model.{ChildOrder, ParentOrder}
import org.finos.vuu.core.sort.SortDirection
import org.finos.vuu.example.ignite.module.IgniteOrderDataModule
import org.finos.vuu.example.ignite.{IgniteOrderStore, TestUtils}
import org.finos.vuu.net.FilterSpec
import org.scalatest.funsuite.AnyFunSuiteLike
import org.scalatest.matchers.should.Matchers

class IgniteOrderDataQueryFunctionalTest extends AnyFunSuiteLike with Matchers {

private val ignite = TestUtils.setupIgnite()
private val parentOrderCache: IgniteCache[Int, ParentOrder] = ignite.getOrCreateCache("parentOrderCache")
private val childOrderCache: IgniteCache[Int, ChildOrder] = ignite.getOrCreateCache("childOrderCache")
private val orderStore = new IgniteOrderStore(parentOrderCache, childOrderCache)
private val dataQuery = IgniteOrderDataQuery(orderStore, IgniteOrderDataModule.schemaMapper)

test("Can parse and apply filtering and sorting when fetching") {
val testOrder1 = TestUtils.createChildOrder(1, ric = "ABC.HK", price = 5.55)
val testOrder2 = TestUtils.createChildOrder(2, ric = "ABC.LDN", price = 6.0)
val testOrder3 = TestUtils.createChildOrder(3, ric = "ABC.NY", price = 4.5)
givenChildOrdersExist(testOrder1, testOrder2, testOrder3)

val filterSpec = FilterSpec("orderId > 1 and ric starts \"ABC\"")
val sortSpec = Map("price" -> SortDirection.Ascending)

val res = dataQuery.fetch(filterSpec, sortSpec, startIndex = 0, rowCount = 3)

res.toList shouldEqual List(testOrder3, testOrder2)
}

private def givenChildOrdersExist(childOrders: ChildOrder*): Unit = {
val parentOrder = TestUtils.createParentOrder(1)
childOrders.foreach(o => orderStore.storeChildOrder(parentOrder, o))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.finos.vuu.example.ignite.provider

import org.finos.vuu.core.sort.SortDirection
import org.finos.vuu.core.table.{Column, SimpleColumn}
import org.finos.vuu.example.ignite.IgniteOrderStore
import org.finos.vuu.example.ignite.schema.IgniteEntitySchemaBuilder
import org.finos.vuu.feature.ignite.schema.SchemaMapper
import org.finos.vuu.net.FilterSpec
import org.scalamock.scalatest.MockFactory
import org.scalatest.featurespec.AnyFeatureSpec
import org.scalatest.matchers.should.Matchers

class IgniteOrderDataQueryTest extends AnyFeatureSpec with Matchers with MockFactory {

private val igniteStore: IgniteOrderStore = mock[IgniteOrderStore]
private val schemaMapper = SchemaMapper(entitySchema, externalFieldByColumns)
private val igniteDataQuery = IgniteOrderDataQuery(igniteStore, schemaMapper)

Feature("toInternalRow") {
val testEntity = TestDto(1, "TestDto", 100)

Scenario("can convert a dto to internal row representation") {
val internalKey = "id"

val res = igniteDataQuery.toInternalRow(internalKey)(testEntity)

res._1 shouldEqual "1"
res._2 shouldEqual Map("id" -> testEntity.key, "name" -> testEntity.name, "value" -> testEntity.value)
}
}

Feature("fetch") {

Scenario("can parse and apply filter and sort spec") {
val filterSpec = FilterSpec("id = 2 and name != \"ABC\"")
val sortSpec = Map("value" -> SortDirection.Ascending)

(igniteStore.findChildOrder _).expects("(key = 2 AND name != 'ABC')", "value ASC", *, *).once()

igniteDataQuery.fetch(filterSpec, sortSpec, 0, 0)
}
}

private def externalFieldByColumns: Map[String, Column] = Map(
"key" -> SimpleColumn("id", 0, classOf[Int]),
"name" -> SimpleColumn("name", 1, classOf[String]),
"value" -> SimpleColumn("value", 2, classOf[String]),
)

private def entitySchema = IgniteEntitySchemaBuilder().withCaseClass[TestDto].build()
}

private case class TestDto(key: Int, name: String, value: Int)
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.finos.vuu.feature.ignite

import org.finos.vuu.core.filter.FilterSpecParser
import org.finos.vuu.core.sort.ModelType.SortSpecInternal
import org.finos.vuu.feature.ignite.filter.{IgniteSqlFilterClause, IgniteSqlFilterTreeVisitor}
import org.finos.vuu.feature.ignite.schema.SchemaMapper
import org.finos.vuu.feature.ignite.sort.IgniteSqlSortBuilder
import org.finos.vuu.net.FilterSpec

trait FilterAndSortSpecToSql {
def filterToSql(filterSpec: FilterSpec): String
def sortToSql(sortSpec: SortSpecInternal): String
}

object FilterAndSortSpecToSql {
def apply(schemaMapper: SchemaMapper): FilterAndSortSpecToSql = {
new FilterAndSortSpecToSqlImpl(schemaMapper)
}
}

private class FilterAndSortSpecToSqlImpl(private val schemaMapper: SchemaMapper) extends FilterAndSortSpecToSql {
// @Todo convert IgniteSqlFilterTreeVisitor & IgniteSqlSortBuilder to objects?
private val filterTreeVisitor = new IgniteSqlFilterTreeVisitor
private val igniteSqlSortBuilder = new IgniteSqlSortBuilder

override def filterToSql(filterSpec: FilterSpec): String = {
if (filterSpec.filter == null || filterSpec.filter.isEmpty) {
""
} else {
val clause = FilterSpecParser.parse[IgniteSqlFilterClause](filterSpec.filter, filterTreeVisitor)
clause.toSql(schemaMapper)
}
}

override def sortToSql(sortSpec: SortSpecInternal): String = igniteSqlSortBuilder.toSql(sortSpec, schemaMapper)
}

0 comments on commit f0ca7ba

Please sign in to comment.