Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Curator Service Discovery and Spring Cloud ZooKeeper #2749

Merged
merged 16 commits into from Jun 3, 2020
32 changes: 12 additions & 20 deletions core/src/main/java/com/linecorp/armeria/server/Server.java
Expand Up @@ -197,19 +197,19 @@ public ServerPort activePort(SessionProtocol protocol) {

@Nullable
private ServerPort activePort0(@Nullable SessionProtocol protocol) {
ServerPort candidate = null;
synchronized (activePorts) {
for (ServerPort serverPort : activePorts.values()) {
if (!isLocalPort(serverPort, protocol)) {
return serverPort;
}
}
for (ServerPort serverPort : activePorts.values()) {
if (protocol == null || serverPort.hasProtocol(protocol)) {
return serverPort;
if (!isLocalPort(serverPort)) {
return serverPort;
} else if (candidate == null) {
candidate = serverPort;
}
}
}
}
return null;
return candidate;
}

/**
Expand All @@ -235,7 +235,8 @@ public int activeLocalPort(SessionProtocol protocol) {
private int activeLocalPort0(@Nullable SessionProtocol protocol) {
synchronized (activePorts) {
return activePorts.values().stream()
.filter(activePort -> isLocalPort(activePort, protocol))
.filter(activePort -> (protocol == null || activePort.hasProtocol(protocol)) &&
isLocalPort(activePort))
.findFirst()
.orElseThrow(() -> new IllegalStateException(
(protocol == null ? "no active local ports: "
Expand Down Expand Up @@ -720,7 +721,7 @@ public void operationComplete(ChannelFuture f) {
}

if (logger.isInfoEnabled()) {
if (isLocalPort(actualPort, null)) {
if (isLocalPort(actualPort)) {
port.protocols().forEach(p -> logger.info(
"Serving {} at {} - {}://127.0.0.1:{}/",
p.name(), localAddress, p.uriText(), localAddress.getPort()));
Expand All @@ -746,17 +747,8 @@ private static String bossThreadName(ServerPort port) {
return "armeria-boss-" + protocolNames + '-' + localHostName + ':' + localAddr.getPort();
}

private static boolean isLocalPort(ServerPort serverPort, @Nullable SessionProtocol protocol) {
private static boolean isLocalPort(ServerPort serverPort) {
final InetAddress address = serverPort.localAddress().getAddress();

if (!address.isAnyLocalAddress() && !address.isLoopbackAddress()) {
return false;
}

if (protocol == null) {
return true;
}

return serverPort.hasProtocol(protocol);
return address.isAnyLocalAddress() || address.isLoopbackAddress();
}
}
6 changes: 5 additions & 1 deletion dependencies.yml
Expand Up @@ -321,11 +321,15 @@ net.shibboleth.utilities:
# See: https://github.com/apache/curator/blob/master/pom.xml
org.apache.curator:
curator-recipes:
version: '4.3.0'
version: &CURATOR_VERSION '4.3.0'
javadocs:
- https://static.javadoc.io/org.apache.curator/curator-recipes/4.3.0/
exclusions:
- org.apache.zookeeper:zookeeper
curator-x-discovery:
version: *CURATOR_VERSION
javadocs:
- https://www.javadoc.io/doc/org.apache.curator/curator-x-discovery/4.3.0/

org.apache.hbase:
hbase-shaded-client:
Expand Down
Expand Up @@ -238,7 +238,7 @@ private static PortWrapper portWrapper(Server server, PortWrapper oldPortWrapper
return oldPortWrapper;
}
}
logger.warn("The port number: {} (expected one of activePorts: {})",
logger.warn("The specified port number {} does not exist. (expected one of activePorts: {})",
oldPortWrapper.getPort(), server.activePorts());
}

Expand Down
17 changes: 1 addition & 16 deletions zookeeper/build.gradle
@@ -1,26 +1,11 @@
final def DROPWIZARD_VERSION = '3.2.6'
final def SNAPPY_VERSION = '1.1.7.5'
trustin marked this conversation as resolved.
Show resolved Hide resolved

dependencies {
// Curator
api 'org.apache.curator:curator-recipes'
api 'org.apache.curator:curator-x-discovery'

// ZooKeeper
api 'org.apache.zookeeper:zookeeper'

implementation("io.dropwizard.metrics:metrics-core") {
version {
// Will fail the build if the override doesn't work
strictly DROPWIZARD_VERSION
}
}
implementation("org.xerial.snappy:snappy-java") {
version {
// Will fail the build if the override doesn't work
strictly SNAPPY_VERSION
}
}

// ZooKeeper JUnit
testImplementation 'org.dmonix.junit:zookeeper-junit'
}
@@ -0,0 +1,55 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.client.zookeeper;

import java.util.function.Function;

import org.apache.curator.x.discovery.ServiceInstance;

import com.google.common.base.MoreObjects;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.internal.common.zookeeper.CuratorXNodeValueCodec;

final class CuratorDiscoverySpec implements ZookeeperDiscoverySpec {

private final String path;
private final Function<? super ServiceInstance<?>, Endpoint> converter;

CuratorDiscoverySpec(
String serviceName, Function<? super ServiceInstance<?>, Endpoint> converter) {
path = '/' + serviceName;
trustin marked this conversation as resolved.
Show resolved Hide resolved
this.converter = converter;
}

@Override
public String path() {
return path;
}

@Override
public Endpoint decode(byte[] data) {
return converter.apply(CuratorXNodeValueCodec.INSTANCE.decode(data));
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("path", path)
.add("converter", converter)
.toString();
}
}
@@ -0,0 +1,110 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.client.zookeeper;

import static com.google.common.base.Preconditions.checkState;
import static com.linecorp.armeria.internal.common.zookeeper.ZookeeperPathUtil.validatePath;
import static java.util.Objects.requireNonNull;

import java.util.function.Function;

import javax.annotation.Nullable;

import org.apache.curator.x.discovery.ServiceInstance;

import com.linecorp.armeria.client.Endpoint;

/**
* Builds a {@link ZookeeperDiscoverySpec} for
* <a href="https://curator.apache.org/curator-x-discovery/index.html">Curator Service Discovery</a>.
*/
public final class CuratorDiscoverySpecBuilder {

private final String serviceName;
@Nullable
private String instanceId;
@Nullable
private Boolean useSsl;
@Nullable
private Function<? super ServiceInstance<?>, Endpoint> converter;

/**
* Creates a new instance.
*/
CuratorDiscoverySpecBuilder(String serviceName) {
this.serviceName = validatePath(serviceName, "serviceName");
}

/**
* Sets the specified instance ID. If this is set, the {@link ZooKeeperEndpointGroup} will only connect to
* the instance.
*/
public CuratorDiscoverySpecBuilder instanceId(String instanceId) {
checkState(converter == null, "converter() and instanceId() are mutually exclusive.");
this.instanceId = requireNonNull(instanceId, "instanceId");
return this;
}

/**
* Sets whether to connect an {@link Endpoint} using {@code sslPort} of {@link ServiceInstance}.
*/
public CuratorDiscoverySpecBuilder useSsl(boolean useSsl) {
checkState(converter == null, "converter() and useSsl() are mutually exclusive.");
this.useSsl = useSsl;
return this;
}

/**
* Sets the specified converter to convert a {@link ServiceInstance} into an {@link Endpoint}.
* If you don't want to connect to the service, you can simply return {@code null} in the converter.
*/
public CuratorDiscoverySpecBuilder converter(
Function<? super ServiceInstance<?>, Endpoint> converter) {
checkState(instanceId == null, "converter() and instanceId() are mutually exclusive.");
checkState(useSsl == null, "converter() and useSsl() are mutually exclusive.");
this.converter = requireNonNull(converter, "converter");
return this;
}

private Function<? super ServiceInstance<?>, Endpoint> converter() {
if (converter != null) {
return converter;
}
return instance -> {
if (!instance.isEnabled()) {
return null;
}
if (instanceId != null && !instanceId.equals(instance.getId())) {
return null;
}
if (useSsl != null && useSsl && instance.getSslPort() != null) {
return Endpoint.of(instance.getAddress(), instance.getSslPort());
}

if (instance.getPort() != null) {
return Endpoint.of(instance.getAddress(), instance.getPort());
}
return Endpoint.of(instance.getAddress());
};
}

/**
* Returns a newly-created {@link ZookeeperDiscoverySpec} based on the properties set so far.
*/
public ZookeeperDiscoverySpec build() {
return new CuratorDiscoverySpec(serviceName, converter());
}
}
@@ -0,0 +1,36 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.client.zookeeper;

import javax.annotation.Nullable;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.internal.common.zookeeper.LegacyNodeValueCodec;

enum LegacyZookeeperDiscoverySpec implements ZookeeperDiscoverySpec {
INSTANCE;

@Nullable
@Override
public String path() {
return null;
}

@Override
public Endpoint decode(byte[] zNodeValue) {
return LegacyNodeValueCodec.INSTANCE.decode(zNodeValue);
}
}