Skip to content

Commit

Permalink
Add JSON type support to the Pinot Connector
Browse files Browse the repository at this point in the history
Pinot does not yet support filtering on json type.
  • Loading branch information
elonazoulay authored and ebyhr committed Aug 17, 2022
1 parent d09a786 commit 05a6295
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ Pinot Trino
``DOUBLE`` ``DOUBLE``
``STRING`` ``VARCHAR``
``BYTES`` ``VARBINARY``
``JSON`` ``JSON``
``INT_ARRAY`` ``VARCHAR``
``LONG_ARRAY`` ``VARCHAR``
``FLOAT_ARRAY`` ``VARCHAR``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.Type;
import org.apache.pinot.spi.data.Schema;

import javax.inject.Inject;
Expand Down Expand Up @@ -290,8 +291,13 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
Map<ColumnHandle, Domain> supported = new HashMap<>();
Map<ColumnHandle, Domain> unsupported = new HashMap<>();
for (Map.Entry<ColumnHandle, Domain> entry : domains.entrySet()) {
// Pinot does not support array literals
if (((PinotColumnHandle) entry.getKey()).getDataType() instanceof ArrayType) {
Type columnType = ((PinotColumnHandle) entry.getKey()).getDataType();
if (columnType instanceof ArrayType) {
// Pinot does not support array literals
unsupported.put(entry.getKey(), entry.getValue());
}
else if (typeConverter.isJsonType(columnType)) {
// Pinot does not support filtering on json values
unsupported.put(entry.getKey(), entry.getValue());
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.type.StandardTypes;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
Expand All @@ -35,6 +36,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static io.trino.plugin.base.util.JsonTypeUtil.jsonParse;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_DECODE_ERROR;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static io.trino.plugin.pinot.decoders.VarbinaryDecoder.toBytes;
Expand Down Expand Up @@ -328,6 +330,10 @@ private Slice getSlice(int rowIndex, int columnIndex)
else if (trinoType instanceof VarbinaryType) {
return Slices.wrappedBuffer(toBytes(currentDataTable.getDataTable().getString(rowIndex, columnIndex)));
}
else if (trinoType.getTypeSignature().getBase() == StandardTypes.JSON) {
String field = currentDataTable.getDataTable().getString(rowIndex, columnIndex);
return jsonParse(getUtf8Slice(field));
}
return Slices.EMPTY_SLICE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,10 @@ public Type toTrinoType(DataSchema.ColumnDataType columnDataType)
}
throw new PinotException(PINOT_UNSUPPORTED_COLUMN_TYPE, Optional.empty(), "Unsupported column data type: " + columnDataType);
}

public boolean isJsonType(Type type)
{
requireNonNull(type, "type is null");
return type.equals(jsonTypeSupplier.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Optional;

import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static io.trino.spi.type.StandardTypes.JSON;
import static java.util.Objects.requireNonNull;

public class DecoderFactory
Expand Down Expand Up @@ -64,6 +65,9 @@ else if (type instanceof ArrayType) {
else if (type instanceof VarbinaryType) {
return new VarbinaryDecoder();
}
else if (type.getTypeSignature().getBase().equals(JSON)) {
return new JsonDecoder();
}
else {
return new VarcharDecoder();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot.decoders;

import io.airlift.slice.Slice;
import io.trino.spi.TrinoException;
import io.trino.spi.block.BlockBuilder;

import java.util.function.Supplier;

import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.base.util.JsonTypeUtil.jsonParse;
import static io.trino.spi.StandardErrorCode.TYPE_MISMATCH;
import static java.lang.String.format;

public class JsonDecoder
implements Decoder
{
@Override
public void decode(Supplier<Object> getter, BlockBuilder output)
{
Object value = getter.get();
if (value == null) {
output.appendNull();
}
else if (value instanceof String) {
Slice slice = jsonParse(utf8Slice((String) value));
output.writeBytes(slice, 0, slice.length()).closeEntry();
}
else {
throw new TrinoException(TYPE_MISMATCH, format("Expected a json value of type STRING: %s [%s]", value, value.getClass().getSimpleName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public abstract class AbstractPinotIntegrationSmokeTest
private static final String DUPLICATE_TABLE_LOWERCASE = "dup_table";
private static final String DUPLICATE_TABLE_MIXED_CASE = "dup_Table";
private static final String JSON_TABLE = "my_table";
private static final String JSON_TYPE_TABLE = "json_table";
private static final String RESERVED_KEYWORD_TABLE = "reserved_keyword";
private static final String QUOTES_IN_COLUMN_NAME_TABLE = "quotes_in_column_name";
private static final String DUPLICATE_VALUES_IN_COLUMNS_TABLE = "duplicate_values_in_columns";
Expand Down Expand Up @@ -310,6 +311,28 @@ protected QueryRunner createQueryRunner()
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("date_time_fields_schema.json"), DATE_TIME_FIELDS_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("date_time_fields_realtimeSpec.json"), DATE_TIME_FIELDS_TABLE);

// Create json type table
kafka.createTopic(JSON_TYPE_TABLE);

Schema jsonTableAvroSchema = SchemaBuilder.record(JSON_TYPE_TABLE).fields()
.name("string_col").type().optional().stringType()
.name("json_col").type().optional().stringType()
.name("updatedAt").type().optional().longType()
.endRecord();

ImmutableList.Builder<ProducerRecord<String, GenericRecord>> jsonTableRecordsBuilder = ImmutableList.builder();
for (int i = 0; i < 3; i++) {
jsonTableRecordsBuilder.add(new ProducerRecord<>(JSON_TYPE_TABLE, "key" + i, new GenericRecordBuilder(jsonTableAvroSchema)
.set("string_col", "string_" + i)
.set("json_col", "{ \"name\": \"user_" + i + "\", \"id\": " + i + "}")
.set("updatedAt", initialUpdatedAt.plusMillis(i * 1000).toEpochMilli())
.build()));
}
kafka.sendMessages(jsonTableRecordsBuilder.build().stream(), schemaRegistryAwareProducer(kafka));
pinot.createSchema(getClass().getClassLoader().getResourceAsStream("json_schema.json"), JSON_TYPE_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("json_realtimeSpec.json"), JSON_TYPE_TABLE);
pinot.addOfflineTable(getClass().getClassLoader().getResourceAsStream("json_offlineSpec.json"), JSON_TYPE_TABLE);

// Create json table
kafka.createTopic(JSON_TABLE);
long key = 0L;
Expand Down Expand Up @@ -2203,4 +2226,26 @@ public void testVarbinary()
assertThat(query("SELECT bytes_col FROM \"SELECT bytes_col, string_col FROM alltypes\" WHERE string_col != 'array_null'"))
.matches(expectedValues);
}

@Test
public void testJson()
{
assertThat(query("SELECT json_col FROM " + JSON_TYPE_TABLE))
.matches("VALUES (JSON '{\"id\":0,\"name\":\"user_0\"}')," +
" (JSON '{\"id\":1,\"name\":\"user_1\"}')," +
" (JSON '{\"id\":2,\"name\":\"user_2\"}')");
assertThat(query("SELECT json_col" +
" FROM \"SELECT json_col FROM " + JSON_TYPE_TABLE + "\""))
.matches("VALUES (JSON '{\"id\":0,\"name\":\"user_0\"}')," +
" (JSON '{\"id\":1,\"name\":\"user_1\"}')," +
" (JSON '{\"id\":2,\"name\":\"user_2\"}')");
assertThat(query("SELECT name FROM \"SELECT json_extract_scalar(json_col, '$.name', 'STRING', '0') AS name" +
" FROM json_table WHERE json_extract_scalar(json_col, '$.id', 'INT', '0') = '1'\""))
.matches("VALUES (VARCHAR 'user_1')");
assertThat(query("SELECT JSON_EXTRACT_SCALAR(json_col, '$.name') FROM " + JSON_TYPE_TABLE +
" WHERE JSON_EXTRACT_SCALAR(json_col, '$.id') = '1'"))
.matches("VALUES (VARCHAR 'user_1')");
assertThat(query("SELECT string_col FROM " + JSON_TYPE_TABLE + " WHERE json_col = JSON '{\"id\":0,\"name\":\"user_0\"}'"))
.matches("VALUES VARCHAR 'string_0'");
}
}
31 changes: 31 additions & 0 deletions plugin/trino-pinot/src/test/resources/json_offlineSpec.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"tableName": "json_table",
"tableType": "OFFLINE",
"segmentsConfig": {
"timeColumnName": "updated_at_seconds",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "365",
"segmentPushType": "APPEND",
"segmentPushFrequency": "daily",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"replication": "1"
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"invertedIndexColumns": ["string_col"],
"sortedColumn": ["updated_at_seconds"],
"noDictionaryColumns": ["json_col"],
"starTreeIndexConfigs": [],
"aggregateMetrics": "true",
"nullHandlingEnabled": "true"
},
"metadata": {
"customConfigs": {
"owner": "analytics@example.com"
}
}
}
44 changes: 44 additions & 0 deletions plugin/trino-pinot/src/test/resources/json_realtimeSpec.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"tableName": "json_table",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "updated_at_seconds",
"timeType": "SECONDS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "365",
"segmentPushType": "APPEND",
"segmentPushFrequency": "daily",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "json_table",
"replicasPerPartition": "1"
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"invertedIndexColumns": ["string_col"],
"sortedColumn": ["updated_at_seconds"],
"noDictionaryColumns": ["json_col"],
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.topic.name": "json_table",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081",
"stream.kafka.zk.broker.url": "zookeeper:2181/",
"stream.kafka.broker.list": "kafka:9092",
"realtime.segment.flush.threshold.time": "1m",
"realtime.segment.flush.threshold.size": "0",
"realtime.segment.flush.desired.size": "1M",
"isolation.level": "read_committed",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.prop.group.id": "json_table"
}
},
"metadata": {
"customConfigs": {}
}
}
24 changes: 24 additions & 0 deletions plugin/trino-pinot/src/test/resources/json_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"schemaName": "json_table",
"dimensionFieldSpecs": [
{
"name": "string_col",
"dataType": "STRING"
},
{
"name": "json_col",
"dataType": "JSON",
"maxLength": 2147483647
}
],
"dateTimeFieldSpecs": [
{
"name": "updated_at_seconds",
"dataType": "LONG",
"defaultNullValue" : 0,
"format": "1:SECONDS:EPOCH",
"transformFunction": "toEpochSeconds(updatedAt)",
"granularity" : "1:SECONDS"
}
]
}

0 comments on commit 05a6295

Please sign in to comment.