Skip to content

Commit

Permalink
Refactor Kafka Admin factory
Browse files Browse the repository at this point in the history
  • Loading branch information
hashhar authored and losipiuk committed Nov 21, 2020
1 parent a8f5738 commit f60a220
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 31 deletions.
Expand Up @@ -14,41 +14,17 @@

package io.prestosql.plugin.kafka;

import io.prestosql.spi.HostAddress;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.KafkaAdminClient;

import javax.inject.Inject;

import java.util.Properties;
import java.util.Set;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;

public class KafkaAdminFactory
public interface KafkaAdminFactory
{
private final Set<HostAddress> nodes;

@Inject
public KafkaAdminFactory(KafkaConfig kafkaConfig)
{
requireNonNull(kafkaConfig, "kafkaConfig is null");
nodes = kafkaConfig.getNodes();
}

public AdminClient create()
default Admin create()
{
return KafkaAdminClient.create(configure());
}

public Properties configure()
{
Properties properties = new Properties();
properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, nodes.stream()
.map(HostAddress::toString)
.collect(joining(",")));
return properties;
}
Properties configure();
}
@@ -0,0 +1,28 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.kafka;

import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;

public class KafkaAdminModule
implements Module
{
@Override
public void configure(Binder binder)
{
binder.bind(KafkaAdminFactory.class).to(PlainTextKafkaAdminFactory.class).in(Scopes.SINGLETON);
}
}
Expand Up @@ -56,7 +56,6 @@ public void configure(Binder binder)
binder.bind(KafkaConnector.class).in(Scopes.SINGLETON);
binder.bind(KafkaInternalFieldManager.class).in(Scopes.SINGLETON);
binder.bind(KafkaSessionProperties.class).in(Scopes.SINGLETON);
binder.bind(KafkaAdminFactory.class).in(Scopes.SINGLETON);
binder.bind(KafkaFilterManager.class).in(Scopes.SINGLETON);

configBinder(binder).bindConfig(KafkaConfig.class);
Expand Down
Expand Up @@ -24,7 +24,7 @@
import io.prestosql.spi.predicate.SortedRangeSet;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.predicate.ValueSet;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -148,7 +148,7 @@ public KafkaFilteringResult getKafkaFilterResult(

private boolean isTimestampUpperBoundPushdownEnabled(ConnectorSession session, String topic)
{
try (AdminClient adminClient = adminFactory.create()) {
try (Admin adminClient = adminFactory.create()) {
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);

DescribeConfigsResult describeResult = adminClient.describeConfigs(Collections.singleton(topicResource));
Expand Down
Expand Up @@ -26,6 +26,7 @@ public class KafkaPlugin
public static final Module DEFAULT_EXTENSION = binder -> {
binder.install(new KafkaConsumerModule());
binder.install(new KafkaProducerModule());
binder.install(new KafkaAdminModule());
};

private final Module extension;
Expand Down
@@ -0,0 +1,50 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.prestosql.plugin.kafka;

import io.prestosql.spi.HostAddress;

import javax.inject.Inject;

import java.util.Properties;
import java.util.Set;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;

public class PlainTextKafkaAdminFactory
implements KafkaAdminFactory
{
private final Set<HostAddress> nodes;

@Inject
public PlainTextKafkaAdminFactory(KafkaConfig kafkaConfig)
{
requireNonNull(kafkaConfig, "kafkaConfig is null");

nodes = kafkaConfig.getNodes();
}

@Override
public Properties configure()
{
Properties properties = new Properties();
properties.setProperty(BOOTSTRAP_SERVERS_CONFIG, nodes.stream()
.map(HostAddress::toString)
.collect(joining(",")));
return properties;
}
}

0 comments on commit f60a220

Please sign in to comment.