Skip to content

Commit

Permalink
TEIID-5146 fixing issues with lateral joins
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Nov 13, 2017
1 parent a0e37ab commit 56220c1
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 5 deletions.
Expand Up @@ -125,7 +125,7 @@ public SharedTupleSource(SharedState state) {

@Override
public void closeSource() {
if (!closed && --state.expectedReaders == 0 && sharedStates != null && sharedStates.containsKey(state.id)) {
if (!closed && state.expectedReaders != -1 && --state.expectedReaders == 0 && sharedStates != null && sharedStates.containsKey(state.id)) {
state.remove();
sharedStates.remove(state.id);
}
Expand Down
Expand Up @@ -482,7 +482,7 @@ protected void visitNode(org.teiid.query.sql.LanguageObject obj) {
}
//check if valid to share this with other nodes
if (ev != null && ev.getDeterminismLevel().compareTo(Determinism.COMMAND_DETERMINISTIC) >= 0 && command.areResultsCachable()) {
checkForSharedSourceCommand(aNode);
checkForSharedSourceCommand(aNode, node);
}
}
if (subPlans != null) {
Expand Down Expand Up @@ -759,7 +759,7 @@ private Command aliasCommand(AccessNode aNode, Command command,
return command;
}

private void checkForSharedSourceCommand(AccessNode aNode) {
private void checkForSharedSourceCommand(AccessNode aNode, PlanNode node) {
//create a top level key to avoid the full command toString
String modelName = aNode.getModelName();
Command cmd = aNode.getCommand();
Expand All @@ -778,14 +778,38 @@ private void checkForSharedSourceCommand(AccessNode aNode) {
}

AccessNode other = sharedCommands.get(cmd);

//lateral may be reused any number of times,
//so this requires special handling
//we will clean it up at the end
boolean lateral = false;
while (node.getParent() != null) {
if (node.getParent().getType() == NodeConstants.Types.JOIN
&& node.getParent().getProperty(Info.JOIN_STRATEGY) == JoinStrategyType.NESTED_TABLE
&& node.getParent().getLastChild() == node) {
lateral = true;
break;
}
node = node.getParent();
}

if (other == null) {
sharedCommands.put(cmd, aNode);
if (lateral) {
aNode.info = new RegisterRequestParameter.SharedAccessInfo();
aNode.info.id = sharedId.getAndIncrement();
aNode.info.sharingCount = -1;
}
} else {
if (other.info == null) {
other.info = new RegisterRequestParameter.SharedAccessInfo();
other.info.id = sharedId.getAndIncrement();
}
other.info.sharingCount++;
if (other.info.sharingCount != -1) {
other.info.sharingCount++;
} else if (lateral) {
other.info.sharingCount = -1;
}
aNode.info = other.info;
}
}
Expand Down
Expand Up @@ -1666,11 +1666,14 @@ public static Object getTrackableGroup(GroupSymbol group, QueryMetadataInterface
private SymbolMap getCorrelatedReferences(PlanNode parent, PlanNode node,
LanguageObject lo) {
PlanNode rootJoin = parent;
Set<GroupSymbol> groups = new HashSet<GroupSymbol>(rootJoin.getGroups());
while (rootJoin.getParent() != null && rootJoin.getParent().getType() == NodeConstants.Types.JOIN && !rootJoin.getParent().getGroups().isEmpty()) {
rootJoin = rootJoin.getParent();
//accumulate groups as we go, as intermediate joins may not contribute groups to the final join
groups.addAll(rootJoin.getGroups());
}
List<Reference> correlatedReferences = new ArrayList<Reference>();
CorrelatedReferenceCollectorVisitor.collectReferences(lo, rootJoin.getGroups(), correlatedReferences, metadata);
CorrelatedReferenceCollectorVisitor.collectReferences(lo, groups, correlatedReferences, metadata);

if (correlatedReferences.isEmpty()) {
return null;
Expand Down
25 changes: 25 additions & 0 deletions engine/src/test/java/org/teiid/query/processor/TestProcessor.java
Expand Up @@ -7842,5 +7842,30 @@ private void sampleDataBQT2a(FakeDataManager dataMgr) throws Exception {
TestProcessor.helpProcess(plan, dataManager, new List<?>[] {Arrays.asList("Test1@mail.com", "test1@mail.com", "test1@mail.com", "test1@mail.com")});
}

@Test public void testMixedAnsiLateralJoins() throws Exception {
int rows = 20;
TransformationMetadata metadata = RealMetadataFactory.fromDDL("CREATE VIRTUAL PROCEDURE pr0(arg1 string) returns (res1 string) AS\n" +
" BEGIN\n" +
" SELECT '2017-01-01';\n" +
" END;"
+ "create foreign table test_t1(col_t1 varchar) options (cardinality 20); create foreign table test_t2(col_t2 integer) options (cardinality 20);", "x", "y");

//prior to the fix, the following worked as non-ansi, but not with ansi

String sql = "SELECT d.col_t2 FROM \"test_t1\", table(CALL pr0(\"arg1\" => col_t1)) x\n" +
" cross join table(select * from \"test_t2\" where col_t2 >= res1) d";
BasicSourceCapabilities caps = TestOptimizer.getTypicalCapabilities();
caps.setCapabilitySupport(Capability.QUERY_FROM_JOIN_LATERAL, true);
ProcessorPlan plan = TestProcessor.helpGetPlan(sql, metadata, new DefaultCapabilitiesFinder(caps));
HardcodedDataManager dataManager = new HardcodedDataManager();
List<?>[] vals = new List<?>[rows];
Arrays.fill(vals, Arrays.asList("1"));
List<?>[] vals1 = new List<?>[rows];
Arrays.fill(vals1, Arrays.asList(1));
dataManager.addData("SELECT g_0.col_t1 FROM y.test_t1 AS g_0", vals);
dataManager.addData("SELECT g_0.col_t2 FROM y.test_t2 AS g_0", vals1);
TestProcessor.helpProcess(plan, dataManager, new List<?>[] {});
}

private static final boolean DEBUG = false;
}
47 changes: 47 additions & 0 deletions runtime/src/test/java/org/teiid/runtime/TestEmbeddedServer.java
Expand Up @@ -2272,5 +2272,52 @@ public void loadMetadata(MetadataFactory factory,

s.execute("select * from g1");
}

@Test public void testLateralTupleSourceReuse() throws Exception {
es.start(new EmbeddedConfiguration());
int rows = 20;
ModelMetaData mmd = new ModelMetaData();
mmd.setName("y");
mmd.addSourceMetadata("ddl", "CREATE VIRTUAL PROCEDURE pr0(arg1 string) returns (res1 string) AS\n" +
" BEGIN\n" +
" SELECT '2017-01-01';\n" +
" END;"
+ "create foreign table test_t1(col_t1 varchar) options (cardinality 20); create foreign table test_t2(col_t2 integer) options (cardinality 20);");
mmd.addSourceMapping("y", "y", null);

HardCodedExecutionFactory hcef = new HardCodedExecutionFactory();
es.addTranslator("y", hcef);

es.deployVDB("x", mmd);

String sql = "SELECT d.col_t2 FROM \"test_t1\", table(CALL pr0(\"arg1\" => col_t1)) x\n" +
" join table(select * from \"test_t2\") d \n" +
" on true " +
"UNION all\n" +
"SELECT d.col_t2 FROM \"test_t1\", table(CALL pr0(\"arg1\" => col_t1)) x\n" +
" join table(select * from \"test_t2\") d \n" +
" on true " +
" limit 100";

List<?>[] vals = new List<?>[rows];
Arrays.fill(vals, Arrays.asList("1"));
List<?>[] vals1 = new List<?>[rows];
Arrays.fill(vals1, Arrays.asList(1));
hcef.addData("SELECT test_t1.col_t1 FROM test_t1", Arrays.asList(vals));
hcef.addData("SELECT test_t2.col_t2 FROM test_t2", Arrays.asList(vals1));

Connection c = es.getDriver().connect("jdbc:teiid:x;", null);
Statement s = c.createStatement();

s.executeQuery(sql);

ResultSet rs = s.getResultSet();
int count = 0;
while (rs.next()) {
count++;
}
rs.close();
assertEquals(100, count);
}

}

0 comments on commit 56220c1

Please sign in to comment.