Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pinot query options to query #21902

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/src/main/sphinx/connector/pinot.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ This can be the ip or the FDQN, the url scheme (`http://`) is optional.
### General configuration properties

| Property name | Required | Description |
|--------------------------------------------------------|----------| ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|--------------------------------------------------------|----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `pinot.controller-urls` | Yes | A comma separated list of controller hosts. If Pinot is deployed via [Kubernetes](https://kubernetes.io/) this needs to point to the controller service endpoint. The Pinot broker and server must be accessible via DNS as Pinot returns hostnames and not IP addresses. |
| `pinot.broker-url` | No | A host and port of broker. If broker URL exposed by Pinot controller API is not accessible, this property can be used to specify the broker endpoint. Enabling this property will disable broker discovery. |
| `pinot.broker-url` | No | A host and port of broker. If broker URL exposed by Pinot controller API is not accessible, this property can be used to specify the broker endpoint. Enabling this property will disable broker discovery. |
| `pinot.connection-timeout` | No | Pinot connection timeout, default is `15s`. |
| `pinot.metadata-expiry` | No | Pinot metadata expiration time, default is `2m`. |
| `pinot.controller.authentication.type` | No | Pinot authentication method for controller requests. Allowed values are `NONE` and `PASSWORD` - defaults to `NONE` which is no authentication. |
Expand All @@ -55,6 +55,7 @@ This can be the ip or the FDQN, the url scheme (`http://`) is optional.
| `pinot.count-distinct-pushdown.enabled` | No | Push down count distinct queries to Pinot, default is `true`. |
| `pinot.target-segment-page-size` | No | Max allowed page size for segment query, default is `1MB`. |
| `pinot.proxy.enabled` | No | Use Pinot Proxy for controller and broker requests, default is `false`. |
| `pinot.query-options` | No | Query options to be included with pinot queries. Default is null. For a varchar / string option please add single quates(`'`) aroud the value. example:`enableNullHanding:true,minReplicas:2,somethingElse:'YES'` |

If `pinot.controller.authentication.type` is set to `PASSWORD` then both `pinot.controller.authentication.user` and
`pinot.controller.authentication.password` are required.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;
import io.trino.plugin.pinot.query.PinotQueryBuilder;
import jakarta.annotation.PostConstruct;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class PinotConfig
private boolean countDistinctPushdownEnabled = true;
private boolean proxyEnabled;
private DataSize targetSegmentPageSize = DataSize.of(1, MEGABYTE);
private Optional<String> queryOptions = Optional.empty();

@NotEmpty(message = "pinot.controller-urls cannot be empty")
public List<URI> getControllerUrls()
Expand Down Expand Up @@ -261,6 +263,25 @@ public PinotConfig setTargetSegmentPageSize(DataSize targetSegmentPageSize)
return this;
}

public String getQueryOptions()
{
return queryOptions.orElse(null);
}

@Config("pinot.query-options")
naman-patel marked this conversation as resolved.
Show resolved Hide resolved
@ConfigDescription("Comma separated list of query options. Each option should be in the format key:value. " +
"For example, enableNullHandling:true,skipUpsert:true,varcharOption:'value'")
public PinotConfig setQueryOptions(String options)
naman-patel marked this conversation as resolved.
Show resolved Hide resolved
{
if (options == null) {
queryOptions = Optional.empty();
}
else {
queryOptions = PinotQueryBuilder.getQueryOptionsString(options); // validate the options (throws exception if invalid)
}
return this;
}

@PostConstruct
public void validate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import static io.trino.plugin.pinot.query.DynamicTablePqlExtractor.extractPql;
import static io.trino.plugin.pinot.query.PinotQueryBuilder.generatePql;
Expand Down Expand Up @@ -74,7 +75,8 @@ public ConnectorPageSource createPageSource(
handles.add((PinotColumnHandle) handle);
}
PinotTableHandle pinotTableHandle = (PinotTableHandle) tableHandle;
String query = generatePql(pinotTableHandle, handles, pinotSplit.getSuffix(), pinotSplit.getTimePredicate(), limitForSegmentQueries);
Optional<String> queryOptions = PinotSessionProperties.getQueryOptions(session);
String query = generatePql(pinotTableHandle, handles, pinotSplit.getSuffix(), pinotSplit.getTimePredicate(), limitForSegmentQueries, queryOptions);

switch (pinotSplit.getSplitType()) {
case SEGMENT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import io.trino.spi.session.PropertyMetadata;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;

public class PinotSessionProperties
{
Expand All @@ -37,6 +39,7 @@ public class PinotSessionProperties
private static final String SEGMENTS_PER_SPLIT = "segments_per_split";
private static final String AGGREGATION_PUSHDOWN_ENABLED = "aggregation_pushdown_enabled";
private static final String COUNT_DISTINCT_PUSHDOWN_ENABLED = "count_distinct_pushdown_enabled";
private static final String QUERY_OPTIONS = "query_options";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -84,6 +87,11 @@ public PinotSessionProperties(PinotConfig pinotConfig)
COUNT_DISTINCT_PUSHDOWN_ENABLED,
"Enable count distinct pushdown",
pinotConfig.isCountDistinctPushdownEnabled(),
false),
stringProperty(
QUERY_OPTIONS,
"Pinot query option in the format of key:value,key:value",
pinotConfig.getQueryOptions(),
false));
}

Expand Down Expand Up @@ -130,6 +138,11 @@ public static boolean isCountDistinctPushdownEnabled(ConnectorSession session)
return session.getProperty(COUNT_DISTINCT_PUSHDOWN_ENABLED, Boolean.class);
}

public static Optional<String> getQueryOptions(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(QUERY_OPTIONS, String.class));
}

public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,7 @@ public static String extractPql(DynamicTable table, TupleDomain<ColumnHandle> tu
{
StringBuilder builder = new StringBuilder();
Map<String, String> queryOptions = table.queryOptions();
queryOptions.keySet().stream().sorted().forEach(
key -> builder
.append("SET ")
.append(key)
.append(" = ")
.append(format("'%s'", queryOptions.get(key)))
.append(";\n"));
PinotQueryBuilder.getQueryOptions(queryOptions).ifPresent(builder::append);
builder.append("SELECT ");
if (!table.projections().isEmpty()) {
builder.append(table.projections().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.slice.Slice;
import io.trino.plugin.pinot.PinotColumnHandle;
import io.trino.plugin.pinot.PinotTableHandle;
Expand All @@ -36,8 +37,10 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterables.getOnlyElement;
Expand All @@ -53,10 +56,17 @@ private PinotQueryBuilder()
{
}

public static String generatePql(PinotTableHandle tableHandle, List<PinotColumnHandle> columnHandles, Optional<String> tableNameSuffix, Optional<String> timePredicate, int limitForSegmentQueries)
public static String generatePql(
PinotTableHandle tableHandle,
List<PinotColumnHandle> columnHandles,
Optional<String> tableNameSuffix,
Optional<String> timePredicate,
int limitForSegmentQueries,
Optional<String> queryOptions)
{
requireNonNull(tableHandle, "tableHandle is null");
StringBuilder pqlBuilder = new StringBuilder();
queryOptions.ifPresent(pqlBuilder::append);
List<String> quotedColumnNames;
if (columnHandles.isEmpty()) {
// This occurs when the query is SELECT COUNT(*) FROM pinotTable ...
Expand Down Expand Up @@ -227,4 +237,52 @@ private static String quoteIdentifier(String identifier)
{
return format("\"%s\"", identifier.replaceAll("\"", "\"\""));
}

public static Optional<String> getQueryOptionsString(String options)
{
if (isNullOrEmpty(options)) {
return Optional.empty();
}

Map<String, String> queryOptionsMap = parseQueryOptions(options);
return getQueryOptions(queryOptionsMap);
}

public static Optional<String> getQueryOptions(Map<String, String> queryOptionsMap)
{
if (queryOptionsMap.isEmpty()) {
return Optional.empty();
}
String options = queryOptionsMap.entrySet().stream()
.map(e -> "SET " + e.getKey() + " = " + e.getValue())
.collect(Collectors.joining(";\n"));
if (!options.endsWith(";")) {
options += ";\n";
}
return Optional.of(options);
}

public static Map<String, String> parseQueryOptions(String options)
{
if (isNullOrEmpty(options)) {
naman-patel marked this conversation as resolved.
Show resolved Hide resolved
return ImmutableMap.of();
}
try {
// we allow escaping the delimiters like , and : using back-slash.
// To support that we create a negative lookbehind of , and : which
// are not preceded by a back-slash.
String headersDelim = "(?<!\\\\),";
String kvDelim = "(?<!\\\\):";
ImmutableMap.Builder<String, String> queryOptions = ImmutableMap.builder();
for (String kv : options.split(headersDelim)) {
String key = kv.split(kvDelim, 2)[0].trim();
String val = kv.split(kvDelim, 2)[1].trim();
queryOptions.put(key, val);
}
return queryOptions.buildOrThrow();
}
catch (IndexOutOfBoundsException e) {
throw new IllegalArgumentException("Invalid format for 'pinot.query-options'. Value provided is :" + options, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2800,5 +2800,16 @@ public void testQueryOptions()
" HAVING SUM(long_number) > 10000\""))
.matches("VALUES (VARCHAR 'Los Angeles', DOUBLE '50000.0'), (VARCHAR 'New York', DOUBLE '20000.0')")
.isFullyPushedDown();
Session queryOptions = Session.builder(getQueryRunner().getDefaultSession())
.setCatalogSessionProperty("pinot", "query_options", "skipUpsert:true")
.build();
assertThat(query(queryOptions,"SELECT city, \"sum(long_number)\" FROM" +
" \"SET skipUpsert = true;\n" +
" SELECT city, SUM(long_number)" +
" FROM my_table" +
" GROUP BY city" +
" HAVING SUM(long_number) > 10000\""))
.matches("VALUES (VARCHAR 'Los Angeles', DOUBLE '50000.0'), (VARCHAR 'New York', DOUBLE '20000.0')")
.isFullyPushedDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -453,16 +453,16 @@ public void testQueryOptions()
String tableName = realtimeOnlyTable.getTableName();
String tableNameWithSuffix = tableName + REALTIME_SUFFIX;
String query = """
SET skipUpsert='true';
SET useMultistageEngine='true';
SET skipUpsert=true;
SET useMultistageEngine=true;
SELECT FlightNum
FROM %s
LIMIT 50;
""".formatted(tableNameWithSuffix);
DynamicTable dynamicTable = buildFromPql(pinotMetadata, new SchemaTableName("default", query), mockClusterInfoFetcher, TESTING_TYPE_CONVERTER);
String expectedPql = """
SET skipUpsert = 'true';
SET useMultistageEngine = 'true';
SET useMultistageEngine = true;
SET skipUpsert = true;
SELECT "FlightNum" \
FROM %s \
LIMIT 50""".formatted(tableNameWithSuffix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void testDefaults()
.setAggregationPushdownEnabled(true)
.setCountDistinctPushdownEnabled(true)
.setProxyEnabled(false)
.setQueryOptions(null)
.setTargetSegmentPageSize(DataSize.of(1, MEGABYTE)));
}

Expand All @@ -68,6 +69,7 @@ public void testExplicitPropertyMappings()
.put("pinot.count-distinct-pushdown.enabled", "false")
.put("pinot.proxy.enabled", "true")
.put("pinot.target-segment-page-size", "2MB")
.put("pinot.query-options", "enableNullHandling:true,skipUpsert:false")
.buildOrThrow();

PinotConfig expected = new PinotConfig()
Expand All @@ -84,6 +86,7 @@ public void testExplicitPropertyMappings()
.setAggregationPushdownEnabled(false)
.setCountDistinctPushdownEnabled(false)
.setProxyEnabled(true)
.setQueryOptions("enableNullHandling:true,skipUpsert:false")
.setTargetSegmentPageSize(DataSize.of(2, MEGABYTE));

ConfigAssertions.assertFullMapping(properties, expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,18 @@ public void testConnectionTimeoutParsedProperly()
.build();
assertThat(PinotSessionProperties.getConnectionTimeout(session)).isEqualTo(new Duration(0.25, TimeUnit.MINUTES));
}

ebyhr marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void testQueryOptionsParsing()
naman-patel marked this conversation as resolved.
Show resolved Hide resolved
{
PinotConfig config = new PinotConfig().setQueryOptions("enableNullHandling:true,skipUpsert:true,varcharOption:'value'");
PinotSessionProperties pinotSessionProperties = new PinotSessionProperties(config);
ConnectorSession session = TestingConnectorSession.builder()
.setPropertyMetadata(pinotSessionProperties.getSessionProperties())
.build();
String queryOptions = PinotSessionProperties.getQueryOptions(session).orElseThrow();
assertThat(queryOptions).isEqualTo("SET enableNullHandling = true;\n" +
"SET skipUpsert = true;\n" +
"SET varcharOption = 'value';\n");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot.query;

import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;

final class TestPinotQueryBuilder
{
@Test
public void testParseQueryOption()
{
String options = "limitForSegmentQueries:1000,limitForBrokerQueries:1000,targetSegmentPageSizeBytes:1000";
Map<String, String> parssedOptions = PinotQueryBuilder.parseQueryOptions(options);
assertThat(parssedOptions).containsExactly(entry("limitForSegmentQueries", "1000"), entry("limitForBrokerQueries", "1000"), entry("targetSegmentPageSizeBytes", "1000"));
}

@Test
public void testParseQueryOptionWithQuotes()
{
String options = "enableNullHandling:true,skipUpsert:true,varcharOption:'value'";
Map<String, String> parssedOptions = PinotQueryBuilder.parseQueryOptions(options);
assertThat(parssedOptions).containsExactly(entry("enableNullHandling", "true"), entry("skipUpsert", "true"), entry("varcharOption", "'value'"));
}

@Test
public void testParseQueryOptionWithEmptyString()
{
String options = "";
Map<String, String> parssedOptions = PinotQueryBuilder.parseQueryOptions(options);
assertThat(parssedOptions).isEmpty();
}
}