Skip to content

Commit

Permalink
fixes #74 update ConsulRegistry to use one layer of cache structure
Browse files Browse the repository at this point in the history
  • Loading branch information
stevehu committed Jul 6, 2017
1 parent 42488b4 commit c4ef187
Showing 1 changed file with 33 additions and 23 deletions.
56 changes: 33 additions & 23 deletions consul/src/main/java/com/networknt/consul/ConsulRegistry.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.networknt.consul;

import com.networknt.config.Config;
import com.networknt.registry.URLParamType;
import com.networknt.consul.client.ConsulClient;
import com.networknt.registry.support.command.CommandFailbackRegistry;
Expand All @@ -24,7 +25,7 @@ public class ConsulRegistry extends CommandFailbackRegistry {
private int lookupInterval;

// service local cache. key: serviceName, value: <service url list>
private ConcurrentHashMap<String, ConcurrentHashMap<String, List<URL>>> serviceCache = new ConcurrentHashMap<String, ConcurrentHashMap<String, List<URL>>>();
private ConcurrentHashMap<String, List<URL>> serviceCache = new ConcurrentHashMap<String, List<URL>>();
// command local cache. key: serviceName, value: command content
private ConcurrentHashMap<String, String> commandCache = new ConcurrentHashMap<String, String>();

Expand Down Expand Up @@ -183,22 +184,19 @@ protected void unsubscribeCommand(URL url, CommandListener listener) {
@Override
protected List<URL> discoverService(URL url) {
String serviceName = url.getPath();
List<URL> serviceUrls = new ArrayList<>();
ConcurrentHashMap<String, List<URL>> serviceMap = serviceCache.get(serviceName);
if (serviceMap == null) {
if(logger.isDebugEnabled()) logger.debug("serviceName = " + serviceName);
List<URL> urls = serviceCache.get(serviceName);
if (urls == null) {
synchronized (serviceName.intern()) {
serviceMap = serviceCache.get(serviceName);
if (serviceMap == null) {
ConcurrentHashMap<String, List<URL>> urls = lookupServiceUpdate(serviceName);
updateServiceCache(serviceName, urls, false);
serviceMap = serviceCache.get(serviceName);
urls = serviceCache.get(serviceName);
if (urls == null) {
ConcurrentHashMap<String, List<URL>> serviceUrls = lookupServiceUpdate(serviceName);
updateServiceCache(serviceName, serviceUrls, false);
urls = serviceCache.get(serviceName);
}
}
}
if (serviceMap != null) {
serviceUrls = serviceMap.get(serviceName);
}
return serviceUrls;
return urls;
}

@Override
Expand All @@ -211,21 +209,28 @@ protected String discoverCommand(URL url) {

private ConcurrentHashMap<String, List<URL>> lookupServiceUpdate(String serviceName) {
Long lastConsulIndexId = lookupServices.get(serviceName) == null ? 0L : lookupServices.get(serviceName);
if(logger.isDebugEnabled()) logger.debug("serviceName = " + serviceName + " lastConsulIndexId = " + lastConsulIndexId);
ConsulResponse<List<ConsulService>> response = lookupConsulService(serviceName, lastConsulIndexId);
if(logger.isDebugEnabled()) {
try {
logger.debug("response = " + Config.getInstance().getMapper().writeValueAsString(response));
} catch (Exception e) {}
}
if (response != null) {
List<ConsulService> services = response.getValue();
if(logger.isDebugEnabled()) try {logger.debug("services = " + Config.getInstance().getMapper().writeValueAsString(services));} catch (Exception e) {}
if (services != null && !services.isEmpty()
&& response.getConsulIndex() > lastConsulIndexId) {
ConcurrentHashMap<String, List<URL>> serviceUrls = new ConcurrentHashMap<String, List<URL>>();
for (ConsulService service : services) {
try {
URL url = ConsulUtils.buildUrl(service);
String cluster = ConsulUtils.getUrlClusterInfo(url);
List<URL> urlList = serviceUrls.get(cluster);
List<URL> urlList = serviceUrls.get(serviceName);
if (urlList == null) {
urlList = new ArrayList<>();
serviceUrls.put(cluster, urlList);
serviceUrls.put(serviceName, urlList);
}
if(logger.isDebugEnabled()) logger.debug("lookupServiceUpdate url = " + url);
urlList.add(url);
} catch (Exception e) {
logger.error("convert consul service to url fail! service:" + service, e);
Expand Down Expand Up @@ -268,19 +273,24 @@ private ConsulResponse<List<ConsulService>> lookupConsulService(String serviceNa
*/
private void updateServiceCache(String serviceName, ConcurrentHashMap<String, List<URL>> serviceUrls, boolean needNotify) {
if (serviceUrls != null && !serviceUrls.isEmpty()) {
ConcurrentHashMap<String, List<URL>> serviceMap = serviceCache.get(serviceName);
if (serviceMap == null) {
serviceCache.put(serviceName, serviceUrls);
List<URL> urls = serviceCache.get(serviceName);
if (urls == null) {
if(logger.isDebugEnabled()) {
try {
logger.debug("serviceUrls = " + Config.getInstance().getMapper().writeValueAsString(serviceUrls));
} catch(Exception e) {
}
}
serviceCache.put(serviceName, serviceUrls.get(serviceName));
}
for (Map.Entry<String, List<URL>> entry : serviceUrls.entrySet()) {
boolean change = true;
if (serviceMap != null) {
List<URL> oldUrls = serviceMap.get(entry.getKey());
if (urls != null) {
List<URL> newUrls = entry.getValue();
if (newUrls == null || newUrls.isEmpty() || ConsulUtils.isSame(entry.getValue(), oldUrls)) {
if (newUrls == null || newUrls.isEmpty() || ConsulUtils.isSame(newUrls, urls)) {
change = false;
} else {
serviceMap.put(entry.getKey(), newUrls);
serviceCache.put(serviceName, newUrls);
}
}
if (change && needNotify) {
Expand Down

0 comments on commit c4ef187

Please sign in to comment.