forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 1
/
SetOperatorsTest.scala
75 lines (68 loc) · 2.46 KB
/
SetOperatorsTest.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.scala.batch.sql
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.utils.TableTestBase
import org.apache.flink.api.table.utils.TableTestUtil._
import org.junit.Test
class SetOperatorsTest extends TableTestBase {
@Test
def testExists(): Unit = {
val util = batchTestUtil()
util.addTable[(Long, Int, String)]("A", 'a_long, 'a_int, 'a_string)
util.addTable[(Long, Int, String)]("B", 'b_long, 'b_int, 'b_string)
val expected = unaryNode(
"DataSetCalc",
binaryNode(
"DataSetJoin",
batchTableNode(0),
unaryNode(
"DataSetAggregate",
unaryNode(
"DataSetCalc",
binaryNode(
"DataSetJoin",
batchTableNode(1),
unaryNode(
"DataSetAggregate",
batchTableNode(0),
term("groupBy", "a_long"),
term("select", "a_long")
),
term("where", "=(a_long, b_long)"),
term("join", "b_long", "b_int", "b_string", "a_long"),
term("joinType", "InnerJoin")
),
term("select", "a_long", "true AS $f0")
),
term("groupBy", "a_long"),
term("select", "a_long", "MIN($f0) AS $f1")
),
term("where", "=(a_long, a_long0)"),
term("join", "a_long", "a_int", "a_string", "a_long0", "$f1"),
term("joinType", "InnerJoin")
),
term("select", "a_int", "a_string")
)
util.verifySql(
"SELECT a_int, a_string FROM A WHERE EXISTS(SELECT * FROM B WHERE a_long = b_long)",
expected
)
}
}