Skip to content

Commit

Permalink
finos#1167 handle when external data schema is different from internal
Browse files Browse the repository at this point in the history
- uses SchemaMapper to build the schema mappings b/w
  external and internal.
- Sort + Filters to use the mapper to build SQL queries instead
  of internal table def.
- refactors and improves the flow of IgniteOrderDataProvider.
  • Loading branch information
junaidzm13 committed Feb 7, 2024
1 parent 75fe017 commit 3121c95
Show file tree
Hide file tree
Showing 25 changed files with 865 additions and 275 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package org.finos.vuu.example.ignite

import org.apache.ignite.cache.{QueryEntity, QueryIndex, QueryIndexType}
import org.apache.ignite.configuration.{CacheConfiguration, DataStorageConfiguration, IgniteConfiguration}
import org.finos.vuu.core.module.simul.model.ChildOrder
import org.finos.vuu.example.ignite.schema.IgniteChildOrderEntity

import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicBoolean
import scala.jdk.CollectionConverters.IterableHasAsJava


object IgniteLocalConfig {
val parentOrderCacheName = "ParentOrders"
val childOrderCacheName = "ChildOrders"
Expand Down Expand Up @@ -40,30 +38,7 @@ object IgniteLocalConfig {
private def createChildOrderCacheConfig(): CacheConfiguration[?, ?] = {
val cacheConfiguration = new CacheConfiguration()

val fields = new java.util.LinkedHashMap[String, String]()
fields.put("parentId", classOf[Int].getName)
fields.put("id", classOf[Int].getName)
fields.put("ric", classOf[String].getName)
fields.put("price", classOf[Double].getName)
fields.put("quantity", classOf[Int].getName)
fields.put("side", classOf[String].getName)
fields.put("account", classOf[String].getName)
fields.put("strategy", classOf[String].getName)
fields.put("exchange", classOf[String].getName)
fields.put("ccy", classOf[String].getName)
fields.put("volLimit", classOf[Double].getName)
fields.put("filledQty", classOf[Int].getName)
fields.put("openQty", classOf[Int].getName)
fields.put("averagePrice", classOf[Double].getName)
fields.put("status", classOf[String].getName)

val indexes = new java.util.ArrayList[QueryIndex]()
indexes.add(new QueryIndex(List("parentId").asJavaCollection, QueryIndexType.SORTED).setName("PARENTID_IDX"))
indexes.add(new QueryIndex(List("id").asJavaCollection, QueryIndexType.SORTED).setName("CHILDID_IDX"))

val queryEntity: QueryEntity = new QueryEntity(classOf[Int], classOf[ChildOrder])
.setFields(fields)
.setIndexes(indexes)
val queryEntity = IgniteChildOrderEntity.buildQueryEntity
cacheConfiguration.setQueryEntities(List(queryEntity).asJavaCollection)
cacheConfiguration.setName(childOrderCacheName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.apache.ignite.cache.query.{IndexQuery, IndexQueryCriteriaBuilder, Ind
import org.apache.ignite.cluster.ClusterState
import org.apache.ignite.{IgniteCache, Ignition}
import org.finos.vuu.core.module.simul.model.{ChildOrder, OrderStore, ParentOrder}
import org.finos.vuu.example.ignite.schema.IgniteChildOrderEntity

import scala.collection.mutable
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -58,25 +59,17 @@ class IgniteOrderStore(private val parentOrderCache: IgniteCache[Int, ParentOrde
parentOrderCache.get(id)
}

def findChildOrder(sqlFilterQueries: String, sqlSortQueries: String, rowCount: Int, startIndex: Long): Iterable[ChildOrder] = {
def findChildOrder(sqlFilterQueries: String, sqlSortQueries: String, rowCount: Int, startIndex: Long): LazyList[ChildOrder] = {
val whereClause = if(sqlFilterQueries == null || sqlFilterQueries.isEmpty) "" else s" where $sqlFilterQueries"
val orderByClause = if(sqlSortQueries == null || sqlSortQueries.isEmpty) " order by id" else s" order by $sqlSortQueries"
val query = new SqlFieldsQuery(s"select * from ChildOrder$whereClause$orderByClause limit ? offset ?")
query.setArgs(rowCount, startIndex)

val results = childOrderCache.query(query)

var counter = 0
val buffer = mutable.ListBuffer[ChildOrder]()
results.forEach(item => {
buffer.addOne(toChildOrder(item))
counter += 1
})

logger.info(s"Loaded Ignite ChildOrder for $counter rows, from index : $startIndex where $whereClause order by $sqlSortQueries")
val results = LazyList.from(childOrderCache.query(query).asScala).map(i => toChildOrder(i.asScala.toList))

buffer
logger.info(s"Loaded Ignite ChildOrder for ${results.size} rows, from index : $startIndex where $whereClause order by $sqlSortQueries")

results
}
def findChildOrderFilteredBy(filterQueryCriteria: List[IndexQueryCriterion]): Iterable[ChildOrder] = {
// val filter: IgniteBiPredicate[Int, ChildOrder] = (key, p) => p.openQty > 0
Expand All @@ -102,30 +95,7 @@ class IgniteOrderStore(private val parentOrderCache: IgniteCache[Int, ParentOrde
.map(x => x.getValue)
}

// todo - make it metadata aware and extract to another class.
private def toChildOrder(cols: java.util.List[_]): ChildOrder = {
ChildOrder(
parentId = cols.get(0).asInstanceOf[Int],
id = cols.get(1).asInstanceOf[Int],
ric = cols.get(2).asInstanceOf[String],
price = cols.get(3).asInstanceOf[Double],
quantity = cols.get(4).asInstanceOf[Int],
side = cols.get(5).asInstanceOf[String],
account = cols.get(6).asInstanceOf[String],
strategy = cols.get(7).asInstanceOf[String],
exchange = cols.get(8).asInstanceOf[String],
ccy = cols.get(9).asInstanceOf[String],
volLimit = cols.get(10).asInstanceOf[Double],
filledQty = cols.get(11).asInstanceOf[Int],
openQty = cols.get(12).asInstanceOf[Int],
averagePrice = cols.get(13).asInstanceOf[Double],
status = cols.get(14).asInstanceOf[String]
)
}

def parentOrderCount(): Long = {
parentOrderCache.sizeLong(CachePeekMode.ALL)
}
private def toChildOrder = IgniteChildOrderEntity.getListToChildOrder

def childOrderCount(): Long = {
childOrderCache.sizeLong(CachePeekMode.ALL)
Expand All @@ -137,14 +107,10 @@ class IgniteOrderStore(private val parentOrderCache: IgniteCache[Int, ParentOrde
// val query = new SqlFieldsQuery(s"select * from ChildOrder")
val results = childOrderCache.query(query)

var counter = 0
val buffer = mutable.ListBuffer[ChildOrder]()
results.forEach(item => {
buffer.addOne(toChildOrder(item))
counter += 1
})
results.forEach(item => buffer.addOne(toChildOrder(item.asScala.toList)))

logger.debug(s"Loaded $counter rows, from index : $startIndex, rowCou")
logger.debug(s"Loaded ${buffer.length} rows, from index : $startIndex, rowCou")

buffer
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ object IgniteOrderLoaderMain extends App {
implicit val clock: Clock = new DefaultClock()
implicit val lifecycleContainer = new LifecycleContainer()
implicit val randomNumbers: SeededRandomNumbers = new SeededRandomNumbers(clock.now())
implicit val orderStore: OrderStore = IgniteOrderStore.apply()
implicit val orderStore: OrderStore = IgniteOrderStore()

private val ordersModel = new ParentChildOrdersModel()
private val childOrderCounter = new LongAdder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,46 @@ import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.time.Clock
import org.finos.vuu.api.ViewPortDef
import org.finos.vuu.core.module.{DefaultModule, ModuleFactory, TableDefContainer, ViewServerModule}
import org.finos.vuu.core.table.Columns
import org.finos.vuu.core.table.{Column, SimpleColumn}
import org.finos.vuu.example.ignite.IgniteOrderStore
import org.finos.vuu.example.ignite.provider.IgniteOrderDataProvider
import org.finos.vuu.example.ignite.schema.IgniteChildOrderEntity
import org.finos.vuu.feature.ignite.schema.SchemaMapper
import org.finos.vuu.net.rpc.RpcHandler
import org.finos.vuu.plugin.virtualized.api.VirtualizedSessionTableDef

class NoOpIgniteService extends RpcHandler

object IgniteOrderDataModule extends DefaultModule {
final val NAME = "IGNITE"
private final val NAME = "IGNITE"

def apply(igniteOrderStore: IgniteOrderStore)(implicit clock: Clock, lifecycle: LifecycleContainer, tableDefContainer: TableDefContainer): ViewServerModule = {
ModuleFactory.withNamespace(NAME)
.addSessionTable(
VirtualizedSessionTableDef(
name = "bigOrders2",
keyField = "orderId",
Columns.fromNames("orderId".int(), "ric".string(), "quantity".int(), "price".double(), "side".string(), "strategy".string(), "parentOrderId".int())
columns = schemaMapper.tableColumns
),
(table, _) => new IgniteOrderDataProvider(igniteOrderStore),
(_, _) => new IgniteOrderDataProvider(igniteOrderStore, schemaMapper),
(table, _, _, _) => ViewPortDef(
columns = table.getTableDef.columns,
service = new NoOpIgniteService()
)
).asModule()
}

private val tableColumnByExternalField: Map[String, Column] = Map(
"id" -> ("orderId", classOf[Int]),
"ric" -> ("ric", classOf[String]),
"price" -> ("price", classOf[Double]),
"quantity" -> ("quantity", classOf[Int]),
"side" -> ("side", classOf[String]),
"strategy" -> ("strategy", classOf[String]),
"parentId" -> ("parentId", classOf[Int]),
).zipWithIndex
.map({ case ((extField, (name, t)), i) => (extField, SimpleColumn(name, i, t)) })
.toMap

val schemaMapper: SchemaMapper = SchemaMapper(IgniteChildOrderEntity.getSchema, tableColumnByExternalField)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ package org.finos.vuu.example.ignite.provider

import com.typesafe.scalalogging.StrictLogging
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.filter.FilterSpecParser
import org.finos.vuu.core.table.RowWithData
import org.finos.vuu.example.ignite.IgniteOrderStore
import org.finos.vuu.feature.ignite.filter.{IgniteSqlFilterClause, IgniteSqlFilterTreeVisitor}
import org.finos.vuu.feature.ignite.sort.IgniteSqlSortBuilder
import org.finos.vuu.feature.ignite.schema.SchemaMapper
import org.finos.vuu.plugin.virtualized.table.{VirtualizedRange, VirtualizedSessionTable, VirtualizedViewPortKeys}
import org.finos.vuu.provider.VirtualizedProvider
import org.finos.vuu.viewport.ViewPort

import java.util.concurrent.atomic.AtomicInteger

class IgniteOrderDataProvider(final val igniteStore: IgniteOrderStore)(implicit clock: Clock) extends VirtualizedProvider with StrictLogging {
class IgniteOrderDataProvider(final val igniteStore: IgniteOrderStore, final val schemaMapper: SchemaMapper)
(implicit clock: Clock) extends VirtualizedProvider with StrictLogging {

private val extraRowsCount = 5000 //fetch extra rows to reduce need to re-fetch when view port change by small amount
private val dataQuery = IgniteOrderDataQuery(igniteStore, schemaMapper)

override def runOnce(viewPort: ViewPort): Unit = {

Expand All @@ -26,49 +26,27 @@ class IgniteOrderDataProvider(final val igniteStore: IgniteOrderStore)(implicit

internalTable.setSize(totalSize)

val sqlFilterClause =
if (viewPort.filterSpec.filter == null || viewPort.filterSpec.filter.isEmpty) {
""
}
else {
val filterTreeVisitor = new IgniteSqlFilterTreeVisitor
val clause = FilterSpecParser.parse[IgniteSqlFilterClause](viewPort.filterSpec.filter, filterTreeVisitor)
clause.toSql(internalTable.getTableDef)
}

val startIndex = Math.max(range.from - extraRowsCount, 0)
val endIndex = range.to + extraRowsCount
val rowCount = if (endIndex > startIndex) endIndex - startIndex else 1

internalTable.setRange(VirtualizedRange(startIndex, endIndex))

val sortBuilder = new IgniteSqlSortBuilder
val sqlSortQueries = sortBuilder.toSql(viewPort.sortSpecInternal, tableColumn => ColumnMap.toIgniteColumn(tableColumn))
val index = new AtomicInteger(startIndex) // todo: get rid of working assumption here that the dataset is fairly immutable.

logger.info(s"Loading data between $startIndex and $endIndex")
dataQuery
.fetch(viewPort.filterSpec, viewPort.sortSpecInternal, startIndex = startIndex, rowCount = rowCount)
.map(dataQuery.toInternalRow(internalTable.tableDef.keyField))
.foreach(processUpdateForIndex(internalTable, index.getAndIncrement()))

val iterator = igniteStore.findChildOrder(sqlFilterQueries = sqlFilterClause, sqlSortQueries = sqlSortQueries, rowCount = rowCount, startIndex = startIndex)

logger.info(s"Loaded data between $startIndex and $endIndex")

val index = new AtomicInteger(startIndex) // todo: get rid of working assumption here that the dataset is fairly immutable.

iterator.foreach(childOrder => {
val row = RowWithData(childOrder.id.toString,
Map(
"orderId" -> childOrder.id,
"ric" -> childOrder.ric,
"price" -> childOrder.price,
"quantity" -> childOrder.quantity,
"side" -> childOrder.side,
"strategy" -> childOrder.strategy,
"parentOrderId" -> childOrder.parentId
))
internalTable.processUpdateForIndex(index.getAndIncrement(), childOrder.id.toString, row, clock.now())
})
viewPort.setKeys(new VirtualizedViewPortKeys(internalTable.primaryKeys))
}

private def processUpdateForIndex(internalTable: VirtualizedSessionTable, index: Int): IgniteOrderDataQuery.RowKeyAndData => Unit = {
case (key, rowData) => internalTable.processUpdateForIndex(index, key, RowWithData(key, rowData), clock.now())
}

override def subscribe(key: String): Unit = {}

override def doStart(): Unit = {}
Expand All @@ -81,21 +59,3 @@ class IgniteOrderDataProvider(final val igniteStore: IgniteOrderStore)(implicit

override val lifecycleId: String = "org.finos.vuu.example.ignite.provider.IgniteOrderDataProvider"
}

object ColumnMap {

private type TableToIgniteColumns = Map[String, String]

private val orderMap : TableToIgniteColumns = Map(
"orderId" -> "id",
"ric" -> "ric",
"price" -> "price",
"quantity" -> "quantity",
"side" -> "side",
"strategy" -> "strategy",
"parentOrderId" -> "parentId",
)
def toIgniteColumn(tableColumn: String): Option[String] =
orderMap.get(tableColumn)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.finos.vuu.example.ignite.provider

import org.finos.vuu.core.module.simul.model.ChildOrder
import org.finos.vuu.core.sort.ModelType.SortSpecInternal
import org.finos.vuu.example.ignite.IgniteOrderStore
import org.finos.vuu.example.ignite.provider.IgniteOrderDataQuery.RowKeyAndData
import org.finos.vuu.feature.ignite.FilterAndSortSpecToSql
import org.finos.vuu.feature.ignite.schema.SchemaMapper
import org.finos.vuu.net.FilterSpec

class IgniteOrderDataQuery private (private val igniteOrderStore: IgniteOrderStore,
private val schemaMapper: SchemaMapper) {

private val filterAndSortSpecToSql = FilterAndSortSpecToSql(schemaMapper)

def fetch(filterSpec: FilterSpec, sortSpec: SortSpecInternal, startIndex: Long, rowCount: Int): LazyList[ChildOrder] = {
igniteOrderStore.findChildOrder(
filterAndSortSpecToSql.filterToSql(filterSpec),
filterAndSortSpecToSql.sortToSql(sortSpec),
startIndex = startIndex,
rowCount = rowCount
)
}

def toInternalRow(keyField: String): Product => RowKeyAndData =
dto => {
val values = dto.productIterator.toList
val rowData = schemaMapper.toTableRowData(values)
val key = values(schemaMapper.externalSchemaField(keyField).get.index)
(key.toString, rowData)
}
}

object IgniteOrderDataQuery {
def apply(igniteOrderStore: IgniteOrderStore, schemaMapper: SchemaMapper): IgniteOrderDataQuery =
new IgniteOrderDataQuery(igniteOrderStore, schemaMapper)

private type RowKey = String
type RowKeyAndData = (RowKey, Map[String, Any])
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.finos.vuu.example.ignite.schema

import scala.jdk.CollectionConverters.IterableHasAsJava
import org.apache.ignite.cache.{QueryEntity, QueryIndex, QueryIndexType}
import org.finos.vuu.core.module.simul.model.ChildOrder

object IgniteChildOrderEntity {

private val _schema: IgniteEntitySchema = createSchema()

def getSchema: IgniteEntitySchema = _schema

def buildQueryEntity: QueryEntity = _schema.queryEntity(classOf[Int], classOf[ChildOrder])

def getListToChildOrder: List[_] => ChildOrder = {
val converter = ChildOrder.getClass.getMethods
.find(x => x.getName == "apply" && x.isBridge)
.get

values =>
converter.invoke(ChildOrder, values map (_.asInstanceOf[AnyRef]): _*).asInstanceOf[ChildOrder]
}

private def createSchema(): IgniteEntitySchema = {
IgniteEntitySchemaBuilder()
.withCaseClass[ChildOrder]
.withIndex(new QueryIndex(List("parentId").asJavaCollection, QueryIndexType.SORTED).setName("PARENTID_IDX"))
.withIndex(new QueryIndex(List("id").asJavaCollection, QueryIndexType.SORTED).setName("CHILDID_IDX"))
.build()
}
}
Loading

0 comments on commit 3121c95

Please sign in to comment.