Skip to content

Commit

Permalink
Move Kafka decoders to presto-record-decoder module
Browse files Browse the repository at this point in the history
  • Loading branch information
avasilevskiy authored and dain committed Sep 19, 2015
1 parent f7281ec commit fda5d0f
Show file tree
Hide file tree
Showing 54 changed files with 1,866 additions and 1,418 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Expand Up @@ -64,6 +64,7 @@
<modules>
<module>presto-spi</module>
<module>presto-jmx</module>
<module>presto-record-decoder</module>
<module>presto-kafka</module>
<module>presto-cassandra</module>
<module>presto-blackhole</module>
Expand Down Expand Up @@ -104,6 +105,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-record-decoder</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-orc</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions presto-jdbc/pom.xml
Expand Up @@ -242,6 +242,12 @@
<exclude>about.html</exclude>
</excludes>
</filter>
<filter>
<artifact>net.sf.opencsv:opencsv</artifact>
<excludes>
<exclude>**</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
Expand Down
15 changes: 5 additions & 10 deletions presto-kafka/pom.xml
Expand Up @@ -42,6 +42,11 @@
<artifactId>units</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-record-decoder</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand All @@ -62,21 +67,11 @@
<artifactId>validation-api</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
</dependency>

<dependency>
<groupId>net.sf.opencsv</groupId>
<artifactId>opencsv</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand Down
Expand Up @@ -13,8 +13,8 @@
*/
package com.facebook.presto.kafka;

import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.decoder.DecoderColumnHandle;
import com.facebook.presto.spi.type.Type;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -28,7 +28,7 @@
* Kafka specific connector column handle.
*/
public final class KafkaColumnHandle
implements ColumnHandle, Comparable<KafkaColumnHandle>
implements DecoderColumnHandle, Comparable<KafkaColumnHandle>
{
private final String connectorId;
private final int ordinalPosition;
Expand Down Expand Up @@ -111,30 +111,35 @@ public int getOrdinalPosition()
return ordinalPosition;
}

@Override
@JsonProperty
public String getName()
{
return name;
}

@Override
@JsonProperty
public Type getType()
{
return type;
}

@Override
@JsonProperty
public String getMapping()
{
return mapping;
}

@Override
@JsonProperty
public String getDataFormat()
{
return dataFormat;
}

@Override
@JsonProperty
public String getFormatHint()
{
Expand All @@ -153,6 +158,7 @@ public boolean isHidden()
return hidden;
}

@Override
@JsonProperty
public boolean isInternal()
{
Expand Down
Expand Up @@ -13,9 +13,9 @@
*/
package com.facebook.presto.kafka;

import com.facebook.presto.kafka.decoder.KafkaDecoderModule;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.decoder.DecoderModule;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
import com.google.inject.Binder;
Expand Down Expand Up @@ -55,7 +55,7 @@ public void configure(Binder binder)
jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
jsonCodecBinder(binder).bindJsonCodec(KafkaTopicDescription.class);

binder.install(new KafkaDecoderModule());
binder.install(new DecoderModule());

for (KafkaInternalFieldDescription internalFieldDescription : KafkaInternalFieldDescription.getInternalFields()) {
bindInternalColumn(binder, internalFieldDescription);
Expand Down
Expand Up @@ -23,11 +23,7 @@ public enum KafkaErrorCode
implements ErrorCodeSupplier
{
// Connectors can use error codes starting at EXTERNAL

/**
* A requested data conversion is not supported.
*/
KAFKA_CONVERSION_NOT_SUPPORTED(0x0200_0000),
// NOTE: 0x0200_0000 is reserved for DecoderErrorCode
KAFKA_SPLIT_ERROR(0x0200_0001);

private final ErrorCode errorCode;
Expand Down
Expand Up @@ -14,6 +14,8 @@
package com.facebook.presto.kafka;

import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.decoder.DecoderColumnHandle;
import com.facebook.presto.decoder.FieldValueProvider;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.BooleanType;
import com.facebook.presto.spi.type.Type;
Expand Down Expand Up @@ -145,17 +147,17 @@ ColumnMetadata getColumnMetadata(boolean hidden)
return new ColumnMetadata(name, type, false, comment, hidden);
}

public KafkaFieldValueProvider forBooleanValue(boolean value)
public FieldValueProvider forBooleanValue(boolean value)
{
return new BooleanKafkaFieldValueProvider(value);
}

public KafkaFieldValueProvider forLongValue(long value)
public FieldValueProvider forLongValue(long value)
{
return new LongKafkaFieldValueProvider(value);
}

public KafkaFieldValueProvider forByteValue(byte[] value)
public FieldValueProvider forByteValue(byte[] value)
{
return new BytesKafkaFieldValueProvider(value);
}
Expand Down Expand Up @@ -191,7 +193,7 @@ public String toString()
}

public class BooleanKafkaFieldValueProvider
extends KafkaFieldValueProvider
extends FieldValueProvider
{
private final boolean value;

Expand All @@ -201,7 +203,7 @@ private BooleanKafkaFieldValueProvider(boolean value)
}

@Override
public boolean accept(KafkaColumnHandle columnHandle)
public boolean accept(DecoderColumnHandle columnHandle)
{
return columnHandle.getName().equals(name);
}
Expand All @@ -220,7 +222,7 @@ public boolean isNull()
}

public class LongKafkaFieldValueProvider
extends KafkaFieldValueProvider
extends FieldValueProvider
{
private final long value;

Expand All @@ -230,7 +232,7 @@ private LongKafkaFieldValueProvider(long value)
}

@Override
public boolean accept(KafkaColumnHandle columnHandle)
public boolean accept(DecoderColumnHandle columnHandle)
{
return columnHandle.getName().equals(name);
}
Expand All @@ -249,7 +251,7 @@ public boolean isNull()
}

public class BytesKafkaFieldValueProvider
extends KafkaFieldValueProvider
extends FieldValueProvider
{
private final byte[] value;

Expand All @@ -259,7 +261,7 @@ private BytesKafkaFieldValueProvider(byte[] value)
}

@Override
public boolean accept(KafkaColumnHandle columnHandle)
public boolean accept(DecoderColumnHandle columnHandle)
{
return columnHandle.getName().equals(name);
}
Expand Down
Expand Up @@ -13,7 +13,6 @@
*/
package com.facebook.presto.kafka;

import com.facebook.presto.kafka.decoder.dummy.DummyKafkaRowDecoder;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorMetadata;
Expand All @@ -23,6 +22,7 @@
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.decoder.dummy.DummyRowDecoder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -97,7 +97,7 @@ public KafkaTableHandle getTableHandle(ConnectorSession session, SchemaTableName

private static String getDataFormat(KafkaTopicFieldGroup fieldGroup)
{
return (fieldGroup == null) ? DummyKafkaRowDecoder.NAME : fieldGroup.getDataFormat();
return (fieldGroup == null) ? DummyRowDecoder.NAME : fieldGroup.getDataFormat();
}

@Override
Expand Down
Expand Up @@ -13,11 +13,13 @@
*/
package com.facebook.presto.kafka;

import com.facebook.presto.kafka.decoder.KafkaFieldDecoder;
import com.facebook.presto.kafka.decoder.KafkaRowDecoder;
import com.facebook.presto.decoder.DecoderColumnHandle;
import com.facebook.presto.decoder.FieldDecoder;
import com.facebook.presto.decoder.FieldValueProvider;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordSet;
import com.facebook.presto.decoder.RowDecoder;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -56,23 +58,23 @@ public class KafkaRecordSet
private final KafkaSplit split;
private final KafkaSimpleConsumerManager consumerManager;

private final KafkaRowDecoder keyDecoder;
private final KafkaRowDecoder messageDecoder;
private final Map<KafkaColumnHandle, KafkaFieldDecoder<?>> keyFieldDecoders;
private final Map<KafkaColumnHandle, KafkaFieldDecoder<?>> messageFieldDecoders;
private final RowDecoder keyDecoder;
private final RowDecoder messageDecoder;
private final Map<DecoderColumnHandle, FieldDecoder<?>> keyFieldDecoders;
private final Map<DecoderColumnHandle, FieldDecoder<?>> messageFieldDecoders;

private final List<KafkaColumnHandle> columnHandles;
private final List<DecoderColumnHandle> columnHandles;
private final List<Type> columnTypes;

private final Set<KafkaFieldValueProvider> globalInternalFieldValueProviders;
private final Set<FieldValueProvider> globalInternalFieldValueProviders;

KafkaRecordSet(KafkaSplit split,
KafkaSimpleConsumerManager consumerManager,
List<KafkaColumnHandle> columnHandles,
KafkaRowDecoder keyDecoder,
KafkaRowDecoder messageDecoder,
Map<KafkaColumnHandle, KafkaFieldDecoder<?>> keyFieldDecoders,
Map<KafkaColumnHandle, KafkaFieldDecoder<?>> messageFieldDecoders)
List<DecoderColumnHandle> columnHandles,
RowDecoder keyDecoder,
RowDecoder messageDecoder,
Map<DecoderColumnHandle, FieldDecoder<?>> keyFieldDecoders,
Map<DecoderColumnHandle, FieldDecoder<?>> messageFieldDecoders)
{
this.split = requireNonNull(split, "split is null");

Expand All @@ -92,7 +94,7 @@ public class KafkaRecordSet

ImmutableList.Builder<Type> typeBuilder = ImmutableList.builder();

for (KafkaColumnHandle handle : columnHandles) {
for (DecoderColumnHandle handle : columnHandles) {
typeBuilder.add(handle.getType());
}

Expand Down Expand Up @@ -120,7 +122,7 @@ public class KafkaRecordCursor
private Iterator<MessageAndOffset> messageAndOffsetIterator;
private final AtomicBoolean reported = new AtomicBoolean();

private KafkaFieldValueProvider[] fieldValueProviders;
private FieldValueProvider[] fieldValueProviders;

KafkaRecordCursor()
{
Expand Down Expand Up @@ -207,7 +209,7 @@ private boolean nextRow(MessageAndOffset messageAndOffset)
message.get(messageData);
}

Set<KafkaFieldValueProvider> fieldValueProviders = new HashSet<>();
Set<FieldValueProvider> fieldValueProviders = new HashSet<>();

fieldValueProviders.addAll(globalInternalFieldValueProviders);
fieldValueProviders.add(KafkaInternalFieldDescription.SEGMENT_COUNT_FIELD.forLongValue(totalMessages));
Expand All @@ -216,17 +218,17 @@ private boolean nextRow(MessageAndOffset messageAndOffset)
fieldValueProviders.add(KafkaInternalFieldDescription.MESSAGE_LENGTH_FIELD.forLongValue(messageData.length));
fieldValueProviders.add(KafkaInternalFieldDescription.KEY_FIELD.forByteValue(keyData));
fieldValueProviders.add(KafkaInternalFieldDescription.KEY_LENGTH_FIELD.forLongValue(keyData.length));
fieldValueProviders.add(KafkaInternalFieldDescription.KEY_CORRUPT_FIELD.forBooleanValue(keyDecoder.decodeRow(keyData, fieldValueProviders, columnHandles, keyFieldDecoders)));
fieldValueProviders.add(KafkaInternalFieldDescription.MESSAGE_CORRUPT_FIELD.forBooleanValue(messageDecoder.decodeRow(messageData, fieldValueProviders, columnHandles, messageFieldDecoders)));
fieldValueProviders.add(KafkaInternalFieldDescription.KEY_CORRUPT_FIELD.forBooleanValue(keyDecoder.decodeRow(keyData, null, fieldValueProviders, columnHandles, keyFieldDecoders)));
fieldValueProviders.add(KafkaInternalFieldDescription.MESSAGE_CORRUPT_FIELD.forBooleanValue(messageDecoder.decodeRow(messageData, null, fieldValueProviders, columnHandles, messageFieldDecoders)));

this.fieldValueProviders = new KafkaFieldValueProvider[columnHandles.size()];
this.fieldValueProviders = new FieldValueProvider[columnHandles.size()];

// If a value provider for a requested internal column is present, assign the
// value to the internal cache. It is possible that an internal column is present
// where no value provider exists (e.g. the '_corrupt' column with the DummyRowDecoder).
// In that case, the cache is null (and the column is reported as null).
for (int i = 0; i < columnHandles.size(); i++) {
for (KafkaFieldValueProvider fieldValueProvider : fieldValueProviders) {
for (FieldValueProvider fieldValueProvider : fieldValueProviders) {
if (fieldValueProvider.accept(columnHandles.get(i))) {
this.fieldValueProviders[i] = fieldValueProvider;
break; // for(InternalColumnProvider...
Expand Down

0 comments on commit fda5d0f

Please sign in to comment.