Skip to content

Commit

Permalink
feat: Properly implement and fix ProjectionPushDown optimization. Now…
Browse files Browse the repository at this point in the history
… catalogs can implement

proper projection of needed columns.
  • Loading branch information
kuseman committed Apr 9, 2024
1 parent 9f05c30 commit aae131b
Show file tree
Hide file tree
Showing 51 changed files with 1,604 additions and 602 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ public class DatasourceData
private final Optional<Schema> schema;
private final List<IPredicate> predicates;
private final List<? extends ISortItem> sortItems;
private final List<String> projection;
private final Optional<List<String>> projection;
private final List<Option> options;

public DatasourceData(int nodeId, Optional<Schema> schema, List<IPredicate> predicates, List<? extends ISortItem> sortItems, List<String> projection, List<Option> options)
public DatasourceData(int nodeId, Optional<Schema> schema, List<IPredicate> predicates, List<? extends ISortItem> sortItems, Optional<List<String>> projection, List<Option> options)
{
this.nodeId = nodeId;
this.schema = requireNonNull(schema, "schema");
Expand Down Expand Up @@ -63,9 +63,9 @@ public List<? extends ISortItem> getSortItems()

/**
* Return the projected columns needed for this data source. This is a optimization hint that can be used for catalogs that supports fetching of specific columns only like RDMBS:es. If empty then
* all column should be returned
* all column should be returned. Special case is when non empty with an empty list this means that all rows should be returned but no data is wanted for this data source.
*/
public List<String> getProjection()
public Optional<List<String>> getProjection()
{
return projection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import org.apache.commons.lang3.ArrayUtils;

import se.kuseman.payloadbuilder.api.QualifiedName;
import se.kuseman.payloadbuilder.api.catalog.Column;
Expand Down Expand Up @@ -48,12 +51,13 @@ class JdbcDatasource implements IDatasource
private final JdbcCatalog catalog;
private final String catalogAlias;
private final QualifiedName table;
private final List<String> projection;
private final Optional<List<String>> projection;
private final List<IPredicate> predicates;
private final List<ISortItem> sortItems;
private final ISeekPredicate indexPredicate;

JdbcDatasource(JdbcCatalog catalog, String catalogAlias, QualifiedName table, ISeekPredicate indexPredicate, List<String> projection, List<IPredicate> predicates, List<ISortItem> sortItems)
JdbcDatasource(JdbcCatalog catalog, String catalogAlias, QualifiedName table, ISeekPredicate indexPredicate, Optional<List<String>> projection, List<IPredicate> predicates,
List<ISortItem> sortItems)
{
this.catalog = catalog;
this.catalogAlias = catalogAlias;
Expand Down Expand Up @@ -94,7 +98,9 @@ public TupleIterator execute(IExecutionContext context, IDatasourceOptions optio
{
SqlDialect dialect = DialectProvider.getDialect(context.getSession(), catalogAlias);
String sql = buildSql(dialect, context, false);
return getIterator(dialect, catalog, context, catalogAlias, sql, null, options.getBatchSize(context));
return getIterator(dialect, catalog, context, catalogAlias, sql, null, options.getBatchSize(context), projection.isPresent()
&& projection.get()
.isEmpty());
}

// CSOFF
Expand All @@ -103,9 +109,12 @@ private String buildSql(SqlDialect dialect, IExecutionContext context, boolean d
{
StringBuilder sb = new StringBuilder("SELECT ");
sb.append(projection.isEmpty() ? "y.*"
: projection.stream()
.map(c -> "y." + c)
.collect(joining(",")));
: projection.get()
.isEmpty() ? "1"
: projection.get()
.stream()
.map(c -> "y." + c)
.collect(joining(",")));
sb.append(" FROM ");
sb.append(table.toString())
.append(" y");
Expand Down Expand Up @@ -290,7 +299,8 @@ private void appendComparisonValue(StringBuilder sb, IPredicate predicate)
}

/** Returns a row iterator with provided query and parameters */
static TupleIterator getIterator(SqlDialect dialect, JdbcCatalog catalog, IExecutionContext context, String catalogAlias, String query, List<Object> parameters, int batchSize)
static TupleIterator getIterator(SqlDialect dialect, JdbcCatalog catalog, IExecutionContext context, String catalogAlias, String query, List<Object> parameters, int batchSize,
boolean emptyProjection)
{
final String database = context.getSession()
.getCatalogProperty(catalogAlias, JdbcCatalog.DATABASE)
Expand All @@ -304,7 +314,8 @@ static TupleIterator getIterator(SqlDialect dialect, JdbcCatalog catalog, IExecu
private Connection connection;
private volatile Statement statement;
private ResultSet rs;
private String[] columns;
private String[] columns = emptyProjection ? ArrayUtils.EMPTY_STRING_ARRAY
: null;
private int[] jdbcTypes;
private boolean resultSetEnded = false;
private volatile boolean abort = false;
Expand Down Expand Up @@ -334,11 +345,14 @@ public TupleVector next()
populateMeta();
}

List<Object[]> batch = new ArrayList<>(batchSize);
int currentBatchSize = 0;
List<Object[]> batch = emptyProjection ? null
: new ArrayList<>(batchSize);
int length = columns.length;
do
{
Object[] values = new Object[length];
Object[] values = emptyProjection ? null
: new Object[length];
for (int i = 0; i < length; i++)
{
if (abort)
Expand All @@ -353,11 +367,15 @@ public TupleVector next()
break;
}

batch.add(values);
currentBatchSize++;
if (batch != null)
{
batch.add(values);
}

resultSetEnded = !rs.next();

if (batch.size() >= batchSize)
if (currentBatchSize >= batchSize)
{
break;
}
Expand All @@ -373,7 +391,7 @@ public TupleVector next()
Schema schema = new Schema(Arrays.stream(columns)
.map(c -> Column.of(c, Type.Any))
.collect(toList()));
return new ObjectTupleVector(schema, batch.size(), (row, col) -> batch.get(row)[col]);
return new ObjectTupleVector(schema, currentBatchSize, (row, col) -> batch.get(row)[col]);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ public TupleIterator execute(IExecutionContext context, String catalogAlias, Opt

SqlDialect dialect = DialectProvider.getDialect(context.getSession(), catalogAlias);

return JdbcDatasource.getIterator(dialect, catalog, context, catalogAlias, query, parameters, options.getBatchSize(context));
return JdbcDatasource.getIterator(dialect, catalog, context, catalogAlias, query, parameters, options.getBatchSize(context), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void test_multi_type_scan() throws IOException

List<ISortItem> sortItems = new ArrayList<>(asList(mockSortItem(QualifiedName.of("key"))));
IDatasource ds = catalog.getScanDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of("_doc"),
new DatasourceData(0, Optional.empty(), emptyList(), sortItems, emptyList(), emptyList()));
new DatasourceData(0, Optional.empty(), emptyList(), sortItems, Optional.empty(), emptyList()));
IDatasourceOptions options = mockOptions(500);

// Verify that sort items are consumed
Expand Down Expand Up @@ -197,7 +197,7 @@ public void test_system_tables() throws IOException
IExecutionContext context = mockExecutionContext(CATALOG_ALIAS, ofEntries(entry("endpoint", endpoint), entry("index", INDEX)), 0, new ESDatasource.Data());

IDatasource ds = catalog.getSystemTableDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of("tables"),
new DatasourceData(0, Optional.empty(), emptyList(), emptyList(), emptyList(), emptyList()));
new DatasourceData(0, Optional.empty(), emptyList(), emptyList(), Optional.empty(), emptyList()));
IDatasourceOptions options = mockOptions(500);

TupleIterator it = ds.execute(context, options);
Expand Down Expand Up @@ -243,7 +243,7 @@ public void test_system_indices() throws IOException
IExecutionContext context = mockExecutionContext(CATALOG_ALIAS, ofEntries(entry("endpoint", endpoint), entry("index", INDEX)), 0, new ESDatasource.Data());

IDatasource ds = catalog.getSystemTableDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of("indices"),
new DatasourceData(0, Optional.of(ESCatalog.INDICES_SCHEMA.getSchema()), emptyList(), emptyList(), emptyList(), emptyList()));
new DatasourceData(0, Optional.of(ESCatalog.INDICES_SCHEMA.getSchema()), emptyList(), emptyList(), Optional.empty(), emptyList()));
IDatasourceOptions options = mockOptions(500);

TupleIterator it = ds.execute(context, options);
Expand Down Expand Up @@ -304,7 +304,7 @@ public void test_system_columns() throws IOException
IExecutionContext context = mockExecutionContext(CATALOG_ALIAS, ofEntries(entry("endpoint", endpoint), entry("index", INDEX)), 0, new ESDatasource.Data());

IDatasource ds = catalog.getSystemTableDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of("columns"),
new DatasourceData(0, Optional.empty(), emptyList(), emptyList(), emptyList(), emptyList()));
new DatasourceData(0, Optional.empty(), emptyList(), emptyList(), Optional.empty(), emptyList()));
IDatasourceOptions options = mockOptions(500);

TupleIterator it = ds.execute(context, options);
Expand Down Expand Up @@ -630,7 +630,7 @@ public void test_nested() throws IOException
IDatasource ds = catalog.getScanDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of(version.getStrategy()
.supportsTypes() ? type
: ESCatalog.SINGLE_TYPE_TABLE_NAME),
new DatasourceData(0, Optional.empty(), predicates, sortItems, emptyList(), emptyList()));
new DatasourceData(0, Optional.empty(), predicates, sortItems, Optional.empty(), emptyList()));
IDatasourceOptions options = mockOptions(500);

assertTrue(sortItems.isEmpty());
Expand Down Expand Up @@ -663,7 +663,8 @@ public void test_datasource_index_non_id_field() throws IOException
IExecutionContext context = mockExecutionContext(CATALOG_ALIAS, ofEntries(entry("endpoint", endpoint), entry("index", INDEX)), 0, data);
// Test non matching case of index field
ISeekPredicate seekPredicate = mockSeekPrecidate(context, "KEY", 123, null); // Null values should be excluded
IDatasource ds = catalog.getSeekDataSource(context.getSession(), CATALOG_ALIAS, seekPredicate, new DatasourceData(0, Optional.empty(), emptyList(), emptyList(), emptyList(), emptyList()));
IDatasource ds = catalog.getSeekDataSource(context.getSession(), CATALOG_ALIAS, seekPredicate,
new DatasourceData(0, Optional.empty(), emptyList(), emptyList(), Optional.empty(), emptyList()));
IDatasourceOptions options = mockOptions(500);

TupleIterator it = ds.execute(context, options);
Expand Down Expand Up @@ -708,7 +709,8 @@ public void test_datasource_index_id_field() throws IOException
ESDatasource.Data data = new ESDatasource.Data();
IExecutionContext context = mockExecutionContext(CATALOG_ALIAS, ofEntries(entry("endpoint", endpoint), entry("index", INDEX)), 0, data);
ISeekPredicate seekPredicate = mockSeekPrecidate(context, "__id", "001", null); // Null values should be excluded
IDatasource ds = catalog.getSeekDataSource(context.getSession(), CATALOG_ALIAS, seekPredicate, new DatasourceData(0, Optional.empty(), emptyList(), emptyList(), emptyList(), emptyList()));
IDatasource ds = catalog.getSeekDataSource(context.getSession(), CATALOG_ALIAS, seekPredicate,
new DatasourceData(0, Optional.empty(), emptyList(), emptyList(), Optional.empty(), emptyList()));
IDatasourceOptions options = mockOptions(500);

TupleIterator it = ds.execute(context, options);
Expand Down Expand Up @@ -753,7 +755,8 @@ public void test_datasource_index_non_id_field_batching() throws IOException
ESDatasource.Data data = new ESDatasource.Data();
IExecutionContext context = mockExecutionContext(CATALOG_ALIAS, ofEntries(entry("endpoint", endpoint), entry("index", INDEX)), 0, data);
ISeekPredicate seekPredicate = mockSeekPrecidate(context, "key", 123, 456);
IDatasource ds = catalog.getSeekDataSource(context.getSession(), CATALOG_ALIAS, seekPredicate, new DatasourceData(0, Optional.empty(), emptyList(), emptyList(), emptyList(), emptyList()));
IDatasource ds = catalog.getSeekDataSource(context.getSession(), CATALOG_ALIAS, seekPredicate,
new DatasourceData(0, Optional.empty(), emptyList(), emptyList(), Optional.empty(), emptyList()));
// Size 1 => 2 batches
IDatasourceOptions options = mockOptions(1);

Expand Down Expand Up @@ -816,7 +819,7 @@ public void test_datasource_sort_on_index() throws IOException
IExecutionContext context = mockExecutionContext(CATALOG_ALIAS, ofEntries(entry("endpoint", endpoint), entry("index", "test*")), 0, data);
List<ISortItem> sortItems = new ArrayList<>(asList(mockSortItem(QualifiedName.of("__index"), Order.DESC)));
IDatasource ds = catalog.getScanDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of(ESCatalog.SINGLE_TYPE_TABLE_NAME),
new DatasourceData(0, Optional.empty(), emptyList(), sortItems, emptyList(), emptyList()));
new DatasourceData(0, Optional.empty(), emptyList(), sortItems, Optional.empty(), emptyList()));
IDatasourceOptions options = mockOptions(500);

// Verify that sort items are consumed
Expand Down Expand Up @@ -859,7 +862,7 @@ public void test_datasource_filter_on_index() throws IOException

IExecutionContext context = mockExecutionContext(CATALOG_ALIAS, ofEntries(entry("endpoint", endpoint), entry("index", "*")), 0, data);
IDatasource ds = catalog.getScanDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of(ESCatalog.SINGLE_TYPE_TABLE_NAME),
new DatasourceData(0, Optional.empty(), predicates, emptyList(), emptyList(), emptyList()));
new DatasourceData(0, Optional.empty(), predicates, emptyList(), Optional.empty(), emptyList()));
IDatasourceOptions options = mockOptions(500);

// Verify that predicates are consumed
Expand Down Expand Up @@ -899,7 +902,7 @@ public void test_datasource_filter_on_type() throws IOException

List<IPredicate> predicates = new ArrayList<>(asList(IPredicateMock.eq("__type", "type2")));
IDatasource ds = catalog.getScanDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of("_doc"),
new DatasourceData(0, Optional.empty(), predicates, emptyList(), emptyList(), emptyList()));
new DatasourceData(0, Optional.empty(), predicates, emptyList(), Optional.empty(), emptyList()));
IDatasourceOptions options = mockOptions(500);

// Verify that predicates are consumed
Expand Down Expand Up @@ -947,7 +950,7 @@ public void test_datasource_filter_on_id() throws IOException

List<IPredicate> predicates = new ArrayList<>(asList(IPredicateMock.eq("__id", "002")));
IDatasource ds = catalog.getScanDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of("_doc"),
new DatasourceData(0, Optional.empty(), predicates, emptyList(), emptyList(), emptyList()));
new DatasourceData(0, Optional.empty(), predicates, emptyList(), Optional.empty(), emptyList()));
IDatasourceOptions options = mockOptions(500);

// Verify that predicates are consumed
Expand Down Expand Up @@ -998,7 +1001,7 @@ public void test_datasource_filter_on_id_with_in() throws IOException
List<ISortItem> sortItems = new ArrayList<>(asList(mockSortItem(QualifiedName.of("key"))));
List<IPredicate> predicates = new ArrayList<>(asList(IPredicateMock.in("__id", asList("002", "004"))));
IDatasource ds = catalog.getScanDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of("_doc"),
new DatasourceData(0, Optional.empty(), predicates, sortItems, emptyList(), emptyList()));
new DatasourceData(0, Optional.empty(), predicates, sortItems, Optional.empty(), emptyList()));
IDatasourceOptions options = mockOptions(500);

// Verify that predicates/sort items are consumed
Expand Down Expand Up @@ -1050,7 +1053,7 @@ public void test_datasource_filter_on_id_with_not_in() throws IOException
List<ISortItem> sortItems = new ArrayList<>(asList(mockSortItem(QualifiedName.of("key"))));
List<IPredicate> predicates = new ArrayList<>(asList(IPredicateMock.notIn("__id", asList("001", "003"))));
IDatasource ds = catalog.getScanDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of("_doc"),
new DatasourceData(0, Optional.empty(), predicates, sortItems, emptyList(), emptyList()));
new DatasourceData(0, Optional.empty(), predicates, sortItems, Optional.empty(), emptyList()));
IDatasourceOptions options = mockOptions(500);

// Verify that predicates/sort items are consumed
Expand Down Expand Up @@ -1099,7 +1102,8 @@ public void test_datasource_index_id_field_batching() throws IOException
ESDatasource.Data data = new ESDatasource.Data();
IExecutionContext context = mockExecutionContext(CATALOG_ALIAS, ofEntries(entry("endpoint", endpoint), entry("index", INDEX)), 0, data);
ISeekPredicate seekPredicate = mockSeekPrecidate(context, "__id", "001", "002", "003");
IDatasource ds = catalog.getSeekDataSource(context.getSession(), CATALOG_ALIAS, seekPredicate, new DatasourceData(0, Optional.empty(), emptyList(), emptyList(), emptyList(), emptyList()));
IDatasource ds = catalog.getSeekDataSource(context.getSession(), CATALOG_ALIAS, seekPredicate,
new DatasourceData(0, Optional.empty(), emptyList(), emptyList(), Optional.empty(), emptyList()));
// Size 2 => 2 batches
IDatasourceOptions options = mockOptions(2);

Expand Down Expand Up @@ -1176,7 +1180,7 @@ public void test_datasource_table_scan() throws IOException
IDatasource ds = catalog.getScanDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of(version.getStrategy()
.supportsTypes() ? type
: ESCatalog.SINGLE_TYPE_TABLE_NAME),
new DatasourceData(0, Optional.empty(), emptyList(), sortItems, emptyList(), emptyList()));
new DatasourceData(0, Optional.empty(), emptyList(), sortItems, Optional.empty(), emptyList()));
IDatasourceOptions options = TestUtils.mockOptions(500);

// Verify that sort items are consumed
Expand Down Expand Up @@ -1215,7 +1219,7 @@ public void test_datasource_table_scan_batching() throws IOException
IDatasource ds = catalog.getScanDataSource(context.getSession(), CATALOG_ALIAS, QualifiedName.of(version.getStrategy()
.supportsTypes() ? type
: ESCatalog.SINGLE_TYPE_TABLE_NAME),
new DatasourceData(0, Optional.empty(), emptyList(), sortItems, emptyList(), emptyList()));
new DatasourceData(0, Optional.empty(), emptyList(), sortItems, Optional.empty(), emptyList()));
// Size 2 => 2 batches
IDatasourceOptions options = mockOptions(2);

Expand Down
Loading

0 comments on commit aae131b

Please sign in to comment.