Skip to content

Commit

Permalink
A simple demo for string rowtime
Browse files Browse the repository at this point in the history
  • Loading branch information
xccui committed Nov 16, 2017
1 parent 101fef7 commit afcc5f1
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 6 deletions.
Expand Up @@ -99,6 +99,7 @@ class BatchTableSourceScan(

// get expression to extract rowtime attribute
val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression(
tableEnv,
tableSource,
selectedFields,
cluster,
Expand Down
Expand Up @@ -105,6 +105,7 @@ class StreamTableSourceScan(

// get expression to extract rowtime attribute
val rowtimeExpression: Option[RexNode] = TableSourceUtil.getRowtimeExtractionExpression(
tableEnv,
tableSource,
selectedFields,
cluster,
Expand Down
Expand Up @@ -29,9 +29,9 @@ import org.apache.calcite.rex.{RexLiteral, RexNode}
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.{AtomicType, SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.table.api.{TableException, Types, ValidationException}
import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.expressions.{Cast, ResolvedFieldReference}
import org.apache.flink.table.expressions.{Call, Cast, ResolvedFieldReference}
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -337,6 +337,7 @@ object TableSourceUtil {
* @return The [[RexNode]] expression to extract the timestamp of the table source.
*/
def getRowtimeExtractionExpression(
tableEnv: TableEnvironment,
tableSource: TableSource[_],
selectedFields: Option[Array[Int]],
cluster: RelOptCluster,
Expand Down Expand Up @@ -378,7 +379,11 @@ object TableSourceUtil {
new Array[ResolvedFieldReference](0)
}

val expression = tsExtractor.getExpression(fieldAccesses)
var expression = tsExtractor.getExpression(fieldAccesses)
expression = expression match {
case x : Call => tableEnv.functionCatalog.lookupFunction(x.functionName, x.args)
case _ => expression
}
// add cast to requested type and convert expression to RexNode
val rexExpression = Cast(expression, resultType).toRexNode(relBuilder)
relBuilder.clear()
Expand Down
Expand Up @@ -20,14 +20,14 @@ package org.apache.flink.table.sources.tsextractors

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{Types, ValidationException}
import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldReference}
import org.apache.flink.table.expressions.{Call, Cast, Expression, Literal, ResolvedFieldReference}

/**
* Converts an existing [[Long]] or [[java.sql.Timestamp]] field into a rowtime attribute.
*
* @param field The field to convert into a rowtime attribute.
*/
class ExistingField(field: String) extends TimestampExtractor {
class ExistingField(field: String, arguments: String*) extends TimestampExtractor {

override def getArgumentFields: Array[String] = Array(field)

Expand All @@ -41,6 +41,7 @@ class ExistingField(field: String) extends TimestampExtractor {
fieldType match {
case Types.LONG => // OK
case Types.SQL_TIMESTAMP => // OK
case Types.STRING => // OK
case _: TypeInformation[_] =>
throw ValidationException(
s"Field '$field' must be of type Long or Timestamp but is of type $fieldType.")
Expand All @@ -62,6 +63,9 @@ class ExistingField(field: String) extends TimestampExtractor {
case Types.SQL_TIMESTAMP =>
// cast timestamp to long
Cast(fieldAccess, Types.LONG)
case Types.STRING =>
// cast string to long with the internal UDF
Call("toEventTime", Seq(fieldAccess) ++ arguments.map(f => Literal(f)))
}
}

Expand Down
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.table.runtime.stream.table

import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong}
import java.text.SimpleDateFormat
import java.util.TimeZone

import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTimestamp}
import org.apache.flink.api.common.typeinfo.TypeInformation
Expand All @@ -31,6 +33,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase}
import org.apache.flink.table.sources.StreamTableSource
import org.apache.flink.table.utils._
Expand Down Expand Up @@ -690,4 +693,51 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase {
"3,Mike,30000,true,3000")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}

@Test
def testRowtimeStringTableSource(): Unit = {
StreamITCase.testResults = mutable.MutableList()
val tableName = "MyTable"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tEnv = TableEnvironment.getTableEnvironment(env)

val data = Seq(
"1970-01-01 00:00:00",
"1970-01-01 00:00:01",
"1970-01-01 00:00:01",
"1970-01-01 00:00:02",
"1970-01-01 00:00:04")

val schema = new TableSchema(Array("rtime"), Array(Types.SQL_TIMESTAMP))
val returnType = Types.STRING

val tableSource = new TestTableSourceWithTime(schema, returnType, data, "rtime", null)
tEnv.registerTableSource(tableName, tableSource)

tEnv.registerFunction("toEventTime", str2EventTime)

tEnv.scan(tableName)
.window(Tumble over 1.second on 'rtime as 'w)
.groupBy('w)
.select('w.start, 1.count)
.addSink(new StreamITCase.StringSink[Row])
env.execute()

val expected = Seq(
"1970-01-01 00:00:00.0,1",
"1970-01-01 00:00:01.0,2",
"1970-01-01 00:00:02.0,1",
"1970-01-01 00:00:04.0,1")
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
}

case object str2EventTime extends ScalarFunction {
def eval(value: String, pattern: String): Long = {
val format: SimpleDateFormat = new SimpleDateFormat(pattern)
format.setTimeZone(TimeZone.getTimeZone("GMT+0"))
format.parse(value).getTime
}
}

Expand Up @@ -60,7 +60,7 @@ class TestTableSourceWithTime[T](
if (rowtime != null) {
Collections.singletonList(new RowtimeAttributeDescriptor(
rowtime,
new ExistingField(rowtime),
new ExistingField(rowtime, "yyyy-MM-dd HH:mm:ss"),
new AscendingTimestamps))
} else {
Collections.EMPTY_LIST.asInstanceOf[util.List[RowtimeAttributeDescriptor]]
Expand Down

0 comments on commit afcc5f1

Please sign in to comment.