Skip to content

Commit

Permalink
增加集群任务功能
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Apr 9, 2019
1 parent 059b836 commit 3ccaf46
Show file tree
Hide file tree
Showing 50 changed files with 1,237 additions and 175 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<modules>
<module>rule-engine-api</module>
<module>rule-engine-defaults</module>
<module>rule-engine-support</module>
<module>rule-engine-standalone</module>
<module>rule-engine-cluster</module>
</modules>
Expand Down
Expand Up @@ -2,6 +2,8 @@

import lombok.Getter;
import lombok.Setter;
import org.jetlinks.rule.engine.api.cluster.RunMode;
import org.jetlinks.rule.engine.api.cluster.SchedulingRule;
import org.jetlinks.rule.engine.api.model.RuleModel;

/**
Expand Down
@@ -1,6 +1,8 @@
package org.jetlinks.rule.engine.api;


import org.hswebframework.utils.StringUtils;

public abstract class RuleDataHelper {


Expand All @@ -10,6 +12,7 @@ public abstract class RuleDataHelper {

private static String ERROR_TYPE = "error_type";
private static String ERROR_MESSAGE = "error_message";
private static String ERROR_STACK = "error_stack";


private RuleDataHelper() {
Expand Down Expand Up @@ -41,7 +44,10 @@ public static RuleData putError(RuleData data, Throwable error) {
while (error.getCause() != null) {
error = error.getCause();
}
return putError(data, error.getClass().getName(), error.getMessage());
putError(data, error.getClass().getName(), error.getMessage());
String stack = StringUtils.throwable2String(error);
data.setAttribute(ERROR_STACK, stack);
return data;
}

public static RuleData putError(RuleData data, String type, String message) {
Expand Down
@@ -1,5 +1,6 @@
package org.jetlinks.rule.engine.api;


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
Expand Down Expand Up @@ -55,6 +56,11 @@ public interface RuleInstanceContext {
*/
void execute(Consumer<Function<RuleData, CompletionStage<RuleData>>> dataSource);

/**
* 启动规则
*/
void start();

/**
* 停止规则
*/
Expand Down
@@ -0,0 +1,12 @@
package org.jetlinks.rule.engine.api.cluster;

/**
* @author zhouhao
* @since 1.0.0
*/
public enum RunMode {
//集群模式
CLUSTER,
//分布式模式
DISTRIBUTED
}
@@ -0,0 +1,20 @@
package org.jetlinks.rule.engine.api.cluster;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;
import java.util.Map;

/**
* 调度规则
* @author zhouhao
* @since 1.0.0
*/
@Getter
@Setter
public class SchedulingRule implements Serializable {
private String type;

private Map<String, Object> configuration;
}
@@ -0,0 +1,10 @@
package org.jetlinks.rule.engine.api.events;

/**
* @author zhouhao
* @since 1.0.0
*/
public interface EventSupportRuleInstanceContext {

void addEventListener(GlobalNodeEventListener listener);
}
@@ -0,0 +1,11 @@
package org.jetlinks.rule.engine.api.events;

import org.jetlinks.rule.engine.api.RuleData;

/**
* @author zhouhao
* @since 1.0.0
*/
public interface GlobalNodeEventListener {
void onEvent(NodeExecuteEvent executeEvent);
}
@@ -0,0 +1,26 @@
package org.jetlinks.rule.engine.api.events;

import lombok.*;
import org.jetlinks.rule.engine.api.RuleData;

/**
* @author zhouhao
* @since 1.0.0
*/
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class NodeExecuteEvent implements RuleEvent {

private String event;

private String nodeId;

private String instanceId;

private RuleData ruleData;

}
@@ -1,16 +1,19 @@
package org.jetlinks.rule.engine.api.events;

import java.io.Serializable;

/**
* @author zhouhao
* @since 1.0.0
*/
public interface RuleEvent {
public interface RuleEvent extends Serializable {

String NODE_EXECUTE_BEFORE = "NODE_EXECUTE_BEFORE";

String NODE_EXECUTE_FAIL = "NODE_EXECUTE_FAIL";

String NODE_EXECUTE_DONE = "NODE_EXECUTE_DONE";

String getEvent();

}
Expand Up @@ -17,4 +17,6 @@ public interface StreamExecutionContext extends ExecutionContext {
void fireEvent(String event, RuleData data);

void onError(RuleData data, Throwable e);

void stop();
}
Expand Up @@ -2,6 +2,8 @@

import lombok.Getter;
import lombok.Setter;
import org.jetlinks.rule.engine.api.cluster.RunMode;
import org.jetlinks.rule.engine.api.cluster.SchedulingRule;

import java.util.*;
import java.util.stream.Collectors;
Expand All @@ -22,6 +24,10 @@ public class RuleModel {

private String description;

private RunMode runMode;

private SchedulingRule schedulingRule;

private Map<String, Object> configuration = new HashMap<>();

private List<RuleLink> events = new ArrayList<>();
Expand All @@ -34,9 +40,9 @@ public List<RuleLink> getEvents(String type) {
.collect(Collectors.toList());
}

public Optional<RuleNodeModel> getNode(String nodeId){
public Optional<RuleNodeModel> getNode(String nodeId) {
return nodes.stream()
.filter(model->model.getId().equals(nodeId))
.filter(model -> model.getId().equals(nodeId))
.findFirst();
}

Expand Down
Expand Up @@ -2,6 +2,7 @@

import lombok.Getter;
import lombok.Setter;
import org.jetlinks.rule.engine.api.cluster.SchedulingRule;
import org.jetlinks.rule.engine.api.executor.RuleNodeConfiguration;

import java.util.ArrayList;
Expand Down Expand Up @@ -32,6 +33,8 @@ public class RuleNodeModel {

private NodeType nodeType;

private SchedulingRule schedulingRule;

private boolean end;

private boolean start;
Expand Down
Expand Up @@ -17,6 +17,8 @@ public class RuleInstancePersistent implements Serializable {

private String ruleId;

private String schedulerNodeId;

private Date createTime;

private Boolean running;
Expand Down
Expand Up @@ -3,6 +3,7 @@
import lombok.Getter;
import lombok.Setter;
import org.jetlinks.rule.engine.api.Rule;
import org.jetlinks.rule.engine.api.model.RuleEngineModelParser;

import java.io.Serializable;

Expand All @@ -27,4 +28,11 @@ public class RulePersistent implements Serializable {
private String model;


public Rule toRule(RuleEngineModelParser parser) {
Rule rule = new Rule();
rule.setId(ruleId);
rule.setVersion(version == null ? 1 : version);
rule.setModel(parser.parse(modelFormat, model));
return rule;
}
}
Expand Up @@ -4,6 +4,4 @@

public interface RuleHistoryRepository {
void save(RuleHistoryPersistent persistent);


}
Expand Up @@ -10,4 +10,8 @@ public interface RuleInstanceRepository {

void saveInstance(RuleInstancePersistent instancePersistent);

void stopInstance(String instanceId);

void startInstance(String instanceId);

}
Expand Up @@ -8,7 +8,4 @@ public interface RuleRepository {

Optional<RulePersistent> findRuleById(String ruleId);




}
8 changes: 7 additions & 1 deletion rule-engine-cluster/pom.xml
Expand Up @@ -15,7 +15,13 @@
<dependencies>
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>rule-engine-defaults</artifactId>
<artifactId>rule-engine-support</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>rule-engine-standalone</artifactId>
<version>${project.version}</version>
</dependency>

Expand Down
@@ -1,4 +1,4 @@
package org.jetlinks.rule.engine.cluster.redisson;
package org.jetlinks.rule.engine.cluster.ha;

import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -11,9 +11,10 @@
@Setter
@AllArgsConstructor
@NoArgsConstructor

public class ClusterNotify implements Serializable {

private String replyId;

private String address;

private Object message;
Expand Down
@@ -0,0 +1,28 @@
package org.jetlinks.rule.engine.cluster.ha;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.io.Serializable;

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class ClusterNotifyReply implements Serializable {

private String replyId;

private String address;

private Object reply;

private boolean success;

private String errorType;

private String errorMessage;

}
Expand Up @@ -3,7 +3,9 @@
import org.jetlinks.rule.engine.cluster.NodeInfo;

import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* @author zhouhao
Expand Down Expand Up @@ -44,7 +46,7 @@ public interface HaManager {
* @param consumer 消息消费者
* @param <T> 消息类型
*/
<T> void onNotify(String address, Consumer<T> consumer);
<T,R> void onNotify(String address, Function<T,R> consumer);

/**
* 向其他服务节点发送通知
Expand All @@ -53,6 +55,6 @@ public interface HaManager {
* @param address 通知地址
* @param message 消息
*/
void sendNotify(String nodeId, String address, Object message);
<V> CompletionStage<V> sendNotify(String nodeId, String address, Object message);

}
@@ -0,0 +1,24 @@
package org.jetlinks.rule.engine.cluster.message;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;

/**
* @author zhouhao
* @since 1.0.0
*/
@Getter
@Setter
public class InitRuleNodeError implements Serializable {
private static final long serialVersionUID = -6849794470754667710L;

private String instanceId;

private String nodeId;

private String errorType;

private String errorMessage;
}

0 comments on commit 3ccaf46

Please sign in to comment.