Skip to content

Commit

Permalink
Remove stored config objects in Kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Aug 6, 2015
1 parent 4e5a7a1 commit 4b2b644
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 19 deletions.
Expand Up @@ -45,9 +45,8 @@ public class KafkaMetadata
extends ReadOnlyConnectorMetadata
{
private final String connectorId;
private final KafkaConnectorConfig kafkaConnectorConfig;
private final KafkaHandleResolver handleResolver;

private final boolean hideInternalColumns;
private final Map<SchemaTableName, KafkaTopicDescription> tableDescriptions;
private final Set<KafkaInternalFieldDescription> internalFieldDescriptions;

Expand All @@ -60,9 +59,11 @@ public KafkaMetadata(
Set<KafkaInternalFieldDescription> internalFieldDescriptions)
{
this.connectorId = checkNotNull(connectorId, "connectorId is null").toString();
this.kafkaConnectorConfig = checkNotNull(kafkaConnectorConfig, "kafkaConfig is null");
this.handleResolver = checkNotNull(handleResolver, "handleResolver is null");

checkNotNull(kafkaConnectorConfig, "kafkaConfig is null");
this.hideInternalColumns = kafkaConnectorConfig.isHideInternalColumns();

checkNotNull(kafkaTableDescriptionSupplier, "kafkaTableDescriptionSupplier is null");
this.tableDescriptions = kafkaTableDescriptionSupplier.get();
this.internalFieldDescriptions = checkNotNull(internalFieldDescriptions, "internalFieldDescriptions is null");
Expand Down Expand Up @@ -160,7 +161,7 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
}

for (KafkaInternalFieldDescription kafkaInternalFieldDescription : internalFieldDescriptions) {
columnHandles.put(kafkaInternalFieldDescription.getName(), kafkaInternalFieldDescription.getColumnHandle(connectorId, index++, kafkaConnectorConfig.isHideInternalColumns()));
columnHandles.put(kafkaInternalFieldDescription.getName(), kafkaInternalFieldDescription.getColumnHandle(connectorId, index++, hideInternalColumns));
}

return columnHandles.build();
Expand Down Expand Up @@ -225,7 +226,7 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName)
}

for (KafkaInternalFieldDescription fieldDescription : internalFieldDescriptions) {
builder.add(fieldDescription.getColumnMetadata(kafkaConnectorConfig.isHideInternalColumns()));
builder.add(fieldDescription.getColumnMetadata(hideInternalColumns));
}

return new ConnectorTableMetadata(schemaTableName, builder.build());
Expand Down
Expand Up @@ -43,8 +43,9 @@ public class KafkaSimpleConsumerManager
private final LoadingCache<HostAddress, SimpleConsumer> consumerCache;

private final String connectorId;
private final KafkaConnectorConfig kafkaConnectorConfig;
private final NodeManager nodeManager;
private final int connectTimeoutMillis;
private final int bufferSizeBytes;

@Inject
public KafkaSimpleConsumerManager(
Expand All @@ -53,9 +54,12 @@ public KafkaSimpleConsumerManager(
NodeManager nodeManager)
{
this.connectorId = checkNotNull(connectorId, "connectorId is null").toString();
this.kafkaConnectorConfig = checkNotNull(kafkaConnectorConfig, "kafkaConfig is null");
this.nodeManager = checkNotNull(nodeManager, "nodeManager is null");

checkNotNull(kafkaConnectorConfig, "kafkaConfig is null");
this.connectTimeoutMillis = Ints.checkedCast(kafkaConnectorConfig.getKafkaConnectTimeout().toMillis());
this.bufferSizeBytes = Ints.checkedCast(kafkaConnectorConfig.getKafkaBufferSize().toBytes());

this.consumerCache = CacheBuilder.newBuilder().build(new SimpleConsumerCacheLoader());
}

Expand Down Expand Up @@ -93,8 +97,8 @@ public SimpleConsumer load(HostAddress host)
log.info("Creating new Consumer for %s", host);
return new SimpleConsumer(host.getHostText(),
host.getPort(),
Ints.checkedCast(kafkaConnectorConfig.getKafkaConnectTimeout().toMillis()),
Ints.checkedCast(kafkaConnectorConfig.getKafkaBufferSize().toBytes()),
connectTimeoutMillis,
bufferSizeBytes,
format("presto-kafka-%s-%s", connectorId, nodeManager.getCurrentNode().getNodeIdentifier()));
}
}
Expand Down
Expand Up @@ -28,6 +28,7 @@
import com.facebook.presto.spi.TupleDomain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import io.airlift.log.Logger;
import kafka.api.PartitionOffsetRequestInfo;
Expand All @@ -46,6 +47,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;

import static com.facebook.presto.kafka.KafkaErrorCode.KAFKA_SPLIT_ERROR;
import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -60,9 +62,9 @@ public class KafkaSplitManager
private static final Logger log = Logger.get(KafkaSplitManager.class);

private final String connectorId;
private final KafkaConnectorConfig kafkaConnectorConfig;
private final KafkaHandleResolver handleResolver;
private final KafkaSimpleConsumerManager consumerManager;
private final Set<HostAddress> nodes;

@Inject
public KafkaSplitManager(
Expand All @@ -72,17 +74,19 @@ public KafkaSplitManager(
KafkaSimpleConsumerManager consumerManager)
{
this.connectorId = checkNotNull(connectorId, "connectorId is null").toString();
this.kafkaConnectorConfig = checkNotNull(kafkaConnectorConfig, "kafkaConfig is null");
this.handleResolver = checkNotNull(handleResolver, "handleResolver is null");
this.consumerManager = checkNotNull(consumerManager, "consumerManager is null");

checkNotNull(kafkaConnectorConfig, "kafkaConfig is null");
this.nodes = ImmutableSet.copyOf(kafkaConnectorConfig.getNodes());
}

@Override
public ConnectorPartitionResult getPartitions(ConnectorSession session, ConnectorTableHandle tableHandle, TupleDomain<ColumnHandle> tupleDomain)
{
KafkaTableHandle kafkaTableHandle = handleResolver.convertTableHandle(tableHandle);

List<HostAddress> nodes = new ArrayList<>(kafkaConnectorConfig.getNodes());
List<HostAddress> nodes = new ArrayList<>(this.nodes);
Collections.shuffle(nodes);

SimpleConsumer simpleConsumer = consumerManager.getConsumer(nodes.get(0));
Expand Down
Expand Up @@ -18,6 +18,7 @@
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;

Expand All @@ -27,6 +28,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

import static com.google.common.base.MoreObjects.firstNonNull;
Expand All @@ -39,29 +41,35 @@ public class KafkaTableDescriptionSupplier
{
private static final Logger log = Logger.get(KafkaTableDescriptionSupplier.class);

private final KafkaConnectorConfig kafkaConnectorConfig;
private final JsonCodec<KafkaTopicDescription> topicDescriptionCodec;
private final File tableDescriptionDir;
private final String defaultSchema;
private final Set<String> tableNames;

@Inject
KafkaTableDescriptionSupplier(KafkaConnectorConfig kafkaConnectorConfig,
JsonCodec<KafkaTopicDescription> topicDescriptionCodec)
{
this.kafkaConnectorConfig = checkNotNull(kafkaConnectorConfig, "kafkaConnectorConfig is null");
this.topicDescriptionCodec = checkNotNull(topicDescriptionCodec, "topicDescriptionCodec is null");

checkNotNull(kafkaConnectorConfig, "kafkaConfig is null");
this.tableDescriptionDir = kafkaConnectorConfig.getTableDescriptionDir();
this.defaultSchema = kafkaConnectorConfig.getDefaultSchema();
this.tableNames = ImmutableSet.copyOf(kafkaConnectorConfig.getTableNames());
}

@Override
public Map<SchemaTableName, KafkaTopicDescription> get()
{
ImmutableMap.Builder<SchemaTableName, KafkaTopicDescription> builder = ImmutableMap.builder();

log.debug("Loading kafka table definitions from %s", kafkaConnectorConfig.getTableDescriptionDir().getAbsolutePath());
log.debug("Loading kafka table definitions from %s", tableDescriptionDir.getAbsolutePath());

try {
for (File file : listFiles(kafkaConnectorConfig.getTableDescriptionDir())) {
for (File file : listFiles(tableDescriptionDir)) {
if (file.isFile() && file.getName().endsWith(".json")) {
KafkaTopicDescription table = topicDescriptionCodec.fromJson(readAllBytes(file.toPath()));
String schemaName = firstNonNull(table.getSchemaName(), kafkaConnectorConfig.getDefaultSchema());
String schemaName = firstNonNull(table.getSchemaName(), defaultSchema);
log.debug("Kafka table %s.%s: %s", schemaName, table.getTableName(), table);
builder.put(new SchemaTableName(schemaName, table.getTableName()), table);
}
Expand All @@ -72,13 +80,13 @@ public Map<SchemaTableName, KafkaTopicDescription> get()
log.debug("Loaded Table definitions: %s", tableDefinitions.keySet());

builder = ImmutableMap.builder();
for (String definedTable : kafkaConnectorConfig.getTableNames()) {
for (String definedTable : tableNames) {
SchemaTableName tableName;
try {
tableName = SchemaTableName.valueOf(definedTable);
}
catch (IllegalArgumentException iae) {
tableName = new SchemaTableName(kafkaConnectorConfig.getDefaultSchema(), definedTable);
tableName = new SchemaTableName(defaultSchema, definedTable);
}

if (tableDefinitions.containsKey(tableName)) {
Expand Down

0 comments on commit 4b2b644

Please sign in to comment.