Skip to content

Commit

Permalink
Polish apache#3984 : Add Service registration and discovery implement…
Browse files Browse the repository at this point in the history
…ation for Zookeeper
  • Loading branch information
mercyblitz committed May 31, 2019
1 parent 9975f82 commit 1df1d1a
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* <p>
* It's compatible with Spring Cloud
*
* @since 2.7.2
* @since 2.7.3
*/
public class ZookeeperInstance {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.event.EventDispatcher;
import org.apache.dubbo.registry.client.EventPublishingServiceRegistry;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceDiscoveryChangeListener;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
Expand All @@ -47,7 +46,7 @@
* Zookeeper {@link ServiceDiscovery} implementation based on
* <a href="https://curator.apache.org/curator-x-discovery/index.html">Apache Curator X Discovery</a>
*/
public class ZookeeperServiceDiscovery extends EventPublishingServiceRegistry implements ServiceDiscovery {
public class ZookeeperServiceDiscovery implements ServiceDiscovery {

private final Logger logger = LoggerFactory.getLogger(getClass());

Expand All @@ -64,43 +63,38 @@ public class ZookeeperServiceDiscovery extends EventPublishingServiceRegistry im
*/
private final Map<String, CuratorWatcher> watcherCaches = new ConcurrentHashMap<>();

public ZookeeperServiceDiscovery(URL registerURL) throws Exception {
this.curatorFramework = buildCuratorFramework(registerURL);
this.rootPath = ROOT_PATH.getParameterValue(registerURL);
public ZookeeperServiceDiscovery(URL connectionURL) throws Exception {
this.curatorFramework = buildCuratorFramework(connectionURL);
this.rootPath = ROOT_PATH.getParameterValue(connectionURL);
this.serviceDiscovery = buildServiceDiscovery(curatorFramework, rootPath);
this.dispatcher = EventDispatcher.getDefaultExtension();
}

@Override
protected void doRegister(ServiceInstance serviceInstance) throws RuntimeException {
public void register(ServiceInstance serviceInstance) throws RuntimeException {
doInServiceRegistry(serviceDiscovery -> {
serviceDiscovery.registerService(build(serviceInstance));
});
}

@Override
protected void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
public void update(ServiceInstance serviceInstance) throws RuntimeException {
doInServiceRegistry(serviceDiscovery -> {
serviceDiscovery.updateService(build(serviceInstance));
});
}

@Override
protected void doUnregister(ServiceInstance serviceInstance) throws RuntimeException {
public void unregister(ServiceInstance serviceInstance) throws RuntimeException {
doInServiceRegistry(serviceDiscovery -> {
serviceDiscovery.unregisterService(build(serviceInstance));
});
}

@Override
protected void doStart() {
public void start() {
doInServiceRegistry(serviceDiscovery -> {
serviceDiscovery.start();
});
}

@Override
protected void doStop() {
public void stop() {
doInServiceRegistry(serviceDiscovery -> {
serviceDiscovery.close();
});
Expand All @@ -117,7 +111,7 @@ public List<ServiceInstance> getInstances(String serviceName) throws NullPointer
}

@Override
public void addServiceDiscoveryChangeListener(String serviceName, ServiceDiscoveryChangeListener listener)
public void addServiceInstancesChangedListener(String serviceName, ServiceInstancesChangedListener listener)
throws NullPointerException, IllegalArgumentException {
addServiceWatcherIfAbsent(serviceName);
dispatcher.addEventListener(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.dubbo.event.EventDispatcher;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceDiscoveryChangeEvent;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;

import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;
Expand All @@ -33,9 +33,9 @@
/**
* Zookeeper {@link ServiceDiscovery} Change {@link CuratorWatcher watcher} only interests in
* {@link Watcher.Event.EventType#NodeChildrenChanged} and {@link Watcher.Event.EventType#NodeDataChanged} event types,
* which will multicast a {@link ServiceDiscoveryChangeEvent} when the service node has been changed.
* which will multicast a {@link ServiceInstancesChangedEvent} when the service node has been changed.
*
* @since 2.7.2
* @since 2.7.3
*/
public class ZookeeperServiceDiscoveryChangeWatcher implements CuratorWatcher {

Expand All @@ -58,7 +58,7 @@ public void process(WatchedEvent event) throws Exception {
Watcher.Event.EventType eventType = event.getType();

if (NodeChildrenChanged.equals(eventType) || NodeDataChanged.equals(eventType)) {
dispatcher.dispatch(new ServiceDiscoveryChangeEvent(serviceName, getServiceInstances(serviceName)));
dispatcher.dispatch(new ServiceInstancesChangedEvent(serviceName, getServiceInstances(serviceName)));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.zookeeper;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceDiscoveryFactory;

/**
* The zookeeper {@link ServiceDiscoveryFactory} implementation
*
* @see ServiceDiscoveryFactory
* @since 2.7.3
*/
public class ZookeeperServiceDiscoveryFactory implements ServiceDiscoveryFactory {

@Override
public boolean supports(URL connectionURL) {
return "zookeeper".equalsIgnoreCase(connectionURL.getProtocol());
}

@Override
public ServiceDiscovery create(URL connectionURL) {
try {
return new ZookeeperServiceDiscovery(connectionURL);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* The enumeration for the parameters of {@link CuratorFramework}
*
* @see CuratorFramework
* @since 2.7.2
* @since 2.7.3
*/
public enum CuratorFrameworkParams {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@
/**
* Curator Framework Utilities Class
*
* @since 2.7.2
* @since 2.7.3
*/
public abstract class CuratorFrameworkUtils {

public static ZookeeperServiceDiscovery buildZookeeperServiceDiscovery(URL registerURL) throws Exception {
return new ZookeeperServiceDiscovery(registerURL);
public static ZookeeperServiceDiscovery buildZookeeperServiceDiscovery(URL connectionURL) throws Exception {
return new ZookeeperServiceDiscovery(connectionURL);
}

public static ServiceDiscovery<ZookeeperInstance> buildServiceDiscovery(CuratorFramework curatorFramework,
Expand All @@ -61,21 +61,21 @@ public static ServiceDiscovery<ZookeeperInstance> buildServiceDiscovery(CuratorF
.build();
}

public static CuratorFramework buildCuratorFramework(URL registerURL) throws Exception {
public static CuratorFramework buildCuratorFramework(URL connectionURL) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
.connectString(registerURL.getIp() + ":" + registerURL.getPort())
.retryPolicy(buildRetryPolicy(registerURL))
.connectString(connectionURL.getIp() + ":" + connectionURL.getPort())
.retryPolicy(buildRetryPolicy(connectionURL))
.build();
curatorFramework.start();
curatorFramework.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT.getParameterValue(registerURL),
BLOCK_UNTIL_CONNECTED_UNIT.getParameterValue(registerURL));
curatorFramework.blockUntilConnected(BLOCK_UNTIL_CONNECTED_WAIT.getParameterValue(connectionURL),
BLOCK_UNTIL_CONNECTED_UNIT.getParameterValue(connectionURL));
return curatorFramework;
}

public static RetryPolicy buildRetryPolicy(URL registerURL) {
int baseSleepTimeMs = BASE_SLEEP_TIME.getParameterValue(registerURL);
int maxRetries = MAX_RETRIES.getParameterValue(registerURL);
int getMaxSleepMs = MAX_SLEEP.getParameterValue(registerURL);
public static RetryPolicy buildRetryPolicy(URL connectionURL) {
int baseSleepTimeMs = BASE_SLEEP_TIME.getParameterValue(connectionURL);
int maxRetries = MAX_RETRIES.getParameterValue(connectionURL);
int getMaxSleepMs = MAX_SLEEP.getParameterValue(connectionURL);
return new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries, getMaxSleepMs);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.dubbo.registry.zookeeper.ZookeeperServiceDiscoveryFactory
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
/**
* {@link ZookeeperServiceDiscovery} Test
*
* @since 2.7.2
* @since 2.7.3
*/
public class ZookeeperServiceDiscoveryTest {

Expand All @@ -66,7 +66,7 @@ public void init() throws Exception {

this.registryUrl = URL.valueOf("zookeeper://127.0.0.1:" + zkServerPort);
this.discovery = buildZookeeperServiceDiscovery(registryUrl);
this.discovery.start();
this.discovery.start(registryUrl);
}

@AfterEach
Expand Down Expand Up @@ -124,7 +124,7 @@ public void testGetInstances() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);

// Add Listener
discovery.addServiceDiscoveryChangeListener(SERVICE_NAME, event -> {
discovery.addServiceInstancesChangedListener(SERVICE_NAME, event -> {
serviceInstances.addAll(event.getServiceInstances());
latch.countDown();
});
Expand Down

0 comments on commit 1df1d1a

Please sign in to comment.