Skip to content

Commit

Permalink
Use Optionals in KafkaTopicDescription
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk authored and findepi committed Aug 16, 2018
1 parent 731eb9a commit 0964662
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 38 deletions.
Expand Up @@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import static com.facebook.presto.kafka.KafkaHandleResolver.convertColumnHandle;
Expand Down Expand Up @@ -97,9 +98,9 @@ public KafkaTableHandle getTableHandle(ConnectorSession session, SchemaTableName
getDataFormat(table.getMessage()));
}

private static String getDataFormat(KafkaTopicFieldGroup fieldGroup)
private static String getDataFormat(Optional<KafkaTopicFieldGroup> fieldGroup)
{
return (fieldGroup == null) ? DummyRowDecoder.NAME : fieldGroup.getDataFormat();
return fieldGroup.map(KafkaTopicFieldGroup::getDataFormat).orElse(DummyRowDecoder.NAME);
}

@Override
Expand Down Expand Up @@ -134,29 +135,30 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn

ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();

int index = 0;
KafkaTopicFieldGroup key = kafkaTopicDescription.getKey();
if (key != null) {
AtomicInteger index = new AtomicInteger(0);

kafkaTopicDescription.getKey().ifPresent(key ->
{
List<KafkaTopicFieldDescription> fields = key.getFields();
if (fields != null) {
for (KafkaTopicFieldDescription kafkaTopicFieldDescription : fields) {
columnHandles.put(kafkaTopicFieldDescription.getName(), kafkaTopicFieldDescription.getColumnHandle(connectorId, true, index++));
columnHandles.put(kafkaTopicFieldDescription.getName(), kafkaTopicFieldDescription.getColumnHandle(connectorId, true, index.getAndIncrement()));
}
}
}
});

KafkaTopicFieldGroup message = kafkaTopicDescription.getMessage();
if (message != null) {
kafkaTopicDescription.getMessage().ifPresent(message ->
{
List<KafkaTopicFieldDescription> fields = message.getFields();
if (fields != null) {
for (KafkaTopicFieldDescription kafkaTopicFieldDescription : fields) {
columnHandles.put(kafkaTopicFieldDescription.getName(), kafkaTopicFieldDescription.getColumnHandle(connectorId, false, index++));
columnHandles.put(kafkaTopicFieldDescription.getName(), kafkaTopicFieldDescription.getColumnHandle(connectorId, false, index.getAndIncrement()));
}
}
}
});

for (KafkaInternalFieldDescription kafkaInternalFieldDescription : KafkaInternalFieldDescription.values()) {
columnHandles.put(kafkaInternalFieldDescription.getColumnName(), kafkaInternalFieldDescription.getColumnHandle(connectorId, index++, hideInternalColumns));
columnHandles.put(kafkaInternalFieldDescription.getColumnName(), kafkaInternalFieldDescription.getColumnHandle(connectorId, index.getAndIncrement(), hideInternalColumns));
}

return columnHandles.build();
Expand Down Expand Up @@ -220,25 +222,23 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName)

ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();

KafkaTopicFieldGroup key = table.getKey();
if (key != null) {
table.getKey().ifPresent(key -> {
List<KafkaTopicFieldDescription> fields = key.getFields();
if (fields != null) {
for (KafkaTopicFieldDescription fieldDescription : fields) {
builder.add(fieldDescription.getColumnMetadata());
}
}
}
});

KafkaTopicFieldGroup message = table.getMessage();
if (message != null) {
table.getMessage().ifPresent(message -> {
List<KafkaTopicFieldDescription> fields = message.getFields();
if (fields != null) {
for (KafkaTopicFieldDescription fieldDescription : fields) {
builder.add(fieldDescription.getColumnMetadata());
}
}
}
});

for (KafkaInternalFieldDescription fieldDescription : KafkaInternalFieldDescription.values()) {
builder.add(fieldDescription.getColumnMetadata(hideInternalColumns));
Expand Down
Expand Up @@ -29,10 +29,10 @@
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.nio.file.Files.readAllBytes;
Expand Down Expand Up @@ -72,7 +72,7 @@ public Map<SchemaTableName, KafkaTopicDescription> get()
for (File file : listFiles(tableDescriptionDir)) {
if (file.isFile() && file.getName().endsWith(".json")) {
KafkaTopicDescription table = topicDescriptionCodec.fromJson(readAllBytes(file.toPath()));
String schemaName = firstNonNull(table.getSchemaName(), defaultSchema);
String schemaName = table.getSchemaName().orElse(defaultSchema);
log.debug("Kafka table %s.%s: %s", schemaName, table.getTableName(), table);
builder.put(new SchemaTableName(schemaName, table.getTableName()), table);
}
Expand Down Expand Up @@ -100,11 +100,12 @@ public Map<SchemaTableName, KafkaTopicDescription> get()
else {
// A dummy table definition only supports the internal columns.
log.debug("Created dummy Table definition for %s", tableName);
builder.put(tableName, new KafkaTopicDescription(tableName.getTableName(),
tableName.getSchemaName(),
builder.put(tableName, new KafkaTopicDescription(
tableName.getTableName(),
Optional.ofNullable(tableName.getSchemaName()),
definedTable,
new KafkaTopicFieldGroup(DummyRowDecoder.NAME, ImmutableList.of()),
new KafkaTopicFieldGroup(DummyRowDecoder.NAME, ImmutableList.of())));
Optional.of(new KafkaTopicFieldGroup(DummyRowDecoder.NAME, ImmutableList.of())),
Optional.of(new KafkaTopicFieldGroup(DummyRowDecoder.NAME, ImmutableList.of()))));
}
}

Expand Down
Expand Up @@ -16,6 +16,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
Expand All @@ -28,24 +30,24 @@ public class KafkaTopicDescription
{
private final String tableName;
private final String topicName;
private final String schemaName;
private final KafkaTopicFieldGroup key;
private final KafkaTopicFieldGroup message;
private final Optional<String> schemaName;
private final Optional<KafkaTopicFieldGroup> key;
private final Optional<KafkaTopicFieldGroup> message;

@JsonCreator
public KafkaTopicDescription(
@JsonProperty("tableName") String tableName,
@JsonProperty("schemaName") String schemaName,
@JsonProperty("schemaName") Optional<String> schemaName,
@JsonProperty("topicName") String topicName,
@JsonProperty("key") KafkaTopicFieldGroup key,
@JsonProperty("message") KafkaTopicFieldGroup message)
@JsonProperty("key") Optional<KafkaTopicFieldGroup> key,
@JsonProperty("message") Optional<KafkaTopicFieldGroup> message)
{
checkArgument(!isNullOrEmpty(tableName), "tableName is null or is empty");
this.tableName = tableName;
this.topicName = requireNonNull(topicName, "topicName is null");
this.schemaName = schemaName;
this.key = key;
this.message = message;
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.key = requireNonNull(key, "key is null");
this.message = requireNonNull(message, "message is null");
}

@JsonProperty
Expand All @@ -61,19 +63,19 @@ public String getTopicName()
}

@JsonProperty
public String getSchemaName()
public Optional<String> getSchemaName()
{
return schemaName;
}

@JsonProperty
public KafkaTopicFieldGroup getKey()
public Optional<KafkaTopicFieldGroup> getKey()
{
return key;
}

@JsonProperty
public KafkaTopicFieldGroup getMessage()
public Optional<KafkaTopicFieldGroup> getMessage()
{
return message;
}
Expand Down
Expand Up @@ -28,6 +28,7 @@
import java.net.ServerSocket;
import java.util.AbstractMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import static com.facebook.presto.kafka.util.EmbeddedKafka.CloseableProducer;
Expand Down Expand Up @@ -83,13 +84,13 @@ public static Map.Entry<SchemaTableName, KafkaTopicDescription> loadTpchTopicDes

return new AbstractMap.SimpleImmutableEntry<>(
schemaTableName,
new KafkaTopicDescription(schemaTableName.getTableName(), schemaTableName.getSchemaName(), topicName, tpchTemplate.getKey(), tpchTemplate.getMessage()));
new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), topicName, tpchTemplate.getKey(), tpchTemplate.getMessage()));
}

public static Map.Entry<SchemaTableName, KafkaTopicDescription> createEmptyTopicDescription(String topicName, SchemaTableName schemaTableName)
{
return new AbstractMap.SimpleImmutableEntry<>(
schemaTableName,
new KafkaTopicDescription(schemaTableName.getTableName(), schemaTableName.getSchemaName(), topicName, null, null));
new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), topicName, Optional.empty(), Optional.empty()));
}
}

0 comments on commit 0964662

Please sign in to comment.