Skip to content

Commit

Permalink
Merge pull request Netflix#1036 from Netflix/external_storage_paths_a…
Browse files Browse the repository at this point in the history
…dded_to_summary

add external storage paths to summary
  • Loading branch information
apanicker-nflx committed Mar 7, 2019
2 parents 2571f37 + fd1e602 commit 16b6376
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.github.vmg.protogen.annotations.*;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import org.apache.commons.lang3.StringUtils;

/**
* @author Viren
Expand Down Expand Up @@ -86,6 +87,12 @@ public class TaskSummary {
@ProtoField(id = 16)
private String taskId;

@ProtoField(id = 17)
private String externalInputPayloadStoragePath;

@ProtoField(id = 18)
private String externalOutputPayloadStoragePath;

public TaskSummary() {
}

Expand Down Expand Up @@ -119,6 +126,13 @@ public TaskSummary(Task task) {
if(task.getEndTime() > 0){
this.executionTime = task.getEndTime() - task.getStartTime();
}

if (StringUtils.isNotBlank(task.getExternalInputPayloadStoragePath())) {
this.externalInputPayloadStoragePath = task.getExternalInputPayloadStoragePath();
}
if (StringUtils.isNotBlank(task.getExternalOutputPayloadStoragePath())) {
this.externalOutputPayloadStoragePath = task.getExternalOutputPayloadStoragePath();
}
}

/**
Expand Down Expand Up @@ -348,6 +362,32 @@ public String getTaskId() {
public void setTaskId(String taskId) {
this.taskId = taskId;
}



/**
* @return the external storage path for the task input payload
*/
public String getExternalInputPayloadStoragePath() {
return externalInputPayloadStoragePath;
}

/**
* @param externalInputPayloadStoragePath the external storage path where the task input payload is stored
*/
public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) {
this.externalInputPayloadStoragePath = externalInputPayloadStoragePath;
}

/**
* @return the external storage path for the task output payload
*/
public String getExternalOutputPayloadStoragePath() {
return externalOutputPayloadStoragePath;
}

/**
* @param externalOutputPayloadStoragePath the external storage path where the task output payload is stored
*/
public void setExternalOutputPayloadStoragePath(String externalOutputPayloadStoragePath) {
this.externalOutputPayloadStoragePath = externalOutputPayloadStoragePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.github.vmg.protogen.annotations.*;
import com.netflix.conductor.common.run.Workflow.WorkflowStatus;
import org.apache.commons.lang3.StringUtils;

/**
* Captures workflow summary info to be indexed in Elastic Search.
Expand Down Expand Up @@ -80,6 +81,12 @@ public class WorkflowSummary {

@ProtoField(id = 14)
private String failedReferenceTaskNames = "";

@ProtoField(id = 15)
private String externalInputPayloadStoragePath;

@ProtoField(id = 16)
private String externalOutputPayloadStoragePath;

public WorkflowSummary() {

Expand Down Expand Up @@ -115,6 +122,12 @@ public WorkflowSummary(Workflow workflow) {
}
this.event = workflow.getEvent();
this.failedReferenceTaskNames = workflow.getFailedReferenceTaskNames().stream().collect(Collectors.joining(","));
if (StringUtils.isNotBlank(workflow.getExternalInputPayloadStoragePath())) {
this.externalInputPayloadStoragePath = workflow.getExternalInputPayloadStoragePath();
}
if (StringUtils.isNotBlank(workflow.getExternalOutputPayloadStoragePath())) {
this.externalOutputPayloadStoragePath = workflow.getExternalOutputPayloadStoragePath();
}
}

/**
Expand Down Expand Up @@ -282,4 +295,32 @@ public void setReasonForIncompletion(String reasonForIncompletion) {
public void setExecutionTime(long executionTime) {
this.executionTime = executionTime;
}

/**
* @return the external storage path of the workflow input payload
*/
public String getExternalInputPayloadStoragePath() {
return externalInputPayloadStoragePath;
}

/**
* @param externalInputPayloadStoragePath the external storage path where the workflow input payload is stored
*/
public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) {
this.externalInputPayloadStoragePath = externalInputPayloadStoragePath;
}

/**
* @return the external storage path of the workflow output payload
*/
public String getExternalOutputPayloadStoragePath() {
return externalOutputPayloadStoragePath;
}

/**
* @param externalOutputPayloadStoragePath the external storage path where the workflow output payload is stored
*/
public void setExternalOutputPayloadStoragePath(String externalOutputPayloadStoragePath) {
this.externalOutputPayloadStoragePath = externalOutputPayloadStoragePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,12 @@ public TaskSummaryPb.TaskSummary toProto(TaskSummary from) {
if (from.getTaskId() != null) {
to.setTaskId( from.getTaskId() );
}
if (from.getExternalInputPayloadStoragePath() != null) {
to.setExternalInputPayloadStoragePath( from.getExternalInputPayloadStoragePath() );
}
if (from.getExternalOutputPayloadStoragePath() != null) {
to.setExternalOutputPayloadStoragePath( from.getExternalOutputPayloadStoragePath() );
}
return to.build();
}

Expand Down Expand Up @@ -1078,6 +1084,12 @@ public WorkflowSummaryPb.WorkflowSummary toProto(WorkflowSummary from) {
if (from.getFailedReferenceTaskNames() != null) {
to.setFailedReferenceTaskNames( from.getFailedReferenceTaskNames() );
}
if (from.getExternalInputPayloadStoragePath() != null) {
to.setExternalInputPayloadStoragePath( from.getExternalInputPayloadStoragePath() );
}
if (from.getExternalOutputPayloadStoragePath() != null) {
to.setExternalOutputPayloadStoragePath( from.getExternalOutputPayloadStoragePath() );
}
return to.build();
}

Expand All @@ -1097,6 +1109,8 @@ public WorkflowSummary fromProto(WorkflowSummaryPb.WorkflowSummary from) {
to.setExecutionTime( from.getExecutionTime() );
to.setEvent( from.getEvent() );
to.setFailedReferenceTaskNames( from.getFailedReferenceTaskNames() );
to.setExternalInputPayloadStoragePath( from.getExternalInputPayloadStoragePath() );
to.setExternalOutputPayloadStoragePath( from.getExternalOutputPayloadStoragePath() );
return to;
}

Expand Down Expand Up @@ -1126,6 +1140,9 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) {
if (from.getCaseExpression() != null) {
to.setCaseExpression( from.getCaseExpression() );
}
if (from.getScriptExpression() != null) {
to.setScriptExpression( from.getScriptExpression() );
}
for (Map.Entry<String, List<WorkflowTask>> pair : from.getDecisionCases().entrySet()) {
to.putDecisionCases( pair.getKey(), toProto( pair.getValue() ) );
}
Expand Down Expand Up @@ -1156,6 +1173,7 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) {
if (from.isRateLimited() != null) {
to.setRateLimited( from.isRateLimited() );
}
to.addAllDefaultExclusiveJoinTask( from.getDefaultExclusiveJoinTask() );
return to.build();
}

Expand All @@ -1173,6 +1191,7 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) {
to.setDynamicTaskNameParam( from.getDynamicTaskNameParam() );
to.setCaseValueParam( from.getCaseValueParam() );
to.setCaseExpression( from.getCaseExpression() );
to.setScriptExpression( from.getScriptExpression() );
Map<String, List<WorkflowTask>> decisionCasesMap = new HashMap<String, List<WorkflowTask>>();
for (Map.Entry<String, WorkflowTaskPb.WorkflowTask.WorkflowTaskList> pair : from.getDecisionCasesMap().entrySet()) {
decisionCasesMap.put( pair.getKey(), fromProto( pair.getValue() ) );
Expand All @@ -1193,6 +1212,7 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) {
to.setTaskDefinition( fromProto( from.getTaskDefinition() ) );
}
to.setRateLimited( from.getRateLimited() );
to.setDefaultExclusiveJoinTask( from.getDefaultExclusiveJoinTaskList().stream().collect(Collectors.toCollection(ArrayList::new)) );
return to;
}

Expand Down
2 changes: 2 additions & 0 deletions grpc/src/main/proto/model/tasksummary.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ message TaskSummary {
string input = 14;
string output = 15;
string task_id = 16;
string external_input_payload_storage_path = 17;
string external_output_payload_storage_path = 18;
}
2 changes: 2 additions & 0 deletions grpc/src/main/proto/model/workflowsummary.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ message WorkflowSummary {
int64 execution_time = 12;
string event = 13;
string failed_reference_task_names = 14;
string external_input_payload_storage_path = 15;
string external_output_payload_storage_path = 16;
}
1 change: 1 addition & 0 deletions grpc/src/main/proto/model/workflowtask.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ message WorkflowTask {
string dynamic_task_name_param = 6;
string case_value_param = 7;
string case_expression = 8;
string script_expression = 22;
map<string, WorkflowTask.WorkflowTaskList> decision_cases = 9;
string dynamic_fork_tasks_param = 10;
string dynamic_fork_tasks_input_param_name = 11;
Expand Down

0 comments on commit 16b6376

Please sign in to comment.