Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Add ssl listeners in channelInitializer #45

Merged
merged 7 commits into from
Nov 13, 2019
Merged

Add ssl listeners in channelInitializer #45

merged 7 commits into from
Nov 13, 2019

Conversation

jiazhai
Copy link
Contributor

@jiazhai jiazhai commented Nov 11, 2019

  • Add ssl listeners and sslCtxRefresher in channelInitializer

@jiazhai jiazhai requested a review from sijie November 11, 2019 02:52
@jiazhai jiazhai self-assigned this Nov 11, 2019
// TODO: support data register, when do https://github.com/streamnative/kop/issues/2
return "mock-data-for-kafka";
log.debug("Get configured listeners", kafkaConfig.getListeners());
return kafkaConfig.getListeners();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

if (log.isDebugEnabled()) {
    log.debug("Get configured listeners", kafkaConfig.getListeners());
}

for (String listener: parts) {
if (listener.startsWith(PLAINTEXT_PREFIX)) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we use the address in the listener as the bind address?

true));
} else {
log.error("KafkaProtocolHandler listeners {} not supported. supports {} and {}",
listeners, PLAINTEXT_PREFIX, SSL_PREFIX);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
listeners, PLAINTEXT_PREFIX, SSL_PREFIX);
listener, PLAINTEXT_PREFIX, SSL_PREFIX);

return initializerMap;
true));
} else {
log.error("KafkaProtocolHandler listeners {} not supported. supports {} and {}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.error("KafkaProtocolHandler listeners {} not supported. supports {} and {}",
log.error("Kafka listener {} is not supported. Only {} and {} are supported now.",

@@ -119,11 +127,15 @@
private final NamespaceName kafkaNamespace;
private final ExecutorService executor;
private final PulsarAdmin admin;
private final Boolean tlsEnabled;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need tlsEnabled if we already support listeners?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some method, it will need construct a Node(contains broker host and port), the port should be identified separately(9092 or 9093), so for this handler we need to know it is created with SSL enabled or not.

if (!lookupResult.isPresent()) {
log.error("Can't find broker for topic {}", topic);
CompletableFuture<Optional<String>> future = new CompletableFuture<>();
future.completeExceptionally(new KeeperException.NoNodeException());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should return a zookeeper exception here. We should return a Pulsar exception.

} catch (Exception e) {
log.error("Failed to get URI from {} for topic {}", candidateBroker, topic);
CompletableFuture<Optional<String>> future = new CompletableFuture<>();
future.completeExceptionally(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convert the exception into a broker exception?

} catch (Exception e) {
log.error("Caught error while find Broker for topic:{} ", topic, e);
resultFuture.completeExceptionally(e);
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we return null here, did we handle null in the caller?

@sijie sijie added area/protocol type/feature Indicates new functionality labels Nov 12, 2019
@sijie sijie added this to the 0.0.1 milestone Nov 12, 2019
@sijie sijie merged commit 3424abc into streamnative:master Nov 13, 2019
@sijie sijie deleted the enc_tls branch November 13, 2019 09:53
@jiazhai jiazhai changed the title Add ssl listeners and sslCtxRefresher in channelInitializer Add ssl listeners in channelInitializer Nov 13, 2019
@jiazhai jiazhai modified the milestones: 0.0.1, 0.1.0 Feb 17, 2020
michaeljmarshall pushed a commit to michaeljmarshall/kop that referenced this pull request Jan 24, 2023
This pull request was closed.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
area/protocol type/feature Indicates new functionality
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants