Skip to content

Commit

Permalink
Merge pull request apache#901 from huyuanxin/consul-regisrty
Browse files Browse the repository at this point in the history
[ISSUE apache#900] Support Consul Service Registry
  • Loading branch information
xwm1992 committed Jul 28, 2022
2 parents 98669ea + 62c5b49 commit a1a574c
Show file tree
Hide file tree
Showing 8 changed files with 639 additions and 1 deletion.
24 changes: 24 additions & 0 deletions eventmesh-registry-plugin/eventmesh-registry-consul/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

dependencies {
implementation 'com.ecwid.consul:consul-api:1.4.5'
implementation 'org.apache.httpcomponents:httpclient:4.5.13'
implementation project(":eventmesh-registry-plugin:eventmesh-registry-api")
implementation project(":eventmesh-common")
testImplementation "org.mockito:mockito-core"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.registry.consul.service;

import org.apache.eventmesh.api.exception.RegistryException;
import org.apache.eventmesh.api.registry.RegistryService;
import org.apache.eventmesh.api.registry.dto.EventMeshDataInfo;
import org.apache.eventmesh.api.registry.dto.EventMeshRegisterInfo;
import org.apache.eventmesh.api.registry.dto.EventMeshUnRegisterInfo;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.utils.ConfigurationContextUtil;

import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.ConsulRawClient;
import com.ecwid.consul.v1.agent.model.NewService;
import com.ecwid.consul.v1.agent.model.Service;
import com.ecwid.consul.v1.health.HealthServicesRequest;
import com.ecwid.consul.v1.health.model.HealthService;

public class ConsulRegistryService implements RegistryService {

public static final String IP_PORT_SEPARATOR = ":";

private static final Logger logger = LoggerFactory.getLogger(ConsulRegistryService.class);

private static final AtomicBoolean INIT_STATUS = new AtomicBoolean(false);

private static final AtomicBoolean START_STATUS = new AtomicBoolean(false);

private String consulHost;

private String consulPort;

private ConsulClient consulClient;

private String token;

@Override
public void init() throws RegistryException {
if (INIT_STATUS.compareAndSet(false, true)) {
for (String key : ConfigurationContextUtil.KEYS) {
CommonConfiguration commonConfiguration = ConfigurationContextUtil.get(key);
if (null != commonConfiguration) {
String namesrvAddr = commonConfiguration.namesrvAddr;
if (StringUtils.isBlank(namesrvAddr)) {
throw new RegistryException("namesrvAddr cannot be null");
}
String[] addr = namesrvAddr.split(":");
if (addr.length != 2) {
throw new RegistryException("Illegal namesrvAddr");
}
this.consulHost = addr[0];
this.consulPort = addr[1];
break;
}
}
}
}

@Override
public void start() throws RegistryException {
consulClient = new ConsulClient(new ConsulRawClient(consulHost, Integer.parseInt(consulPort)));
}

@Override
public void shutdown() throws RegistryException {
INIT_STATUS.compareAndSet(true, false);
START_STATUS.compareAndSet(true, false);
consulClient = null;
}

@Override
public boolean register(EventMeshRegisterInfo eventMeshRegisterInfo) throws RegistryException {
try {
String[] ipPort = eventMeshRegisterInfo.getEndPoint().split(IP_PORT_SEPARATOR);
NewService service = new NewService();
service.setPort(Integer.parseInt(ipPort[1]));
service.setAddress(ipPort[0]);
service.setName(eventMeshRegisterInfo.getEventMeshName());
service.setId(eventMeshRegisterInfo.getEventMeshClusterName() + "-" + eventMeshRegisterInfo.getEventMeshName());
consulClient.agentServiceRegister(service, token);
} catch (Exception e) {
throw new RegistryException(e.getMessage());
}
logger.info("EventMesh successfully registered to consul");
return true;
}

@Override
public boolean unRegister(EventMeshUnRegisterInfo eventMeshUnRegisterInfo) throws RegistryException {
try {
consulClient.agentServiceDeregister(eventMeshUnRegisterInfo.getEventMeshClusterName() + "-" + eventMeshUnRegisterInfo.getEventMeshName(),
token);
} catch (Exception e) {
throw new RegistryException(e.getMessage());
}
logger.info("EventMesh successfully unregistered to consul");
return true;
}

@Override
public List<EventMeshDataInfo> findEventMeshInfoByCluster(String clusterName) throws RegistryException {
HealthServicesRequest request = HealthServicesRequest.newBuilder().setPassing(true).setToken(token).build();
List<HealthService> healthServices = consulClient.getHealthServices(clusterName, request).getValue();
List<EventMeshDataInfo> eventMeshDataInfos = new ArrayList<>();
healthServices.forEach(healthService -> {
HealthService.Service service = healthService.getService();
String[] split = service.getId().split("-");
eventMeshDataInfos.add(new EventMeshDataInfo(split[0], split[1], service.getAddress() + ":" + service.getPort(), 0, service.getMeta()));
});
return eventMeshDataInfos;
}

@Override
public List<EventMeshDataInfo> findAllEventMeshInfo() throws RegistryException {
Map<String, Service> agentServices = consulClient.getAgentServices().getValue();
List<EventMeshDataInfo> eventMeshDataInfos = new ArrayList<>();
agentServices.forEach((k, v) -> {
String[] split = v.getId().split("-");
eventMeshDataInfos.add(new EventMeshDataInfo(split[0], split[1], v.getAddress() + ":" + v.getPort(), 0, v.getMeta()));
});
return eventMeshDataInfos;
}

@Override
public Map<String, Map<String, Integer>> findEventMeshClientDistributionData(String clusterName, String group, String purpose)
throws RegistryException {
return Collections.emptyMap();
}

@Override
public void registerMetadata(Map<String, String> metadataMap) {

}

public ConsulClient getConsulClient() {
return consulClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.registry.consul.service;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.agent.model.NewService;

public class HeatBeatScheduler {

private final ConsulClient consulClient;

private final ConcurrentHashMap<String, NewService> heartBeatMap = new ConcurrentHashMap<>();

private final ScheduledExecutorService heartbeatServiceExecutor = new ScheduledThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
r -> {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("ConsulHeartbeatService");
return thread;
}
);

public HeatBeatScheduler(ConsulClient consulClient) {
this.consulClient = consulClient;
}

/**
* start service heartbeat
*
* @param newService service
* @param aclToken token
*/
protected void startHeartBeat(NewService newService, String aclToken) {
heartbeatServiceExecutor.execute(new HeartBeat(newService, aclToken));
heartBeatMap.put(newService.getName(), newService);
}

/**
* stop service heartbeat
*
* @param newService service
*/
private void stopHeartBeat(NewService newService) {
heartBeatMap.remove(newService.getName());
}

class HeartBeat implements Runnable {

private static final String CHECK_ID_PREFIX = "service:";

private String checkId;

private final String aclToken;

private final NewService instance;

public HeartBeat(NewService instance, String aclToken) {
this.instance = instance;
this.checkId = instance.getId();
this.aclToken = aclToken;
if (!checkId.startsWith(CHECK_ID_PREFIX)) {
checkId = CHECK_ID_PREFIX + checkId;
}
}

@Override
public void run() {
try {
if (aclToken != null) {
consulClient.agentCheckPass(checkId, aclToken);
return;
}
if (heartBeatMap.contains(instance)) {
consulClient.agentCheckPass(checkId);
}
} finally {
heartbeatServiceExecutor.schedule(this, 3000, TimeUnit.SECONDS);
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

consul=org.apache.eventmesh.registry.consul.service.ConsulRegistryService

0 comments on commit a1a574c

Please sign in to comment.