Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue#3]And adjust and optimize some codes for runtime. #4

Open
wants to merge 4 commits into
base: runtime-rocketmq
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<directory>src/main/resources</directory>
<excludes>
<exclude>*.xml</exclude>
<exclude>connect.conf</exclude>
<exclude>runtime.conf</exclude>
</excludes>
<filtering>true</filtering>
</resource>
Expand Down
2 changes: 1 addition & 1 deletion run_worker.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
export OMS_RMQ_DIRECT_NAME_SRV=true
echo "run rumtime worker"
cd target/distribution/ && java -cp .:./conf/:./lib/* io.openmessaging.connect.runtime.ConnectStartup -c conf/connect.conf
cd target/distribution/ && java -cp .:./conf/:./lib/* io.openmessaging.connect.runtime.RuntimeStartup -c conf/runtime.conf

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
/**
* Connect controller to access and control all resource in runtime.
*/
public class ConnectController {
public class RuntimeController {

private static final Logger log = LoggerFactory.getLogger(LoggerName.OMS_RUNTIME);

Expand Down Expand Up @@ -87,11 +87,11 @@ public class ConnectController {
*/
private ScheduledExecutorService scheduledExecutorService;

public ConnectController(ConnectConfig connectConfig) {
public RuntimeController(ConnectConfig connectConfig) {

this.connectConfig = connectConfig;
this.messagingAccessWrapper = new MessagingAccessWrapper();
MessagingAccessPoint messageAccessPoint = messagingAccessWrapper.getMessageAccessPoint(connectConfig.getOmsDriverUrl());
MessagingAccessPoint messageAccessPoint = messagingAccessWrapper.getMessageAccessPoint(connectConfig.getRuntimeOmsDriverUrl());
this.clusterManagementService = new ClusterManagementServiceImpl(connectConfig, messageAccessPoint);
this.configManagementService = new ConfigManagementServiceImpl(connectConfig, messageAccessPoint);
this.positionManagementService = new PositionManagementServiceImpl(connectConfig, messageAccessPoint);
Expand All @@ -118,7 +118,7 @@ public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(() -> {

try {
ConnectController.this.configManagementService.persist();
RuntimeController.this.configManagementService.persist();
} catch (Exception e) {
log.error("schedule persist config error.", e);
}
Expand All @@ -128,7 +128,7 @@ public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(() -> {

try {
ConnectController.this.positionManagementService.persist();
RuntimeController.this.positionManagementService.persist();
} catch (Exception e) {
log.error("schedule persist position error.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
/**
* Startup class of the runtime worker.
*/
public class ConnectStartup {
public class RuntimeStartup {

private static final Logger log = LoggerFactory.getLogger(LoggerName.OMS_RUNTIME);

Expand All @@ -51,7 +51,7 @@ public static void main(String[] args) {
start(createConnectController(args));
}

private static void start(ConnectController controller) {
private static void start(RuntimeController controller) {

try {
controller.start();
Expand All @@ -70,7 +70,7 @@ private static void start(ConnectController controller) {
* @param args
* @return
*/
private static ConnectController createConnectController(String[] args) {
private static RuntimeController createConnectController(String[] args) {

try {

Expand Down Expand Up @@ -99,7 +99,7 @@ private static ConnectController createConnectController(String[] args) {
}

// Create controller and initialize.
ConnectController controller = new ConnectController(connectConfig);
RuntimeController controller = new RuntimeController(connectConfig);
controller.initialize();

// Invoked when shutdown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class ConnectConfig {
* OMS driver url, which determine the specific MQ to send and consume message.
* The MQ is used for internal management of the connect runtime.
*/
private String omsDriverUrl = "oms:rocketmq://localhost:9876/default:default";
private String runtimeOmsDriverUrl = "oms:rocketmq://localhost:9876/default:default";

/**
* Http port for REST API.
Expand All @@ -55,14 +55,6 @@ public class ConnectConfig {
*/
private int configPersistInterval = 20 * 1000;

public String getOmsDriverUrl() {
return omsDriverUrl;
}

public void setOmsDriverUrl(String omsDriverUrl) {
this.omsDriverUrl = omsDriverUrl;
}

public String getWorkerId() {
return workerId;
}
Expand Down Expand Up @@ -102,4 +94,12 @@ public int getConfigPersistInterval() {
public void setConfigPersistInterval(int configPersistInterval) {
this.configPersistInterval = configPersistInterval;
}

public String getRuntimeOmsDriverUrl() {
return runtimeOmsDriverUrl;
}

public void setRuntimeOmsDriverUrl(String runtimeOmsDriverUrl) {
this.runtimeOmsDriverUrl = runtimeOmsDriverUrl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ public class RuntimeConfigDefine {
public static final String CONNECTOR_CLASS = "connector-class";

public static final String TASK_CLASS = "task-class";

/**
* OMS driver url for the connector.
* OMS driver url for the runtime connector
*/
public static final String OMS_DRIVER_URL = "oms-driver-url";
public static final String RUNTIME_OMS_DRIVER_URL = "runtime-oms-driver-url";

/**
* Last updated time of the configuration.
Expand All @@ -59,8 +59,8 @@ public class RuntimeConfigDefine {
public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
{
add(CONNECTOR_CLASS);
add(OMS_DRIVER_URL);
add(SOURCE_RECORD_CONVERTER);
add(RUNTIME_OMS_DRIVER_URL);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public synchronized void startTasks(Map<String, List<ConnectKeyValue>> taskConfi

if(task instanceof SourceTask){
Producer producer = messagingAccessWrapper
.getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.OMS_DRIVER_URL)).createProducer();
.getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL)).createProducer();
producer.startup();
WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName,
(SourceTask) task, keyValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import io.openmessaging.connect.runtime.common.ConnAndTaskConfigs;
import io.openmessaging.connect.runtime.common.LoggerName;
import io.openmessaging.connect.runtime.utils.TransferUtils;
import io.openmessaging.connect.runtime.utils.TransferUtil;
import io.openmessaging.connector.api.data.Converter;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
Expand All @@ -41,12 +41,12 @@ public byte[] objectToByte(ConnAndTaskConfigs object) {
Map<String, String> connectorMap = new HashMap<>();
Map<String, String> taskMap = new HashMap<>();
for(String key : configs.getConnectorConfigs().keySet()){
connectorMap.put(key, TransferUtils.keyValueToString(configs.getConnectorConfigs().get(key)));
connectorMap.put(key, TransferUtil.keyValueToString(configs.getConnectorConfigs().get(key)));
}
for(String key : configs.getTaskConfigs().keySet()){
taskMap.put(key, TransferUtils.keyValueListToString(configs.getTaskConfigs().get(key)));
taskMap.put(key, TransferUtil.keyValueListToString(configs.getTaskConfigs().get(key)));
}
return TransferUtils.toJsonString(connectorMap, taskMap).getBytes("UTF-8");
return TransferUtil.toJsonString(connectorMap, taskMap).getBytes("UTF-8");
} catch (Exception e) {
log.error("ConnAndTaskConfigConverter#objectToByte failed", e);
}
Expand All @@ -58,7 +58,7 @@ public ConnAndTaskConfigs byteToObject(byte[] bytes) {

try {
String jsonString = new String(bytes, "UTF-8");
ConnAndTaskConfigs configs = TransferUtils.toConnAndTaskConfigs(jsonString);
ConnAndTaskConfigs configs = TransferUtil.toConnAndTaskConfigs(jsonString);
return configs;
} catch (UnsupportedEncodingException e) {
log.error("ConnAndTaskConfigConverter#byteToObject failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.alibaba.fastjson.JSON;
import io.javalin.Context;
import io.javalin.Javalin;
import io.openmessaging.connect.runtime.ConnectController;
import io.openmessaging.connect.runtime.RuntimeController;
import io.openmessaging.connect.runtime.common.ConnectKeyValue;
import io.openmessaging.connect.runtime.common.LoggerName;
import io.openmessaging.connect.runtime.connectorwrapper.WorkerConnector;
Expand All @@ -38,24 +38,24 @@ public class RestHandler {

private static final Logger log = LoggerFactory.getLogger(LoggerName.OMS_RUNTIME);

private final ConnectController connectController;
private final RuntimeController runtimeController;

public RestHandler(ConnectController connectController){
this.connectController = connectController;
Javalin app = Javalin.start(connectController.getConnectConfig().getHttpPort());
app.get("/connectors/:connectorName", this::handleCreateConnector);
public RestHandler(RuntimeController runtimeController){
this.runtimeController = runtimeController;
Javalin app = Javalin.start(runtimeController.getConnectConfig().getHttpPort());
app.post("/connectors/:connectorName", this::handleCreateConnector);
app.get("/connectors/:connectorName/config", this::handleQueryConnectorConfig);
app.get("/connectors/:connectorName/status", this::handleQueryConnectorStatus);
app.get("/connectors/:connectorName/stop", this::handleStopConnector);
app.delete("/connectors/:connectorName/stop", this::handleStopConnector);
app.get("/getClusterInfo", this::getClusterInfo);
app.get("/getConfigInfo", this::getConfigInfo);
app.get("/getAllocatedInfo", this::getAllocatedInfo);
}

private void getAllocatedInfo(Context context){

Set<WorkerConnector> workerConnectors = connectController.getWorker().getWorkingConnectors();
Set<WorkerSourceTask> workerSourceTasks = connectController.getWorker().getWorkingTasks();
Set<WorkerConnector> workerConnectors = runtimeController.getWorker().getWorkingConnectors();
Set<WorkerSourceTask> workerSourceTasks = runtimeController.getWorker().getWorkingTasks();
StringBuilder sb = new StringBuilder();
sb.append("working connectors:\n");
for(WorkerConnector workerConnector : workerConnectors){
Expand All @@ -70,13 +70,13 @@ private void getAllocatedInfo(Context context){

private void getConfigInfo(Context context) {

Map<String, ConnectKeyValue> connectorConfigs = connectController.getConfigManagementService().getConnectorConfigs();
Map<String, List<ConnectKeyValue>> taskConfigs = connectController.getConfigManagementService().getTaskConfigs();
Map<String, ConnectKeyValue> connectorConfigs = runtimeController.getConfigManagementService().getConnectorConfigs();
Map<String, List<ConnectKeyValue>> taskConfigs = runtimeController.getConfigManagementService().getTaskConfigs();
context.result("ConnectorConfigs:"+JSON.toJSONString(connectorConfigs)+"\nTaskConfigs:"+JSON.toJSONString(taskConfigs));
}

private void getClusterInfo(Context context) {
context.result(JSON.toJSONString(connectController.getClusterManagementService().getAllAliveWorkers()));
context.result(JSON.toJSONString(runtimeController.getClusterManagementService().getAllAliveWorkers()));
}

private void handleCreateConnector(Context context) {
Expand All @@ -89,13 +89,14 @@ private void handleCreateConnector(Context context) {
}
try {

String result = connectController.getConfigManagementService().putConnectorConfig(connectorName, configs);
String result = runtimeController.getConfigManagementService().putConnectorConfig(connectorName, configs);
if(result != null && result.length() > 0){
context.result(result);
}else{
context.result("success");
}
} catch (Exception e) {
log.error("oms connect runtime create the connector exception, ", e);
context.result("failed");
}
}
Expand All @@ -104,8 +105,8 @@ private void handleQueryConnectorConfig(Context context){

String connectorName = context.param("connectorName");

Map<String, ConnectKeyValue> connectorConfigs = connectController.getConfigManagementService().getConnectorConfigs();
Map<String, List<ConnectKeyValue>> taskConfigs = connectController.getConfigManagementService().getTaskConfigs();
Map<String, ConnectKeyValue> connectorConfigs = runtimeController.getConfigManagementService().getConnectorConfigs();
Map<String, List<ConnectKeyValue>> taskConfigs = runtimeController.getConfigManagementService().getTaskConfigs();
StringBuilder sb = new StringBuilder();
sb.append("ConnectorConfigs:")
.append(JSON.toJSONString(connectorConfigs.get(connectorName)))
Expand All @@ -118,7 +119,7 @@ private void handleQueryConnectorConfig(Context context){
private void handleQueryConnectorStatus(Context context){

String connectorName = context.param("connectorName");
Map<String, ConnectKeyValue> connectorConfigs = connectController.getConfigManagementService().getConnectorConfigs();
Map<String, ConnectKeyValue> connectorConfigs = runtimeController.getConfigManagementService().getConnectorConfigs();

if(connectorConfigs.containsKey(connectorName)){
context.result("running");
Expand All @@ -131,9 +132,10 @@ private void handleStopConnector(Context context){
String connectorName = context.param("connectorName");
try {

connectController.getConfigManagementService().removeConnectorConfig(connectorName);
runtimeController.getConfigManagementService().removeConnectorConfig(connectorName);
context.result("success");
} catch (Exception e) {
log.error("oms connect runtime stop the connector exception, ", e);
context.result("failed");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public String putConnectorConfig(String connectorName, ConnectKeyValue configs)
newKeyValue.put(key, keyValue.getString(key));
}
newKeyValue.put(RuntimeConfigDefine.TASK_CLASS, connector.taskClass().getName());
newKeyValue.put(RuntimeConfigDefine.OMS_DRIVER_URL, configs.getString(RuntimeConfigDefine.OMS_DRIVER_URL));
newKeyValue.put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, configs.getString(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL));
newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESATMP, currentTimestamp);
converterdConfigs.add(newKeyValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public RebalanceService(RebalanceImpl rebalanceImpl, ConfigManagementService con
this.rebalanceImpl = rebalanceImpl;
this.configManagementService = configManagementService;
this.clusterManagementService = clusterManagementService;
this.configManagementService.registerListener(new ConnectorConnectorConfigChangeListenerImpl());
this.configManagementService.registerListener(new ConnectorConfigChangeListenerImpl());
this.clusterManagementService.registerListener(new WorkerStatusListenerImpl());
}

Expand Down Expand Up @@ -79,7 +79,7 @@ public void onWorkerChange() {
}
}

class ConnectorConnectorConfigChangeListenerImpl implements ConfigManagementService.ConnectorConfigUpdateListener {
class ConnectorConfigChangeListenerImpl implements ConfigManagementService.ConnectorConfigUpdateListener {

/**
* When config change.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.List;
import java.util.Map;

public class TransferUtils {
public class TransferUtil {

public static String keyValueToString(ConnectKeyValue keyValue) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
## Worker id, should be unique
workerId=DEFAULT_WORKER_1

## Choose a MQ to support runtime data synchronize
omsDriverUrl=oms:rocketmq://localhost:9876/default:default

## Http prot for user to access REST API
httpPort=8081

## local file dir for config store
storePathRootDir=./storeRoot/
storePathRootDir=./storeRoot/

## Choose a MQ to support runtime data synchronize
runtimeOmsDriverUrl=oms:rocketmq://localhost:9876/default:default
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ public class ConnectConfigTest {
public void testConnectConfigAttribute() {
ConnectConfig connectConfig = new ConnectConfig();
connectConfig.setHttpPort(8081);
connectConfig.setOmsDriverUrl("oms:rocketmq://localhost:9876/default:default");
connectConfig.setWorkerId("DEFAULT_WORKER_1");
connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default3:default3");
assertThat(connectConfig.getHttpPort()).isEqualTo(8081);
assertThat(connectConfig.getOmsDriverUrl()).isEqualTo("oms:rocketmq://localhost:9876/default:default");
assertThat(connectConfig.getWorkerId()).isEqualTo("DEFAULT_WORKER_1");
assertThat(connectConfig.getRuntimeOmsDriverUrl()).isEqualTo("oms:rocketmq://localhost:9876/default3:default3");

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class WorkerTest {
public void init() {
connectConfig = new ConnectConfig();
connectConfig.setHttpPort(8081);
connectConfig.setOmsDriverUrl("oms:rocketmq://localhost:9876/default:default");
connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default:default");
connectConfig.setWorkerId("DEFAULT_WORKER_1");
connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
messagingAccessWrapper = new MessagingAccessWrapper();
Expand Down Expand Up @@ -139,7 +139,7 @@ public void testStartTasks() {
connectKeyValue.getProperties().put("key2", "TEST-CONN-" + i + "2");
connectKeyValue.getProperties().put(RuntimeConfigDefine.TASK_CLASS, TestSourceTask.class.getName());
connectKeyValue.getProperties().put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, TestConverter.class.getName());
connectKeyValue.getProperties().put(RuntimeConfigDefine.OMS_DRIVER_URL, this.connectConfig.getOmsDriverUrl());
connectKeyValue.getProperties().put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, this.connectConfig.getRuntimeOmsDriverUrl());
connectKeyValues.add(connectKeyValue);
taskConfigs.put("TEST-CONN-" + i, connectKeyValues);
}
Expand Down
Loading