Skip to content

Commit

Permalink
Add basic Avro decoder for Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
anusudarsan authored and findepi committed Aug 16, 2018
1 parent 9045ce4 commit 54df0a4
Show file tree
Hide file tree
Showing 14 changed files with 944 additions and 3 deletions.
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.CharStreams;
import io.airlift.log.Logger;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
Expand All @@ -39,6 +40,13 @@

import javax.inject.Inject;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -47,6 +55,8 @@
import static com.facebook.presto.kafka.KafkaHandleResolver.convertLayout;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -108,8 +118,8 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co
metadata.topic(),
kafkaTableHandle.getKeyDataFormat(),
kafkaTableHandle.getMessageDataFormat(),
kafkaTableHandle.getKeyDataSchemaLocation(),
kafkaTableHandle.getMessageDataSchemaLocation(),
kafkaTableHandle.getKeyDataSchemaLocation().map(KafkaSplitManager::readSchema),
kafkaTableHandle.getMessageDataSchemaLocation().map(KafkaSplitManager::readSchema),
part.partitionId(),
offsets[i],
offsets[i - 1],
Expand All @@ -129,6 +139,54 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, Co
}
}

private static String readSchema(String dataSchemaLocation)
{
InputStream inputStream = null;
try {
if (isURI(dataSchemaLocation.trim().toLowerCase(ENGLISH))) {
try {
inputStream = new URL(dataSchemaLocation).openStream();
}
catch (MalformedURLException e) {
// try again before failing
inputStream = new FileInputStream(dataSchemaLocation);
}
}
else {
inputStream = new FileInputStream(dataSchemaLocation);
}
return CharStreams.toString(new InputStreamReader(inputStream, UTF_8));
}
catch (IOException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Could not parse the AVRO schema at: " + dataSchemaLocation, e);
}
finally {
closeQuietly(inputStream);
}
}

private static void closeQuietly(InputStream stream)
{
try {
if (stream != null) {
stream.close();
}
}
catch (IOException ignored) {
}
}

private static boolean isURI(String location)
{
try {
URI.create(location);
}
catch (Exception e) {
return false;
}
return true;
}

private static long[] findAllOffsets(SimpleConsumer consumer, String topicName, int partitionId)
{
TopicAndPartition topicAndPartition = new TopicAndPartition(topicName, partitionId);
Expand Down
Expand Up @@ -2,6 +2,8 @@ connector.name=kafka
kafka.table-names=product_tests.simple_key_and_value,\
product_tests.all_datatypes_raw,\
product_tests.all_datatypes_csv,\
product_tests.all_datatypes_json
product_tests.all_datatypes_json, \
product_tests.all_datatypes_avro, \
product_tests.all_null_avro
kafka.nodes=kafka:9092
kafka.table-description-dir=/docker/volumes/conf/presto/etc/catalog/kafka
@@ -0,0 +1,31 @@
{
"tableName": "all_datatypes_avro",
"schemaName": "product_tests",
"topicName": "all_datatypes_avro",
"message": {
"dataFormat": "avro",
"dataSchema": "/docker/volumes/conf/presto/etc/catalog/kafka/all_datatypes_avro_schema.avsc",
"fields": [
{
"name": "c_varchar",
"type": "VARCHAR",
"mapping": "a_varchar"
},
{
"name": "c_bigint",
"type": "BIGINT",
"mapping": "a_bigint"
},
{
"name": "c_double",
"type": "DOUBLE",
"mapping": "a_double"
},
{
"name": "c_boolean",
"type": "BOOLEAN",
"mapping": "a_boolean"
}
]
}
}
@@ -0,0 +1,29 @@
{
"type" : "record",
"name" : "all_datatypes_avro",
"namespace" : "com.facebook.presto.tests.kafka",
"fields" :
[
{
"name":"a_varchar",
"type":["null", "string"],
"default": "null"
},
{
"name":"a_bigint",
"type":["null", "long"],
"default": null
},
{
"name":"a_double",
"type":["null", "double"],
"default": null
},
{
"name":"a_boolean",
"type":["null", "boolean"],
"default": null
}
],
"doc:" : "A basic avro schema for product tests"
}
@@ -0,0 +1,31 @@
{
"tableName": "all_null_avro",
"schemaName": "product_tests",
"topicName": "all_null_avro",
"message": {
"dataFormat": "avro",
"dataSchema": "/docker/volumes/conf/presto/etc/catalog/kafka/all_datatypes_avro_schema.avsc",
"fields": [
{
"name": "c_varchar",
"type": "VARCHAR",
"mapping": "a_varchar"
},
{
"name": "c_bigint",
"type": "BIGINT",
"mapping": "a_bigint"
},
{
"name": "c_double",
"type": "DOUBLE",
"mapping": "a_double"
},
{
"name": "c_boolean",
"type": "BOOLEAN",
"mapping": "a_boolean"
}
]
}
}
13 changes: 13 additions & 0 deletions presto-product-tests/pom.xml
Expand Up @@ -14,9 +14,22 @@
<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<main-class>com.facebook.presto.tests.TemptoProductTestRunner</main-class>
<air.check.skip-duplicate-finder>true</air.check.skip-duplicate-finder>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.2.6</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-jdbc</artifactId>
Expand Down
@@ -0,0 +1,151 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.tests.kafka;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.prestodb.tempto.ProductTest;
import io.prestodb.tempto.Requirement;
import io.prestodb.tempto.RequirementsProvider;
import io.prestodb.tempto.Requires;
import io.prestodb.tempto.configuration.Configuration;
import io.prestodb.tempto.fulfillment.table.kafka.KafkaMessage;
import io.prestodb.tempto.fulfillment.table.kafka.KafkaTableDefinition;
import io.prestodb.tempto.fulfillment.table.kafka.ListKafkaDataSource;
import io.prestodb.tempto.query.QueryResult;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.testng.annotations.Test;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.SQLException;
import java.util.Map;

import static com.facebook.presto.tests.TestGroups.KAFKA;
import static io.prestodb.tempto.assertions.QueryAssert.Row.row;
import static io.prestodb.tempto.assertions.QueryAssert.assertThat;
import static io.prestodb.tempto.fulfillment.table.TableRequirements.immutableTable;
import static io.prestodb.tempto.fulfillment.table.kafka.KafkaMessageContentsBuilder.contentsBuilder;
import static io.prestodb.tempto.query.QueryExecutor.query;
import static java.lang.String.format;

public class KafkaAvroSmokeTest
extends ProductTest
{
private static final String KAFKA_CATALOG = "kafka";

private static final String ALL_DATATYPES_AVRO_TABLE_NAME = "product_tests.all_datatypes_avro";
private static final String ALL_DATATYPES_AVRO_TOPIC_NAME = "all_datatypes_avro";
private static final String ALL_DATATYPE_SCHEMA_PATH = "/docker/volumes/conf/presto/etc/catalog/kafka/all_datatypes_avro_schema.avsc";

private static final String ALL_NULL_AVRO_TABLE_NAME = "product_tests.all_null_avro";
private static final String ALL_NULL_AVRO_TOPIC_NAME = "all_null_avro";
// kafka-connectors requires tables to be predefined in presto configuration
// the requirements here will be used to verify that table actually exists and to
// create topics and propagate them with data
private static class AllDataTypesAvroTable
implements RequirementsProvider
{
@Override
public Requirement getRequirements(Configuration configuration)
{
ImmutableMap<String, Object> record = ImmutableMap.of(
"a_varchar", "foobar",
"a_bigint", 127L,
"a_double", 234.567,
"a_boolean", true);
return createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_DATATYPES_AVRO_TABLE_NAME, ALL_DATATYPES_AVRO_TOPIC_NAME, record);
}
}

private static Requirement createAvroTable(String schemaPath, String tableName, String topicName, ImmutableMap<String, Object> record)
{
try {
Schema schema = new Schema.Parser().parse(new File(schemaPath));
byte[] avroData = convertRecordToAvro(schema, record);

return immutableTable(new KafkaTableDefinition(
tableName,
topicName,
new ListKafkaDataSource(ImmutableList.of(
new KafkaMessage(
contentsBuilder()
.appendBytes(avroData)
.build()))),
1,
1));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static byte[] convertRecordToAvro(Schema schema, Map<String, Object> values)
{
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
GenericData.Record record = new GenericData.Record(schema);
values.forEach(record::put);
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<>(schema))) {
dataFileWriter.create(schema, outputStream);
dataFileWriter.append(record);
dataFileWriter.close();
}
catch (IOException e) {
throw new UncheckedIOException("Failed to convert to AVRO.", e);
}
return outputStream.toByteArray();
}

@Test(groups = {KAFKA})
@Requires(AllDataTypesAvroTable.class)
public void testSelectPrimitiveDataType()
throws SQLException
{
QueryResult queryResult = query(format("select * from %s.%s", KAFKA_CATALOG, ALL_DATATYPES_AVRO_TABLE_NAME));
assertThat(queryResult).containsOnly(row(
"foobar",
127,
234.567,
true));
}

private static class NullDataAvroTable
implements RequirementsProvider
{
@Override
public Requirement getRequirements(Configuration configuration)
{
return createAvroTable(ALL_DATATYPE_SCHEMA_PATH, ALL_NULL_AVRO_TABLE_NAME, ALL_NULL_AVRO_TOPIC_NAME, ImmutableMap.of());
}
}

@Test(groups = {KAFKA})
@Requires(NullDataAvroTable.class)
public void testNullType()
throws SQLException
{
QueryResult queryResult = query(format("select * from %s.%s", KAFKA_CATALOG, ALL_NULL_AVRO_TABLE_NAME));
assertThat(queryResult).containsOnly(row(
null,
null,
null,
null));
}
}
13 changes: 13 additions & 0 deletions presto-record-decoder/pom.xml
Expand Up @@ -51,6 +51,19 @@
<artifactId>joda-time</artifactId>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
</dependency>

<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.1.7</version>
<scope>runtime</scope>
</dependency>

<!-- Presto SPI -->
<dependency>
<groupId>com.facebook.presto</groupId>
Expand Down

0 comments on commit 54df0a4

Please sign in to comment.