Skip to content

Commit

Permalink
Rework RowDecoder
Browse files Browse the repository at this point in the history
Commit makes important changes around RowDecoder used in Kafka and Redis
connectors.

Changes:
 * RowDecoder is now stateful (columnHandles, types etc are passed
 during construction
 * Introduced RowDecoderFactory
 * We no longer enforce decoding in two phases (row decoding and field
 decoding)
 * Make construction of RowDecoder instances more straightforward
 without cumbersome DecoderRegistry
  • Loading branch information
losipiuk authored and kokosing committed Jul 23, 2018
1 parent adb50bf commit dfb0423
Show file tree
Hide file tree
Showing 36 changed files with 684 additions and 1,121 deletions.
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.kafka;

import com.facebook.presto.decoder.DecoderColumnHandle;
import com.facebook.presto.decoder.FieldDecoder;
import com.facebook.presto.decoder.FieldValueProvider;
import com.facebook.presto.decoder.RowDecoder;
import com.facebook.presto.spi.ColumnHandle;
Expand Down Expand Up @@ -62,28 +61,22 @@ public class KafkaRecordSet

private final RowDecoder keyDecoder;
private final RowDecoder messageDecoder;
private final Map<DecoderColumnHandle, FieldDecoder<?>> keyFieldDecoders;
private final Map<DecoderColumnHandle, FieldDecoder<?>> messageFieldDecoders;

private final List<DecoderColumnHandle> columnHandles;
private final List<KafkaColumnHandle> columnHandles;
private final List<Type> columnTypes;

KafkaRecordSet(KafkaSplit split,
KafkaSimpleConsumerManager consumerManager,
List<DecoderColumnHandle> columnHandles,
List<KafkaColumnHandle> columnHandles,
RowDecoder keyDecoder,
RowDecoder messageDecoder,
Map<DecoderColumnHandle, FieldDecoder<?>> keyFieldDecoders,
Map<DecoderColumnHandle, FieldDecoder<?>> messageFieldDecoders)
RowDecoder messageDecoder)
{
this.split = requireNonNull(split, "split is null");

this.consumerManager = requireNonNull(consumerManager, "consumerManager is null");

this.keyDecoder = requireNonNull(keyDecoder, "rowDecoder is null");
this.messageDecoder = requireNonNull(messageDecoder, "rowDecoder is null");
this.keyFieldDecoders = requireNonNull(keyFieldDecoders, "keyFieldDecoders is null");
this.messageFieldDecoders = requireNonNull(messageFieldDecoders, "messageFieldDecoders is null");

this.columnHandles = requireNonNull(columnHandles, "columnHandles is null");

Expand Down Expand Up @@ -200,8 +193,8 @@ private boolean nextRow(MessageAndOffset messageAndOffset)

Map<ColumnHandle, FieldValueProvider> currentRowValuesMap = new HashMap<>();

Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedKey = keyDecoder.decodeRow(keyData, null, columnHandles, keyFieldDecoders);
Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedValue = messageDecoder.decodeRow(messageData, null, columnHandles, messageFieldDecoders);
Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedKey = keyDecoder.decodeRow(keyData, null);
Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedValue = messageDecoder.decodeRow(messageData, null);

for (DecoderColumnHandle columnHandle : columnHandles) {
if (columnHandle.isInternal()) {
Expand Down
Expand Up @@ -13,9 +13,7 @@
*/
package com.facebook.presto.kafka;

import com.facebook.presto.decoder.DecoderColumnHandle;
import com.facebook.presto.decoder.DecoderRegistry;
import com.facebook.presto.decoder.FieldDecoder;
import com.facebook.presto.decoder.DispatchingRowDecoderFactory;
import com.facebook.presto.decoder.RowDecoder;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
Expand All @@ -24,14 +22,13 @@
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import javax.inject.Inject;

import java.util.List;

import static com.facebook.presto.kafka.KafkaHandleResolver.convertColumnHandle;
import static com.facebook.presto.kafka.KafkaHandleResolver.convertSplit;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -40,13 +37,13 @@
public class KafkaRecordSetProvider
implements ConnectorRecordSetProvider
{
private DispatchingRowDecoderFactory decoderFactory;
private final KafkaSimpleConsumerManager consumerManager;
private final DecoderRegistry registry;

@Inject
public KafkaRecordSetProvider(DecoderRegistry registry, KafkaSimpleConsumerManager consumerManager)
public KafkaRecordSetProvider(DispatchingRowDecoderFactory decoderFactory, KafkaSimpleConsumerManager consumerManager)
{
this.registry = requireNonNull(registry, "registry is null");
this.decoderFactory = requireNonNull(decoderFactory, "decoderFactory is null");
this.consumerManager = requireNonNull(consumerManager, "consumerManager is null");
}

Expand All @@ -55,41 +52,24 @@ public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorS
{
KafkaSplit kafkaSplit = convertSplit(split);

ImmutableList.Builder<DecoderColumnHandle> handleBuilder = ImmutableList.builder();
ImmutableMap.Builder<DecoderColumnHandle, FieldDecoder<?>> keyFieldDecoderBuilder = ImmutableMap.builder();
ImmutableMap.Builder<DecoderColumnHandle, FieldDecoder<?>> messageFieldDecoderBuilder = ImmutableMap.builder();
List<KafkaColumnHandle> kafkaColumns = columns.stream()
.map(KafkaHandleResolver::convertColumnHandle)
.collect(ImmutableList.toImmutableList());

RowDecoder keyDecoder = registry.getRowDecoder(kafkaSplit.getKeyDataFormat());
RowDecoder messageDecoder = registry.getRowDecoder(kafkaSplit.getMessageDataFormat());
RowDecoder keyDecoder = decoderFactory.create(
kafkaSplit.getKeyDataFormat(),
kafkaColumns.stream()
.filter(col -> !col.isInternal())
.filter(KafkaColumnHandle::isKeyDecoder)
.collect(toImmutableSet()));

for (ColumnHandle handle : columns) {
KafkaColumnHandle columnHandle = convertColumnHandle(handle);
handleBuilder.add(columnHandle);
RowDecoder messageDecoder = decoderFactory.create(
kafkaSplit.getMessageDataFormat(),
kafkaColumns.stream()
.filter(col -> !col.isInternal())
.filter(col -> !col.isKeyDecoder())
.collect(toImmutableSet()));

if (!columnHandle.isInternal()) {
if (columnHandle.isKeyDecoder()) {
FieldDecoder<?> fieldDecoder = registry.getFieldDecoder(
kafkaSplit.getKeyDataFormat(),
columnHandle.getType().getJavaType(),
columnHandle.getDataFormat());

keyFieldDecoderBuilder.put(columnHandle, fieldDecoder);
}
else {
FieldDecoder<?> fieldDecoder = registry.getFieldDecoder(
kafkaSplit.getMessageDataFormat(),
columnHandle.getType().getJavaType(),
columnHandle.getDataFormat());

messageFieldDecoderBuilder.put(columnHandle, fieldDecoder);
}
}
}

ImmutableList<DecoderColumnHandle> handles = handleBuilder.build();
ImmutableMap<DecoderColumnHandle, FieldDecoder<?>> keyFieldDecoders = keyFieldDecoderBuilder.build();
ImmutableMap<DecoderColumnHandle, FieldDecoder<?>> messageFieldDecoders = messageFieldDecoderBuilder.build();

return new KafkaRecordSet(kafkaSplit, consumerManager, handles, keyDecoder, messageDecoder, keyFieldDecoders, messageFieldDecoders);
return new KafkaRecordSet(kafkaSplit, consumerManager, kafkaColumns, keyDecoder, messageDecoder);
}
}
11 changes: 0 additions & 11 deletions presto-record-decoder/pom.xml
Expand Up @@ -36,22 +36,11 @@
<artifactId>javax.inject</artifactId>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>json</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>net.sf.opencsv</groupId>
<artifactId>opencsv</artifactId>
Expand Down
Expand Up @@ -13,15 +13,19 @@
*/
package com.facebook.presto.decoder;

import com.facebook.presto.decoder.csv.CsvDecoderModule;
import com.facebook.presto.decoder.dummy.DummyDecoderModule;
import com.facebook.presto.decoder.json.JsonDecoderModule;
import com.facebook.presto.decoder.raw.RawDecoderModule;
import com.facebook.presto.decoder.csv.CsvRowDecoder;
import com.facebook.presto.decoder.csv.CsvRowDecoderFactory;
import com.facebook.presto.decoder.dummy.DummyRowDecoder;
import com.facebook.presto.decoder.dummy.DummyRowDecoderFactory;
import com.facebook.presto.decoder.json.JsonRowDecoder;
import com.facebook.presto.decoder.json.JsonRowDecoderFactory;
import com.facebook.presto.decoder.raw.RawRowDecoder;
import com.facebook.presto.decoder.raw.RawRowDecoderFactory;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.Multibinder;
import com.google.inject.multibindings.MapBinder;

import static com.google.inject.Scopes.SINGLETON;

/**
* Default decoder module. Installs the registry and all known decoder submodules.
Expand All @@ -32,23 +36,12 @@ public class DecoderModule
@Override
public void configure(Binder binder)
{
binder.bind(DecoderRegistry.class).in(Scopes.SINGLETON);

binder.install(new DummyDecoderModule());
binder.install(new CsvDecoderModule());
binder.install(new JsonDecoderModule());
binder.install(new RawDecoderModule());
}
MapBinder<String, RowDecoderFactory> decoderFactoriesByName = MapBinder.newMapBinder(binder, String.class, RowDecoderFactory.class);
decoderFactoriesByName.addBinding(DummyRowDecoder.NAME).to(DummyRowDecoderFactory.class).in(SINGLETON);
decoderFactoriesByName.addBinding(CsvRowDecoder.NAME).to(CsvRowDecoderFactory.class).in(SINGLETON);
decoderFactoriesByName.addBinding(JsonRowDecoder.NAME).to(JsonRowDecoderFactory.class).in(SINGLETON);
decoderFactoriesByName.addBinding(RawRowDecoder.NAME).to(RawRowDecoderFactory.class).in(SINGLETON);

public static void bindRowDecoder(Binder binder, Class<? extends RowDecoder> decoderClass)
{
Multibinder<RowDecoder> rowDecoderBinder = Multibinder.newSetBinder(binder, RowDecoder.class);
rowDecoderBinder.addBinding().to(decoderClass).in(Scopes.SINGLETON);
}

public static void bindFieldDecoder(Binder binder, Class<? extends FieldDecoder<?>> decoderClass)
{
Multibinder<FieldDecoder<?>> fieldDecoderBinder = Multibinder.newSetBinder(binder, new TypeLiteral<FieldDecoder<?>>() {});
fieldDecoderBinder.addBinding().to(decoderClass).in(Scopes.SINGLETON);
binder.bind(DispatchingRowDecoderFactory.class).in(SINGLETON);
}
}

This file was deleted.

@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.decoder;

import com.google.common.collect.ImmutableMap;

import javax.inject.Inject;

import java.util.Map;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;

public class DispatchingRowDecoderFactory
{
private final Map<String, RowDecoderFactory> factories;

@Inject
public DispatchingRowDecoderFactory(Map<String, RowDecoderFactory> factories)
{
this.factories = ImmutableMap.copyOf(factories);
}

public RowDecoder create(String dataFormat, Set<DecoderColumnHandle> columns)
{
checkArgument(factories.containsKey(dataFormat), "unknown data format '%s'", dataFormat);
return factories.get(dataFormat).create(columns);
}
}

0 comments on commit dfb0423

Please sign in to comment.