Skip to content

Commit

Permalink
优化规则执行器初始化逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Apr 17, 2019
1 parent 39ee7d8 commit ec4395e
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

import org.jetlinks.rule.engine.api.persistent.RuleInstancePersistent;

import java.util.List;
import java.util.Optional;

public interface RuleInstanceRepository {

Optional<RuleInstancePersistent> findInstanceById(String instanceId);

List<RuleInstancePersistent> findInstanceByRuleId(String ruleId);

void saveInstance(RuleInstancePersistent instancePersistent);

void stopInstance(String instanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ public interface RuleRepository {

Optional<RulePersistent> findRuleById(String ruleId);

void save(RulePersistent persistent);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import org.jetlinks.rule.engine.api.persistent.repository.RuleInstanceRepository;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/**
* @author zhouhao
Expand All @@ -20,6 +22,14 @@ public Optional<RuleInstancePersistent> findInstanceById(String instanceId) {
return Optional.ofNullable(repository.get(instanceId));
}

@Override
public List<RuleInstancePersistent> findInstanceByRuleId(String ruleId) {
return repository.values()
.stream()
.filter(persistent->persistent.getRuleId().equals(ruleId))
.collect(Collectors.toList());
}

@Override
public void saveInstance(RuleInstancePersistent instancePersistent) {
repository.put(instancePersistent.getId(), instancePersistent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ public class MockRuleRepository implements RuleRepository {
public Optional<RulePersistent> findRuleById(String ruleId) {
return Optional.empty();
}

@Override
public void save(RulePersistent persistent) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ public DefaultWorkerNodeSelector() {

@Override
public List<NodeInfo> select(SchedulingRule rule, List<NodeInfo> allNode) {
return Optional.ofNullable(allStrategy.get(rule.getType()))
return Optional
.ofNullable(rule)
.map(SchedulingRule::getType)
.map(type -> allStrategy.get(type))
.orElse(defaultStrategy)
.select(rule, allNode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import org.jetlinks.rule.engine.api.Logger;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.events.RuleEvent;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.api.executor.RuleNodeConfiguration;
import org.jetlinks.rule.engine.api.executor.ExecutableRuleNode;
import org.jetlinks.rule.engine.executor.supports.RuleNodeConfig;

import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;

/**
* @author zhouhao
Expand All @@ -22,29 +24,31 @@ public abstract class AbstractExecutableRuleNodeFactoryStrategy<C extends RuleNo

public abstract String getSupportType();

public abstract BiFunction<Logger, Object, CompletionStage<Object>> createExecutor(C config);
public abstract Function<RuleData, CompletionStage<Object>> createExecutor(ExecutionContext context, C config);

protected ExecutableRuleNode doCreate(C config) {
BiFunction<Logger, Object, CompletionStage<Object>> executor = createExecutor(config);
return context -> context.getInput()
.acceptOnce(data -> {
context.fireEvent(RuleEvent.NODE_EXECUTE_BEFORE, data.newData(data));
executor.apply(context.logger(), data.getData())
.whenComplete((result, error) -> {
if (error != null) {
context.onError(data, error);
} else {
RuleData newData;
if (config.getNodeType().isReturnNewValue()) {
newData = data.newData(result);
return context -> {
Function<RuleData, CompletionStage<Object>> executor = createExecutor(context, config);
context.getInput()
.acceptOnce(data -> {
context.fireEvent(RuleEvent.NODE_EXECUTE_BEFORE, data.newData(data));
executor.apply(data)
.whenComplete((result, error) -> {
if (error != null) {
context.onError(data, error);
} else {
newData = data.newData(data);
RuleData newData;
if (config.getNodeType().isReturnNewValue()) {
newData = data.newData(result);
} else {
newData = data.newData(data);
}
context.fireEvent(RuleEvent.NODE_EXECUTE_DONE, newData);
context.getOutput().write(newData);
}
context.fireEvent(RuleEvent.NODE_EXECUTE_DONE, newData);
context.getOutput().write(newData);
}
});
});
});
});
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.rule.engine.api.Logger;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.TypeConverter;
import org.jetlinks.rule.engine.api.executor.ExecutionContext;
import org.jetlinks.rule.engine.api.model.NodeType;
import org.jetlinks.rule.engine.executor.AbstractExecutableRuleNodeFactoryStrategy;

Expand Down Expand Up @@ -51,7 +53,7 @@ public Object getInstance(Class type) {
}

@SneakyThrows
public BiFunction<Logger, Object, CompletionStage<Object>> createExecutor(JavaMethodInvokeStrategyConfiguration config) {
public Function<RuleData, CompletionStage<Object>> createExecutor(ExecutionContext context, JavaMethodInvokeStrategyConfiguration config) {
String className = config.getClassName();
String methodName = config.getMethodName();
Class clazz = getType(className);
Expand All @@ -76,14 +78,14 @@ public BiFunction<Logger, Object, CompletionStage<Object>> createExecutor(JavaMe
int parameterCount = method.getParameterCount();
Class[] methodTypes = method.getParameterTypes();
log.debug("create java method invoke executor:{}.{}", className, methodName);
return (logger, data) -> {
return (data) -> {
CompletableFuture future = new CompletableFuture();
try {
Object[] invokeParameter = parameterCount > 0 ? new Object[parameterCount] : emptyArgs;
for (int i = 0; i < parameterCount; i++) {
invokeParameter[i] = convertParameter(methodTypes[i], data, config, i);
}
logger.info("invoke {}.{}", className, methodName);
context.logger().info("invoke {}.{}", className, methodName);
Object result = finaleMethod.invoke(instance, (Object[]) invokeParameter);
if (result instanceof CompletionStage) {
return ((CompletionStage) result);
Expand All @@ -96,13 +98,13 @@ public BiFunction<Logger, Object, CompletionStage<Object>> createExecutor(JavaMe
};
}

protected Object convertParameter(Class type, Object data,
protected Object convertParameter(Class type, RuleData data,
JavaMethodInvokeStrategyConfiguration config,
int index) {

// FIXME: 19-3-29 类型转换未实现
return Optional.ofNullable(config.getParameter(index))
.orElseGet(() -> data);
.orElseGet(() -> data.getData());
}

@SneakyThrows
Expand Down
17 changes: 16 additions & 1 deletion rule-engine-support/src/main/resources/rule-model.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
</xs:choice>
</xs:group>

<xs:complexType name="A">
<xs:attribute name="CLUSTER" type="xs:string"/>
<xs:attribute name="y" type="xs:string"/>
</xs:complexType>

<xs:element name="rule">
<xs:complexType>
<xs:sequence>
<xs:element name="testCase">
<xs:element name="testCase" minOccurs="0">
<xs:complexType>
<xs:sequence>
<xs:element maxOccurs="999" name="test" type="xs:element">
Expand Down Expand Up @@ -85,8 +90,18 @@
</xs:sequence>
<xs:attribute name="id" type="xs:string"/>
<xs:attribute name="name" type="xs:string"/>

<xs:attribute name="runMode" type="xs:complexType">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="DISTRIBUTED"/>
<xs:enumeration value="CLUSTER"/>
</xs:restriction>
</xs:simpleType>
</xs:attribute>
</xs:complexType>

</xs:element>


</xs:schema>

0 comments on commit ec4395e

Please sign in to comment.