Skip to content

Commit

Permalink
[FLINK-7698] [table] Tests joins with null literals
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Nov 16, 2017
1 parent d0b2aa2 commit 101fef7
Showing 1 changed file with 45 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.flink.table.api.stream.sql

import org.apache.calcite.rel.logical.LogicalJoin
import org.apache.flink.api.scala._
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.Null
import org.apache.flink.table.plan.logical.TumblingGroupWindow
import org.apache.flink.table.runtime.join.WindowJoinUtil
import org.apache.flink.table.utils.TableTestUtil.{term, _}
Expand Down Expand Up @@ -244,6 +246,49 @@ class JoinTest extends TableTestBase {
streamUtil.verifySql(sqlQuery, expected)
}

@Test
def testJoinWithNullLiteral(): Unit = {
val streamUtil: StreamTableTestUtil = streamTestUtil()

val t1 = streamUtil.addTable[(Int, Long, String)]("Table1", 'a, 'b, 'c, 'proctime.proctime)
.select('a, 'b, 'c, 'proctime, Null(Types.LONG) as 'nullField)

val t2 = streamUtil.addTable[(Int, Long, String)]("Table2", 'a, 'b, 'c, 'proctime.proctime)
.select('a, 'b, 'c, 'proctime, 12L as 'nullField)

streamUtil.tableEnv.registerTable("T1", t1)
streamUtil.tableEnv.registerTable("T2", t2)

val sqlQuery =
"""
|SELECT t2.a, t2.c, t1.c
|FROM T1 AS t1
|JOIN T2 AS t2 ON t1.a = t2.a AND t1.nullField = t2.nullField AND
| t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND
| t2.proctime + INTERVAL '5' SECOND
|""".stripMargin

val expected =
unaryNode("DataStreamCalc",
binaryNode("DataStreamWindowJoin",
unaryNode("DataStreamCalc",
streamTableNode(0),
term("select", "a", "c", "proctime", "null AS nullField")
),
unaryNode("DataStreamCalc",
streamTableNode(1),
term("select", "a", "c", "proctime", "12 AS nullField")
),
term("where", "AND(=(a, a0), =(nullField, nullField0), >=(proctime, " +
"-(proctime0, 5000)), <=(proctime, DATETIME_PLUS(proctime0, 5000)))"),
term("join", "a", "c", "proctime", "nullField", "a0", "c0", "proctime0", "nullField0"),
term("joinType", "InnerJoin")
),
term("select", "a0 AS a", "c0 AS c", "c AS c0")
)
streamUtil.verifySql(sqlQuery, expected)
}

@Test
def testRowTimeInnerJoinAndWindowAggregationOnFirst(): Unit = {

Expand Down

0 comments on commit 101fef7

Please sign in to comment.