Skip to content

Commit

Permalink
Support decoding varbinary in Pinot broker queries
Browse files Browse the repository at this point in the history
  • Loading branch information
elonazoulay authored and ebyhr committed Aug 3, 2022
1 parent aed2d5b commit 6a3e8ff
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/src/main/sphinx/connector/pinot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ Pinot Trino
``FLOAT`` ``REAL``
``DOUBLE`` ``DOUBLE``
``STRING`` ``VARCHAR``
``BYTES`` ``VARBINARY``
``INT_ARRAY`` ``VARCHAR``
``LONG_ARRAY`` ``VARCHAR``
``FLOAT_ARRAY`` ``VARCHAR``
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;

Expand All @@ -39,6 +37,7 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_DECODE_ERROR;
import static io.trino.plugin.pinot.PinotErrorCode.PINOT_UNSUPPORTED_COLUMN_TYPE;
import static io.trino.plugin.pinot.decoders.VarbinaryDecoder.toBytes;
import static java.lang.Float.floatToIntBits;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -332,16 +331,6 @@ else if (trinoType instanceof VarbinaryType) {
return Slices.EMPTY_SLICE;
}

private static byte[] toBytes(String stringValue)
{
try {
return Hex.decodeHex(stringValue.toCharArray());
}
catch (DecoderException e) {
throw new IllegalArgumentException("Value: " + stringValue + " is not Hex encoded", e);
}
}

private Slice getUtf8Slice(String value)
{
if (isNullOrEmpty(value)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.RealType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;

import java.util.Optional;

Expand Down Expand Up @@ -60,6 +61,9 @@ else if (type instanceof BooleanType) {
else if (type instanceof ArrayType) {
return new ArrayDecoder(type);
}
else if (type instanceof VarbinaryType) {
return new VarbinaryDecoder();
}
else {
return new VarcharDecoder();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.pinot.decoders;

import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.spi.TrinoException;
import io.trino.spi.block.BlockBuilder;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;

import java.util.function.Supplier;

import static io.trino.spi.StandardErrorCode.TYPE_MISMATCH;
import static java.lang.String.format;

public class VarbinaryDecoder
implements Decoder
{
@Override
public void decode(Supplier<Object> getter, BlockBuilder output)
{
Object value = getter.get();
if (value == null) {
output.appendNull();
}
else if (value instanceof String) {
Slice slice = Slices.wrappedBuffer(toBytes((String) value));
output.writeBytes(slice, 0, slice.length()).closeEntry();
}
else {
throw new TrinoException(TYPE_MISMATCH, format("Expected a string value of type VARBINARY: %s [%s]", value, value.getClass().getSimpleName()));
}
}

public static byte[] toBytes(String stringValue)
{
try {
return Hex.decodeHex(stringValue.toCharArray());
}
catch (DecoderException e) {
throw new IllegalArgumentException("Value: " + stringValue + " is not Hex encoded", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2183,4 +2183,24 @@ public void testAggregationPushdownWithArrays()
"(56, VARCHAR 'string_8400', BIGINT '1')," +
"(1000, VARCHAR 'string1_8401', BIGINT '1')");
}

@Test
public void testVarbinary()
{
String expectedValues = "VALUES (X'')," +
" (X'73 74 72 69 6e 67 5f 30')," +
" (X'73 74 72 69 6e 67 5f 31 32 30 30')," +
" (X'73 74 72 69 6e 67 5f 32 34 30 30')," +
" (X'73 74 72 69 6e 67 5f 33 36 30 30')," +
" (X'73 74 72 69 6e 67 5f 34 38 30 30')," +
" (X'73 74 72 69 6e 67 5f 36 30 30 30')," +
" (X'73 74 72 69 6e 67 5f 37 32 30 30')," +
" (X'73 74 72 69 6e 67 5f 38 34 30 30')," +
" (X'73 74 72 69 6e 67 5f 39 36 30 30')";
// The filter on string_col is to have a deterministic result set: the default limit for broker queries is 10 rows.
assertThat(query("SELECT bytes_col FROM alltypes WHERE string_col != 'array_null'"))
.matches(expectedValues);
assertThat(query("SELECT bytes_col FROM \"SELECT bytes_col, string_col FROM alltypes\" WHERE string_col != 'array_null'"))
.matches(expectedValues);
}
}

0 comments on commit 6a3e8ff

Please sign in to comment.