From afcc5f1a0ad92db08294199e61be5df72c1514f8 Mon Sep 17 00:00:00 2001 From: Xingcan Cui Date: Thu, 16 Nov 2017 21:22:11 +0800 Subject: [PATCH] A simple demo for string rowtime --- .../nodes/dataset/BatchTableSourceScan.scala | 1 + .../datastream/StreamTableSourceScan.scala | 1 + .../flink/table/sources/TableSourceUtil.scala | 11 ++-- .../sources/tsextractors/ExistingField.scala | 8 ++- .../stream/table/TableSourceITCase.scala | 50 +++++++++++++++++++ .../flink/table/utils/testTableSources.scala | 2 +- 6 files changed, 67 insertions(+), 6 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala index d09d69c7f7c81..5f8b3a133caea 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -99,6 +99,7 @@ class BatchTableSourceScan( // get expression to extract rowtime attribute val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + tableEnv, tableSource, selectedFields, cluster, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala index 5d305b44134e0..18366c71a69e0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala @@ -105,6 +105,7 @@ class StreamTableSourceScan( // get expression to extract rowtime attribute val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression( + tableEnv, tableSource, selectedFields, cluster, diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala index 48ab3ded717d8..f51f207be02c1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceUtil.scala @@ -29,9 +29,9 @@ import org.apache.calcite.rex.{RexLiteral, RexNode} import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.table.api.{TableException, Types, ValidationException} +import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException} import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.expressions.{Cast, ResolvedFieldReference} +import org.apache.flink.table.expressions.{Call, Cast, ResolvedFieldReference} import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo import scala.collection.JavaConverters._ @@ -337,6 +337,7 @@ object TableSourceUtil { * @return The [[RexNode]] expression to extract the timestamp of the table source. */ def getRowtimeExtractionExpression( + tableEnv: TableEnvironment, tableSource: TableSource[_], selectedFields: Option[Array[Int]], cluster: RelOptCluster, @@ -378,7 +379,11 @@ object TableSourceUtil { new Array[ResolvedFieldReference](0) } - val expression = tsExtractor.getExpression(fieldAccesses) + var expression = tsExtractor.getExpression(fieldAccesses) + expression = expression match { + case x : Call => tableEnv.functionCatalog.lookupFunction(x.functionName, x.args) + case _ => expression + } // add cast to requested type and convert expression to RexNode val rexExpression = Cast(expression, resultType).toRexNode(relBuilder) relBuilder.clear() diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala index 12cd564395cea..d8f70464ce1c6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala @@ -20,14 +20,14 @@ package org.apache.flink.table.sources.tsextractors import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.{Types, ValidationException} -import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldReference} +import org.apache.flink.table.expressions.{Call, Cast, Expression, Literal, ResolvedFieldReference} /** * Converts an existing [[Long]] or [[java.sql.Timestamp]] field into a rowtime attribute. * * @param field The field to convert into a rowtime attribute. */ -class ExistingField(field: String) extends TimestampExtractor { +class ExistingField(field: String, arguments: String*) extends TimestampExtractor { override def getArgumentFields: Array[String] = Array(field) @@ -41,6 +41,7 @@ class ExistingField(field: String) extends TimestampExtractor { fieldType match { case Types.LONG => // OK case Types.SQL_TIMESTAMP => // OK + case Types.STRING => // OK case _: TypeInformation[_] => throw ValidationException( s"Field '$field' must be of type Long or Timestamp but is of type $fieldType.") @@ -62,6 +63,9 @@ class ExistingField(field: String) extends TimestampExtractor { case Types.SQL_TIMESTAMP => // cast timestamp to long Cast(fieldAccess, Types.LONG) + case Types.STRING => + // cast string to long with the internal UDF + Call("toEventTime", Seq(fieldAccess) ++ arguments.map(f => Literal(f))) } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala index 77c1e081686d5..240e45dc9055a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala @@ -19,6 +19,8 @@ package org.apache.flink.table.runtime.stream.table import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong} +import java.text.SimpleDateFormat +import java.util.TimeZone import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTimestamp} import org.apache.flink.api.common.typeinfo.TypeInformation @@ -31,6 +33,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types} import org.apache.flink.table.api.scala._ +import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase} import org.apache.flink.table.sources.StreamTableSource import org.apache.flink.table.utils._ @@ -690,4 +693,51 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { "3,Mike,30000,true,3000") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testRowtimeStringTableSource(): Unit = { + StreamITCase.testResults = mutable.MutableList() + val tableName = "MyTable" + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + "1970-01-01 00:00:00", + "1970-01-01 00:00:01", + "1970-01-01 00:00:01", + "1970-01-01 00:00:02", + "1970-01-01 00:00:04") + + val schema = new TableSchema(Array("rtime"), Array(Types.SQL_TIMESTAMP)) + val returnType = Types.STRING + + val tableSource = new TestTableSourceWithTime(schema, returnType, data, "rtime", null) + tEnv.registerTableSource(tableName, tableSource) + + tEnv.registerFunction("toEventTime", str2EventTime) + + tEnv.scan(tableName) + .window(Tumble over 1.second on 'rtime as 'w) + .groupBy('w) + .select('w.start, 1.count) + .addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.0,1", + "1970-01-01 00:00:01.0,2", + "1970-01-01 00:00:02.0,1", + "1970-01-01 00:00:04.0,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } } + +case object str2EventTime extends ScalarFunction { + def eval(value: String, pattern: String): Long = { + val format: SimpleDateFormat = new SimpleDateFormat(pattern) + format.setTimeZone(TimeZone.getTimeZone("GMT+0")) + format.parse(value).getTime + } +} + diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala index a546919189e2e..f75b799a13a5b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala @@ -60,7 +60,7 @@ class TestTableSourceWithTime[T]( if (rowtime != null) { Collections.singletonList(new RowtimeAttributeDescriptor( rowtime, - new ExistingField(rowtime), + new ExistingField(rowtime, "yyyy-MM-dd HH:mm:ss"), new AscendingTimestamps)) } else { Collections.EMPTY_LIST.asInstanceOf[util.List[RowtimeAttributeDescriptor]]