Skip to content

Commit

Permalink
Merge pull request #148 from xm-online/feature/iterable_task
Browse files Browse the repository at this point in the history
Add support of iterable task
  • Loading branch information
sergeysenja1992 committed Jun 5, 2024
2 parents 75f1ee3 + e0a54fa commit 45a0936
Show file tree
Hide file tree
Showing 22 changed files with 893 additions and 132 deletions.
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
rootProject.name=activation
profile=dev

version=2.0.59
version=2.0.60

# Build properties
node_version=12.13.0
Expand Down Expand Up @@ -34,7 +34,7 @@ spring_no_http_plugin_version=0.0.3.RELEASE
checkstyle_version=8.24
openapi_plugin_version=4.2.2

xm_commons_version=2.3.18.1
xm_commons_version=2.3.19
lombok_version=1.18.2
zalando_version=0.24.0-RC.0
database_rider_version=1.9.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@
import com.icthh.xm.commons.lep.processor.GroovyMap;
import com.icthh.xm.commons.logging.trace.TraceService.TraceServiceField;
import com.icthh.xm.commons.topic.service.KafkaTemplateService;
import com.icthh.xm.tmf.ms.activation.config.TaskIterationLepAdditionalContext.TaskIterationField;
import com.icthh.xm.tmf.ms.activation.config.TaskLepAdditionalContext.TaskContextField;
import com.icthh.xm.tmf.ms.activation.config.TaskParametersLepAdditionalContext.TaskParametersField;
import com.icthh.xm.tmf.ms.activation.config.TransactionLepAdditionalContext.TransactionContextField;
import com.icthh.xm.tmf.ms.activation.service.MailService;
import com.icthh.xm.tmf.ms.activation.service.SagaService;
import org.springframework.context.ApplicationContext;
import org.springframework.web.client.RestTemplate;

@GroovyMap
public class LepContext extends BaseLepContext implements OutboxTransportServiceField, TraceServiceField {
public class LepContext extends BaseLepContext implements OutboxTransportServiceField, TraceServiceField,
TransactionContextField, TaskContextField, TaskIterationField, TaskParametersField {

public LepServices services;
public LepTemplates templates;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.icthh.xm.tmf.ms.activation.config;

import com.icthh.xm.commons.lep.TargetProceedingLep;
import com.icthh.xm.commons.lep.api.BaseLepContext;
import com.icthh.xm.commons.lep.api.LepAdditionalContext;
import com.icthh.xm.commons.lep.api.LepAdditionalContextField;
import com.icthh.xm.commons.lep.api.LepBaseKey;
import com.icthh.xm.commons.lep.api.LepEngine;
import com.icthh.xm.tmf.ms.activation.domain.SagaEvent;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Optional;

import static com.icthh.xm.tmf.ms.activation.config.TaskIterationLepAdditionalContext.TaskIterationField.FIELD_NAME;


@Component
@RequiredArgsConstructor
public class TaskIterationLepAdditionalContext implements LepAdditionalContext<Integer> {

@Override
public String additionalContextKey() {
return FIELD_NAME;
}

@Override
public Integer additionalContextValue() {
return null;
}

@Override
public Optional<Integer> additionalContextValue(BaseLepContext lepContext, LepEngine lepEngine, TargetProceedingLep lepMethod) {
LepBaseKey lepBaseKey = lepMethod.getLepBaseKey();
if ("tasks".equals(lepBaseKey.getGroup()) && "Task".equals(lepBaseKey.getBaseKey())) {
SagaEvent sagaEvent = lepMethod.getParameter("sagaEvent", SagaEvent.class);
return Optional.ofNullable(sagaEvent.getIteration());
} else {
return Optional.empty();
}
}

@Override
public Class<? extends LepAdditionalContextField> fieldAccessorInterface() {
return TaskIterationField.class;
}

public interface TaskIterationField extends LepAdditionalContextField {
String FIELD_NAME = "iteration";
default Map<String, Object> getIteration() {
return (Map<String, Object>)get(FIELD_NAME);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package com.icthh.xm.tmf.ms.activation.config;

import com.icthh.xm.commons.lep.TargetProceedingLep;
import com.icthh.xm.commons.lep.api.BaseLepContext;
import com.icthh.xm.commons.lep.api.LepAdditionalContext;
import com.icthh.xm.commons.lep.api.LepAdditionalContextField;
import com.icthh.xm.commons.lep.api.LepBaseKey;
import com.icthh.xm.commons.lep.api.LepEngine;
import com.icthh.xm.commons.lep.commons.CommonsExecutor;
import com.icthh.xm.tmf.ms.activation.config.TaskLepAdditionalContext.TaskContext;
import com.icthh.xm.tmf.ms.activation.domain.SagaEvent;
import com.icthh.xm.tmf.ms.activation.domain.SagaLog;
import com.icthh.xm.tmf.ms.activation.domain.SagaLogType;
import com.icthh.xm.tmf.ms.activation.domain.SagaTransaction;
import com.icthh.xm.tmf.ms.activation.repository.SagaLogRepository;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Delegate;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.icthh.xm.tmf.ms.activation.config.TaskLepAdditionalContext.TaskContextField.FIELD_NAME;
import static com.icthh.xm.tmf.ms.activation.domain.SagaLogType.EVENT_END;
import static com.icthh.xm.tmf.ms.activation.domain.SagaLogType.EVENT_START;
import static java.util.Collections.emptyMap;


@Component
@RequiredArgsConstructor
public class TaskLepAdditionalContext implements LepAdditionalContext<TaskContext> {

private final SagaLogRepository sagaLogRepository;

@Override
public String additionalContextKey() {
return FIELD_NAME;
}

@Override
public TaskContext additionalContextValue() {
return null;
}

@Override
public Optional<TaskContext> additionalContextValue(BaseLepContext lepContext, LepEngine lepEngine, TargetProceedingLep lepMethod) {
LepBaseKey lepBaseKey = lepMethod.getLepBaseKey();
if ("tasks".equals(lepBaseKey.getGroup()) && "Task".equals(lepBaseKey.getBaseKey())) {
SagaTransaction sagaTransaction = lepMethod.getParameter("sagaTransaction", SagaTransaction.class);
SagaEvent sagaEvent = lepMethod.getParameter("sagaEvent", SagaEvent.class);
TaskContext taskContext = new TaskContext(sagaLogRepository, sagaTransaction, sagaEvent);
return Optional.of(taskContext);
} else {
return Optional.empty();
}
}

@Override
public Class<? extends LepAdditionalContextField> fieldAccessorInterface() {
return TaskContextField.class;
}

public interface TaskContextField extends LepAdditionalContextField {
String FIELD_NAME = "tasks";
default TaskContext getTasks() {
return (TaskContext)get(FIELD_NAME);
}
}

// need to implement all methods from Map interface for js lep-s interop
@RequiredArgsConstructor
public static class TaskContext implements Map<String, Object> {

private final SagaLogRepository sagaLogRepository;
private final SagaTransaction sagaTransaction;
private final SagaEvent sagaEvent;
private final Map<String, Object> context = new HashMap<>();
@Delegate(excludes = MapExcludes.class)
private final Map<String, Object> delegate = emptyMap();

@Override
public Object get(Object inputTaskTypeKey) {
String taskTypeKey = String.valueOf(inputTaskTypeKey);
if ("input".equals(taskTypeKey) || "context".equals(taskTypeKey)) {
return sagaEvent.getTaskContext();
}
if (sagaEvent.getTypeKey().equals(taskTypeKey)) {
return Map.of("input", sagaEvent.getTaskContext(), "output", Map.of());
}

return context.computeIfAbsent(taskTypeKey, key -> {
List<SagaLog> logs = sagaLogRepository.getLogsBySagaTransactionIdAndTypeKey(sagaTransaction.getId(), taskTypeKey);
return Map.of(
"input", filterContext(logs, EVENT_START),
"output", filterContext(logs, EVENT_END)
);
}
);
}

@Override
public boolean containsKey(Object taskTypeKey) {
return true;
}

private static Map<String, Object> filterContext(List<SagaLog> logs, SagaLogType sagaLogType) {
return logs.stream()
.filter(it -> sagaLogType == it.getLogType())
.findAny()
.map(SagaLog::getTaskContext)
.orElse(emptyMap());
}

public interface MapExcludes {
Object get(Object key);
boolean containsKey(Object key);
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.icthh.xm.tmf.ms.activation.config;

import com.icthh.xm.commons.lep.TargetProceedingLep;
import com.icthh.xm.commons.lep.api.BaseLepContext;
import com.icthh.xm.commons.lep.api.LepAdditionalContext;
import com.icthh.xm.commons.lep.api.LepAdditionalContextField;
import com.icthh.xm.commons.lep.api.LepBaseKey;
import com.icthh.xm.commons.lep.api.LepEngine;
import com.icthh.xm.tmf.ms.activation.domain.spec.SagaTaskSpec;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Optional;

import static com.icthh.xm.tmf.ms.activation.config.TaskParametersLepAdditionalContext.TaskParametersField.FIELD_NAME;


@Component
@RequiredArgsConstructor
public class TaskParametersLepAdditionalContext implements LepAdditionalContext<Map<String, Object>> {

@Override
public String additionalContextKey() {
return FIELD_NAME;
}

@Override
public Map<String, Object> additionalContextValue() {
return null;
}

@Override
public Optional<Map<String, Object>> additionalContextValue(BaseLepContext lepContext, LepEngine lepEngine, TargetProceedingLep lepMethod) {
LepBaseKey lepBaseKey = lepMethod.getLepBaseKey();
if ("tasks".equals(lepBaseKey.getGroup()) && "Task".equals(lepBaseKey.getBaseKey())) {
SagaTaskSpec sagaTaskSpec = lepMethod.getParameter("task", SagaTaskSpec.class);
return Optional.ofNullable(sagaTaskSpec.getTaskParameters());
} else {
return Optional.empty();
}
}

@Override
public Class<? extends LepAdditionalContextField> fieldAccessorInterface() {
return TaskParametersField.class;
}

public interface TaskParametersField extends LepAdditionalContextField {
String FIELD_NAME = "taskParameters";
default Map<String, Object> getTaskParameters() {
return (Map<String, Object>)get(FIELD_NAME);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.icthh.xm.tmf.ms.activation.config;

import com.icthh.xm.commons.lep.TargetProceedingLep;
import com.icthh.xm.commons.lep.api.BaseLepContext;
import com.icthh.xm.commons.lep.api.LepAdditionalContext;
import com.icthh.xm.commons.lep.api.LepAdditionalContextField;
import com.icthh.xm.commons.lep.api.LepBaseKey;
import com.icthh.xm.commons.lep.api.LepEngine;
import com.icthh.xm.tmf.ms.activation.config.TransactionLepAdditionalContext.TransactionContext;
import com.icthh.xm.tmf.ms.activation.domain.SagaTransaction;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Delegate;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Optional;

import static com.icthh.xm.tmf.ms.activation.config.TransactionLepAdditionalContext.TransactionContextField.FIELD_NAME;


@Component
@RequiredArgsConstructor
public class TransactionLepAdditionalContext implements LepAdditionalContext<TransactionContext> {


@Override
public String additionalContextKey() {
return FIELD_NAME;
}

@Override
public TransactionContext additionalContextValue() {
return null;
}

@Override
public Optional<TransactionContext> additionalContextValue(BaseLepContext lepContext, LepEngine lepEngine, TargetProceedingLep lepMethod) {
LepBaseKey lepBaseKey = lepMethod.getLepBaseKey();
if ("tasks".equals(lepBaseKey.getGroup()) && "Task".equals(lepBaseKey.getBaseKey())) {
SagaTransaction sagaTransaction = lepMethod.getParameter("sagaTransaction", SagaTransaction.class);
TransactionContext transactionContext = new TransactionContext(sagaTransaction.getContext());
return Optional.of(transactionContext);
} else {
return Optional.empty();
}
}

@Override
public Class<? extends LepAdditionalContextField> fieldAccessorInterface() {
return TransactionContextField.class;
}

public interface TransactionContextField extends LepAdditionalContextField {
String FIELD_NAME = "transaction";
default TransactionContext getTransaction() {
return (TransactionContext)get(FIELD_NAME);
}
}

// need to implement all methods from Map interface for js lep-s interop
@RequiredArgsConstructor
public static class TransactionContext implements Map<String, Object> {

@Delegate
private final Map<String, Object> delegate;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ public class SagaEvent implements Serializable {
@Column(name = "create_date")
private Instant createDate;

@Column(name = "iteration")
private Integer iteration;

@Column(name = "iterations_count")
private Integer iterationsCount;

public boolean isInQueue() {
return IN_QUEUE == status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,10 @@ public class SagaLog implements Serializable {
@Convert(converter = MapToStringConverter.class)
@Column(name = "task_context")
private Map<String, Object> taskContext = new HashMap<>();

@Column(name = "iteration")
private Integer iteration;

@Column(name = "iterations_count")
private Integer iterationsCount;
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private BusinessException notFound(String type) {

public static SagaSpec mergeSpec(SagaSpec spec1, SagaSpec spec2) {
SagaSpec sagaSpec = new SagaSpec();
List<SagaTransactionSpec> transactions = new ArrayList<>(spec1.transactions);
List<SagaTransactionSpec> transactions = new ArrayList<>(spec1.getTransactions());
transactions.addAll(spec2.getTransactions());
sagaSpec.setTransactions(transactions);
return sagaSpec;
Expand Down
Loading

0 comments on commit 45a0936

Please sign in to comment.