Skip to content

Commit

Permalink
!fixup Implement timestamp predicate pushdown in Druid connector
Browse files Browse the repository at this point in the history
Co-authored-by: Liu Yang <whilgeek@gmail.com>
  • Loading branch information
Praveen2112 and jerryleooo committed Jul 29, 2022
1 parent 81a84e6 commit f1122d3
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 23 deletions.
Expand Up @@ -134,6 +134,8 @@ public class DruidJdbcClient
private static final String DRUID_CATALOG = "druid";
// All the datasources in Druid are created under schema "druid"
private static final String DRUID_SCHEMA = "druid";

// TODO We could also re-evaluate this logic by using a new Calendar for each row if necessary
private static final Calendar UTC_CALENDAR = Calendar.getInstance(TimeZone.getTimeZone(UTC));

private static final DateTimeFormatter LOCAL_DATE_TIME = new DateTimeFormatterBuilder()
Expand Down Expand Up @@ -309,7 +311,7 @@ public static ColumnMapping timestampColumnMappingUsingSqlTimestampWithFullPushd
return ColumnMapping.longMapping(
timestampType,
(resultSet, columnIndex) -> {
// Druis's ResultSet depends on JDBC Connection TimeZone, so we pass the Calendar to get the result at UTC.
// Druid's ResultSet depends on JDBC Connection TimeZone, so we pass the Calendar to get the result at UTC.
Instant instant = Instant.ofEpochMilli(resultSet.getTimestamp(columnIndex, UTC_CALENDAR).getTime());
LocalDateTime timestamp = LocalDateTime.ofInstant(instant, UTC);
return toTrinoTimestamp(timestampType, timestamp);
Expand Down
Expand Up @@ -31,14 +31,18 @@
import org.intellij.lang.annotations.Language;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static io.trino.plugin.druid.DruidQueryRunner.copyAndIngestTpchData;
import static io.trino.plugin.druid.DruidTpchTables.SELECT_FROM_ORDERS;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.sql.planner.assertions.PlanMatchPattern.anyTree;
import static io.trino.sql.planner.assertions.PlanMatchPattern.node;
import static io.trino.sql.planner.assertions.PlanMatchPattern.output;
import static io.trino.sql.planner.assertions.PlanMatchPattern.values;
import static io.trino.testing.MaterializedResult.resultBuilder;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -396,61 +400,87 @@ public void testPredicatePushdown()
}

@Test
public void testPredicatePushdownForTimestamp()
public void testPredicatePushdownForTimestampWithSecondsPrecision()
{
// timestamp equality
assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time = TIMESTAMP '1992-01-04 00:00:00'"))
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isFullyPushedDown();

assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time = TIMESTAMP '1992-01-04 00:00:00.001'"))
.returnsEmptyResult()
.isNotFullyPushedDown(FilterNode.class);

// timestamp comparison
assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time < TIMESTAMP '1992-01-05'"))
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isFullyPushedDown();

assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time < TIMESTAMP '1992-01-05 00:00:00.001'"))
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isNotFullyPushedDown(FilterNode.class);

assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time <= TIMESTAMP '1992-01-04'"))
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isFullyPushedDown();

assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time <= TIMESTAMP '1992-01-04 00:00:00.001'"))
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isNotFullyPushedDown(FilterNode.class);

assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time > TIMESTAMP '1998-11-28'"))
.matches("VALUES " +
"(BIGINT '2', BIGINT '370', CAST('RAIL' AS varchar)), " +
"(BIGINT '2', BIGINT '468', CAST('AIR' AS varchar))")
.isFullyPushedDown();

assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time > TIMESTAMP '1998-11-28 00:00:00.001'"))
assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time >= TIMESTAMP '1998-11-29 00:00:00'"))
.matches("VALUES " +
"(BIGINT '2', BIGINT '370', CAST('RAIL' AS varchar)), " +
"(BIGINT '2', BIGINT '468', CAST('AIR' AS varchar))")
.isFullyPushedDown();

// timestamp range
assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time BETWEEN TIMESTAMP '1992-01-01' AND TIMESTAMP '1992-01-05'"))
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isFullyPushedDown();

// varchar IN without domain compaction
assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time IN (TIMESTAMP '1992-01-04', TIMESTAMP '1998-11-27 00:00:00.000', TIMESTAMP '1998-11-28')"))
.matches("VALUES " +
"(BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar)), " +
"(BIGINT '1', BIGINT '574', CAST('AIR' AS varchar))")
.isFullyPushedDown();

// varchar IN with small compaction threshold
assertThat(query(
Session.builder(getSession())
.setCatalogSessionProperty("druid", "domain_compaction_threshold", "1")
.build(),
"SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time IN (TIMESTAMP '1992-01-04', TIMESTAMP '1998-11-27 00:00:00', TIMESTAMP '1998-11-28')"))
.matches("VALUES " +
"(BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar)), " +
"(BIGINT '1', BIGINT '574', CAST('AIR' AS varchar))")
// Filter node is retained as no constraint is pushed into connector.
.isNotFullyPushedDown(FilterNode.class);
}

assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time >= TIMESTAMP '1998-11-29 00:00:00'"))
@Test
public void testPredicatePushdownForTimestampWithMillisPrecision()
{
// timestamp equality
assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time = TIMESTAMP '1992-01-04 00:00:00.001'"))
.returnsEmptyResult()
.isNotFullyPushedDown(FilterNode.class);

// timestamp comparison
assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time < TIMESTAMP '1992-01-05 00:00:00.001'"))
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isNotFullyPushedDown(FilterNode.class);

assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time <= TIMESTAMP '1992-01-04 00:00:00.001'"))
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isNotFullyPushedDown(FilterNode.class);

assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time > TIMESTAMP '1998-11-28 00:00:00.001'"))
.matches("VALUES " +
"(BIGINT '2', BIGINT '370', CAST('RAIL' AS varchar)), " +
"(BIGINT '2', BIGINT '468', CAST('AIR' AS varchar))")
.isFullyPushedDown();
.isNotFullyPushedDown(FilterNode.class);

assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time >= TIMESTAMP '1998-11-29 00:00:00.001'"))
.returnsEmptyResult()
.isNotFullyPushedDown(FilterNode.class);

// timestamp range
assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time BETWEEN TIMESTAMP '1992-01-01' AND TIMESTAMP '1992-01-05'"))
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isFullyPushedDown();

assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time BETWEEN TIMESTAMP '1992-01-01 00:00:00.001' AND TIMESTAMP '1992-01-05'"))
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isNotFullyPushedDown(FilterNode.class);
Expand All @@ -459,7 +489,7 @@ public void testPredicatePushdownForTimestamp()
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isNotFullyPushedDown(FilterNode.class);

// varchar IN without domain compaction
// timestamp IN without domain compaction
assertThat(query("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time IN (TIMESTAMP '1992-01-04', TIMESTAMP '1998-11-27 00:00:00.000', TIMESTAMP '1998-11-28')"))
.matches("VALUES " +
"(BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar)), " +
Expand All @@ -472,7 +502,7 @@ public void testPredicatePushdownForTimestamp()
"(BIGINT '1', BIGINT '574', CAST('AIR' AS varchar))")
.isNotFullyPushedDown(FilterNode.class);

// varchar IN with small compaction threshold
// timestamp IN with small compaction threshold
assertThat(query(
Session.builder(getSession())
.setCatalogSessionProperty("druid", "domain_compaction_threshold", "1")
Expand All @@ -484,4 +514,61 @@ public void testPredicatePushdownForTimestamp()
// Filter node is retained as no constraint is pushed into connector.
.isNotFullyPushedDown(FilterNode.class);
}

@Test(dataProvider = "timestampValuesProvider")
public void testPredicatePushdownForTimestampWithHigherPrecision(String timestamp)
{
// timestamp equality
assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time = TIMESTAMP '%s'", timestamp)))
.returnsEmptyResult()
.matches(output(
values("linenumber", "partkey", "shipmode")));

// timestamp comparison
assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time < TIMESTAMP '%s'", timestamp)))
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isNotFullyPushedDown(FilterNode.class);

assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time <= TIMESTAMP '%s'", timestamp)))
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isNotFullyPushedDown(FilterNode.class);

assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time > (TIMESTAMP '%s' + INTERVAL '2520' DAY)", timestamp)))
.matches("VALUES " +
"(BIGINT '2', BIGINT '370', CAST('RAIL' AS varchar)), " +
"(BIGINT '2', BIGINT '468', CAST('AIR' AS varchar))")
.isNotFullyPushedDown(FilterNode.class);

assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time >= (TIMESTAMP '%s' + INTERVAL '2521' DAY)", timestamp)))
.returnsEmptyResult()
.isNotFullyPushedDown(FilterNode.class);

// timestamp range
assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time BETWEEN TIMESTAMP '1992-01-04' AND TIMESTAMP '%s'", timestamp)))
.matches("VALUES (BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar))")
.isNotFullyPushedDown(FilterNode.class);

// varchar IN without domain compaction
assertThat(query(format("SELECT linenumber, partkey, shipmode FROM lineitem WHERE __time IN (TIMESTAMP '1992-01-04', TIMESTAMP '1998-11-27', TIMESTAMP '%s')", timestamp)))
.matches("VALUES " +
"(BIGINT '3', BIGINT '1673', CAST('RAIL' AS varchar)), " +
"(BIGINT '1', BIGINT '574', CAST('AIR' AS varchar))")
.isNotFullyPushedDown(FilterNode.class);
}

@DataProvider
public Object[][] timestampValuesProvider()
{
return new Object[][] {
{"1992-01-04 00:00:00.1234"},
{"1992-01-04 00:00:00.12345"},
{"1992-01-04 00:00:00.123456"},
{"1992-01-04 00:00:00.1234567"},
{"1992-01-04 00:00:00.12345678"},
{"1992-01-04 00:00:00.123456789"},
{"1992-01-04 00:00:00.1234567891"},
{"1992-01-04 00:00:00.12345678912"},
{"1992-01-04 00:00:00.123456789123"}
};
}
}

0 comments on commit f1122d3

Please sign in to comment.