Skip to content

Commit

Permalink
Support Curator Service Discovery and Spring Cloud ZooKeeper (#2749)
Browse files Browse the repository at this point in the history
Related: #2673

Motivation:

It will be nice if we support Curator Service Discovery and Spring Cloud Zookeeper.

Modifications:
- Add `ZookeeperRegistrationSpec` and `ZookeeperDiscoverySpec` to specify whether use legacy format or Curator compatible format.
  - (Breaking) You should specify `ZookeeperRegistrationSpec` When creating `ZookeeperUpdatingListener`.
  - (Breaking) You should specify `ZookeeperDiscoverySpec` When creating `ZookeeperEndpointGroup`.
- (Breaking)
  - `NodeValueCodec` is gone.
    - You now have to use `ZookeeperRegistrationSpec` and `ZookeeperDiscoverySpec` to encode and decode.
    - `ZooKeeperEndpointGroupBuilder.codec(...)` and `ZooKeeperUpatingListenerBuilder.codec(...)` are gone as well.

Result:
- You can now use Armeria client and server with Curator Service Discovery.
  • Loading branch information
minwoox committed Jun 3, 2020
1 parent 31f9f6b commit 7fe6daf
Show file tree
Hide file tree
Showing 28 changed files with 1,353 additions and 485 deletions.
32 changes: 12 additions & 20 deletions core/src/main/java/com/linecorp/armeria/server/Server.java
Expand Up @@ -197,19 +197,19 @@ public ServerPort activePort(SessionProtocol protocol) {

@Nullable
private ServerPort activePort0(@Nullable SessionProtocol protocol) {
ServerPort candidate = null;
synchronized (activePorts) {
for (ServerPort serverPort : activePorts.values()) {
if (!isLocalPort(serverPort, protocol)) {
return serverPort;
}
}
for (ServerPort serverPort : activePorts.values()) {
if (protocol == null || serverPort.hasProtocol(protocol)) {
return serverPort;
if (!isLocalPort(serverPort)) {
return serverPort;
} else if (candidate == null) {
candidate = serverPort;
}
}
}
}
return null;
return candidate;
}

/**
Expand All @@ -235,7 +235,8 @@ public int activeLocalPort(SessionProtocol protocol) {
private int activeLocalPort0(@Nullable SessionProtocol protocol) {
synchronized (activePorts) {
return activePorts.values().stream()
.filter(activePort -> isLocalPort(activePort, protocol))
.filter(activePort -> (protocol == null || activePort.hasProtocol(protocol)) &&
isLocalPort(activePort))
.findFirst()
.orElseThrow(() -> new IllegalStateException(
(protocol == null ? "no active local ports: "
Expand Down Expand Up @@ -720,7 +721,7 @@ public void operationComplete(ChannelFuture f) {
}

if (logger.isInfoEnabled()) {
if (isLocalPort(actualPort, null)) {
if (isLocalPort(actualPort)) {
port.protocols().forEach(p -> logger.info(
"Serving {} at {} - {}://127.0.0.1:{}/",
p.name(), localAddress, p.uriText(), localAddress.getPort()));
Expand All @@ -746,17 +747,8 @@ private static String bossThreadName(ServerPort port) {
return "armeria-boss-" + protocolNames + '-' + localHostName + ':' + localAddr.getPort();
}

private static boolean isLocalPort(ServerPort serverPort, @Nullable SessionProtocol protocol) {
private static boolean isLocalPort(ServerPort serverPort) {
final InetAddress address = serverPort.localAddress().getAddress();

if (!address.isAnyLocalAddress() && !address.isLoopbackAddress()) {
return false;
}

if (protocol == null) {
return true;
}

return serverPort.hasProtocol(protocol);
return address.isAnyLocalAddress() || address.isLoopbackAddress();
}
}
6 changes: 5 additions & 1 deletion dependencies.yml
Expand Up @@ -321,11 +321,15 @@ net.shibboleth.utilities:
# See: https://github.com/apache/curator/blob/master/pom.xml
org.apache.curator:
curator-recipes:
version: '4.3.0'
version: &CURATOR_VERSION '4.3.0'
javadocs:
- https://static.javadoc.io/org.apache.curator/curator-recipes/4.3.0/
exclusions:
- org.apache.zookeeper:zookeeper
curator-x-discovery:
version: *CURATOR_VERSION
javadocs:
- https://www.javadoc.io/doc/org.apache.curator/curator-x-discovery/4.3.0/

org.apache.hbase:
hbase-shaded-client:
Expand Down
Expand Up @@ -238,7 +238,7 @@ private static PortWrapper portWrapper(Server server, PortWrapper oldPortWrapper
return oldPortWrapper;
}
}
logger.warn("The port number: {} (expected one of activePorts: {})",
logger.warn("The specified port number {} does not exist. (expected one of activePorts: {})",
oldPortWrapper.getPort(), server.activePorts());
}

Expand Down
17 changes: 1 addition & 16 deletions zookeeper/build.gradle
@@ -1,26 +1,11 @@
final def DROPWIZARD_VERSION = '3.2.6'
final def SNAPPY_VERSION = '1.1.7.5'

dependencies {
// Curator
api 'org.apache.curator:curator-recipes'
api 'org.apache.curator:curator-x-discovery'

// ZooKeeper
api 'org.apache.zookeeper:zookeeper'

implementation("io.dropwizard.metrics:metrics-core") {
version {
// Will fail the build if the override doesn't work
strictly DROPWIZARD_VERSION
}
}
implementation("org.xerial.snappy:snappy-java") {
version {
// Will fail the build if the override doesn't work
strictly SNAPPY_VERSION
}
}

// ZooKeeper JUnit
testImplementation 'org.dmonix.junit:zookeeper-junit'
}
@@ -0,0 +1,55 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.client.zookeeper;

import java.util.function.Function;

import org.apache.curator.x.discovery.ServiceInstance;

import com.google.common.base.MoreObjects;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.internal.common.zookeeper.CuratorXNodeValueCodec;

final class CuratorDiscoverySpec implements ZookeeperDiscoverySpec {

private final String path;
private final Function<? super ServiceInstance<?>, Endpoint> converter;

CuratorDiscoverySpec(
String serviceName, Function<? super ServiceInstance<?>, Endpoint> converter) {
path = '/' + serviceName;
this.converter = converter;
}

@Override
public String path() {
return path;
}

@Override
public Endpoint decode(byte[] data) {
return converter.apply(CuratorXNodeValueCodec.INSTANCE.decode(data));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("path", path)
.add("converter", converter)
.toString();
}
}
@@ -0,0 +1,110 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.client.zookeeper;

import static com.google.common.base.Preconditions.checkState;
import static com.linecorp.armeria.internal.common.zookeeper.ZookeeperPathUtil.validatePath;
import static java.util.Objects.requireNonNull;

import java.util.function.Function;

import javax.annotation.Nullable;

import org.apache.curator.x.discovery.ServiceInstance;

import com.linecorp.armeria.client.Endpoint;

/**
* Builds a {@link ZookeeperDiscoverySpec} for
* <a href="https://curator.apache.org/curator-x-discovery/index.html">Curator Service Discovery</a>.
*/
public final class CuratorDiscoverySpecBuilder {

private final String serviceName;
@Nullable
private String instanceId;
@Nullable
private Boolean useSsl;
@Nullable
private Function<? super ServiceInstance<?>, Endpoint> converter;

/**
* Creates a new instance.
*/
CuratorDiscoverySpecBuilder(String serviceName) {
this.serviceName = validatePath(serviceName, "serviceName");
}

/**
* Sets the specified instance ID. If this is set, the {@link ZooKeeperEndpointGroup} will only connect to
* the instance.
*/
public CuratorDiscoverySpecBuilder instanceId(String instanceId) {
checkState(converter == null, "converter() and instanceId() are mutually exclusive.");
this.instanceId = requireNonNull(instanceId, "instanceId");
return this;
}

/**
* Sets whether to connect an {@link Endpoint} using {@code sslPort} of {@link ServiceInstance}.
*/
public CuratorDiscoverySpecBuilder useSsl(boolean useSsl) {
checkState(converter == null, "converter() and useSsl() are mutually exclusive.");
this.useSsl = useSsl;
return this;
}

/**
* Sets the specified converter to convert a {@link ServiceInstance} into an {@link Endpoint}.
* If you don't want to connect to the service, you can simply return {@code null} in the converter.
*/
public CuratorDiscoverySpecBuilder converter(
Function<? super ServiceInstance<?>, Endpoint> converter) {
checkState(instanceId == null, "converter() and instanceId() are mutually exclusive.");
checkState(useSsl == null, "converter() and useSsl() are mutually exclusive.");
this.converter = requireNonNull(converter, "converter");
return this;
}

private Function<? super ServiceInstance<?>, Endpoint> converter() {
if (converter != null) {
return converter;
}
return instance -> {
if (!instance.isEnabled()) {
return null;
}
if (instanceId != null && !instanceId.equals(instance.getId())) {
return null;
}
if (useSsl != null && useSsl && instance.getSslPort() != null) {
return Endpoint.of(instance.getAddress(), instance.getSslPort());
}

if (instance.getPort() != null) {
return Endpoint.of(instance.getAddress(), instance.getPort());
}
return Endpoint.of(instance.getAddress());
};
}

/**
* Returns a newly-created {@link ZookeeperDiscoverySpec} based on the properties set so far.
*/
public ZookeeperDiscoverySpec build() {
return new CuratorDiscoverySpec(serviceName, converter());
}
}
@@ -0,0 +1,36 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.client.zookeeper;

import javax.annotation.Nullable;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.internal.common.zookeeper.LegacyNodeValueCodec;

enum LegacyZookeeperDiscoverySpec implements ZookeeperDiscoverySpec {
INSTANCE;

@Nullable
@Override
public String path() {
return null;
}

@Override
public Endpoint decode(byte[] zNodeValue) {
return LegacyNodeValueCodec.INSTANCE.decode(zNodeValue);
}
}

0 comments on commit 7fe6daf

Please sign in to comment.