Skip to content

Commit

Permalink
Fix error when getting value from ByteBuffer more than once
Browse files Browse the repository at this point in the history
  • Loading branch information
charlesjmorgan authored and findepi committed Jun 26, 2020
1 parent 20458c7 commit 6d936a4
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 14 deletions.
Expand Up @@ -13,25 +13,107 @@
*/
package io.prestosql.plugin.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.prestosql.plugin.kafka.util.TestingKafka;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.Type;
import io.prestosql.testing.AbstractTestIntegrationSmokeTest;
import io.prestosql.testing.QueryRunner;
import io.prestosql.tpch.TpchTable;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;

import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

public class TestKafkaIntegrationSmokeTest
extends AbstractTestIntegrationSmokeTest
{
private TestingKafka testingKafka;
private String topicName;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
testingKafka = new TestingKafka();
return KafkaQueryRunner.builder(testingKafka)
topicName = "test_raw_" + UUID.randomUUID().toString().replaceAll("-", "_");

Map<SchemaTableName, KafkaTopicDescription> extraTopicDescriptions = ImmutableMap.<SchemaTableName, KafkaTopicDescription>builder()
.put(new SchemaTableName("default", topicName),
createDescription(topicName, "default", topicName,
createFieldGroup("raw", ImmutableList.of(
createOneFieldDescription("id", BigintType.BIGINT, "0", "LONG")))))
.build();

QueryRunner queryRunner = KafkaQueryRunner.builder(testingKafka)
.setTables(TpchTable.getTables())
.setExtraTopicDescription(ImmutableMap.<SchemaTableName, KafkaTopicDescription>builder()
.putAll(extraTopicDescriptions)
.build())
.build();

testingKafka.createTopics(topicName);
return queryRunner;
}

@Test
public void testColumnReferencedTwice()
{
ByteBuffer buf = ByteBuffer.allocate(8);
buf.putLong(0, 1);

insertData(buf.array());

assertQuery("SELECT id FROM default." + topicName + " WHERE id = 1", "VALUES (1)");
assertQuery("SELECT id FROM default." + topicName + " WHERE id < 2", "VALUES (1)");
}

private void insertData(byte[] data)
{
try (KafkaProducer<byte[], byte[]> producer = createProducer()) {
producer.send(new ProducerRecord<>(topicName, data));
}
}

private KafkaProducer<byte[], byte[]> createProducer()
{
Properties properties = new Properties();
properties.put(BOOTSTRAP_SERVERS_CONFIG, testingKafka.getConnectString());
properties.put(ACKS_CONFIG, "all");
properties.put(KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
properties.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

return new KafkaProducer<>(properties);
}

private KafkaTopicDescription createDescription(String name, String schema, String topic, Optional<KafkaTopicFieldGroup> message)
{
return new KafkaTopicDescription(name, Optional.of(schema), topic, Optional.empty(), message);
}

private Optional<KafkaTopicFieldGroup> createFieldGroup(String dataFormat, List<KafkaTopicFieldDescription> fields)
{
return Optional.of(new KafkaTopicFieldGroup(dataFormat, Optional.empty(), fields));
}

private KafkaTopicFieldDescription createOneFieldDescription(String name, Type type, String mapping, String dataFormat)
{
return new KafkaTopicFieldDescription(name, type, mapping, null, dataFormat, null, false);
}

@AfterClass(alwaysRun = true)
Expand Down
Expand Up @@ -213,6 +213,7 @@ private static class RawValueProvider
private final String columnName;
private final Type columnType;
private final int size;
private final int start;

public RawValueProvider(ByteBuffer value, FieldType fieldType, String columnName, Type columnType)
{
Expand All @@ -221,6 +222,7 @@ public RawValueProvider(ByteBuffer value, FieldType fieldType, String columnName
this.columnName = columnName;
this.columnType = columnType;
this.size = value.limit() - value.position();
this.start = value.position();
}

@Override
Expand All @@ -235,13 +237,13 @@ public boolean getBoolean()
checkEnoughBytes();
switch (fieldType) {
case BYTE:
return value.get() != 0;
return value.get(start) != 0;
case SHORT:
return value.getShort() != 0;
return value.getShort(start) != 0;
case INT:
return value.getInt() != 0;
return value.getInt(start) != 0;
case LONG:
return value.getLong() != 0;
return value.getLong(start) != 0;
default:
throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED, format("conversion '%s' to boolean not supported", fieldType));
}
Expand All @@ -253,13 +255,13 @@ public long getLong()
checkEnoughBytes();
switch (fieldType) {
case BYTE:
return value.get();
return value.get(start);
case SHORT:
return value.getShort();
return value.getShort(start);
case INT:
return value.getInt();
return value.getInt(start);
case LONG:
return value.getLong();
return value.getLong(start);
default:
throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED, format("conversion '%s' to long not supported", fieldType));
}
Expand All @@ -271,9 +273,9 @@ public double getDouble()
checkEnoughBytes();
switch (fieldType) {
case FLOAT:
return value.getFloat();
return value.getFloat(start);
case DOUBLE:
return value.getDouble();
return value.getDouble(start);
default:
throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED, format("conversion '%s' to double not supported", fieldType));
}
Expand Down
Expand Up @@ -29,6 +29,7 @@
import io.prestosql.spi.type.TinyintType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarbinaryType;
import io.prestosql.spi.type.VarcharType;
import org.assertj.core.api.ThrowableAssert;
import org.testng.annotations.Test;

Expand All @@ -45,6 +46,7 @@
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

public class TestRawDecoder
{
Expand All @@ -67,7 +69,7 @@ public void testEmptyRecord()
@Test
public void testSimple()
{
ByteBuffer buf = ByteBuffer.allocate(100);
ByteBuffer buf = ByteBuffer.allocate(36);
buf.putLong(4815162342L); // 0 - 7
buf.putInt(12345678); // 8 - 11
buf.putShort((short) 4567); // 12 - 13
Expand Down Expand Up @@ -128,7 +130,7 @@ public void testFixedWithString()
@Test
public void testFloatStuff()
{
ByteBuffer buf = ByteBuffer.allocate(100);
ByteBuffer buf = ByteBuffer.allocate(20);
buf.putDouble(Math.PI);
buf.putFloat((float) Math.E);
buf.putDouble(Math.E);
Expand All @@ -154,7 +156,7 @@ public void testFloatStuff()
@Test
public void testBooleanStuff()
{
ByteBuffer buf = ByteBuffer.allocate(100);
ByteBuffer buf = ByteBuffer.allocate(38);
buf.put((byte) 127); // offset 0
buf.putLong(0); // offset 1
buf.put((byte) 126); // offset 9
Expand Down Expand Up @@ -389,6 +391,71 @@ public void testSupportedDataTypeValidation()
assertUnsupportedColumnTypeException(() -> singleColumnDecoder(VarbinaryType.VARBINARY, "0", "BYTE"));
}

@Test
public void testGetValueTwice()
{
ByteBuffer buf = ByteBuffer.allocate(50);
buf.putLong(0, 4815162342L);
buf.putInt(8, 2147483647);
buf.putShort(12, (short) 32767);
buf.put(14, (byte) 128);
buf.putLong(15, 1);
buf.putInt(23, 1);
buf.putShort(27, (short) 1);
buf.put(29, (byte) 1);
buf.putDouble(30, 12345.6789d);
buf.putFloat(38, 123.345f);
buf.put("test val".getBytes(StandardCharsets.UTF_8)); // offset 42

byte[] row = new byte[buf.capacity()];
System.arraycopy(buf.array(), 0, row, 0, buf.limit());

DecoderColumnHandle col1 = new DecoderTestColumnHandle(0, "col1", BigintType.BIGINT, "0", "LONG", null, false, false, false);
DecoderColumnHandle col2 = new DecoderTestColumnHandle(1, "col2", BigintType.BIGINT, "8", "INT", null, false, false, false);
DecoderColumnHandle col3 = new DecoderTestColumnHandle(2, "col3", BigintType.BIGINT, "12", "SHORT", null, false, false, false);
DecoderColumnHandle col4 = new DecoderTestColumnHandle(3, "col4", BigintType.BIGINT, "14", "BYTE", null, false, false, false);
DecoderColumnHandle col5 = new DecoderTestColumnHandle(4, "col5", BooleanType.BOOLEAN, "15", "LONG", null, false, false, false);
DecoderColumnHandle col6 = new DecoderTestColumnHandle(5, "col6", BooleanType.BOOLEAN, "23", "INT", null, false, false, false);
DecoderColumnHandle col7 = new DecoderTestColumnHandle(6, "col7", BooleanType.BOOLEAN, "27", "SHORT", null, false, false, false);
DecoderColumnHandle col8 = new DecoderTestColumnHandle(7, "col8", BooleanType.BOOLEAN, "29", "BYTE", null, false, false, false);
DecoderColumnHandle col9 = new DecoderTestColumnHandle(8, "col9", DoubleType.DOUBLE, "30", "DOUBLE", null, false, false, false);
DecoderColumnHandle col10 = new DecoderTestColumnHandle(9, "col10", DoubleType.DOUBLE, "38", "FLOAT", null, false, false, false);
DecoderColumnHandle col11 = new DecoderTestColumnHandle(10, "col11", VarcharType.VARCHAR, "42", "BYTE", null, false, false, false);

Set<DecoderColumnHandle> columns = ImmutableSet.of(
col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11);

RowDecoder rowDecoder = DECODER_FACTORY.create(emptyMap(), columns);

Map<DecoderColumnHandle, FieldValueProvider> decodedRow = rowDecoder.decodeRow(row, null)
.orElseThrow(AssertionError::new);

assertEquals(decodedRow.size(), columns.size());

for (DecoderColumnHandle handle : columns) {
checkTwice(decodedRow, handle);
}
}

private void checkTwice(Map<DecoderColumnHandle, FieldValueProvider> decodedRow, DecoderColumnHandle handle)
{
FieldValueProvider provider = decodedRow.get(handle);
assertNotNull(provider);
Type type = handle.getType();
if (type == BigintType.BIGINT) {
assertEquals(provider.getLong(), provider.getLong());
}
else if (type == BooleanType.BOOLEAN) {
assertEquals(provider.getBoolean(), provider.getBoolean());
}
else if (type == DoubleType.DOUBLE) {
assertEquals(provider.getDouble(), provider.getDouble());
}
else if (type == VarcharType.VARCHAR) {
assertEquals(provider.getSlice(), provider.getSlice());
}
}

private void assertUnsupportedColumnTypeException(ThrowableAssert.ThrowingCallable callable)
{
assertThatThrownBy(callable)
Expand Down

0 comments on commit 6d936a4

Please sign in to comment.