Skip to content

Commit

Permalink
finos#1296 add type conversions capability to SchemaMapper
Browse files Browse the repository at this point in the history
  • Loading branch information
junaidzm13 committed Apr 30, 2024
1 parent 97cfa81 commit 382b1cf
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package org.finos.vuu.feature.ignite.filter
import com.typesafe.scalalogging.StrictLogging
import org.finos.vuu.core.table.DataType.{CharDataType, StringDataType}
import org.finos.vuu.feature.ignite.filter.IgniteSqlFilterClause.EMPTY_SQL
import org.finos.vuu.feature.ignite.filter.SqlFilterColumnValueParser.ParsedResult
import org.finos.vuu.util.schema.SchemaMapper

private object IgniteSqlFilterClause {
Expand All @@ -29,12 +30,12 @@ case class AndIgniteSqlFilterClause(clauses:List[IgniteSqlFilterClause]) extends

case class EqIgniteSqlFilterClause(columnName: String, value: String) extends IgniteSqlFilterClause {
override def toSql(schemaMapper: SchemaMapper): String =
schemaMapper.externalSchemaField(columnName) match {
case Some(f) => f.dataType match {
case CharDataType | StringDataType => eqSql(f.name, quotedString(value))
case _ => eqSql(f.name, value)
SqlFilterColumnValueParser(schemaMapper).parseColumnValue(columnName, value) match {
case Right(ParsedResult(f, parsedValue)) => f.dataType match {
case CharDataType | StringDataType => eqSql(f.name, quotedString(parsedValue))
case _ => eqSql(f.name, parsedValue)
}
case None => logMappingErrorAndReturnEmptySql(columnName)
case Left(errMsg) => logErrorAndReturnEmptySql(errMsg)
}

private def eqSql(field: String, processedVal: String): String = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.finos.vuu.feature.ignite.filter

import com.typesafe.scalalogging.StrictLogging
import org.finos.vuu.core.table.{Column, DataType}
import org.finos.vuu.feature.ignite.filter.SqlFilterColumnValueParser.{ErrorMessage, ParsedResult}
import org.finos.vuu.util.schema.typeConversion.TypeConverter.buildConverterName
import org.finos.vuu.util.schema.{SchemaField, SchemaMapper}

protected trait SqlFilterColumnValueParser {
def parseColumnValue(columnName: String, columnValue: String): Either[ErrorMessage, ParsedResult[String]]
def parseColumnValues(columnName: String, columnValues: List[String]): Either[ErrorMessage, ParsedResult[List[String]]]
}

protected object SqlFilterColumnValueParser {
def apply(schemaMapper: SchemaMapper): SqlFilterColumnValueParser = new ColumnValueParser(schemaMapper)

case class ParsedResult[T](externalField: SchemaField, data: T)

type ErrorMessage = String
}

private class ColumnValueParser(private val mapper: SchemaMapper) extends SqlFilterColumnValueParser with StrictLogging {

override def parseColumnValue(columnName: String, columnValue: String): Either[ErrorMessage, ParsedResult[String]] = {
mapper.externalSchemaField(columnName) match {
case Some(f) => RawColumnValueParser(f).parse(columnValue).map(ParsedResult(f, _))
case None => Left(externalFieldNotFoundError(columnName))
}
}

override def parseColumnValues(columnName: String, columnValues: List[String]): Either[ErrorMessage, ParsedResult[List[String]]] = {
mapper.externalSchemaField(columnName) match {
case Some(f) => parseColumnValues(RawColumnValueParser(f), columnValues)
case None => Left(externalFieldNotFoundError(columnName))
}
}

private def parseColumnValues(parser: RawColumnValueParser,
columnValues: List[String]): Either[ErrorMessage, ParsedResult[List[String]]] = {
val (errors, parsedValues) = columnValues.partitionMap(parser.parse)
val combinedError = errors.mkString("\n")

if (parsedValues.isEmpty) return Left(combinedError)

if (errors.nonEmpty) logger.error(s"Failed to parse some of the column values corresponding to " +
s"the column ${parser.column.name}: \n $combinedError"
)

Right(ParsedResult(parser.field, parsedValues))
}

private def externalFieldNotFoundError(columnName: String): String =
s"Failed to find mapped external field for column `$columnName`"

private case class RawColumnValueParser(field: SchemaField) {
val column: Column = mapper.tableColumn(field.name).get

def parse(columnValue: String): Either[ErrorMessage, String] = {
parseStringToColumnDataType(columnValue)
.flatMap(convertColumnValueToExternalFieldType)
.map(convertExternalValueToString)
}

private def parseStringToColumnDataType(value: String): Either[ErrorMessage, Any] =
DataType.parseToDataType(value, column.dataType)

private def convertColumnValueToExternalFieldType(columnValue: Any): Either[ErrorMessage, Any] =
mapper.toMappedExternalFieldType(column.name, columnValue)
.toRight(s"Failed to convert column value `$columnValue` from `${column.dataType}` to external type `${field.dataType}`")

private def convertExternalValueToString(value: Any): String =
mapper.convertExternalValueToString(field.name, value).getOrElse(logWarningAndUseDefaultToString(value))

private def logWarningAndUseDefaultToString(value: Any): String = {
logger.warn(
s"Could not find a converter [${buildConverterName(field.dataType, classOf[String])}] " +
s"required for SQL filters to be applied to ${field.name}. Falling back to using the " +
s"default `toString`."
)
Option(value).map(_.toString).orNull
}
}
}
17 changes: 17 additions & 0 deletions vuu/src/main/scala/org/finos/vuu/core/table/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.finos.vuu.core.table

import org.finos.vuu.api.TableDef
import org.finos.vuu.core.table.column.CalculatedColumnClause
import org.finos.vuu.util.schema.typeConversion.{DefaultTypeConverters, TypeConverterContainerBuilder}

object DataType {

Expand Down Expand Up @@ -34,6 +35,22 @@ object DataType {
}
}

def parseToDataType[T](value: String, t: Class[T]): Either[String, T] = {
typeConverterContainer.convert[String, T](value, classOf[String], t) match {
case Some(parsedValue) => Right(parsedValue)
case None => Left(s"Failed to parse String $value to data type $t")
}
}

private val typeConverterContainer = TypeConverterContainerBuilder()
.withoutDefaults()
.withConverter(DefaultTypeConverters.stringToCharConverter)
.withConverter(DefaultTypeConverters.stringToBooleanConverter)
.withConverter(DefaultTypeConverters.stringToIntConverter)
.withConverter(DefaultTypeConverters.stringToLongConverter)
.withConverter(DefaultTypeConverters.stringToDoubleConverter)
.build()

}

object Columns {
Expand Down
96 changes: 83 additions & 13 deletions vuu/src/main/scala/org/finos/vuu/util/schema/SchemaMapper.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.finos.vuu.util.schema

import org.finos.vuu.core.table.Column
import org.finos.vuu.util.schema.typeConversion.{TypeConverter, TypeConverterContainer, TypeConverterContainerBuilder, TypeUtils}


/**
Expand All @@ -14,6 +15,9 @@ import org.finos.vuu.core.table.Column
trait SchemaMapper {
def tableColumn(extFieldName: String): Option[Column]
def externalSchemaField(columnName: String): Option[SchemaField]
def toMappedExternalFieldType(columnName: String, columnValue: Any): Option[Any]
def toMappedInternalColumnType(extFieldName: String, extFieldValue: Any): Option[Any]
def convertExternalValueToString(extFieldName: String, extFieldValue: Any): Option[String]
def toInternalRowMap(externalValues: List[_]): Map[String, Any]
def toInternalRowMap(externalDto: Product): Map[String, Any]
}
Expand All @@ -25,14 +29,33 @@ object SchemaMapper {
* @param externalSchema schema representing external fields.
* @param internalColumns an array of internal Vuu columns.
* @param columnNameByExternalField a map from external field names to internal column names.
* @param typeConverterContainer pass this if your types aren't matched exactly. Also @see [[TypeConverterContainer]]
* */
def apply(externalSchema: ExternalEntitySchema,
internalColumns: Array[Column],
columnNameByExternalField: Map[String, String]): SchemaMapper = {
val validationError = validateSchema(externalSchema, internalColumns, columnNameByExternalField)
columnNameByExternalField: Map[String, String],
typeConverterContainer: TypeConverterContainer): SchemaMapper = {
val validationError = validateSchema(externalSchema, internalColumns, columnNameByExternalField, typeConverterContainer)
if (validationError.nonEmpty) throw InvalidSchemaMapException(validationError.get)

new SchemaMapperImpl(externalSchema, internalColumns, columnNameByExternalField)
new SchemaMapperImpl(externalSchema, internalColumns, columnNameByExternalField, typeConverterContainer)
}

/**
* Builds a schema mapper from the following:
*
* @param externalSchema schema representing external fields.
* @param internalColumns an array of internal Vuu columns.
* @param columnNameByExternalField a map from external field names to internal column names.
*
* @note Similar to `apply(ExternalEntitySchema, Array[Column], Map[String, String], TypeConverterContainer)`
* except that this uses a default `TypeConverterContainer`. For the list of default type
* converters used @see [[org.finos.vuu.util.schema.typeConversion.DefaultTypeConverters]]
* */
def apply(externalSchema: ExternalEntitySchema,
internalColumns: Array[Column],
columnNameByExternalField: Map[String, String]): SchemaMapper = {
SchemaMapper(externalSchema, internalColumns, columnNameByExternalField, TypeConverterContainerBuilder().build())
}

/**
Expand All @@ -44,8 +67,6 @@ object SchemaMapper {
* @note Similar to `apply(ExternalEntitySchema, Array[Column], Map[String, String])`
* except that this method builds the `field->column` map from the passed fields
* and columns matching them by their indexes (`Column.index` and `SchemaField.index`).
*
* @see [[SchemaMapper.apply]]
* */
def apply(externalSchema: ExternalEntitySchema, internalColumns: Array[Column]): SchemaMapper = {
val columnNameByExternalField = mapFieldsToColumns(externalSchema.fields, internalColumns)
Expand All @@ -59,11 +80,13 @@ object SchemaMapper {
private type ValidationError = Option[String]
private def validateSchema(externalSchema: ExternalEntitySchema,
internalColumns: Array[Column],
fieldsMap: Map[String, String]): ValidationError = {
fieldsMap: Map[String, String],
typeConverterContainer: TypeConverterContainer): ValidationError = {
Iterator(
() => hasUniqueColumnNames(fieldsMap.values.toList),
() => externalFieldsInMapConformsToExternalSchema(externalSchema, fieldsMap.keys),
() => internalFieldsInMapConformsToTableColumns(internalColumns, fieldsMap.values)
() => internalFieldsInMapConformsToTableColumns(internalColumns, fieldsMap.values),
() => canSupportRequiredTypeConversions(extFieldToColMap(externalSchema, internalColumns, fieldsMap), typeConverterContainer)
).map(_()).find(_.nonEmpty).flatten
}

Expand All @@ -85,26 +108,73 @@ object SchemaMapper {
.map(columnName => s"Column `$columnName` not found in internal columns")
}

private def canSupportRequiredTypeConversions(extFieldToColMap: Map[SchemaField, Column],
container: TypeConverterContainer): ValidationError = {
val errorStr = extFieldToColMap.iterator
.flatMap({ case (extF, col) => Seq((extF.dataType, col.dataType), (col.dataType, extF.dataType)) })
.filter({ case (t1, t2) => !TypeUtils.areTypesEqual(t1, t2) && container.typeConverter(t1, t2).isEmpty })
.map({ case (t1, t2) => s"[ ${TypeConverter.buildConverterName(t1, t2)} ]" })
.mkString("\n")

Option(errorStr).filter(_.nonEmpty).map("Following `TypeConverter`(s) are required but not found:\n" + _)
}

private def extFieldToColMap(extSchema: ExternalEntitySchema, columns: Array[Column], fieldsMap: Map[String, String]): Map[SchemaField, Column] = {
fieldsMap.map({ case (extFieldName, columnName) =>
val extField = extSchema.fields.find(_.name == extFieldName).get
val column = columns.find(_.name == columnName).get
(extField, column)
})
}

final case class InvalidSchemaMapException(message: String) extends RuntimeException(message)
}

private class SchemaMapperImpl(private val externalSchema: ExternalEntitySchema,
private val internalColumns: Array[Column],
private val columnNameByExternalField: Map[String, String]) extends SchemaMapper {
private val columnNameByExternalField: Map[String, String],
private val typeConverterContainer: TypeConverterContainer) extends SchemaMapper {
private val externalFieldByColumnName: Map[String, SchemaField] = getExternalSchemaFieldsByColumnName
private val internalColumnByExtFieldName: Map[String, Column] = getTableColumnByExternalField
private val extFieldsMap: Map[String, SchemaField] = externalSchema.fields.map(f => (f.name, f)).toMap

override def tableColumn(extFieldName: String): Option[Column] = internalColumnByExtFieldName.get(extFieldName)
override def externalSchemaField(columnName: String): Option[SchemaField] = externalFieldByColumnName.get(columnName)

override def toInternalRowMap(externalValues: List[_]): Map[String, Any] = toInternalRowMap(externalValues.toArray)
override def toInternalRowMap(externalDto: Product): Map[String, Any] = toInternalRowMap(externalDto.productIterator.toArray)

private def toInternalRowMap(externalValues: Array[_]): Map[String, Any] = {
externalFieldByColumnName.keys.map(columnName => {
val field = externalSchemaField(columnName).get
val columnValue = externalValues(field.index) // @todo add type conversion conforming to the passed schema
externalFieldByColumnName.map({ case (columnName, extField) =>
val extFieldValue = externalValues(extField.index)
// remove this get and make this return Optional (need to guard against conversion error if a user sends in a value not matching the schema type)
val columnValue = toMappedInternalColumnType(extField.name, extFieldValue).get
(columnName, columnValue)
}).toMap
})
}

override def toMappedExternalFieldType(columnName: String, columnValue: Any): Option[Any] = {
externalSchemaField(columnName).flatMap(field => {
val col = tableColumn(field.name).get
castToAny(col.dataType).flatMap(typeConverterContainer.convert(columnValue, _, field.dataType))
})
}

override def toMappedInternalColumnType(extFieldName: String, extFieldValue: Any): Option[Any] = {
tableColumn(extFieldName).flatMap(col => {
val field = externalSchemaField(col.name).get
castToAny(field.dataType).flatMap(typeConverterContainer.convert(extFieldValue, _, col.dataType))
})
}

override def convertExternalValueToString(extFieldName: String, extFieldValue: Any): Option[String] = {
extFieldsMap.get(extFieldName).flatMap(
f => castToAny(f.dataType).flatMap(typeConverterContainer.convert(extFieldValue, _, classOf[String]))
)
}

private def castToAny(cls: Class[_]): Option[Class[Any]] = cls match {
case c: Class[Any] => Some(c)
case _ => None
}

private def getExternalSchemaFieldsByColumnName =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ object DefaultTypeConverters {
val stringToDoubleConverter: TypeConverter[String, Double] = TypeConverter(classOf[String], classOf[Double], withNullSafety[String, Double](_, _.toDouble))
val stringToLongConverter: TypeConverter[String, Long] = TypeConverter[String, Long](classOf[String], classOf[Long], withNullSafety[String, Long](_, _.toLong))
val stringToIntConverter: TypeConverter[String, Integer] = TypeConverter(classOf[String], classOf[Integer], withNullSafety[String, Integer](_, _.toInt))
val stringToCharConverter: TypeConverter[String, Character] = TypeConverter(classOf[String], classOf[Character], withNullSafety[String, Character](_, _.toCharArray.apply(0)))
val stringToBooleanConverter: TypeConverter[String, Boolean] = TypeConverter(classOf[String], classOf[Boolean], withNullSafety[String, Boolean](_, _.toBoolean))

val intToStringConverter: TypeConverter[Integer, String] = TypeConverter(classOf[Integer], classOf[String], withNullSafety[Integer, String](_, _.toString))
val longToStringConverter: TypeConverter[Long, String] = TypeConverter(classOf[Long], classOf[String], withNullSafety[Long, String](_, _.toString))
val doubleToStringConverter: TypeConverter[Double, String] = TypeConverter(classOf[Double], classOf[String], withNullSafety[Double, String](_, _.toString))
Expand All @@ -20,5 +23,7 @@ object DefaultTypeConverters {
longToStringConverter,
stringToIntConverter,
intToStringConverter,
stringToCharConverter,
stringToBooleanConverter,
)
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.finos.vuu.util.schema.typeConversion

import scala.util.Try

trait TypeConverterContainer {
def convert[From, To](value: From, fromClass: Class[From], toClass: Class[To]): Option[To]
def typeConverter[From, To](name: String): Option[TypeConverter[From, To]]
def typeConverter[From, To](fromClass: Class[From], toClass: Class[To]): Option[TypeConverter[From, To]]
}

Expand All @@ -13,19 +14,19 @@ private case class TypeConverterContainerImpl(

override def convert[From, To](value: From, fromClass: Class[From], toClass: Class[To]): Option[To] = {
if (TypeUtils.areTypesEqual(fromClass, toClass)) {
return Option(value.asInstanceOf[To])
return Some(value.asInstanceOf[To])
}
typeConverter[From, To](fromClass, toClass).map(_.convert(value))
}

override def typeConverter[From, To](name: String): Option[TypeConverter[From, To]] = {
typeConverterByName.get(name).map(_.asInstanceOf[TypeConverter[From, To]])
typeConverter[From, To](fromClass, toClass).flatMap(tc => Try(tc.convert(value)).toOption)
}

override def typeConverter[From, To](fromClass: Class[From], toClass: Class[To]): Option[TypeConverter[From, To]] = {
val name = TypeConverter.buildConverterName(fromClass, toClass)
typeConverter[From, To](name)
}

private def typeConverter[From, To](name: String): Option[TypeConverter[From, To]] = {
typeConverterByName.get(name).flatMap(tc => Try(tc.asInstanceOf[TypeConverter[From, To]]).toOption)
}
}

object TypeConverterContainerBuilder {
Expand Down
Loading

0 comments on commit 382b1cf

Please sign in to comment.