diff --git a/FunctionWorker/python/FunctionWorker.py b/FunctionWorker/python/FunctionWorker.py index fb681357..febbcde4 100644 --- a/FunctionWorker/python/FunctionWorker.py +++ b/FunctionWorker/python/FunctionWorker.py @@ -330,10 +330,22 @@ def _fork_and_handle_message(self, key, encapsulated_value): # 3. Apply InputPath, if available timestamp_map["t_start_inputpath"] = time.time() * 1000.0 + #self._logger.debug("[FunctionWorker] Before Path/Parameters processing, input: " + str(type(raw_state_input)) + " : " + str(raw_state_input) + ", metadata: " + str(metadata) + " has_error: " + str(has_error)) if not has_error: try: - function_input = self._state_utils.applyInputPath(raw_state_input) - #self._logger.debug("[FunctionWorker] User code input(After InputPath processing):" + str(type(function_input)) + ":" + str(function_input)) + if "__state_action" not in metadata or (metadata["__state_action"] != "post_map_processing" and metadata["__state_action"] != "post_parallel_processing"): + #self._logger.debug("[FunctionWorker] User code input(Before InputPath processing):" + str(type(raw_state_input)) + ":" + str(raw_state_input)) + function_input = self._state_utils.applyInputPath(raw_state_input) + #self._logger.debug("[FunctionWorker] User code input(Before applyParameter processing):" + str(type(function_input)) + ":" + str(function_input)) + function_input = self._state_utils.applyParameters(function_input) + #self._logger.debug("[FunctionWorker] User code input(Before ItemsPath processing):" + str(type(function_input)) + ":" + str(function_input)) + function_input = self._state_utils.applyItemsPath(function_input) # process map items path + + #elif "Action" not in metadata or metadata["Action"] != "post_parallel_processing": + # function_input = self._state_utils.applyInputPath(raw_state_input) + + else: + function_input = raw_state_input except Exception as exc: self._logger.exception("InputPath processing exception: %s\n%s", str(instance_pid), str(exc)) error_type = "InputPath processing exception" @@ -592,6 +604,7 @@ def _get_and_handle_message(self): def run(self): self._is_running = True + self._logger.info("[FunctionWorker] Started:" \ + self._function_state_name \ + ", user: " + self._userid \ diff --git a/FunctionWorker/python/PublicationUtils.py b/FunctionWorker/python/PublicationUtils.py index d7c29813..dfedfcf3 100644 --- a/FunctionWorker/python/PublicationUtils.py +++ b/FunctionWorker/python/PublicationUtils.py @@ -356,6 +356,13 @@ def _generate_trigger_metadata(self, topic_next): output_instance_id = self._output_counter_map[topic_next] next_function_execution_id = self._metadata["__function_execution_id"] + "_" + str(output_instance_id) + + # get current state type. if map state add marker to execution Id + state_type = self._state_utils.functionstatetype + self._logger.debug("self._state_utils.functionstatetype: " + str(state_type)) + + if state_type == 'Map': + next_function_execution_id = self._metadata["__function_execution_id"] + "_" + str(output_instance_id)+"-M" self._output_counter_map[topic_next] += 1 trigger_metadata = copy.deepcopy(self._metadata) diff --git a/FunctionWorker/python/StateUtils.py b/FunctionWorker/python/StateUtils.py index 9d657c2d..fff348d8 100644 --- a/FunctionWorker/python/StateUtils.py +++ b/FunctionWorker/python/StateUtils.py @@ -26,7 +26,7 @@ from thriftpy2.protocol import TCompactProtocolFactory from thriftpy2.server import TSimpleServer from thriftpy2.thrift import TProcessor -from ujsonpath import parse +from ujsonpath import parse, tokenize import py3utils @@ -42,6 +42,9 @@ class StateUtils: failStateType = 'Fail' waitStateType = 'Wait' parallelStateType = 'Parallel' + mapStateType = 'Map' + _instcnt = 0 # instance counter + mapFunctionOutput = {} def __init__(self, functionstatetype=defaultStateType, functionstatename='', functionstateinfo='{}', functionruntime="", logger=None, workflowid=None, sandboxid=None, functiontopic=None, datalayer=None, storage_userid=None, internal_endpoint=None): self.operators = ['And', 'BooleanEquals', 'Not', 'NumericEquals', 'NumericGreaterThan', 'NumericGreaterThanEquals',\ @@ -60,8 +63,10 @@ def __init__(self, functionstatetype=defaultStateType, functionstatename='', fun self.default_next_choice = [] self.input_path_dict = {} + self.items_path_dict = {} self.result_path_dict = {} self.output_path_dict = {} + self.parameters_dict = {} self.functionstatetype = functionstatetype self.functionstatename = functionstatename self.functionstateinfo = functionstateinfo @@ -79,11 +84,29 @@ def __init__(self, functionstatetype=defaultStateType, functionstatename='', fun self.sandboxid = sandboxid self.choiceNext = '' + self.mapStateCounter = 0 + #self._mapStateInfo = {} + #self.batchAlreadyLaunched = [] + #self.currentMapInputMetadata = {} # initialise with empty dicts + self.evaluateCounter = 0 + self.catcher_list = [] self.retry_list = [] self._logger = logger self.parse_function_state_info() + self.function_output_batch_list = [] + self.tobeProcessedlater = [] + self.outputMapStatebatch = [] + self.mapPartialResult = {} + + def call_counter(func): + def helper(*args, **kwargs): + helper.calls += 1 + return func(*args, **kwargs) + helper.calls = 0 + helper.__name__= func.__name__ + return helper # find target next for error in catcher list def find_cat_data(self, err, cat_list): @@ -110,9 +133,40 @@ def find_ret_data(self, err, ret_list): ret_backoff_rate = ret['BackoffRate'] return ret_max_attempts, ret_interval_seconds, ret_backoff_rate + def isMapState(self): + return self.functionstatetype == StateUtils.mapStateType def isTaskState(self): return self.functionstatetype == StateUtils.taskStateType or self.functionstatetype == StateUtils.defaultStateType + def applyParameters(self, raw_state_input): + #2c. Apply Parameters, if available and applicable (The Parameters field is used in Map to select values in the input) + # in = raw_state_input + # if Parameters: + # in = raw_state_input[ItemsPath] + # + try: + function_input = raw_state_input + self._logger.debug("inside applyParameters: " + str(self.parameters_dict) + ", raw_state_input: " + str(raw_state_input)) + if self.parameters_dict: + function_input = self.process_parameters(self.parameters_dict, function_input) + return function_input + except Exception: + raise Exception("Parameters processing exception") + + def applyItemsPath(self, raw_state_input): + #2a. Apply ItemsPath, if available and applicable (The ItemsPath field is used in Map to select an array in the input) + # in = raw_state_input + # if ItemsPath: + # in = raw_state_input[ItemsPath] + # + try: + function_input = raw_state_input + if self.items_path_dict and 'ItemsPath' in self.items_path_dict: + function_input = self.process_items_path(self.items_path_dict, function_input) + return function_input + except Exception: + raise Exception("Items path processing exception") + def applyInputPath(self, raw_state_input): #2. Apply InputPath, if available (Extract function_input from raw_state_input) @@ -343,6 +397,331 @@ def evaluateChoiceConditions(self, function_input): self.choiceNext = self.evaluateNextState(function_input) self._logger.debug("[StateUtils] Evaluated Choice condition: " + str(self.choiceNext)) + + def evaluateMapState(self, function_input, key, metadata, sapi): + name_prefix = self.functiontopic + "_" + key + + if "MaxConcurrency" in self.parsedfunctionstateinfo: + maxConcurrency = self.parsedfunctionstateinfo["MaxConcurrency"] + else: + maxConcurrency = 0 + self.parsedfunctionstateinfo["MaxConcurrency"] = maxConcurrency + + if "Parameters" in self.parsedfunctionstateinfo: + mapParamters = self.parsedfunctionstateinfo["Parameters"] + else: + mapParameters = {} + + self._logger.debug("[StateUtils] evaluateMapState, maxConcurrency: " + str(maxConcurrency)) + + self._logger.debug("[StateUtils] evaluateMapState metadata: " + str(metadata)) + + counter_name_topic = self.sandboxid + "-" + self.workflowid + "-" + self.functionstatename + + total_branch_count = len(function_input) # all branches executed concurrently + + klist = [total_branch_count] + + self.parsedfunctionstateinfo["BranchCount"] = int(total_branch_count) # overwrite parsed BranchCount with new value + self._logger.debug("[StateUtils] evaluateMapState, total_branch_count: " + str(total_branch_count)) + + # translated from Parallel + counter_metadata = {} + counter_metadata["__state_action"] = "post_map_processing" + counter_metadata["__async_execution"] = metadata["__async_execution"] + workflow_instance_metadata_storage_key = name_prefix + "_workflow_metadata" + counter_metadata["WorkflowInstanceMetadataStorageKey"] = workflow_instance_metadata_storage_key + counter_metadata["CounterValue"] = 0 # this should be updated by riak hook + counter_metadata["Klist"] = klist + counter_metadata["TotalBranches"] = total_branch_count + counter_metadata["ExecutionId"] = key + counter_metadata["FunctionTopic"] = self.functiontopic + counter_metadata["Endpoint"] = self._internal_endpoint + + iterator = self.parsedfunctionstateinfo["Iterator"] + + #assert total_branch_count == len(self.parsedfunctionstateinfo["Branches"]) + + k_list = [] + + if "WaitForNumBranches" in self.parsedfunctionstateinfo: + k_list = self.parsedfunctionstateinfo["WaitForNumBranches"] + if not isinstance(k_list, list): + self._logger.debug("[StateUtils] WaitForNumBranches must be a sorted list with 1 or more integers") + raise Exception("[StateUtils] WaitForNumBranches must be a sorted list with 1 or more integers") + k_list.sort() + for k in k_list: + if not isinstance(k, int): + self._logger.debug("[StateUtils] Values inside WaitForNumBranches must be integers") + raise Exception("[StateUtils] Values inside WaitForNumBranches must be integers") + if k > total_branch_count: + self._logger.debug("[StateUtils] Values inside WaitForNumBranches list cannot be greater than the number of branches in the map state") + raise Exception("[StateUtils] Values inside WaitForNumBranches list cannot be greater than the number of branches in the map state") + else: + k_list.append(total_branch_count) + + counter_name_trigger_metadata = {"k-list": k_list, "total-branches": total_branch_count} + + # dynamic values used for generation of branches + counter_name_key = key + branch_out_keys = [] + for i in range(total_branch_count): + branch_out_key = key + "-branch-" + str(i+1) + branch_out_keys.append(branch_out_key) + + counter_name_value_metadata = copy.deepcopy(metadata) + counter_name_value_metadata["WorkflowInstanceMetadataStorageKey"] = workflow_instance_metadata_storage_key + counter_name_value_metadata["CounterValue"] = 0 # this should be updated by riak hook + counter_name_value_metadata["__state_action"] = "post_map_processing" + counter_name_value_metadata["state_counter"] = metadata["state_counter"] + self._logger.debug("[StateUtils] evaluateMapState, metadata[state_counter]: " + str(metadata["state_counter"])) + self.mapStateCounter = int(metadata["state_counter"]) + + counter_name_value = {"__mfnmetadata": counter_name_value_metadata, "__mfnuserdata": '{}'} + + CounterName = json.dumps([str(counter_name_topic), str(counter_name_key), counter_name_trigger_metadata, counter_name_value]) + + workflow_instance_outputkeys_set_key = key +"_"+ self.functionstatename + "_outputkeys_set" + mapInfo = {} + mapInfo["CounterTopicName"] = counter_name_topic + mapInfo["CounterNameKey"] = counter_name_key + mapInfo["TriggerMetadata"] = counter_name_trigger_metadata + mapInfo["CounterNameValueMetadata"] = counter_name_value_metadata + mapInfo["BranchOutputKeys"] = branch_out_keys + mapInfo["CounterName"] = CounterName + mapInfo["MaxConcurrency"] = maxConcurrency + mapInfo["BranchOutputKeysSetKey"] = workflow_instance_outputkeys_set_key + mapInfo["k_list"] = k_list + + mapInfo_key = self.functionstatename + "_" + key + "_map_info" + + metadata[mapInfo_key] = mapInfo + + self._logger.debug("[StateUtils] evaluateMapState: ") + self._logger.debug("\t CounterName:" + CounterName) + self._logger.debug("\t counter_name_topic:" + counter_name_topic) + self._logger.debug("\t counter_name_key: " + counter_name_key) + self._logger.debug("\t counter_name_trigger_metadata:" + json.dumps(counter_name_trigger_metadata)) + self._logger.debug("\t counter_name_value_metadata:" + json.dumps(counter_name_value_metadata)) + self._logger.debug("\t counter_name_value_encoded: " + json.dumps(counter_name_value)) + self._logger.debug("\t mapInfo_key:" + mapInfo_key) + #self._logger.debug("\t mapInfo:" + json.dumps(mapInfo)) + self._logger.debug("\t workflow_instance_metadata_storage_key: " + workflow_instance_metadata_storage_key) + #self._logger.debug("\t metadata " + json.dumps(metadata)) + self._logger.debug("\t total_branch_count:" + str(total_branch_count)) + self._logger.debug("\t branch_out_keys:" + ",".join(branch_out_keys)) + + # create counter for Map equivalent Parallel state + assert py3utils.is_string(CounterName) + counterName = str(mapInfo["CounterName"]) + counter_metadata_key_name = counterName + "_metadata" + + try: + dlc = DataLayerClient(locality=1, suid=self._storage_userid, is_wf_private=False, connect=self._datalayer) + + # create a triggerable counter to start the post-parallel when parallel state finishes + dlc.createCounter(CounterName, 0, tableName=dlc.countertriggerstable) + + dlc.put(counter_metadata_key_name, json.dumps(counter_metadata), tableName=dlc.countertriggersinfotable) + + except Exception as exc: + self._logger.error("Exception in creating counter: " + str(exc)) + self._logger.error(exc) + raise + finally: + dlc.shutdown() + + + assert py3utils.is_string(workflow_instance_metadata_storage_key) + self._logger.debug("[StateUtils] full_metadata_encoded put key: " + str(workflow_instance_metadata_storage_key)) + + sapi.put(workflow_instance_metadata_storage_key, json.dumps(metadata)) + + #assert py3utils.is_string(workflow_instance_outputkeys_set_key) + # sapi.createSet(workflow_instance_outputkeys_set_key) # obsolete statement + + + # Now provide each branch with its own input + + #branches = self.parsedfunctionstateinfo["Branches"] + branch = self.parsedfunctionstateinfo["Iterator"] # this is just onee set + #for branch in branches: + # lauch a branch for each input element + startat = str(branch["StartAt"]) + + + for i in range(len(function_input)): + sapi.add_dynamic_next(startat, function_input[i]) # Alias for add_workflow_next(self, next, value) + + sapi.put(name_prefix + "_" + "mapStateInputValue", str(function_input[i])) + sapi.put(name_prefix + "_" + "mapStateInputIndex", str(i)) + + #self._mapStateInfo["mapStateInputValue"] = str(function_input[i]) + #self._mapStateInfo["mapStateInputIndex"] = str(i) + + self._logger.debug("\t Map State StartAt:" + startat) + self._logger.debug("\t Map State input:" + str(function_input[i])) + + return function_input, metadata + + def evaluatePostMap(self, function_input, key, metadata, sapi): + + name_prefix = self.functiontopic + "_" + key + + # function is triggered by post-commit hook with metadata containing information abaout state results in buckets. + # It collects these results and returns metadata and post_map_output_results + + #self._logger.debug("[StateUtils] evaluatePostMap: ") + #self._logger.debug("\t key:" + key) + #self._logger.debug("\t metadata:" + json.dumps(metadata)) + #self._logger.debug("\t function_input: " + str(function_input)) + + action = metadata["__state_action"] + assert action == "post_map_processing" + #counterValue = metadata["CounterValue"] + counterValue = function_input["CounterValue"] + + state_counter = 0 + if "state_counter" in metadata: + state_counter = metadata["state_counter"] + + #self._logger.debug("[StateUtils] evaluatePostMap, metadata[state_counter]: " + str(metadata["state_counter"])) + + self._logger.debug("\t metadata:" + json.dumps(metadata)) + + workflow_instance_metadata_storage_key = str(function_input["WorkflowInstanceMetadataStorageKey"]) + assert py3utils.is_string(workflow_instance_metadata_storage_key) + full_metadata_encoded = sapi.get(workflow_instance_metadata_storage_key) + self._logger.debug("[StateUtils] full_metadata_encoded get: " + str(full_metadata_encoded)) + + full_metadata = json.loads(full_metadata_encoded) + full_metadata["state_counter"] = state_counter + + #mapInfoKey = key + "_" + self.functionstatename + "_map_info" + mapInfoKey = self.functionstatename + "_" + key + "_map_info" + mapInfo = full_metadata[mapInfoKey] + + branchOutputKeysSetKey = str(mapInfo["BranchOutputKeysSetKey"]) + branchOutputKeysSet = sapi.retrieveSet(branchOutputKeysSetKey) + self._logger.debug("\t branchOutputKeysSet: " + str(branchOutputKeysSet)) + + if not branchOutputKeysSet: + self._logger.error("[StateUtils] branchOutputKeysSet is empty") + raise Exception("[StateUtils] branchOutputKeysSet is empty") + + k_list = mapInfo["k_list"] + + self._logger.debug("\t action: " + action) + self._logger.debug("\t counterValue:" + str(counterValue)) + #self._logger.debug("\t WorkflowInstanceMetadataStorageKey:" + metadata["WorkflowInstanceMetadataStorageKey"]) + #self._logger.debug("\t full_metadata:" + full_metadata_encoded) + self._logger.debug("\t mapInfoKey: " + mapInfoKey) + #self._logger.debug("\t mapInfo:" + json.dumps(mapInfo)) + self._logger.debug("\t branchOutputKeysSetKey:" + branchOutputKeysSetKey) + self._logger.debug("\t branchOutputKeysSet:" + str(branchOutputKeysSet)) + self._logger.debug("\t k_list:" + str(k_list)) + + NumBranchesFinished = abs(counterValue) + self._logger.debug("\t NumBranchesFinished:" + str(NumBranchesFinished)) + + do_cleanup = False + + if k_list[-1] == NumBranchesFinished: + do_cleanup = True + + self._logger.debug("\t do_cleanup:" + str(do_cleanup)) + + counterName = str(mapInfo["CounterName"]) + counter_metadata_key_name = counterName + "_metadata" + assert py3utils.is_string(counterName) + + if do_cleanup: + assert py3utils.is_string(counterName) + try: + dlc = DataLayerClient(locality=1, suid=self._storage_userid, is_wf_private=False, connect=self._datalayer) + + # done with the triggerable counter + dlc.deleteCounter(counterName, tableName=dlc.countertriggerstable) + + dlc.delete(counter_metadata_key_name, tableName=dlc.countertriggersinfotable) + + except Exception as exc: + self._logger.error("Exception deleting counter: " + str(exc)) + self._logger.error(exc) + raise + finally: + dlc.shutdown() + + post_map_output_values = [] + + self._logger.debug("\t mapInfo_BranchOutputKeys:" + str(mapInfo["BranchOutputKeys"])) + + self._logger.debug("\t mapInfo_BranchOutputKeys length: " + str(len(mapInfo["BranchOutputKeys"]))) + + for outputkey in mapInfo["BranchOutputKeys"]: + outputkey = str(outputkey) + if outputkey in branchOutputKeysSet: # mapInfo["BranchOutputKeys"]: + self._logger.debug("\t BranchOutputKey:" + outputkey) + while sapi.get(outputkey) == "": + time.sleep(0.1) # wait until value is available + + branchOutput = sapi.get(outputkey) + branchOutput_decoded = json.loads(branchOutput) + self._logger.debug("\t branchOutput(type):" + str(type(branchOutput))) + self._logger.debug("\t branchOutput:" + branchOutput) + self._logger.debug("\t branchOutput_decoded(type):" + str(type(branchOutput_decoded))) + self._logger.debug("\t branchOutput_decoded:" + str(branchOutput_decoded)) + post_map_output_values = post_map_output_values + [branchOutput_decoded] + if do_cleanup: + sapi.delete(outputkey) # cleanup the key from data layer + self._logger.debug("\t cleaned output key:" + outputkey) + else: + post_map_output_values = post_map_output_values + [None] + self._logger.debug("\t this_BranchOutputKeys is not contained: " + str(outputkey)) + + self._logger.debug("\t post_map_output_values:" + str(post_map_output_values)) + while (sapi.get(name_prefix + "_" + "mapStatePartialResult")) == "": + time.sleep(0.1) # wait until value is available + + mapStatePartialResult = ast.literal_eval(sapi.get(name_prefix + "_" + "mapStatePartialResult")) + #mapStatePartialResult = ast.literal_eval(self._mapStateInfo["mapStatePartialResult"]) + + mapStatePartialResult += post_map_output_values + sapi.put(name_prefix + "_" + "mapStatePartialResult", str(mapStatePartialResult)) + #self._mapStateInfo["mapStatePartialResult"] = str(mapStatePartialResult) + + # now apply ResultPath and OutputPath + if do_cleanup: + + sapi.deleteSet(branchOutputKeysSetKey) + + if ast.literal_eval(sapi.get(name_prefix + "_" + "mapInputCount")) == len(mapStatePartialResult): + # if ast.literal_eval(self._mapStateInfo["mapInputCount"]) == len(mapStatePartialResult): + + # we are ready to publish but need to honour ResultPath and OutputPath + res_raw = ast.literal_eval(sapi.get(name_prefix + "_" +"mapStatePartialResult")) + #res_raw = ast.literal_eval(self._mapStateInfo["mapStatePartialResult"]) + + # remove unwanted keys from input before publishing + function_input = {} + + function_input_post_result = self.applyResultPath(function_input, res_raw) + function_input_post_output = self.applyResultPath(function_input_post_result, function_input_post_result) + if "Next" in self.parsedfunctionstateinfo: + if self.parsedfunctionstateinfo["Next"]: + sapi.add_dynamic_next(self.parsedfunctionstateinfo["Next"], function_input_post_output ) + + if "End" in self.parsedfunctionstateinfo: + if self.parsedfunctionstateinfo["End"]: + sapi.add_dynamic_next("end", function_input_post_output) + sapi.delete(name_prefix + "_" + "mapInputCount") + sapi.delete(name_prefix + "_" + "mapStateInputIndex") + sapi.delete(name_prefix + "_" + "mapStateInputValue") + sapi.delete(name_prefix + "_" + "mapStatePartialResult") + sapi.delete(name_prefix + "_" + "tobeProcessedlater") + post_map_output_values = function_input_post_output + return post_map_output_values, full_metadata + def evaluateParallelState(self, function_input, key, metadata, sapi): name_prefix = self.functiontopic + "_" + key total_branch_count = self.parsedfunctionstateinfo["BranchCount"] @@ -479,6 +858,72 @@ def processBranchTerminalState(self, key, value_output, metadata, sapi): self._logger.error("[StateUtils] processBranchTerminalState Unable to find ParallelInfo") raise Exception("processBranchTerminalState Unable to find ParallelInfo") + if self.parsedfunctionstateinfo["End"] and "ParentMapInfo" in self.parsedfunctionstateinfo: + + parentMapInfo = self.parsedfunctionstateinfo["ParentMapInfo"] + + #self._logger.debug("[StateUtils] processBranchTerminalState:parentMapInfo: " + str(parentMapInfo)) + mapName = parentMapInfo["Name"] + #self._logger.debug("[StateUtils] processBranchTerminalState:mapName: " + str(mapName)) + mapInfoKey = mapName + "_" + key + "_map_info" + #self._logger.debug("[StateUtils] processBranchTerminalState:mapInfoKey: " + str(mapInfoKey)) + + branchCounter = parentMapInfo["BranchCounter"] + + #self._logger.debug("[StateUtils] processBranchTerminalState: ") + #self._logger.debug("\t ParentMapInfo:" + json.dumps(parentMapInfo)) + #self._logger.debug("\t mapName:" + mapName) + #self._logger.debug("\t branchCounter: " + str(branchCounter)) + #self._logger.debug("\t key:" + key) + #self._logger.debug("\t metadata:" + json.dumps(metadata)) + #self._logger.debug("\t value_output(type):" + str(type(value_output))) + #self._logger.debug("\t value_output:" + value_output) + + if mapInfoKey in metadata: + mapInfo = metadata[mapInfoKey] + + rest = metadata["__function_execution_id"].split("_")[1:] + for codes in rest: # find marker for map state and use it to calculate curent index + if "-M" in codes: + index = rest.index(codes) + current_index = int(rest[index].split("-M")[0]) + + self._logger.debug("[StateUtils] current_index: " + str(current_index)) + if mapInfo["MaxConcurrency"] != 0: + current_index = current_index % int(mapInfo["MaxConcurrency"]) + + counterName = str(mapInfo["CounterName"]) + branchOutputKeys = mapInfo["BranchOutputKeys"] + #branchOutputKey = str(branchOutputKeys[branchCounter-1]) + branchOutputKey = str(branchOutputKeys[current_index]) + + branchOutputKeysSetKey = str(mapInfo["BranchOutputKeysSetKey"]) + + self._logger.debug("\t branchOutputKey:" + branchOutputKey) + self._logger.debug("\t branchOutputKeysSetKey:" + branchOutputKeysSetKey) + + assert py3utils.is_string(branchOutputKey) + sapi.put(branchOutputKey, value_output) + + assert py3utils.is_string(branchOutputKeysSetKey) + sapi.addSetEntry(branchOutputKeysSetKey, branchOutputKey) + + assert py3utils.is_string(counterName) + try: + dlc = DataLayerClient(locality=1, suid=self._storage_userid, is_wf_private=False, connect=self._datalayer) + + # increment the triggerable counter + dlc.incrementCounter(counterName, 1, tableName=dlc.countertriggerstable) + except Exception as exc: + self._logger.error("Exception incrementing counter: " + str(exc)) + self._logger.error(exc) + raise + finally: + dlc.shutdown() + + else: + self._logger.error("[StateUtils] processBranchTerminalState Unable to find MapInfo") + raise Exception("processBranchTerminalState Unable to find MapInfo") def evaluatePostParallel(self, function_input, key, metadata, sapi): #self._logger.debug("[StateUtils] evaluatePostParallel: ") @@ -642,8 +1087,8 @@ def evaluateNonTaskState(self, function_input, key, metadata, sapi): # need to communicate with datalayer for definition of trigger for hibernating/resuming task wait_state_timestamppath_data = [match.value for match in parse(wait_state_timestamppath).find(function_input)] if wait_state_timestamppath_data == []: - #self._logger.exception("[StateUtils] Wait state timestamppath does not match: " + str(wait_state_timestamppath)) - raise Exception("Wait state timestamppath does not match") + #self._logger.exception("[StateUtils] Wait state timestamp_path does not match: " + str(wait_state_timestamppath)) + raise Exception("Wait state timestamp_path does not match") self._logger.debug("[StateUtils] Wait state timestamppath data parsed:" + str(wait_state_timestamppath_data[0])) @@ -704,6 +1149,60 @@ def evaluateNonTaskState(self, function_input, key, metadata, sapi): if metadata["__state_action"] == "post_parallel_processing": function_output, metadata = self.evaluatePostParallel(function_input, key, metadata, sapi) + elif self.functionstatetype == StateUtils.mapStateType: + name_prefix = self.functiontopic + "_" + key + + self._logger.debug("[StateUtils] Map state handling function_input: " + str(function_input)) + self._logger.debug("[StateUtils] Map state handling metadata: " + str(metadata)) + + if "MaxConcurrency" in self.parsedfunctionstateinfo.keys(): + maxConcurrency = int(self.parsedfunctionstateinfo["MaxConcurrency"]) + else: + maxConcurrency = 0 + + self._logger.debug("[StateUtils] Map state maxConcurrency: " + str(maxConcurrency)) + self._logger.debug("[StateUtils] Map state handling") + + if "__state_action" not in metadata or metadata["__state_action"] != "post_map_processing": + # here we start the iteration process on a first batch + if maxConcurrency != 0: + tobeProcessednow = function_input[:maxConcurrency] # take the first maxConcurrency elements + tobeProcessedlater = function_input[maxConcurrency:] # keep the remaining elements for later + else: + tobeProcessednow = function_input + tobeProcessedlater = [] + self._logger.debug("[StateUtils] Map state function_input split:" + str(tobeProcessednow) + " " + str(tobeProcessedlater)) + sapi.put(name_prefix + "_" + "tobeProcessedlater", str(tobeProcessedlater)) # store elements to be processed on DL + sapi.put(name_prefix + "_" + "mapStatePartialResult", "[]") # initialise the collector variable + sapi.put(name_prefix + "_" + "mapInputCount", str(len(function_input))) + + """ + metadata["tobeProcessedlater"] = str(tobeProcessedlater) # store elements to be processed on DL + metadata["mapStatePartialResult"] = "[]" # initialise the collector variable + metadata["mapInputCount"] = str(len(function_input)) + + """ + + function_output, metadata = self.evaluateMapState(tobeProcessednow, key, metadata, sapi) + + elif metadata["__state_action"] == "post_map_processing": + tobeProcessedlater = ast.literal_eval(sapi.get(name_prefix + "_" + "tobeProcessedlater")) # get all elements that have not yet been processed + #tobeProcessedlater = ast.literal_eval(self._mapStateInfo["tobeProcessedlater"]) # get all elements that have not yet been processed + self._logger.debug("[StateUtils] Map state post_map processing input:" + str(tobeProcessedlater)) + # we need to decide at this point if there is a need for more batches. if so: + + if len(tobeProcessedlater) > 0: # we need to start another batch + function_output, metadata2 = self.evaluatePostMap(function_input, key, metadata, sapi) # take care not to overwrite metadata + function_output, metadata = self.evaluateMapState(tobeProcessedlater[:maxConcurrency], key, metadata, sapi) # start a new batch + sapi.put(name_prefix + "_" + "tobeProcessedlater", str(tobeProcessedlater[maxConcurrency:])) # store remaining elements to be processed on DL + #self._mapStateInfo["tobeProcessedlater"] = str(tobeProcessedlater[maxConcurrency:]) # store remaining elements to be processed on DL + else: # no more batches required. we are at the iteration end, publish the final result + self._logger.debug("[StateUtils] Map state input final stage: " + str(function_input)) + function_output, metadata = self.evaluatePostMap(function_input, key, metadata, sapi) + + else: + raise Exception("Unknow action type in map state") + else: raise Exception("Unknown state type") @@ -724,7 +1223,7 @@ def applyResultPath(self, raw_state_input, function_output): # raw_state_input_midway = function_output # raw_state_input_midway = raw_state_input - #self._logger.debug("Reached applyResultPath!") + #self._logger.debug("Reached applyResultPath: " + str(self.result_path_dict)) try: if self.result_path_dict and 'ResultPath' in self.result_path_dict: raw_state_input_midway = self.process_result_path(self.result_path_dict, raw_state_input, function_output) @@ -798,12 +1297,9 @@ def parse_function_state_info(self): if "ResultPath" in statedef: self.result_path_dict['ResultPath'] = statedef['ResultPath'] - #self._logger.debug("found ResultPath: " + json.dumps(self.result_path_dict['ResultPath'])) - - #if "Next" in statedef: - # self._logger.debug("found Next: " + json.dumps(statedef['Next'])) - #if "Resource" in statedef: - # self._logger.debug("found Resource: " + json.dumps(statedef['Resource'])) + if "Parameters" in statedef: + self.parameters_dict['Parameters'] = statedef['Parameters'] + self._logger.debug("found Parameters: " + json.dumps(self.parameters_dict['Parameters'])) if "Catch" in statedef: self.catcher_list = statedef['Catch'] @@ -913,6 +1409,10 @@ def parse_function_state_info(self): self.result_path_dict['ResultPath'] = statedef['ResultPath'] self._logger.debug("found ResultPath: " + json.dumps(self.result_path_dict['ResultPath'])) + if "Parameters" in statedef: + self.parameters_dict['Parameters'] = statedef['Parameters'] + self._logger.debug("found Parameters: " + json.dumps(self.parameters_dict['Parameters'])) + #self._logger.debug("found Next: " + json.dumps(statedef['Next'])) #self._logger.debug("found Result: " + json.dumps(statedef['Result'])) @@ -931,6 +1431,33 @@ def parse_function_state_info(self): self.result_path_dict['ResultPath'] = statedef['ResultPath'] self._logger.debug("found ResultPath: " + json.dumps(self.result_path_dict['ResultPath'])) + if "Parameters" in statedef: + self.parameters_dict['Parameters'] = statedef['Parameters'] + self._logger.debug("found Parameters: " + json.dumps(self.parameters_dict['Parameters'])) + + if statetype == StateUtils.mapStateType: + #self._logger.debug("[StateUtils] Parallel state parsing") + + if "InputPath" in statedef: + self.input_path_dict['InputPath'] = statedef['InputPath'] + self._logger.debug("found InputPath: " + json.dumps(self.input_path_dict['InputPath'])) + + if "ItemsPath" in statedef: + self.items_path_dict['ItemsPath'] = statedef['ItemsPath'] + self._logger.debug("found ItemsPath: " + json.dumps(self.items_path_dict['ItemsPath'])) + + if "ResultPath" in statedef: + self.result_path_dict['ResultPath'] = statedef['ResultPath'] + self._logger.debug("found ResultPath: " + json.dumps(self.result_path_dict['ResultPath'])) + + if "OutputPath" in statedef: + self.output_path_dict['OutputPath'] = statedef['OutputPath'] + self._logger.debug("found OutputPath: " + json.dumps(self.output_path_dict['OutputPath'])) + + if "Parameters" in statedef: + self.parameters_dict['Parameters'] = statedef['Parameters'] + self._logger.debug("found Parameters: " + json.dumps(self.parameters_dict['Parameters'])) + def EvaluateNode(self, node): """ @@ -992,6 +1519,124 @@ def evaluate(self, expression): result = "%s %s '%s'" % (expr[0], expr[1], expr[2]) # we want to compare strings with strings return result + def process_parameters(self, parameters, state_data): + """ + evaluate JSON path Paramaters in conjunction with state_data + """ + parameters = parameters['Parameters'] + ret_value = None + ret_item_value = None + + if parameters == "$": # return unfiltered input data + ret_value = state_data + elif parameters is None: #return empty json + ret_value = {} + else: # contains a parameter filter, get it and return selected kv pairs + ret_value = {} + ret_index = {} + + for key in parameters.keys(): # process parameters keys + if key.casefold() == "comment".casefold(): # ignore + ret_value[key] = parameters[key] + elif parameters[key] == "$$.Map.Item.Value": # get Items key + value_key = key.split(".$")[0] + ret_value = value_key + ret_item_value = value_key + elif parameters[key] == "$$.Map.Item.Index": # get Index key + index_key = key.split(".$")[0] + ret_index = index_key + else: # processing more complex Parameters values + if isinstance(parameters[key], dict): # parameters key refers to dict value + ret_value[key] = {} + for k in parameters[key]: # get nested keys + if not k.split(".")[-1] == "$": # parse static value + print (parameters[key][k]) + ret_value[key][k] = parameters[key][k] + else: + new_key = k.split(".$")[0] # use the json paths in paramters to match + ret_value[key][new_key] = [match.value for match in parse(parameters[key][k]).find(state_data)][0] + return ret_value + + if isinstance(parameters[key], str): # parameters key refers to string value + ret_value = {} + new_key = key.split(".$")[0] # get the parameters key + query_key = parameters[key].split("$.")[1] # correct the correspondig value + new_value = state_data[query_key] # save the actual value before replacing the key + for kk in state_data.keys(): + if isinstance(state_data[kk], dict): # value encapsulates dict + ret_value[new_key] = new_value + if ret_item_value != None: + ret_value[ret_item_value] = state_data[kk] + else: + raise Exception("Error: item value is not set!") + ret_value_dict = {} + ret_value_dict[kk] = ret_value + return ret_value_dict + + if isinstance(state_data[kk], list): # value encapsulates list + ret_value_list = [] + for data in state_data[kk]: + ret_value_list.append({new_key: new_value, ret_item_value: data}) + ret_value_dict = {} + ret_value_dict[kk] = ret_value_list + return ret_value_dict + else: + raise Exception("Error: invaldid Parmeters format: " + str(parameters[key])) + + # calculate transformed state output provided to Iterator + ret_total = [] + ret_total_dict = {} + + if isinstance(state_data, dict): + for kk in state_data.keys(): + for key in state_data[kk]: + if ret_value != {} and ret_index == {}: + ret_total.append({ret_value: key}) + elif ret_value == {} and ret_index != {}: + ret_total.append({ret_index: state_data[kk].index(key) }) + elif ret_value != {} and ret_index != {}: + ret_total.append({ret_value: key, ret_index: state_data[kk].index(key) }) + else: + raise Exception("Map State Parameters parse error on dict input: " + str(state_data)) + ret_total_dict[kk] = ret_total + ret_value = ret_total_dict + + elif isinstance(state_data, list): + for key in state_data: + if ret_value != {} and ret_index == {}: + ret_total.append({ret_value: key}) + elif ret_value == {} and ret_index != {}: + ret_total.append({ret_index: state_data.index(key) }) + elif ret_value != {} and ret_index != {}: + ret_total.append({ret_value: key, ret_index: state_data.index(key) }) + else: + raise Exception("Map State Parameters parse error on list input: " + str(list)) + ret_value = ret_total + else: + raise Exception("Map state parse error: invalid state input") + + return ret_value + + def process_items_path(self, path_fields, state_data): + ret_value = None + if 'ItemsPath' not in list(path_fields.keys()): + path_fields['ItemsPath'] = "$" + + input_path = path_fields['ItemsPath'] + + if input_path == "$": # return unfiltered input data + ret_value = state_data + elif input_path is None: #return empty list + ret_value = [] + else: # it contains a filter, get it and return selected list in input + self._logger.debug("seeing items_path filter: " + str(input_path) + " " + str(state_data)) + filtered_state_data = [match.value for match in parse(input_path).find(state_data)] + if not filtered_state_data: + raise Exception("Items Path processing exception: no match with map state item, invalid path!") + else: + filtered_state_data = [match.value for match in parse(input_path).find(state_data)][0] + ret_value = filtered_state_data + return ret_value def process_input_path(self, path_fields, state_data): ret_value = None @@ -1003,21 +1648,24 @@ def process_input_path(self, path_fields, state_data): if input_path == "$": # return unfiltered input data ret_value = state_data - #return state_data elif input_path is None: #return empty dict ret_value = {} - #return {} - else: # it contains a filter, get it + else: # input_path contains a filter, get and apply it + self._logger.debug("seeing input_path filter: " + str(input_path) + " " + str(state_data)) filtered_state_data = [match.value for match in parse(input_path).find(state_data)] + self._logger.debug("after seeing input_path filter: " + str(filtered_state_data)) if not filtered_state_data: raise Exception("Input Path processing exception: no match with state input item, invalid path!") else: filtered_state_data = [match.value for match in parse(input_path).find(state_data)][0] ret_value = filtered_state_data - #return filtered_state_data return ret_value + def nested_dict(self, keys, value): + if len(keys) == 1: + return {keys[0]: value} + return {keys[0]: self.nested_dict(keys[1:], value)} def process_result_path(self, path_fields, state_data, task_output): ret_value = None @@ -1031,21 +1679,16 @@ def process_result_path(self, path_fields, state_data, task_output): if result_path == "$": ret_value = state_data - #return state_data elif result_path is None: ret_value = {} - #return {} else: # result_path is not empty so is there a match? - key = str(parse(result_path).nodes[-1].value[0]) # get the new key for results - - filtered_state_data = [match.value for match in parse(result_path).find(task_output)] - if not filtered_state_data: # there is no match between result_path and task_output, create new key for task_output - filtered_state_data = {key: task_output} - else: # there is a match, overwrite with task output of that key - filtered_state_data = {key: task_output[key]} - - ret_value = dict(list(filtered_state_data.items()) + list(state_data.items())) - #return dict(list(filtered_state_data.items()) + list(state_data.items())) + self._logger.debug("inside ResultPath processing: " + str(result_path) + " " + str(task_output) ) + keys = list(tokenize(result_path)) # get all keys + filtered_state_data = self.nested_dict(keys[1:], task_output) + if isinstance(state_data, dict): + ret_value = dict(list(filtered_state_data.items()) + list(state_data.items())) # adding key and values to new dict + else: + ret_value = filtered_state_data return ret_value @@ -1053,16 +1696,13 @@ def process_output_path(self, path_fields, raw_state_input_midway): ret_value = None if 'OutputPath' not in list(path_fields.keys()): path_fields['OutputPath'] = "$" - #return raw_state_input_midway output_path = path_fields['OutputPath'] if output_path == "$": ret_value = raw_state_input_midway - #return raw_state_input_midway elif output_path is None: ret_value = {} - #return {} else: # output_path is not empty so is there a match? filtered_state_data = [match.value for match in parse(output_path).find(raw_state_input_midway)] if not filtered_state_data: @@ -1071,10 +1711,10 @@ def process_output_path(self, path_fields, raw_state_input_midway): key = str(parse(output_path).nodes[-1].value[0]) filtered_state_data = raw_state_input_midway[key] ret_value = filtered_state_data - #return filtered_state_data return ret_value + def traverse(self, path, obj): """ Traverse the object recursively and print every path / value pairs. diff --git a/GUI/.gitignore b/GUI/.gitignore new file mode 100644 index 00000000..ff00d23c --- /dev/null +++ b/GUI/.gitignore @@ -0,0 +1,2 @@ +app/pages/docs/sdk/sdk.md +app/pages/docs/sdk/cli.md diff --git a/GUI/Makefile b/GUI/Makefile index f100d8fd..68b4c1fa 100644 --- a/GUI/Makefile +++ b/GUI/Makefile @@ -23,7 +23,9 @@ app/pages/docs/sdk/cli.md: ../mfn_cli/README.md -include ../proxy.mk include ../docker.mk -image: $(shell find ./ -type f|grep -v Makefile) +image: $(shell find ./ -type f|grep -v -e Makefile -e app/pages/docs/sdk/sdk.md -e app/pages/docs/sdk/cli.md) \ + app/pages/docs/sdk/cli.md \ + app/pages/docs/sdk/sdk.md $(call build_image,Dockerfile,microfn/nginx) push: image diff --git a/GUI/app/pages/workflows/WorkflowEditorCtrl.js b/GUI/app/pages/workflows/WorkflowEditorCtrl.js index a9b01a66..b2c549db 100644 --- a/GUI/app/pages/workflows/WorkflowEditorCtrl.js +++ b/GUI/app/pages/workflows/WorkflowEditorCtrl.js @@ -31,6 +31,9 @@ var fail = $.getJSON("lib/asl-validator/schemas/fail.json", function(json) { }); + var mapState = $.getJSON("lib/asl-validator/schemas/map.json", function(json) { + }); + var parallel = $.getJSON("lib/asl-validator/schemas/parallel.json", function(json) { }); @@ -1209,6 +1212,7 @@ choice.responseJSON, fail.responseJSON, parallel.responseJSON, + mapState.responseJSON, pass.responseJSON, stateMachine.responseJSON, state.responseJSON, diff --git a/GUI/lib/asl-validator/schemas/map.json b/GUI/lib/asl-validator/schemas/map.json new file mode 100644 index 00000000..e4e7c55d --- /dev/null +++ b/GUI/lib/asl-validator/schemas/map.json @@ -0,0 +1,95 @@ +{ + "$id": "http://asl-validator.cloud/map#", + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "Type": { + "type": "string", + "pattern": "^Map$" + }, + "Next": { + "type": "string" + }, + "End": { + "enum": [true] + }, + "Comment": { + "type": "string" + }, + "OutputPath": { + "type": ["string", "null"] + }, + "InputPath": { + "type": ["string", "null"] + }, + "ResultPath": { + "type": ["string", "null"] + }, + "ItemsPath": { + "type": ["string", "null"] + }, + "MaxConcurrency": { + "type": "number", + "minimum": 0 + }, + "Iterator": { + "$ref": "http://asl-validator.cloud/state-machine#" + }, + "Parameters": { + "type": "object" + }, + "Retry": { + "type": "array", + "items": { + "types": "object", + "properties": { + "ErrorEquals": { + "type": "array", + "items": { + "types": "string" + } + }, + "IntervalSeconds": { + "type": "number", + "minimum": 0 + }, + "MaxAttempts": { + "type": "number", + "minimum": 0 + }, + "BackoffRate": { + "type": "number", + "minimum": 0 + } + }, + "required": ["ErrorEquals"] + } + }, + "Catch": { + "type": "array", + "items": { + "types": "object", + "properties": { + "ErrorEquals": { + "type": "array", + "items": { + "types": "string" + } + }, + "Next": { + "type": "string" + } + }, + "required": ["ErrorEquals", "Next"] + } + } + }, + "oneOf": [{ + "required": ["Next"] + }, { + "required": ["End"] + }], + "required": ["Type", "Iterator"], + "additionalProperties": false +} + diff --git a/GUI/lib/asl-validator/schemas/pass.json b/GUI/lib/asl-validator/schemas/pass.json index 31ca6e75..0c37468b 100644 --- a/GUI/lib/asl-validator/schemas/pass.json +++ b/GUI/lib/asl-validator/schemas/pass.json @@ -25,6 +25,9 @@ "ResultPath": { "type": "string" }, + "Parameters": { + "type": "object" + }, "Result": {} }, "oneOf": [{ diff --git a/GUI/lib/asl-validator/schemas/state.json b/GUI/lib/asl-validator/schemas/state.json index 8acb940e..350ae42f 100644 --- a/GUI/lib/asl-validator/schemas/state.json +++ b/GUI/lib/asl-validator/schemas/state.json @@ -11,6 +11,9 @@ { "$ref": "http://asl-validator.cloud/parallel#" }, + { + "$ref": "http://asl-validator.cloud/map#" + }, { "$ref": "http://asl-validator.cloud/pass#" }, diff --git a/ManagementService/Makefile b/ManagementService/Makefile index f490a1b2..bf0128af 100644 --- a/ManagementService/Makefile +++ b/ManagementService/Makefile @@ -28,7 +28,7 @@ thrift: $(THRIFT) $(THRIFT): ../DataLayerService/thrift/DataLayerMessage.thrift ../DataLayerService/thrift/DataLayerService.thrift mkdir -p data_layer - docker run --rm -v $(CURDIR)/..:/root -w /root thrift:0.12.0 bash -c '\ + docker run --user $$(id -u):$$(id -g) --rm -v $(CURDIR)/..:/root -w /root thrift:0.12.0 bash -c '\ thrift --gen py -out ManagementService/ DataLayerService/thrift/DataLayerMessage.thrift; \ thrift --gen py -out ManagementService/ DataLayerService/thrift/DataLayerService.thrift' diff --git a/ManagementService/management_init.py b/ManagementService/management_init.py index aebaaff0..6ed4442f 100755 --- a/ManagementService/management_init.py +++ b/ManagementService/management_init.py @@ -74,6 +74,7 @@ def parse_states(state_map): if stype == "Task": functions.append(wfs["Resource"]) + elif stype == 'Parallel': # add the parallel state function worker states.append({'name':sresource,'type':stype}) @@ -83,7 +84,18 @@ def parse_states(state_map): # add found functions to list of function workers functions.extend(sub_functions) states.extend(sub_states) - elif stype in {'Choice', 'Pass', 'Wait', 'Fail', 'Succeed'}: #, 'Parallel'}: + + elif stype == 'Map': + # add the Map state iterator function worker + states.append({'name':sresource,'type':stype}) + # find recursively everything that is in the branch + branch = state_map[sresource]['Iterator'] + sub_functions, sub_states = parse_states(branch['States']) + # add found functions to list of function workers + functions.extend(sub_functions) + states.extend(sub_states) + + elif stype in {'Choice', 'Pass', 'Wait', 'Fail', 'Succeed'}: states.append({'name':sresource,'type':stype}) else: raise Exception("Unknown state type: " + stype) diff --git a/ManagementService/python/deleteWorkflow.py b/ManagementService/python/deleteWorkflow.py index cc20ad5d..bf6b7acc 100644 --- a/ManagementService/python/deleteWorkflow.py +++ b/ManagementService/python/deleteWorkflow.py @@ -39,7 +39,7 @@ def handle(value, sapi): wf = sapi.get(email + "_workflow_" + workflow["id"], True) if wf is not None and wf != "": wf = json.loads(wf) - if wf["status"] == "undeployed": + if wf["status"] == "undeployed" or wf["status"] == "failed": for wn in workflows: if workflows[wn] == workflow["id"]: del workflows[wn] diff --git a/ManagementService/python/deployWorkflow.py b/ManagementService/python/deployWorkflow.py index b9a2e2e6..340da89e 100644 --- a/ManagementService/python/deployWorkflow.py +++ b/ManagementService/python/deployWorkflow.py @@ -48,6 +48,22 @@ def check_workflow_functions(wf_type, wfobj, email, sapi): for state in list(branches['States'].keys()): # state is the key wf_state_map[state] = branches['States'][state] # add the state to the state map root + if wf_state_map[state_names]["Type"] == "Map": + mapStateName = state_names + iterator = wf_state_map[mapStateName]['Iterator'] # this is a dict + + states_dict = iterator['States'] # this is a also dict + print (json.dumps(states_dict)) + for state in states_dict.keys(): + print ("FOUND MAP STATE: "+str(state)) + wf_state_map[state] = states_dict[state] + + """ + for iterators in wf_state_map[mapStateName]['Iterator']: + for state in list(iterators['States'].keys()): # state is the key + wf_state_map[state] = iterators['States'][state] # add the state to the state map root + """ + for wfsname in wf_state_map: wfs = wf_state_map[wfsname] if wfs["Type"] == "Task": diff --git a/ManagementService/python/getWorkflows.py b/ManagementService/python/getWorkflows.py index a3d60b6b..728aa04f 100644 --- a/ManagementService/python/getWorkflows.py +++ b/ManagementService/python/getWorkflows.py @@ -138,9 +138,9 @@ def handle(value, sapi): try: workflows = sapi.get(email + "_list_workflows", True) if workflows is None or workflows == "": - raise Exception("Couldn't retrieve workflow status; no such workflow.") - - workflows = json.loads(workflows) + workflows = {} + else: + workflows = json.loads(workflows) # get single workflow status if "workflow" in data and "id" in data["workflow"]: diff --git a/SandboxAgent/workflow.py b/SandboxAgent/workflow.py index 9a17beff..cbf72bd4 100644 --- a/SandboxAgent/workflow.py +++ b/SandboxAgent/workflow.py @@ -28,6 +28,7 @@ class WorkflowStateType: FAIL_STATE_TYPE = "Fail" WAIT_STATE_TYPE = "Wait" PARALLEL_STATE_TYPE = "Parallel" + MAP_STATE_TYPE = "Map" class WorkflowNode: def __init__(self, topic, nextNodes, potNext, gwftype, gwfstatename, gwfstateinfo, is_session_function, sgparams, logger): @@ -100,6 +101,9 @@ def __init__(self, uid, sid, wid, wf_type, wfstr, logger): # throws Exception self.parallelStateNamesStack = collections.deque([]) self.parallelBranchCounterStack = collections.deque([]) + self.mapStateNamesStack = collections.deque([]) + self.mapBranchCounterStack = collections.deque([]) + self.workflowExitPoint = None self.workflowExitTopic = None @@ -254,6 +258,10 @@ def createAndAddASLWorkflowNode(self, gname, nextNodes, potNext, gwfstatetype, g wfnode = WorkflowNode(topic, nextNodes, potNext, gwfstatetype, gwfstatename, gwfstateinfo, is_session_function, sgparams, self._logger) self.workflowNodeMap[topic] = wfnode # add new node to workflow node map + def insideMapBranchAlready(self): + if self.mapStateNamesStack: + return True # not empty + def insideParallelBranchAlready(self): if self.parallelStateNamesStack: return True # not empty @@ -266,14 +274,21 @@ def constructParentParallelInfo(self): parentInfo["BranchCounter"] = self.parallelBranchCounterStack[len(self.parallelBranchCounterStack)-1] # peek() operation return parentInfo + def constructParentMapInfo(self): + parentInfo = {} + parentInfo["Name"] = self.mapStateNamesStack[len(self.mapStateNamesStack)-1] # peek() operation + #parentInfo["BranchCounter"] = self.mapBranchCounterStack[len(self.parallelBranchCounterStack)-1] # peek() operation + parentInfo["BranchCounter"] = 1 # hardcoded branch count, required? + return parentInfo + + def parseStates(self, workflowstates): for statename in workflowstates.keys(): # loop over ASL states stateinfo = workflowstates[statename] # loop over the states in the set assert "Type" in stateinfo.keys() statetype = stateinfo["Type"] - self._logger.info("parseStates: State name: " + statename) - self._logger.info("parseStates: State type: " + statetype) + self._logger.info("parseStates: State name: %s, state type: %s", statename, statetype) if statetype == WorkflowStateType.TASK_STATE_TYPE: self.parseTaskState(statename, stateinfo) @@ -289,6 +304,8 @@ def parseStates(self, workflowstates): self.parseWaitState(statename, stateinfo) elif statetype == WorkflowStateType.PARALLEL_STATE_TYPE: self.parseParallelState(statename, stateinfo) + elif statetype == WorkflowStateType.MAP_STATE_TYPE: + self.parseMapState(statename, stateinfo) else: raise Exception("Error: unknown state type") @@ -303,7 +320,7 @@ def parseTaskState(self, taskstatename, taskstateinfo): if "End" in taskstateinfo.keys(): value = taskstateinfo["End"] - if bool(value) and not self.insideParallelBranchAlready(): + if bool(value) and not (self.insideParallelBranchAlready() or self.insideMapBranchAlready()): self.workflowExitPoint = "end" nextNodes.append(self.workflowExitPoint) self.workflowExitTopic = self.topicPrefix + self.workflowExitPoint @@ -337,6 +354,9 @@ def parseTaskState(self, taskstatename, taskstateinfo): if self.insideParallelBranchAlready(): taskstateinfo["ParentParallelInfo"] = self.constructParentParallelInfo() + if self.insideMapBranchAlready(): + taskstateinfo["ParentMapInfo"] = self.constructParentMapInfo() + self._logger.info("parseTask: State info: " + str(taskstateinfo)) self.createAndAddASLWorkflowNode(taskstatename, nextNodes, potNext, WorkflowStateType.TASK_STATE_TYPE, taskstatename, taskstateinfo) @@ -365,6 +385,9 @@ def parseChoiceState(self, choicestatename, choicestateinfo): if self.insideParallelBranchAlready(): choicestateinfo["ParentParallelInfo"] = self.constructParentParallelInfo() + if self.insideMapBranchAlready(): + choicestateinfo["ParentMapInfo"] = self.constructParentMapInfo() + self._logger.info("parseChoice: State info: " + str(choicestateinfo)) self.createAndAddASLWorkflowNode(gname, nextNodes, potNext, WorkflowStateType.CHOICE_STATE_TYPE, choicestatename, choicestateinfo) @@ -374,7 +397,7 @@ def parsePassState(self, passstatename, passstateinfo): gname = passstatename if "End" in passstateinfo.keys(): value = passstateinfo["End"] - if bool(value) and not self.insideParallelBranchAlready(): + if bool(value) and not (self.insideParallelBranchAlready() or self.insideMapBranchAlready()): self.workflowExitPoint = "end" nextNodes.append(self.workflowExitPoint) self.workflowExitTopic = self.topicPrefix + self.workflowExitPoint @@ -392,6 +415,10 @@ def parsePassState(self, passstatename, passstateinfo): """ if self.insideParallelBranchAlready(): passstateinfo["ParentParallelInfo"] = self.constructParentParallelInfo() + + if self.insideMapBranchAlready(): + passstateinfo["ParentMapInfo"] = self.constructParentMapInfo() + self._logger.info("parsePass: State info: " + str(passstateinfo)) self.createAndAddASLWorkflowNode(gname, nextNodes, potNext, WorkflowStateType.PASS_STATE_TYPE, passstatename, passstateinfo) @@ -411,6 +438,10 @@ def parseSucceedState(self, succeedstatename, succeedstateinfo): if self.insideParallelBranchAlready(): succeedstateinfo["ParentParallelInfo"] = self.constructParentParallelInfo() + + if self.insideMapBranchAlready(): + taskstateinfo["ParentMapInfo"] = self.constructParentMapInfo() + self._logger.info("parseSucceed: State info: " + str(succeedstateinfo)) self.createAndAddASLWorkflowNode(gname, nextNodes, potNext, WorkflowStateType.SUCCEED_STATE_TYPE, succeedstatename, succeedstateinfo) @@ -433,6 +464,9 @@ def parseFailState(self, failstatename, failstateinfo): if self.insideParallelBranchAlready(): failstateinfo["ParentParallelInfo"] = self.constructParentParallelInfo() + if self.insideMapBranchAlready(): + failstateinfo["ParentMapInfo"] = self.constructParentMapInfo() + self._logger.info("parseFail: State info: " + str(failstateinfo)) self.createAndAddASLWorkflowNode(gname, nextNodes, potNext, WorkflowStateType.FAIL_STATE_TYPE, failstatename, failstateinfo) @@ -462,9 +496,13 @@ def parseWaitState(self, waitstatename, waitstateinfo): if (waitstateinfo.has("InputPath")) {} if (waitstateinfo.has("OutputPath")) {} """ + if self.insideParallelBranchAlready(): waitstateinfo["ParentParallelInfo"] = self.constructParentParallelInfo() + if self.insideMapBranchAlready(): + waitstateinfo["ParentParallelInfo"] = self.constructParentMapInfo() + self._logger.info("parseWait: State info: " + str(waitstateinfo)) self.createAndAddASLWorkflowNode(gname, nextNodes, potNext, WorkflowStateType.WAIT_STATE_TYPE, waitstatename, waitstateinfo) @@ -528,6 +566,67 @@ def parseParallelState(self, parallelstatename, parallelstateinfo): self._logger.info("parseParallel: State info: " + str(parallelstateinfo)) self.createAndAddASLWorkflowNode(parallelstatename, nextNodes, potNext, WorkflowStateType.PARALLEL_STATE_TYPE, parallelstatename, parallelstateinfo) + def parseMapState(self, mapstatename, mapstateinfo): + potNext = [] + nextNodes = [] + mapstateinfo["WorkflowID"] = self.workflowId + mapstateinfo["SandboxID"] = self.sandboxId + self._logger.info("Inside parseMapState, mapstateinfo: " + str(mapstateinfo)) + if "End" in mapstateinfo.keys() and mapstateinfo["End"] == True: + value = mapstateinfo["End"] + if bool(value) and not self.insideMapBranchAlready(): + self.workflowExitPoint = "end" + potNext.append(self.workflowExitPoint) + self.workflowExitTopic = self.topicPrefix + self.workflowExitPoint + #else: + # This is a branch end and not a workflow exit point + else: + mapstateinfo["End"] = False + if "Next" in mapstateinfo.keys(): + potNext.append(mapstateinfo["Next"]) + #nextNodes.append(mapstateinfo["Next"]) + + """ + if(mapstateinfo.has("InputPath")) + if(mapstateinfo.has("OutputPath")) + if(mapstateinfo.has("ResultPath")) + """ + + mapstateinfo["Name"] = mapstatename + + if self.insideMapBranchAlready(): + mapstateinfo["ParentMapInfo"] = self.constructParentMapInfo() # BranchCounter needs to be added by FunctionWorker + #mapstateinfo["ParentMapInfo"] = {"Name": mapstatename, "BranchCounter": 1} + + self.mapStateNamesStack.append(mapstatename) + if "MaxConcurrency" in mapstateinfo.keys(): + mapstateinfo["MaxConcurrency"] = mapstateinfo["MaxConcurrency"] + + if "Iterator" in mapstateinfo.keys(): + count = 1 # hardcoded number of branches + iterator = mapstateinfo["Iterator"] + + self.mapBranchCounterStack.append(1) + + if "StartAt" in iterator.keys(): + potNext.append(iterator["StartAt"]) + else: + raise Exception("Iterator missing StartAt field") + + if "States" in iterator.keys(): + self.parseStates(iterator["States"]) + else: + raise Exception("Iterator missing States field") + + mapstateinfo["BranchCount"] = count # needs to be corrected in FunctionWorker + + else: + self._logger.info("parseMapState: 'Iterator' parameter missing from state description of: " + mapstatename) + raise Exception("'Iterator' parameter missing from state description of: " + mapstatename) + self.mapStateNamesStack.pop() + self._logger.info("parseMap: State info: " + str(mapstateinfo)) + self.createAndAddASLWorkflowNode(mapstatename, nextNodes, potNext, WorkflowStateType.MAP_STATE_TYPE, mapstatename, mapstateinfo) + def getWorkflowNode(self, topic): return self.workflowNodeMap[topic] @@ -611,4 +710,4 @@ def removeLocalFunction(self, function_topic): del self.workflowLocalFunctions[function_topic] def are_checkpoints_enabled(self): - return self._enable_checkpoints \ No newline at end of file + return self._enable_checkpoints diff --git a/mfn_sdk/mfn_sdk/mfnclient.py b/mfn_sdk/mfn_sdk/mfnclient.py index de826041..03492bf5 100644 --- a/mfn_sdk/mfn_sdk/mfnclient.py +++ b/mfn_sdk/mfn_sdk/mfnclient.py @@ -391,7 +391,6 @@ def addWorkflow(self,name): def delWorkflow(self,wf): return self.delete_workflow(wf) - def find_workflow(self,name): res = [] for wf in self.workflows: @@ -439,6 +438,14 @@ def _get_state_names_and_resource(self, desired_state_type, wf_dict): for branch in branches: parallel_state_list = self._get_state_names_and_resource(desired_state_type, branch) state_list = state_list + parallel_state_list + + if state_type == 'Map': + branch = state['Iterator'] + print("BRANCH: "+ str(branch)) + map_state_list = self._get_state_names_and_resource(desired_state_type, branch) + state_list = state_list + map_state_list + print("STATE_LIST: "+ str(state_list)) + return state_list diff --git a/tests/asl_Map/CallLambda.py b/tests/asl_Map/CallLambda.py new file mode 100644 index 00000000..efaf655f --- /dev/null +++ b/tests/asl_Map/CallLambda.py @@ -0,0 +1,6 @@ +import json +import time +def handle(event, context): + name = event["who"] + time.sleep(1.0) + return "Hello, %s!" % name diff --git a/tests/asl_Map/test.py b/tests/asl_Map/test.py new file mode 100644 index 00000000..6aa70434 --- /dev/null +++ b/tests/asl_Map/test.py @@ -0,0 +1,81 @@ +# Copyright 2020 The microfunctions Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest +import os, sys +import json +import time +#import ast + +sys.path.append("../") +from mfn_test_utils import MFNTest + +class MapStateTest(unittest.TestCase): + """ + event = '[{"who": "bob"},{"who": "meg"},{"who": "joe"}]' + expectedResponse = '[{"ContextValue": {"who": "bob"}, "ContextIndex": 0},{"ContextValue": {"who": "meg"}, "ContextIndex": 1}, {"ContextValue": {"who": "joe"}, "ContextIndex": 2 }]' + test_map = [("asl_Map_State_Context_Data", "workflow_map_state_context_test/workflow_map_state_context_test.json", [(event, expectedResponse)])] + """ + + def test_map_state(self): + file_list = ["workflow_map_state_delivery_test.data", + "workflow_map_state_context_test.data", + "workflow_map_state_example_test.data", + "workflow_map_state_parameters_test.data", + "workflow_map_state_thingspiratessay_test.data", + "workflow_map_state_iro_paths_processing_test.data", + "workflow_map_state_hardcoded_test.data"] + + for file in file_list: + with open(file) as json_input: + testtuplelist = [] + data = json.load(json_input) + testtuplelist.append((json.dumps(data["event"]), json.dumps(data["expectedResponse"]))) + test = MFNTest(test_name=data["test_name"], workflow_filename=data["workflow_name"]) + st = time.time() + test.exec_tests(testtuplelist) + et = time.time() + print ("test duration (s): %s" % str(et-st)) + + for mc in range(4): # set maxConcurrency parameter + """ creates and executes the Map state test workflow from the ASL description """ + + testtuplelist = [] + + event = [{"who": "bob"}, {"who": "meg"}, {"who": "joe"}] + expectedResponse = ["Hello, bob!", "Hello, meg!", "Hello, joe!"] + testtuplelist.append((json.dumps(event), json.dumps(expectedResponse))) + + event = [{"who": "meg"}, {"who": "joe"}, {"who": "bob"}] + expectedResponse = ["Hello, meg!", "Hello, joe!", "Hello, bob!"] + testtuplelist.append((json.dumps(event), json.dumps(expectedResponse))) + + event = [{"who": "joe"}, {"who": "bob"}, {"who": "meg"}] + expectedResponse = ["Hello, joe!", "Hello, bob!", "Hello, meg!"] + testtuplelist.append((json.dumps(event), json.dumps(expectedResponse))) + + event = [{"who": "joe"}, {"who": "bob"}, {"who": "meg"}, {"who":"dave"}, {"who":"tom"}, {"who":"ray"}] + expectedResponse = ["Hello, joe!", "Hello, bob!", "Hello, meg!", "Hello, dave!", "Hello, tom!", "Hello, ray!"] + testtuplelist.append((json.dumps(event), json.dumps(expectedResponse))) + + test = MFNTest(test_name="Map State Test", workflow_filename=("workflow_map_state_test_mc%s.json" % mc)) + + print("MaxConcurrency level: %i " % mc) + + st = time.time() + test.exec_tests(testtuplelist) + et = time.time() + + print ("test duration (s): %s" % str(et-st)) + diff --git a/tests/asl_Map/workflow_map_state_context_test.data b/tests/asl_Map/workflow_map_state_context_test.data new file mode 100644 index 00000000..7efbb4e7 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_context_test.data @@ -0,0 +1,11 @@ + +{ +"test_name": "Map State Context Test", + +"workflow_name": "workflow_map_state_context_test/workflow_map_state_context_test.json", + +"event": [{"who": "bob"},{"who": "meg"},{"who": "joe"}], + +"expectedResponse": [{"ContextValue": {"who": "bob"}, "ContextIndex": 0},{"ContextValue": {"who": "meg"}, "ContextIndex": 1}, {"ContextValue": {"who": "joe"}, "ContextIndex": 2 }] + +} diff --git a/tests/asl_Map/workflow_map_state_context_test/workflow_map_state_context_test.json b/tests/asl_Map/workflow_map_state_context_test/workflow_map_state_context_test.json new file mode 100644 index 00000000..2586a44e --- /dev/null +++ b/tests/asl_Map/workflow_map_state_context_test/workflow_map_state_context_test.json @@ -0,0 +1,23 @@ +{ + "StartAt": "ExampleMapState", + "States": { + "ExampleMapState": { + "Type": "Map", + "Parameters": { + "ContextIndex.$": "$$.Map.Item.Index", + "ContextValue.$": "$$.Map.Item.Value" + }, + "Iterator": { + "StartAt": "TestPass", + "States": { + "TestPass": { + "Type": "Pass", + "End": true + } + } + }, + "End": true + } + } +} + diff --git a/tests/asl_Map/workflow_map_state_delivery_test.data b/tests/asl_Map/workflow_map_state_delivery_test.data new file mode 100644 index 00000000..7f8e7c9a --- /dev/null +++ b/tests/asl_Map/workflow_map_state_delivery_test.data @@ -0,0 +1,52 @@ + +{"test_name": "Map State Delivery Test", + +"workflow_name": "workflow_map_state_delivery_test/workflow_map_state_delivery_test.json", + +"event": + {"orderId": "12345678", + "orderDate": "20190820101213", + "detail": { + "customerId": "1234", + "deliveryAddress": "123, Seattle, WA", + "deliverySpeed": "1-day", + "paymentMethod": "aCreditCard", + "items": [ + { + "productName": "Agile Software Development", + "category": "book", + "price": 60.0, + "quantity": 1 + }, + { + "productName": "Domain-Driven Design", + "category": "book", + "price": 32.0, + "quantity": 1 + }, + { + "productName": "The Mythical Man Month", + "category": "book", + "price": 18.0, + "quantity": 1 + }, + { + "productName": "The Art of Computer Programming", + "category": "book", + "price": 180.0, + "quantity": 1 + }, + { + "productName": "Ground Coffee, Dark Roast", + "category": "grocery", + "price": 8.0, + "quantity": 6 + } + ] + } +}, + +"expectedResponse": +[{"productName": "Agile Software Development", "category": "book", "price": 60.0, "quantity": 1}, {"productName": "Domain-Driven Design", "category": "book", "price": 32.0, "quantity": 1}, {"productName": "The Mythical Man Month", "category": "book", "price": 18.0, "quantity": 1}, {"productName": "The Art of Computer Programming", "category": "book", "price": 180.0, "quantity": 1}, {"productName": "Ground Coffee, Dark Roast", "category": "grocery", "price": 8.0, "quantity": 6}] +} + diff --git a/tests/asl_Map/workflow_map_state_delivery_test/checkAvailability.py b/tests/asl_Map/workflow_map_state_delivery_test/checkAvailability.py new file mode 100644 index 00000000..9440b45a --- /dev/null +++ b/tests/asl_Map/workflow_map_state_delivery_test/checkAvailability.py @@ -0,0 +1,5 @@ +#!/usr/bin/python + +def handle(event, context): + + return event diff --git a/tests/asl_Map/workflow_map_state_delivery_test/workflow_map_state_delivery_test.json b/tests/asl_Map/workflow_map_state_delivery_test/workflow_map_state_delivery_test.json new file mode 100644 index 00000000..c92b20b2 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_delivery_test/workflow_map_state_delivery_test.json @@ -0,0 +1,40 @@ +{ + "StartAt": "ProcessAllItems", + "States": { + "ProcessAllItems": { + "Type": "Map", + "InputPath": "$.detail", + "ItemsPath": "$.items", + "MaxConcurrency": 3, + "Iterator": { + "StartAt": "CheckAvailability", + "States": { + "CheckAvailability": { + "Type": "Task", + "Resource": "checkAvailability", + "Retry": [ + { + "ErrorEquals": [ + "TimeOut" + ], + "IntervalSeconds": 1, + "BackoffRate": 2, + "MaxAttempts": 3 + } + ], + "Next": "PrepareForDelivery" + }, + "PrepareForDelivery": { + "Type": "Pass", + "Next": "StartDelivery" + }, + "StartDelivery": { + "Type": "Pass", + "End": true + } + } + }, + "End": true + } + } +} diff --git a/tests/asl_Map/workflow_map_state_example_test.data b/tests/asl_Map/workflow_map_state_example_test.data new file mode 100644 index 00000000..7253d596 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_example_test.data @@ -0,0 +1,21 @@ + +{ +"test_name": "Map State Example Test", + +"workflow_name": "workflow_map_state_example_test/workflow_map_state_example_test.json", + +"event": {"ship-date": "2016-03-14T01:59:00Z", + "detail": + {"delivery-partner": "UQS", + "shipped": [ + { "prod": "R31", "dest-code": 9511, "quantity": 1344 }, + { "prod": "S39", "dest-code": 9511, "quantit_y": 40 }, + { "prod": "R31", "dest-code": 9833, "quantity": 12 }, + { "prod": "R40", "dest-code": 9860, "quantity": 887 }, + { "prod": "R40", "dest-code": 9511, "quantity": 1220 } + ] + } + }, +"expectedResponse": {"detail": {"shipped": ["All keys are OK!", "item OK!", "All keys are OK!", "All keys are OK!", "All keys are OK!"]}} +} + diff --git a/tests/asl_Map/workflow_map_state_example_test/ship-val.py b/tests/asl_Map/workflow_map_state_example_test/ship-val.py new file mode 100644 index 00000000..493a0e3a --- /dev/null +++ b/tests/asl_Map/workflow_map_state_example_test/ship-val.py @@ -0,0 +1,23 @@ +#!/usr/bin/python +""" +{ + "parcel": { + "prod": "R31", + "dest-code": 9511, + "quantity": 1344 + }, + "courier": "UQS" +} +""" + +def handle(event, context): + + ret = "item NOK!" + if ("courier" in event and "parcel" in event): # just check for the keys + ret = "item OK!" + par = event["parcel"] + if "prod" in par.keys() and "dest-code" in par.keys() and "quantity" in par.keys(): + ret = "All keys are OK!" + + return ret + diff --git a/tests/asl_Map/workflow_map_state_example_test/workflow_map_state_example_test.json b/tests/asl_Map/workflow_map_state_example_test/workflow_map_state_example_test.json new file mode 100644 index 00000000..6602a549 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_example_test/workflow_map_state_example_test.json @@ -0,0 +1,22 @@ +{ + "StartAt": "Validate-All", + "States": { + "Validate-All": { + "End": true, + "Type": "Map", + "InputPath": "$.detail", + "ItemsPath": "$.shipped", + "ResultPath": "$.detail.shipped", + "Parameters": { + "parcel.$": "$$.Map.Item.Value", + "courier.$": "$.delivery-partner" + }, + "MaxConcurrency": 0, + "Iterator": { + "StartAt": "ship-val", + "States": { + "ship-val": {"Type": "Task", "Resource": "ship-val", "End": true}} +} +} +} +} diff --git a/tests/asl_Map/workflow_map_state_hardcoded_test.data b/tests/asl_Map/workflow_map_state_hardcoded_test.data new file mode 100644 index 00000000..d3e24b18 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_hardcoded_test.data @@ -0,0 +1,7 @@ + +{ +"test_name": "Map State Hardcoded Input Test", +"workflow_name": "workflow_map_state_hardcoded_test/workflow_map_state_hardcoded_test.json", +"event" : "trigger map state input", +"expectedResponse" : [{"Value": "Iterate", "Index": 0}, {"Value": "Over", "Index": 1}, {"Value": "This", "Index": 2}, {"Value": "Array", "Index": 3}] +} diff --git a/tests/asl_Map/workflow_map_state_hardcoded_test/workflow_map_state_hardcoded_test.json b/tests/asl_Map/workflow_map_state_hardcoded_test/workflow_map_state_hardcoded_test.json new file mode 100644 index 00000000..efa81e28 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_hardcoded_test/workflow_map_state_hardcoded_test.json @@ -0,0 +1,51 @@ +{ + "Comment": "An example of the Amazon States Language using a map state to process elements of an array with a max concurrency of 2.", + "StartAt": "HardcodedInputsState", + "States": { + "HardcodedInputsState": { + "Type": "Pass", + "Result": { + "array": [ + "Iterate", + "Over", + "This", + "Array" + ] + }, + "Next": "Map" + }, + "Map": { + "Type": "Map", + "ItemsPath": "$.array", + "MaxConcurrency": 0, + "Parameters": { + "Value.$": "$$.Map.Item.Value", + "Index.$": "$$.Map.Item.Index" + }, + "Next": "FinalState", + "Iterator": { + "StartAt": "You", + "States": { + "You": { + "Type": "Pass", + "Next": "Can" + }, + "Can": { + "Type": "Pass", + "Next": "Do" + }, + "Do": { + "Type": "Pass", + "Next": "Anything" + }, + "Anything": { + "Type": "Pass", + "End": true + } + } + } + }, + "FinalState": {"Type": "Pass", "End": true} + } +} + diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test.data b/tests/asl_Map/workflow_map_state_iro_paths_processing_test.data new file mode 100644 index 00000000..456045bf --- /dev/null +++ b/tests/asl_Map/workflow_map_state_iro_paths_processing_test.data @@ -0,0 +1,50 @@ +{ +"test_name": "Map State IRO Paths Processing Test", + +"workflow_name": "workflow_map_state_iro_paths_processing_test/workflow_map_state_iro_paths_processing_test.json", + +"event": { + "orderId": "12345678", + "orderDate": "20190820101213", + "detail": { + "customerId": "1234", + "deliveryAddress": "123, Seattle, WA", + "deliverySpeed": "1-day", + "paymentMethod": "aCreditCard", + "items": [ + { + "productName": "Agile Software Development", + "category": "book", + "price": 60.0, + "quantity": 1 + }, + { + "productName": "Domain-Driven Design", + "category": "book", + "price": 32.0, + "quantity": 1 + }, + { + "productName": "The Mythical Man Month", + "category": "book", + "price": 18.0, + "quantity": 1 + }, + { + "productName": "The Art of Computer Programming", + "category": "book", + "price": 180.0, + "quantity": 1 + }, + { + "productName": "Ground Coffee, Dark Roast", + "category": "grocery", + "price": 8.0, + "quantity": 6 + } + ] + } +}, + +"expectedResponse" : {"summary": "This is a test summary!", "detail": {"processedItems": [{"deliveryAddress": "123, Seattle, WA", "item": {"productName": "Agile Software Development", "category": "book", "price": 60.0, "quantity": 1}}, {"deliveryAddress": "123, Seattle, WA", "item": {"productName": "Domain-Driven Design", "category": "book", "price": 32.0, "quantity": 1}}, {"deliveryAddress": "123, Seattle, WA", "item": {"productName": "The Mythical Man Month", "category": "book", "price": 18.0, "quantity": 1}}, {"deliveryAddress": "123, Seattle, WA", "item": {"productName": "The Art of Computer Programming", "category": "book", "price": 180.0, "quantity": 1}}, {"deliveryAddress": "123, Seattle, WA", "item": {"productName": "Ground Coffee, Dark Roast", "category": "grocery", "price": 8.0, "quantity": 6}}]}} +} diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/checkAvailability.py b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/checkAvailability.py new file mode 100644 index 00000000..9816febc --- /dev/null +++ b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/checkAvailability.py @@ -0,0 +1,6 @@ +#!/usr/bin/python + +def handle(event, context): + + return event + diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/paymentFailed.py b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/paymentFailed.py new file mode 100644 index 00000000..4498557c --- /dev/null +++ b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/paymentFailed.py @@ -0,0 +1,6 @@ +#!/usr/bin/python + +def handle(event, context): + + return "payment failed!" + diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/prepareForDelivery.py b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/prepareForDelivery.py new file mode 100644 index 00000000..9816febc --- /dev/null +++ b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/prepareForDelivery.py @@ -0,0 +1,6 @@ +#!/usr/bin/python + +def handle(event, context): + + return event + diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/sendOrderSummary.py b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/sendOrderSummary.py new file mode 100644 index 00000000..357a4d63 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/sendOrderSummary.py @@ -0,0 +1,10 @@ +#!/usr/bin/python + +def handle(event, context): + context.log(str(event)) + + #for it in event: + # it["summary"] = "test" + + return "This is a test summary!" + diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/startDelivery.py b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/startDelivery.py new file mode 100644 index 00000000..9816febc --- /dev/null +++ b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/startDelivery.py @@ -0,0 +1,6 @@ +#!/usr/bin/python + +def handle(event, context): + + return event + diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/test.py b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/test.py new file mode 100644 index 00000000..61367b76 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/test.py @@ -0,0 +1,91 @@ +# Copyright 2020 The microfunctions Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Map state test motivated by stackovercloud article "New Step Functions Support for Dynamic Parallelism" +# https://www.stackovercloud.com/2019/09/18/new-step-functions-support-for-dynamic-parallelism/ + +import unittest +import os, sys +import json +import time + +sys.path.append("../") +from mfn_test_utils import MFNTest + +class MapStateHardcodedTest(unittest.TestCase): + + def test_map_state(self): + """ creates and executes the Map state test workflow from the ASL description """ + + testtuplelist = [] + + event = { + "orderId": "12345678", + "orderDate": "20190820101213", + "detail": { + "customerId": "1234", + "deliveryAddress": "123, Seattle, WA", + "deliverySpeed": "1-day", + "paymentMethod": "aCreditCard", + "items": [ + { + "productName": "Agile Software Development", + "category": "book", + "price": 60.0, + "quantity": 1 + }, + { + "productName": "Domain-Driven Design", + "category": "book", + "price": 32.0, + "quantity": 1 + }, + { + "productName": "The Mythical Man Month", + "category": "book", + "price": 18.0, + "quantity": 1 + }, + { + "productName": "The Art of Computer Programming", + "category": "book", + "price": 180.0, + "quantity": 1 + }, + { + "productName": "Ground Coffee, Dark Roast", + "category": "grocery", + "price": 8.0, + "quantity": 6 + } + ] + } +} + """ + expectedResponse = {"summary": "This is a test summary!", "processedItems": [{"deliveryAddress": "123, Seattle, WA", "item": {"productName": "Agile Software Development", "category": "book", "price": 60.0, "quantity": 1}}, {"deliveryAddress": "123, Seattle, WA", "item": {"productName": "Domain-Driven Design", "category": "book", "price": 32.0, "quantity": 1}}, {"deliveryAddress": "123, Seattle, WA", "item": {"productName": "The Mythical Man Month", "category": "book", "price": 18.0, "quantity": 1}}, {"deliveryAddress": "123, Seattle, WA", "item": {"productName": "The Art of Computer Programming", "category": "book", "price": 180.0, "quantity": 1}}, {"deliveryAddress": "123, Seattle, WA", "item": {"productName": "Ground Coffee, Dark Roast", "category": "grocery", "price": 8.0, "quantity": 6}}]} + """ + + expectedResponse = {"summary": "This is a test summary!", "detail": {"processedItems": [{"deliveryAddress": "123, Seattle, WA", "item": {"productName": "Agile Software Development", "category": "book", "price": 60.0, "quantity": 1}}, {"deliveryAddress": "123, Seattle, WA", "item": {"productName": "Domain-Driven Design", "category": "book", "price": 32.0, "quantity": 1}}, {"deliveryAddress": "123, Seattle, WA", "item": {"productName": "The Mythical Man Month", "category": "book", "price": 18.0, "quantity": 1}}, {"deliveryAddress": "123, Seattle, WA", "item": {"productName": "The Art of Computer Programming", "category": "book", "price": 180.0, "quantity": 1}}, {"deliveryAddress": "123, Seattle, WA", "item": {"productName": "Ground Coffee, Dark Roast", "category": "grocery", "price": 8.0, "quantity": 6}}]}} + + testtuplelist.append((json.dumps(event), json.dumps(expectedResponse))) + + test = MFNTest(test_name="Map State Input Items Result Output Path Test", workflow_filename="workflow_map_state_ior_path_test.json" ) + + st = time.time() + test.exec_tests(testtuplelist) + et = time.time() + + print ("test duration (s): %s" % str(et-st)) + diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/validatePayment.py b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/validatePayment.py new file mode 100644 index 00000000..79363aff --- /dev/null +++ b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/validatePayment.py @@ -0,0 +1,6 @@ +#!/usr/bin/python + +def handle(event, context): + event["payment"] = "Ok" + return event + diff --git a/tests/asl_Map/workflow_map_state_iro_paths_processing_test/workflow_map_state_iro_paths_processing_test.json b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/workflow_map_state_iro_paths_processing_test.json new file mode 100644 index 00000000..00979d69 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_iro_paths_processing_test/workflow_map_state_iro_paths_processing_test.json @@ -0,0 +1,79 @@ +{ + "StartAt": "ValidatePayment", + "States": { + "ValidatePayment": { + "Type": "Task", + "Resource": "validatePayment", + "Next": "CheckPayment" + }, + "CheckPayment": { + "Type": "Choice", + "Choices": [ + { + "Not": { + "Variable": "$.payment", + "StringEquals": "Ok" + }, + "Next": "PaymentFailed" + } + ], + "Default": "ProcessAllItems" + }, + "PaymentFailed": { + "Type": "Task", + "Resource": "paymentFailed", + "End": true + }, + "ProcessAllItems": { + "Type": "Map", + "ItemsPath": "$.items", + "InputPath": "$.detail", + "MaxConcurrency": 3, + "Parameters": { + "index.$": "$$.Map.Item.Index", + "item.$": "$$.Map.Item.Value", + "deliveryAddress.$": "$.deliveryAddress" + }, + "Iterator": { + "StartAt": "CheckAvailability", + "States": { + "CheckAvailability": { + "Type": "Task", + "Resource": "checkAvailability", + "Retry": [ + { + "ErrorEquals": [ + "TimeOut" + ], + "IntervalSeconds": 1, + "BackoffRate": 2, + "MaxAttempts": 3 + } + ], + "Next": "PrepareForDelivery" + }, + "PrepareForDelivery": { + "Type": "Task", + "Resource": "prepareForDelivery", + "Next": "StartDelivery" + }, + "StartDelivery": { + "Type": "Task", + "Resource": "startDelivery", + "End": true + } + } + }, + "ResultPath": "$.detail.processedItems", + "Next": "SendOrderSummary" + }, + "SendOrderSummary": { + "Type": "Task", + "Resource": "sendOrderSummary", + "ResultPath": "$.summary", + "InputPath": "$.processedItems", + "End": true + } + } +} + diff --git a/tests/asl_Map/workflow_map_state_parameters_test.data b/tests/asl_Map/workflow_map_state_parameters_test.data new file mode 100644 index 00000000..e26d5f41 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_parameters_test.data @@ -0,0 +1,21 @@ + +{ +"test_name": "Map State Parameters Test", + +"workflow_name": "workflow_map_state_parameters_test/workflow_map_state_parameters_test.json", + +"event": [{"who": "bob"},{"who": "meg"},{"who": "joe"}], + +"expectedResponse" : {"result" :[{ + "ContextValue": { + "who": "bob" + },"ContextIndex": 0}, + { + "ContextValue": { + "who": "meg" + },"ContextIndex": 1}, + { + "ContextValue": { + "who": "joe" + }, "ContextIndex": 2}]} +} diff --git a/tests/asl_Map/workflow_map_state_parameters_test/workflow_map_state_parameters_test.json b/tests/asl_Map/workflow_map_state_parameters_test/workflow_map_state_parameters_test.json new file mode 100644 index 00000000..fae88533 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_parameters_test/workflow_map_state_parameters_test.json @@ -0,0 +1,24 @@ +{ + "StartAt": "ExampleMapState", + "States": { + "ExampleMapState": { + "Type": "Map", + "ResultPath": "$.result", + "Parameters": { + "ContextIndex.$": "$$.Map.Item.Index", + "ContextValue.$": "$$.Map.Item.Value" + }, + "Iterator": { + "StartAt": "TestPass", + "States": { + "TestPass": { + "Type": "Pass", + "End": true + } + } + }, + "End": true + } + } +} + diff --git a/tests/asl_Map/workflow_map_state_test_mc0.json b/tests/asl_Map/workflow_map_state_test_mc0.json new file mode 100644 index 00000000..924b8bd2 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_test_mc0.json @@ -0,0 +1,21 @@ +{ + "StartAt": "PassTest1", + "States": { + "PassTest2": {"Type":"Pass", "End": true}, + "PassTest1": {"Type":"Pass", "Next":"ExampleMapState"}, + "ExampleMapState": { + "Next": "PassTest2", + "Type": "Map", + "MaxConcurrency": 0, + "Iterator": { + "StartAt": "PassTest5", + "States": { + "PassTest5": {"Type": "Pass", "Next":"PassTest3"}, + "PassTest3": {"Type": "Pass", "Next":"CallLambda"}, + "CallLambda": {"Type": "Task", "Resource": "CallLambda", "Next": "PassTest4"}, + "PassTest4": {"Type": "Pass", "End": true} +} +} +} +} +} diff --git a/tests/asl_Map/workflow_map_state_test_mc1.json b/tests/asl_Map/workflow_map_state_test_mc1.json new file mode 100644 index 00000000..a62ee7ab --- /dev/null +++ b/tests/asl_Map/workflow_map_state_test_mc1.json @@ -0,0 +1,21 @@ +{ + "StartAt": "PassTest1", + "States": { + "PassTest2": {"Type":"Pass", "End": true}, + "PassTest1": {"Type":"Pass", "Next":"ExampleMapState"}, + "ExampleMapState": { + "Next": "PassTest2", + "Type": "Map", + "MaxConcurrency": 1, + "Iterator": { + "StartAt": "PassTest5", + "States": { + "PassTest5": {"Type": "Pass", "Next":"PassTest3"}, + "PassTest3": {"Type": "Pass", "Next":"CallLambda"}, + "CallLambda": {"Type": "Task", "Resource": "CallLambda", "Next": "PassTest4"}, + "PassTest4": {"Type": "Pass", "End": true} +} +} +} +} +} diff --git a/tests/asl_Map/workflow_map_state_test_mc2.json b/tests/asl_Map/workflow_map_state_test_mc2.json new file mode 100644 index 00000000..e69edffe --- /dev/null +++ b/tests/asl_Map/workflow_map_state_test_mc2.json @@ -0,0 +1,21 @@ +{ + "StartAt": "PassTest1", + "States": { + "PassTest2": {"Type":"Pass", "End": true}, + "PassTest1": {"Type":"Pass", "Next":"ExampleMapState"}, + "ExampleMapState": { + "Next": "PassTest2", + "Type": "Map", + "MaxConcurrency": 2, + "Iterator": { + "StartAt": "PassTest5", + "States": { + "PassTest5": {"Type": "Pass", "Next":"PassTest3"}, + "PassTest3": {"Type": "Pass", "Next":"CallLambda"}, + "CallLambda": {"Type": "Task", "Resource": "CallLambda", "Next": "PassTest4"}, + "PassTest4": {"Type": "Pass", "End": true} +} +} +} +} +} diff --git a/tests/asl_Map/workflow_map_state_test_mc3.json b/tests/asl_Map/workflow_map_state_test_mc3.json new file mode 100644 index 00000000..d19b09b1 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_test_mc3.json @@ -0,0 +1,21 @@ +{ + "StartAt": "PassTest1", + "States": { + "PassTest2": {"Type":"Pass", "End": true}, + "PassTest1": {"Type":"Pass", "Next":"ExampleMapState"}, + "ExampleMapState": { + "Next": "PassTest2", + "Type": "Map", + "MaxConcurrency": 3, + "Iterator": { + "StartAt": "PassTest5", + "States": { + "PassTest5": {"Type": "Pass", "Next":"PassTest3"}, + "PassTest3": {"Type": "Pass", "Next":"CallLambda"}, + "CallLambda": {"Type": "Task", "Resource": "CallLambda", "Next": "PassTest4"}, + "PassTest4": {"Type": "Pass", "End": true} +} +} +} +} +} diff --git a/tests/asl_Map/workflow_map_state_thingspiratessay_test.data b/tests/asl_Map/workflow_map_state_thingspiratessay_test.data new file mode 100644 index 00000000..e40ae570 --- /dev/null +++ b/tests/asl_Map/workflow_map_state_thingspiratessay_test.data @@ -0,0 +1,18 @@ +{ +"test_name": "Map State Pirates Say Test", + +"workflow_name": "workflow_map_state_thingspiratessay_test/workflow_map_state_thingspiratessay_test.json", + +"event" : {"ThingsPiratesSay": [{ + "say": "Avast!"},{ + "say": "Yar!"},{ + "say": "Walk the Plank!"}], + "ThingsGiantsSay": [{ + "say": "Fee!"},{ + "say": "Fi!"},{ + "say": "Fo!"},{ + "say": "Fum!"}] + }, +"expectedResponse" : [ {"say": "Avast!" },{ "say": "Yar!" }, { "say": "Walk the Plank!" } ] +} + diff --git a/tests/asl_Map/workflow_map_state_thingspiratessay_test/workflow_map_state_thingspiratessay_test.json b/tests/asl_Map/workflow_map_state_thingspiratessay_test/workflow_map_state_thingspiratessay_test.json new file mode 100644 index 00000000..98b3cbec --- /dev/null +++ b/tests/asl_Map/workflow_map_state_thingspiratessay_test/workflow_map_state_thingspiratessay_test.json @@ -0,0 +1,19 @@ +{ + "StartAt": "PiratesSay", + "States": { + "PiratesSay": { + "Type": "Map", + "ItemsPath": "$.ThingsPiratesSay", + "Iterator": { + "StartAt": "SayWord", + "States": { + "SayWord": { + "Type": "Pass", + "End": true + } + } + }, + "End": true + } + } +} diff --git a/tests/asl_Parameters/settings.json b/tests/asl_Parameters/settings.json new file mode 100644 index 00000000..064329fb --- /dev/null +++ b/tests/asl_Parameters/settings.json @@ -0,0 +1,4 @@ +{ + "workflow_name": "__map_test", + "workflow_description_file": "workflow_map_state_test.json" +} diff --git a/tests/asl_Parameters/test.py b/tests/asl_Parameters/test.py new file mode 100644 index 00000000..a34a9811 --- /dev/null +++ b/tests/asl_Parameters/test.py @@ -0,0 +1,58 @@ +# Copyright 2020 The microfunctions Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest +import os, sys +import json +import time + +sys.path.append("../") +from mfn_test_utils import MFNTest + +class ParameterTest(unittest.TestCase): + + def test_parameter(self): + """ creates and executes test workflow with parameters """ + + testtuplelist = [] + event = {"comment": "Example for Parameters.", + "product": { + "details": { + "color": "blue", + "size": "small", + "material": "cotton" + }, + "availability": "in stock", + "sku": "2317", + "cost": "$23" + } + } + expectedResponse = { + "comment": "Selecting what I care about.", + "MyDetails": { + "size": "small", + "exists": "in stock", + "StaticValue": "foo" + } + } + testtuplelist.append((json.dumps(event), json.dumps(expectedResponse))) + + test = MFNTest(test_name="Parameter Field Test", workflow_filename="workflow_parameters_test.json" ) + + st = time.time() + test.exec_tests(testtuplelist) + et = time.time() + + print ("test duration (s): %s" % str(et-st)) + diff --git a/tests/asl_Parameters/workflow_parameters_test.json b/tests/asl_Parameters/workflow_parameters_test.json new file mode 100644 index 00000000..ee9da850 --- /dev/null +++ b/tests/asl_Parameters/workflow_parameters_test.json @@ -0,0 +1,19 @@ +{ + "StartAt": "PassTest", + "States": { + "PassTest": + { + "Type":"Pass", + "End":true, + "Parameters": + { + "comment": "Selecting what I care about.", + "MyDetails": { + "size.$": "$.product.details.size", + "exists.$": "$.product.availability", + "StaticValue": "foo"} + } + } + } +} + diff --git a/tests/asl_Parameters_Task/TaskTest.py b/tests/asl_Parameters_Task/TaskTest.py new file mode 100644 index 00000000..84e90c7a --- /dev/null +++ b/tests/asl_Parameters_Task/TaskTest.py @@ -0,0 +1,5 @@ +#!/usr/bin/python + +def handle(event, context): + print ("hello world!") + return event diff --git a/tests/asl_Parameters_Task/settings.json b/tests/asl_Parameters_Task/settings.json new file mode 100644 index 00000000..064329fb --- /dev/null +++ b/tests/asl_Parameters_Task/settings.json @@ -0,0 +1,4 @@ +{ + "workflow_name": "__map_test", + "workflow_description_file": "workflow_map_state_test.json" +} diff --git a/tests/asl_Parameters_Task/test.py b/tests/asl_Parameters_Task/test.py new file mode 100644 index 00000000..a34a9811 --- /dev/null +++ b/tests/asl_Parameters_Task/test.py @@ -0,0 +1,58 @@ +# Copyright 2020 The microfunctions Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest +import os, sys +import json +import time + +sys.path.append("../") +from mfn_test_utils import MFNTest + +class ParameterTest(unittest.TestCase): + + def test_parameter(self): + """ creates and executes test workflow with parameters """ + + testtuplelist = [] + event = {"comment": "Example for Parameters.", + "product": { + "details": { + "color": "blue", + "size": "small", + "material": "cotton" + }, + "availability": "in stock", + "sku": "2317", + "cost": "$23" + } + } + expectedResponse = { + "comment": "Selecting what I care about.", + "MyDetails": { + "size": "small", + "exists": "in stock", + "StaticValue": "foo" + } + } + testtuplelist.append((json.dumps(event), json.dumps(expectedResponse))) + + test = MFNTest(test_name="Parameter Field Test", workflow_filename="workflow_parameters_test.json" ) + + st = time.time() + test.exec_tests(testtuplelist) + et = time.time() + + print ("test duration (s): %s" % str(et-st)) + diff --git a/tests/asl_Parameters_Task/workflow_parameters_test.json b/tests/asl_Parameters_Task/workflow_parameters_test.json new file mode 100644 index 00000000..3ff20b28 --- /dev/null +++ b/tests/asl_Parameters_Task/workflow_parameters_test.json @@ -0,0 +1,20 @@ +{ + "StartAt": "TaskTest", + "States": { + "TaskTest": + { + "Type":"Task", + "Resource":"TaskTest", + "End":true, + "Parameters": + { + "comment": "Selecting what I care about.", + "MyDetails": { + "size.$": "$.product.details.size", + "exists.$": "$.product.availability", + "StaticValue": "foo"} + } + } + } +} + diff --git a/tests/import_error/test.py b/tests/import_error/test.py index cf789f34..c12c3b10 100644 --- a/tests/import_error/test.py +++ b/tests/import_error/test.py @@ -32,3 +32,5 @@ def test_import_error(self): test.report(True, "import error report success", None, None) else: test.report(False, "import error report failure", expected_error, deployment_error) + + test.undeploy_workflow() diff --git a/tests/mfn_test_utils.py b/tests/mfn_test_utils.py index 8d602d83..8c95be3f 100644 --- a/tests/mfn_test_utils.py +++ b/tests/mfn_test_utils.py @@ -205,6 +205,12 @@ def _get_resource_info_map(self, workflow_description=None, resource_info_map=No for branch in branches: resource_info_map = self._get_resource_info_map(branch, resource_info_map) + if "Type" in state and state["Type"] == "Map": + branch = state['Iterator'] + #print(str(branch)) + resource_info_map = self._get_resource_info_map(branch, resource_info_map) + #print(str(resource_info_map)) + else: print("ERROR: invalid workflow description.") assert False @@ -300,11 +306,11 @@ def deploy_workflow(self): def undeploy_workflow(self): existing_workflows = self._client.workflows for wf in existing_workflows: - #print wf.name, wf.wid, wf.status - if wf.name == self._workflow_name and wf.status == "deployed": - wf.undeploy(self._settings["timeout"]) + if wf.name == self._workflow_name: + if wf.status == "deployed": + wf.undeploy(self._settings["timeout"]) + print("Workflow undeployed.") self._client.delete_workflow(wf) - print("Workflow undeployed.") break existing_resources = self._client.functions