Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor:优化registry中的代码结构逻辑 #34

Merged
merged 5 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,24 @@
import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.listener.ServiceListener;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.ServiceChangeEvent;
import com.tencent.polaris.api.utils.StringUtils;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.common.registry.BaseBootConfigHandler;
import com.tencent.polaris.common.registry.BootConfigHandler;
import com.tencent.polaris.common.registry.Consts;
import com.tencent.polaris.common.registry.ConvertUtils;
import com.tencent.polaris.common.registry.PolarisOperator;
import com.tencent.polaris.common.registry.PolarisOperators;
import com.tencent.polaris.common.utils.ExtensionConsts;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;
import org.apache.dubbo.rpc.cluster.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -37,28 +45,16 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.constants.RegistryConstants;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.support.FailbackRegistry;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.cluster.Constants;
import org.apache.dubbo.rpc.cluster.RouterFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.DEFAULT_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;

public class PolarisRegistry extends FailbackRegistry {

private static final Logger LOGGER = LoggerFactory.getLogger(PolarisRegistry.class);

private static final TaskScheduler taskScheduler = new TaskScheduler();

private final Set<URL> registeredInstances = new ConcurrentHashSet<>();

private final AtomicBoolean destroyed = new AtomicBoolean(false);
Expand All @@ -67,54 +63,22 @@ public class PolarisRegistry extends FailbackRegistry {

private final PolarisOperator polarisOperator;

private final boolean hasCircuitBreaker;

private final boolean hasRouter;

public PolarisRegistry(URL url) {
this(url, new BaseBootConfigHandler());
}

// for test
public PolarisRegistry(URL url, BootConfigHandler... handlers) {
super(url);
polarisOperator = new PolarisOperator(url.getHost(), url.getPort(), url.getParameters(), handlers);
PolarisOperators.INSTANCE.addPolarisOperator(polarisOperator);
ExtensionLoader<RouterFactory> routerExtensionLoader = ExtensionLoader.getExtensionLoader(RouterFactory.class);
hasRouter = routerExtensionLoader.hasExtension(ExtensionConsts.PLUGIN_ROUTER_NAME);
ExtensionLoader<Filter> filterExtensionLoader = ExtensionLoader.getExtensionLoader(Filter.class);
hasCircuitBreaker = filterExtensionLoader.hasExtension(ExtensionConsts.PLUGIN_CIRCUITBREAKER_NAME);
}

private URL buildRouterURL(URL consumerUrl) {
URL routerURL = null;
if (hasRouter) {
URL registryURL = getUrl();
routerURL = new URL(RegistryConstants.ROUTE_PROTOCOL, registryURL.getHost(), registryURL.getPort());
routerURL = routerURL.setServiceInterface(CommonConstants.ANY_VALUE);
routerURL = routerURL.addParameter(Constants.ROUTER_KEY, ExtensionConsts.PLUGIN_ROUTER_NAME);
String consumerGroup = consumerUrl.getParameter(CommonConstants.GROUP_KEY);
String consumerVersion = consumerUrl.getParameter(CommonConstants.VERSION_KEY);
String consumerClassifier = consumerUrl.getParameter(CommonConstants.CLASSIFIER_KEY);
if (null != consumerGroup) {
routerURL = routerURL.addParameter(CommonConstants.GROUP_KEY, consumerGroup);
}
if (null != consumerVersion) {
routerURL = routerURL.addParameter(CommonConstants.VERSION_KEY, consumerVersion);
}
if (null != consumerClassifier) {
routerURL = routerURL.addParameter(CommonConstants.CLASSIFIER_KEY, consumerClassifier);
}
}
return routerURL;
polarisOperator = PolarisOperators.INSTANCE.loadOrStore(url.getHost(), url.getPort(), url.getParameters(), handlers);
}

@Override
public void doRegister(URL url) {
if (!shouldRegister(url)) {
return;
}
LOGGER.info("[POLARIS] register service to polaris: {}", url.toString());
LOGGER.info("[POLARIS] register service to polaris: {}", url);
Map<String, String> metadata = new HashMap<>(url.getParameters());
metadata.put(CommonConstants.PATH_KEY, url.getPath());
int port = url.getPort();
Expand All @@ -138,7 +102,7 @@ public void doUnregister(URL url) {
if (!shouldRegister(url)) {
return;
}
LOGGER.info("[POLARIS] unregister service from polaris: {}", url.toString());
LOGGER.info("[POLARIS] unregister service from polaris: {}", url);
int port = url.getPort();
if (port > 0) {
polarisOperator.deregister(url.getServiceInterface(), url.getHost(), url.getPort());
Expand All @@ -155,79 +119,27 @@ public void destroy() {
doUnregister(url);
}
polarisOperator.destroy();
taskScheduler.destroy();
}
}

@Override
public void doSubscribe(URL url, NotifyListener listener) {
String service = url.getServiceInterface();
Instance[] instances = polarisOperator.getAvailableInstances(service, !hasCircuitBreaker);
Instance[] instances = polarisOperator.getAvailableInstances(service, true);
onInstances(url, listener, instances);
LOGGER.info("[POLARIS] submit watch task for service {}", service);
taskScheduler.submitWatchTask(new WatchTask(url, listener, service));
}

private class WatchTask implements Runnable {

private final String service;

private final ServiceListener serviceListener;

private final NotifyListener listener;

private final FetchTask fetchTask;

public WatchTask(URL url, NotifyListener listener,
String service) {
this.service = service;
this.listener = listener;
fetchTask = new FetchTask(url, listener);
serviceListener = new ServiceListener() {
@Override
public void onEvent(ServiceChangeEvent event) {
PolarisRegistry.taskScheduler.submitFetchTask(fetchTask);
serviceListeners.computeIfAbsent(listener, notifyListener -> {
ServiceListener serviceListener = event -> {
try {
Instance[] curInstances = polarisOperator.getAvailableInstances(service, true);
onInstances(url, listener, curInstances);
} catch (PolarisException e) {
LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString());
}
};
}

@Override
public void run() {
boolean result = polarisOperator.watchService(service, serviceListener);
if (result) {
serviceListeners.put(listener, serviceListener);
PolarisRegistry.taskScheduler.submitFetchTask(fetchTask);
return;
}
PolarisRegistry.taskScheduler.submitWatchTask(this);
}
}

private class FetchTask implements Runnable {

private final String service;

private final URL url;

private final NotifyListener listener;

public FetchTask(URL url, NotifyListener listener) {
this.service = url.getServiceInterface();
this.url = url;
this.listener = listener;
}

@Override
public void run() {
Instance[] instances;
try {
instances = polarisOperator.getAvailableInstances(service, !hasCircuitBreaker);
} catch (PolarisException e) {
LOGGER.error("[POLARIS] fail to fetch instances for service {}: {}", service, e.toString());
return;
}
onInstances(url, listener, instances);
}
polarisOperator.watchService(service, serviceListener);
return serviceListener;
});
}

private void onInstances(URL url, NotifyListener listener, Instance[] instances) {
Expand All @@ -240,11 +152,7 @@ private void onInstances(URL url, NotifyListener listener, Instance[] instances)
urls.add(instanceToURL(requireInterface, instance));
}
}
URL routerURL = buildRouterURL(url);
if (null != routerURL) {
urls.add(routerURL);
}
PolarisRegistry.this.notify(url, listener, urls);
notify(url, listener, toUrlWithEmpty(url, urls));
}

private static URL instanceToURL(String requireInterface, Instance instance) {
Expand Down Expand Up @@ -275,68 +183,25 @@ private static URL instanceToURL(String requireInterface, Instance instance) {
newMetadata);
}

private static class TaskScheduler {

private final ExecutorService fetchExecutor = Executors
.newSingleThreadExecutor(new NamedThreadFactory("agent-fetch"));

private final ExecutorService watchExecutor = Executors
.newSingleThreadExecutor(new NamedThreadFactory("agent-retry-watch"));

private final AtomicBoolean executorDestroyed = new AtomicBoolean(false);

private final Object lock = new Object();

void submitFetchTask(Runnable fetchTask) {
if (executorDestroyed.get()) {
return;
}
synchronized (lock) {
if (executorDestroyed.get()) {
return;
}
fetchExecutor.submit(fetchTask);
}
}

void submitWatchTask(Runnable watchTask) {
if (executorDestroyed.get()) {
return;
}
synchronized (lock) {
if (executorDestroyed.get()) {
return;
}
watchExecutor.submit(watchTask);
}
}

boolean isDestroyed() {
return executorDestroyed.get();
}

void destroy() {
synchronized (lock) {
if (executorDestroyed.compareAndSet(false, true)) {
fetchExecutor.shutdown();
watchExecutor.shutdown();
}
}
private List<URL> toUrlWithEmpty(URL providerUrl, List<URL> urls) {
if (CollectionUtils.isEmpty(urls)) {
LOGGER.warn("[POLARIS] received empty url address list, will clear current available addresses");
URL empty = URLBuilder.from(providerUrl)
.setProtocol(EMPTY_PROTOCOL)
.addParameter(CATEGORY_KEY, DEFAULT_CATEGORY)
.build();
urls.add(empty);
}
return urls;
}

@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
LOGGER.info("[polaris] unsubscribe service: {}", url.toString());
taskScheduler.submitWatchTask(new Runnable() {
@Override
public void run() {
ServiceListener serviceListener = serviceListeners.get(listener);
if (null != serviceListener) {
polarisOperator.unwatchService(url.getServiceInterface(), serviceListener);
}
}
});
ServiceListener serviceListener = serviceListeners.get(listener);
if (null != serviceListener) {
polarisOperator.unwatchService(url.getServiceInterface(), serviceListener);
}
}

@Override
Expand Down
Loading