From 4b2b644646506124ae14cdea21e6a5b01bfe4726 Mon Sep 17 00:00:00 2001 From: David Phillips Date: Wed, 29 Jul 2015 07:21:45 -0700 Subject: [PATCH] Remove stored config objects in Kafka --- .../facebook/presto/kafka/KafkaMetadata.java | 11 +++++----- .../kafka/KafkaSimpleConsumerManager.java | 12 ++++++---- .../presto/kafka/KafkaSplitManager.java | 10 ++++++--- .../kafka/KafkaTableDescriptionSupplier.java | 22 +++++++++++++------ 4 files changed, 36 insertions(+), 19 deletions(-) diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java index bfe5a30c420e..c2c7da874312 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaMetadata.java @@ -45,9 +45,8 @@ public class KafkaMetadata extends ReadOnlyConnectorMetadata { private final String connectorId; - private final KafkaConnectorConfig kafkaConnectorConfig; private final KafkaHandleResolver handleResolver; - + private final boolean hideInternalColumns; private final Map tableDescriptions; private final Set internalFieldDescriptions; @@ -60,9 +59,11 @@ public KafkaMetadata( Set internalFieldDescriptions) { this.connectorId = checkNotNull(connectorId, "connectorId is null").toString(); - this.kafkaConnectorConfig = checkNotNull(kafkaConnectorConfig, "kafkaConfig is null"); this.handleResolver = checkNotNull(handleResolver, "handleResolver is null"); + checkNotNull(kafkaConnectorConfig, "kafkaConfig is null"); + this.hideInternalColumns = kafkaConnectorConfig.isHideInternalColumns(); + checkNotNull(kafkaTableDescriptionSupplier, "kafkaTableDescriptionSupplier is null"); this.tableDescriptions = kafkaTableDescriptionSupplier.get(); this.internalFieldDescriptions = checkNotNull(internalFieldDescriptions, "internalFieldDescriptions is null"); @@ -160,7 +161,7 @@ public Map getColumnHandles(ConnectorSession session, Conn } for (KafkaInternalFieldDescription kafkaInternalFieldDescription : internalFieldDescriptions) { - columnHandles.put(kafkaInternalFieldDescription.getName(), kafkaInternalFieldDescription.getColumnHandle(connectorId, index++, kafkaConnectorConfig.isHideInternalColumns())); + columnHandles.put(kafkaInternalFieldDescription.getName(), kafkaInternalFieldDescription.getColumnHandle(connectorId, index++, hideInternalColumns)); } return columnHandles.build(); @@ -225,7 +226,7 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) } for (KafkaInternalFieldDescription fieldDescription : internalFieldDescriptions) { - builder.add(fieldDescription.getColumnMetadata(kafkaConnectorConfig.isHideInternalColumns())); + builder.add(fieldDescription.getColumnMetadata(hideInternalColumns)); } return new ConnectorTableMetadata(schemaTableName, builder.build()); diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSimpleConsumerManager.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSimpleConsumerManager.java index 574bb74742fc..75f612ead6d3 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSimpleConsumerManager.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSimpleConsumerManager.java @@ -43,8 +43,9 @@ public class KafkaSimpleConsumerManager private final LoadingCache consumerCache; private final String connectorId; - private final KafkaConnectorConfig kafkaConnectorConfig; private final NodeManager nodeManager; + private final int connectTimeoutMillis; + private final int bufferSizeBytes; @Inject public KafkaSimpleConsumerManager( @@ -53,9 +54,12 @@ public KafkaSimpleConsumerManager( NodeManager nodeManager) { this.connectorId = checkNotNull(connectorId, "connectorId is null").toString(); - this.kafkaConnectorConfig = checkNotNull(kafkaConnectorConfig, "kafkaConfig is null"); this.nodeManager = checkNotNull(nodeManager, "nodeManager is null"); + checkNotNull(kafkaConnectorConfig, "kafkaConfig is null"); + this.connectTimeoutMillis = Ints.checkedCast(kafkaConnectorConfig.getKafkaConnectTimeout().toMillis()); + this.bufferSizeBytes = Ints.checkedCast(kafkaConnectorConfig.getKafkaBufferSize().toBytes()); + this.consumerCache = CacheBuilder.newBuilder().build(new SimpleConsumerCacheLoader()); } @@ -93,8 +97,8 @@ public SimpleConsumer load(HostAddress host) log.info("Creating new Consumer for %s", host); return new SimpleConsumer(host.getHostText(), host.getPort(), - Ints.checkedCast(kafkaConnectorConfig.getKafkaConnectTimeout().toMillis()), - Ints.checkedCast(kafkaConnectorConfig.getKafkaBufferSize().toBytes()), + connectTimeoutMillis, + bufferSizeBytes, format("presto-kafka-%s-%s", connectorId, nodeManager.getCurrentNode().getNodeIdentifier())); } } diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java index d0dd02691544..ab7fc6ca156b 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaSplitManager.java @@ -28,6 +28,7 @@ import com.facebook.presto.spi.TupleDomain; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import io.airlift.log.Logger; import kafka.api.PartitionOffsetRequestInfo; @@ -46,6 +47,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import static com.facebook.presto.kafka.KafkaErrorCode.KAFKA_SPLIT_ERROR; import static com.google.common.base.Preconditions.checkNotNull; @@ -60,9 +62,9 @@ public class KafkaSplitManager private static final Logger log = Logger.get(KafkaSplitManager.class); private final String connectorId; - private final KafkaConnectorConfig kafkaConnectorConfig; private final KafkaHandleResolver handleResolver; private final KafkaSimpleConsumerManager consumerManager; + private final Set nodes; @Inject public KafkaSplitManager( @@ -72,9 +74,11 @@ public KafkaSplitManager( KafkaSimpleConsumerManager consumerManager) { this.connectorId = checkNotNull(connectorId, "connectorId is null").toString(); - this.kafkaConnectorConfig = checkNotNull(kafkaConnectorConfig, "kafkaConfig is null"); this.handleResolver = checkNotNull(handleResolver, "handleResolver is null"); this.consumerManager = checkNotNull(consumerManager, "consumerManager is null"); + + checkNotNull(kafkaConnectorConfig, "kafkaConfig is null"); + this.nodes = ImmutableSet.copyOf(kafkaConnectorConfig.getNodes()); } @Override @@ -82,7 +86,7 @@ public ConnectorPartitionResult getPartitions(ConnectorSession session, Connecto { KafkaTableHandle kafkaTableHandle = handleResolver.convertTableHandle(tableHandle); - List nodes = new ArrayList<>(kafkaConnectorConfig.getNodes()); + List nodes = new ArrayList<>(this.nodes); Collections.shuffle(nodes); SimpleConsumer simpleConsumer = consumerManager.getConsumer(nodes.get(0)); diff --git a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaTableDescriptionSupplier.java b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaTableDescriptionSupplier.java index 71298169b89c..f290edfb49ac 100644 --- a/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaTableDescriptionSupplier.java +++ b/presto-kafka/src/main/java/com/facebook/presto/kafka/KafkaTableDescriptionSupplier.java @@ -18,6 +18,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.airlift.json.JsonCodec; import io.airlift.log.Logger; @@ -27,6 +28,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Supplier; import static com.google.common.base.MoreObjects.firstNonNull; @@ -39,15 +41,21 @@ public class KafkaTableDescriptionSupplier { private static final Logger log = Logger.get(KafkaTableDescriptionSupplier.class); - private final KafkaConnectorConfig kafkaConnectorConfig; private final JsonCodec topicDescriptionCodec; + private final File tableDescriptionDir; + private final String defaultSchema; + private final Set tableNames; @Inject KafkaTableDescriptionSupplier(KafkaConnectorConfig kafkaConnectorConfig, JsonCodec topicDescriptionCodec) { - this.kafkaConnectorConfig = checkNotNull(kafkaConnectorConfig, "kafkaConnectorConfig is null"); this.topicDescriptionCodec = checkNotNull(topicDescriptionCodec, "topicDescriptionCodec is null"); + + checkNotNull(kafkaConnectorConfig, "kafkaConfig is null"); + this.tableDescriptionDir = kafkaConnectorConfig.getTableDescriptionDir(); + this.defaultSchema = kafkaConnectorConfig.getDefaultSchema(); + this.tableNames = ImmutableSet.copyOf(kafkaConnectorConfig.getTableNames()); } @Override @@ -55,13 +63,13 @@ public Map get() { ImmutableMap.Builder builder = ImmutableMap.builder(); - log.debug("Loading kafka table definitions from %s", kafkaConnectorConfig.getTableDescriptionDir().getAbsolutePath()); + log.debug("Loading kafka table definitions from %s", tableDescriptionDir.getAbsolutePath()); try { - for (File file : listFiles(kafkaConnectorConfig.getTableDescriptionDir())) { + for (File file : listFiles(tableDescriptionDir)) { if (file.isFile() && file.getName().endsWith(".json")) { KafkaTopicDescription table = topicDescriptionCodec.fromJson(readAllBytes(file.toPath())); - String schemaName = firstNonNull(table.getSchemaName(), kafkaConnectorConfig.getDefaultSchema()); + String schemaName = firstNonNull(table.getSchemaName(), defaultSchema); log.debug("Kafka table %s.%s: %s", schemaName, table.getTableName(), table); builder.put(new SchemaTableName(schemaName, table.getTableName()), table); } @@ -72,13 +80,13 @@ public Map get() log.debug("Loaded Table definitions: %s", tableDefinitions.keySet()); builder = ImmutableMap.builder(); - for (String definedTable : kafkaConnectorConfig.getTableNames()) { + for (String definedTable : tableNames) { SchemaTableName tableName; try { tableName = SchemaTableName.valueOf(definedTable); } catch (IllegalArgumentException iae) { - tableName = new SchemaTableName(kafkaConnectorConfig.getDefaultSchema(), definedTable); + tableName = new SchemaTableName(defaultSchema, definedTable); } if (tableDefinitions.containsKey(tableName)) {