diff --git a/docs/src/main/sphinx/connector/pinot.md b/docs/src/main/sphinx/connector/pinot.md index cc6a5a27274ea..f761c9994a5bc 100644 --- a/docs/src/main/sphinx/connector/pinot.md +++ b/docs/src/main/sphinx/connector/pinot.md @@ -33,9 +33,9 @@ This can be the ip or the FDQN, the url scheme (`http://`) is optional. ### General configuration properties | Property name | Required | Description | -|--------------------------------------------------------|----------| ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +|--------------------------------------------------------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `pinot.controller-urls` | Yes | A comma separated list of controller hosts. If Pinot is deployed via [Kubernetes](https://kubernetes.io/) this needs to point to the controller service endpoint. The Pinot broker and server must be accessible via DNS as Pinot returns hostnames and not IP addresses. | -| `pinot.broker-url` | No | A host and port of broker. If broker URL exposed by Pinot controller API is not accessible, this property can be used to specify the broker endpoint. Enabling this property will disable broker discovery. | +| `pinot.broker-url` | No | A host and port of broker. If broker URL exposed by Pinot controller API is not accessible, this property can be used to specify the broker endpoint. Enabling this property will disable broker discovery. | | `pinot.connection-timeout` | No | Pinot connection timeout, default is `15s`. | | `pinot.metadata-expiry` | No | Pinot metadata expiration time, default is `2m`. | | `pinot.controller.authentication.type` | No | Pinot authentication method for controller requests. Allowed values are `NONE` and `PASSWORD` - defaults to `NONE` which is no authentication. | @@ -55,6 +55,7 @@ This can be the ip or the FDQN, the url scheme (`http://`) is optional. | `pinot.count-distinct-pushdown.enabled` | No | Push down count distinct queries to Pinot, default is `true`. | | `pinot.target-segment-page-size` | No | Max allowed page size for segment query, default is `1MB`. | | `pinot.proxy.enabled` | No | Use Pinot Proxy for controller and broker requests, default is `false`. | +| `pinot.query-options` | No | Query options to be included with pinot queries. Default is null. For a varchar / string option please add single quates(`'`) aroud the value. example:`enableNullHanding:true,minReplicas:2,somethingElse:'YES'` | If `pinot.controller.authentication.type` is set to `PASSWORD` then both `pinot.controller.authentication.user` and `pinot.controller.authentication.password` are required. diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java index 723300b1bc903..a1dec050606cd 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotConfig.java @@ -22,6 +22,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.airlift.units.MinDuration; +import io.trino.plugin.pinot.query.PinotQueryBuilder; import jakarta.annotation.PostConstruct; import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotEmpty; @@ -69,6 +70,7 @@ public class PinotConfig private boolean countDistinctPushdownEnabled = true; private boolean proxyEnabled; private DataSize targetSegmentPageSize = DataSize.of(1, MEGABYTE); + private Optional queryOptions = Optional.empty(); @NotEmpty(message = "pinot.controller-urls cannot be empty") public List getControllerUrls() @@ -261,6 +263,25 @@ public PinotConfig setTargetSegmentPageSize(DataSize targetSegmentPageSize) return this; } + public String getQueryOptions() + { + return queryOptions.orElse(null); + } + + @Config("pinot.query-options") + @ConfigDescription("Comma separated list of query options. Each option should be in the format key:value. " + + "For example, enableNullHandling:true,skipUpsert:true,varcharOption:'value'") + public PinotConfig setQueryOptions(String options) + { + if (options == null) { + queryOptions = Optional.empty(); + } + else { + queryOptions = PinotQueryBuilder.getQueryOptionsString(options); // validate the options (throws exception if invalid) + } + return this; + } + @PostConstruct public void validate() { diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java index a39d968a998b1..3345ed2684a13 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotPageSourceProvider.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static io.trino.plugin.pinot.query.DynamicTablePqlExtractor.extractPql; import static io.trino.plugin.pinot.query.PinotQueryBuilder.generatePql; @@ -74,7 +75,8 @@ public ConnectorPageSource createPageSource( handles.add((PinotColumnHandle) handle); } PinotTableHandle pinotTableHandle = (PinotTableHandle) tableHandle; - String query = generatePql(pinotTableHandle, handles, pinotSplit.getSuffix(), pinotSplit.getTimePredicate(), limitForSegmentQueries); + Optional queryOptions = PinotSessionProperties.getQueryOptions(session); + String query = generatePql(pinotTableHandle, handles, pinotSplit.getSuffix(), pinotSplit.getTimePredicate(), limitForSegmentQueries, queryOptions); switch (pinotSplit.getSplitType()) { case SEGMENT: diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSessionProperties.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSessionProperties.java index feb82c1cd4cff..ffae4534b315c 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSessionProperties.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/PinotSessionProperties.java @@ -20,12 +20,14 @@ import io.trino.spi.session.PropertyMetadata; import java.util.List; +import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty; import static io.trino.spi.session.PropertyMetadata.booleanProperty; import static io.trino.spi.session.PropertyMetadata.integerProperty; +import static io.trino.spi.session.PropertyMetadata.stringProperty; public class PinotSessionProperties { @@ -37,6 +39,7 @@ public class PinotSessionProperties private static final String SEGMENTS_PER_SPLIT = "segments_per_split"; private static final String AGGREGATION_PUSHDOWN_ENABLED = "aggregation_pushdown_enabled"; private static final String COUNT_DISTINCT_PUSHDOWN_ENABLED = "count_distinct_pushdown_enabled"; + private static final String QUERY_OPTIONS = "query_options"; private final List> sessionProperties; @@ -84,6 +87,11 @@ public PinotSessionProperties(PinotConfig pinotConfig) COUNT_DISTINCT_PUSHDOWN_ENABLED, "Enable count distinct pushdown", pinotConfig.isCountDistinctPushdownEnabled(), + false), + stringProperty( + QUERY_OPTIONS, + "Pinot query option in the format of key:value,key:value", + pinotConfig.getQueryOptions(), false)); } @@ -130,6 +138,11 @@ public static boolean isCountDistinctPushdownEnabled(ConnectorSession session) return session.getProperty(COUNT_DISTINCT_PUSHDOWN_ENABLED, Boolean.class); } + public static Optional getQueryOptions(ConnectorSession session) + { + return Optional.ofNullable(session.getProperty(QUERY_OPTIONS, String.class)); + } + public List> getSessionProperties() { return sessionProperties; diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTablePqlExtractor.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTablePqlExtractor.java index 3eb3a2a020465..1d17bb02f7667 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTablePqlExtractor.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/DynamicTablePqlExtractor.java @@ -35,13 +35,7 @@ public static String extractPql(DynamicTable table, TupleDomain tu { StringBuilder builder = new StringBuilder(); Map queryOptions = table.queryOptions(); - queryOptions.keySet().stream().sorted().forEach( - key -> builder - .append("SET ") - .append(key) - .append(" = ") - .append(format("'%s'", queryOptions.get(key))) - .append(";\n")); + PinotQueryBuilder.getQueryOptions(queryOptions).ifPresent(builder::append); builder.append("SELECT "); if (!table.projections().isEmpty()) { builder.append(table.projections().stream() diff --git a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java index f87436b9d1432..6bba71c165f62 100755 --- a/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java +++ b/plugin/trino-pinot/src/main/java/io/trino/plugin/pinot/query/PinotQueryBuilder.java @@ -15,6 +15,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import io.airlift.slice.Slice; import io.trino.plugin.pinot.PinotColumnHandle; import io.trino.plugin.pinot.PinotTableHandle; @@ -36,8 +37,10 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.getOnlyElement; @@ -53,10 +56,17 @@ private PinotQueryBuilder() { } - public static String generatePql(PinotTableHandle tableHandle, List columnHandles, Optional tableNameSuffix, Optional timePredicate, int limitForSegmentQueries) + public static String generatePql( + PinotTableHandle tableHandle, + List columnHandles, + Optional tableNameSuffix, + Optional timePredicate, + int limitForSegmentQueries, + Optional queryOptions) { requireNonNull(tableHandle, "tableHandle is null"); StringBuilder pqlBuilder = new StringBuilder(); + queryOptions.ifPresent(pqlBuilder::append); List quotedColumnNames; if (columnHandles.isEmpty()) { // This occurs when the query is SELECT COUNT(*) FROM pinotTable ... @@ -227,4 +237,52 @@ private static String quoteIdentifier(String identifier) { return format("\"%s\"", identifier.replaceAll("\"", "\"\"")); } + + public static Optional getQueryOptionsString(String options) + { + if (isNullOrEmpty(options)) { + return Optional.empty(); + } + + Map queryOptionsMap = parseQueryOptions(options); + return getQueryOptions(queryOptionsMap); + } + + public static Optional getQueryOptions(Map queryOptionsMap) + { + if (queryOptionsMap.isEmpty()) { + return Optional.empty(); + } + String options = queryOptionsMap.entrySet().stream() + .map(e -> "SET " + e.getKey() + " = " + e.getValue()) + .collect(Collectors.joining(";\n")); + if (!options.endsWith(";")) { + options += ";\n"; + } + return Optional.of(options); + } + + public static Map parseQueryOptions(String options) + { + if (isNullOrEmpty(options)) { + return ImmutableMap.of(); + } + try { + // we allow escaping the delimiters like , and : using back-slash. + // To support that we create a negative lookbehind of , and : which + // are not preceded by a back-slash. + String headersDelim = "(? queryOptions = ImmutableMap.builder(); + for (String kv : options.split(headersDelim)) { + String key = kv.split(kvDelim, 2)[0].trim(); + String val = kv.split(kvDelim, 2)[1].trim(); + queryOptions.put(key, val); + } + return queryOptions.buildOrThrow(); + } + catch (IndexOutOfBoundsException e) { + throw new IllegalArgumentException("Invalid format for 'pinot.query-options'. Value provided is :" + options, e); + } + } } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotConnectorSmokeTest.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotConnectorSmokeTest.java index 5fc04434194af..ae5ff8ae32f03 100644 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotConnectorSmokeTest.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/BasePinotConnectorSmokeTest.java @@ -2800,5 +2800,16 @@ public void testQueryOptions() " HAVING SUM(long_number) > 10000\"")) .matches("VALUES (VARCHAR 'Los Angeles', DOUBLE '50000.0'), (VARCHAR 'New York', DOUBLE '20000.0')") .isFullyPushedDown(); + Session queryOptions = Session.builder(getQueryRunner().getDefaultSession()) + .setCatalogSessionProperty("pinot", "query_options", "skipUpsert:true") + .build(); + assertThat(query(queryOptions,"SELECT city, \"sum(long_number)\" FROM" + + " \"SET skipUpsert = true;\n" + + " SELECT city, SUM(long_number)" + + " FROM my_table" + + " GROUP BY city" + + " HAVING SUM(long_number) > 10000\"")) + .matches("VALUES (VARCHAR 'Los Angeles', DOUBLE '50000.0'), (VARCHAR 'New York', DOUBLE '20000.0')") + .isFullyPushedDown(); } } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java index 259d41edf6c55..e73cbf17e1b33 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestDynamicTable.java @@ -453,16 +453,16 @@ public void testQueryOptions() String tableName = realtimeOnlyTable.getTableName(); String tableNameWithSuffix = tableName + REALTIME_SUFFIX; String query = """ - SET skipUpsert='true'; - SET useMultistageEngine='true'; + SET skipUpsert=true; + SET useMultistageEngine=true; SELECT FlightNum FROM %s LIMIT 50; """.formatted(tableNameWithSuffix); DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER); String expectedPql = """ - SET skipUpsert = 'true'; - SET useMultistageEngine = 'true'; + SET useMultistageEngine = true; + SET skipUpsert = true; SELECT "FlightNum" \ FROM %s \ LIMIT 50""".formatted(tableNameWithSuffix); diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java index c5a52a63ac031..b32651a20e800 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotConfig.java @@ -47,6 +47,7 @@ public void testDefaults() .setAggregationPushdownEnabled(true) .setCountDistinctPushdownEnabled(true) .setProxyEnabled(false) + .setQueryOptions(null) .setTargetSegmentPageSize(DataSize.of(1, MEGABYTE))); } @@ -68,6 +69,7 @@ public void testExplicitPropertyMappings() .put("pinot.count-distinct-pushdown.enabled", "false") .put("pinot.proxy.enabled", "true") .put("pinot.target-segment-page-size", "2MB") + .put("pinot.query-options", "enableNullHandling:true,skipUpsert:false") .buildOrThrow(); PinotConfig expected = new PinotConfig() @@ -84,6 +86,7 @@ public void testExplicitPropertyMappings() .setAggregationPushdownEnabled(false) .setCountDistinctPushdownEnabled(false) .setProxyEnabled(true) + .setQueryOptions("enableNullHandling:true,skipUpsert:false") .setTargetSegmentPageSize(DataSize.of(2, MEGABYTE)); ConfigAssertions.assertFullMapping(properties, expected); diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotSessionProperties.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotSessionProperties.java index 3a0f89793fb25..47aadc50720ba 100755 --- a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotSessionProperties.java +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/TestPinotSessionProperties.java @@ -44,4 +44,18 @@ public void testConnectionTimeoutParsedProperly() .build(); assertThat(PinotSessionProperties.getConnectionTimeout(session)).isEqualTo(new Duration(0.25, TimeUnit.MINUTES)); } + + @Test + public void testQueryOptionsParsing() + { + PinotConfig config = new PinotConfig().setQueryOptions("enableNullHandling:true,skipUpsert:true,varcharOption:'value'"); + PinotSessionProperties pinotSessionProperties = new PinotSessionProperties(config); + ConnectorSession session = TestingConnectorSession.builder() + .setPropertyMetadata(pinotSessionProperties.getSessionProperties()) + .build(); + String queryOptions = PinotSessionProperties.getQueryOptions(session).orElseThrow(); + assertThat(queryOptions).isEqualTo("SET enableNullHandling = true;\n" + + "SET skipUpsert = true;\n" + + "SET varcharOption = 'value';\n"); + } } diff --git a/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/query/TestPinotQueryBuilder.java b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/query/TestPinotQueryBuilder.java new file mode 100644 index 0000000000000..e8ba24e9a8d30 --- /dev/null +++ b/plugin/trino-pinot/src/test/java/io/trino/plugin/pinot/query/TestPinotQueryBuilder.java @@ -0,0 +1,48 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.pinot.query; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.entry; + +final class TestPinotQueryBuilder +{ + @Test + public void testParseQueryOption() + { + String options = "limitForSegmentQueries:1000,limitForBrokerQueries:1000,targetSegmentPageSizeBytes:1000"; + Map parssedOptions = PinotQueryBuilder.parseQueryOptions(options); + assertThat(parssedOptions).containsExactly(entry("limitForSegmentQueries", "1000"), entry("limitForBrokerQueries", "1000"), entry("targetSegmentPageSizeBytes", "1000")); + } + + @Test + public void testParseQueryOptionWithQuotes() + { + String options = "enableNullHandling:true,skipUpsert:true,varcharOption:'value'"; + Map parssedOptions = PinotQueryBuilder.parseQueryOptions(options); + assertThat(parssedOptions).containsExactly(entry("enableNullHandling", "true"), entry("skipUpsert", "true"), entry("varcharOption", "'value'")); + } + + @Test + public void testParseQueryOptionWithEmptyString() + { + String options = ""; + Map parssedOptions = PinotQueryBuilder.parseQueryOptions(options); + assertThat(parssedOptions).isEmpty(); + } +}