![Task's flow in job](files/sanjivsingh/job_flows.png)

In [0]:
import logging
import os
import inspect
import time
import random


class Job:

    task_status_map = {}
    task_error_map = {}

    TRIGGERED = "TRIGGERED"
    SKIPPED = "SKIPPED"
    COMPLETED = "COMPLETED"
    ERRORED = "ERRORED"
    BLOCKED = "BLOCKED"

    def __update_task_end_status(self, task_name, task_status=COMPLETED):
        self.task_status_map[task_name] = task_status

    def __update_task_start_status(self, task_name, task_status=TRIGGERED):
        self.task_status_map[task_name] = task_status

    def __handleException(self, task_name, ex):
        print(f"..........Exception in {task_name} :  {ex=}, {type(ex)=}")
        self.task_error_map[task_name] = f"{ex=}"

    def __is_runnable(self, previous_tasks):
        if previous_tasks != None:
            for previous_task in previous_tasks:
                # if one of parent is not completed.
                if previous_task not in self.task_status_map:
                    return False
                # if one of parrent BLOCKED or ERRORED
                if (
                    self.task_status_map[previous_task] == self.ERRORED
                    or self.task_status_map[previous_task] == self.BLOCKED
                ):
                    return False
        return True

    def __execute_task(
        self,
        task_name,
        task_args,
        previous_tasks=None,
        next_tasks=[],
        trigger_type=TRIGGERED,
    ):
        if not self.__is_runnable(previous_tasks):
            return

        self.__update_task_start_status(task_name, trigger_type)
        print(f"{trigger_type} {task_name}")
        if trigger_type == self.TRIGGERED:
            # --------------------------
            task_func = getattr(self, task_name)
            task_func(task_args)
            # --------------------------
        else:
            if trigger_type == self.ERRORED:
                trigger_type = self.BLOCKED

            for next_task in next_tasks:
                next_task_func = getattr(self, next_task)
                next_task_func(task_args, wrapper=True, trigger_type=trigger_type)

    def executeflow(self):
        task_args = {}
        self.step1(task_args, wrapper=True)
        print(self.task_status_map)
        print(self.task_error_map)

    def step1(self, task_args, wrapper=False, trigger_type=TRIGGERED):
        task_name = inspect.currentframe().f_code.co_name
        if wrapper:
            self.__execute_task(
                task_name, task_args, next_tasks=["step2"], trigger_type=trigger_type
            )
        else:
            try:
                # ------LOGIC---------------
                print(f"..........executing {task_name} logic")
                time.sleep(random.randint(0, 2))
                print(f"..........completed {task_name} logic")
                # --------------------------
                self.__update_task_end_status(task_name)
                # trigger next
                self.step2(task_args, wrapper=True, trigger_type=trigger_type)
            except Exception as ex:
                self.__handleException(task_name, ex)
                self.step1(task_args, wrapper=True, trigger_type=self.ERRORED)

    def step2(self, task_args, wrapper=False, trigger_type=TRIGGERED):
        task_name = inspect.currentframe().f_code.co_name
        if wrapper:
            self.__execute_task(
                task_name,
                task_args,
                next_tasks=["step3", "step4"],
                trigger_type=trigger_type,
            )
        else:
            try:
                # ------LOGIC---------------
                print(f"..........executing {task_name} logic")
                time.sleep(random.randint(0, 2))
                print(f"..........completed {task_name} logic")
                # --------------------------
                self.__update_task_end_status(task_name)
                # trigger next
                self.step3(task_args, wrapper=True, trigger_type=trigger_type)
                self.step4(task_args, wrapper=True, trigger_type=trigger_type)
            except Exception as ex:
                self.__handleException(task_name, ex)
                self.step2(task_args, wrapper=True, trigger_type=self.ERRORED)

    def step3(self, task_args, wrapper=False, trigger_type=TRIGGERED):
        task_name = inspect.currentframe().f_code.co_name
        if wrapper:
            self.__execute_task(
                task_name,
                task_args,
                next_tasks=["step5", "step7"],
                trigger_type=trigger_type,
            )
        else:
            try:
                # ------LOGIC---------------
                print(f"..........executing {task_name} logic")
                time.sleep(random.randint(0, 2))
                status = random.randint(10, 20) % 2
                # raise Exception('spam', 'eggs')
                print(f"..........completed {task_name} logic")
                # --------------------------
                self.__update_task_end_status(task_name)
                # trigger next
                if status == 0:
                    self.step5(task_args, wrapper=True, trigger_type=trigger_type)
                    self.step7(task_args, wrapper=True, trigger_type=self.SKIPPED)
                else:
                    self.step7(task_args, wrapper=True, trigger_type=trigger_type)
                    self.step5(task_args, wrapper=True, trigger_type=self.SKIPPED)
            except Exception as ex:
                self.__handleException(task_name, ex)
                self.step3(task_args, wrapper=True, trigger_type=self.ERRORED)

    def step4(self, task_args, wrapper=False, trigger_type=TRIGGERED):
        task_name = inspect.currentframe().f_code.co_name
        if wrapper:
            self.__execute_task(
                task_name, task_args, next_tasks=["step9"], trigger_type=trigger_type
            )
        else:
            try:
                # ------LOGIC---------------
                print(f"..........executing {task_name} logic")
                time.sleep(random.randint(0, 2))
                print(f"..........completed {task_name} logic")
                # --------------------------
                self.__update_task_end_status(task_name)
                # trigger next
                self.step9(task_args, wrapper=True, trigger_type=trigger_type)
            except Exception as ex:
                self.__handleException(task_name, ex)
                self.step4(task_args, wrapper=True, trigger_type=self.ERRORED)

    def step5(self, task_args, wrapper=False, trigger_type=TRIGGERED):
        task_name = inspect.currentframe().f_code.co_name
        if wrapper:
            self.__execute_task(
                task_name, task_args, next_tasks=["step6"], trigger_type=trigger_type
            )
        else:
            try:
                # ------LOGIC---------------
                print(f"..........executing {task_name} logic")
                time.sleep(random.randint(0, 2))
                print(f"..........completed {task_name} logic")
                # --------------------------
                self.__update_task_end_status(task_name)
                # trigger next
                self.step6(task_args, wrapper=True, trigger_type=trigger_type)
            except Exception as ex:
                self.__handleException(task_name, ex)
                self.step5(task_args, wrapper=True, trigger_type=self.ERRORED)

    def step6(self, task_args, wrapper=False, trigger_type=TRIGGERED):
        task_name = inspect.currentframe().f_code.co_name
        if wrapper:
            self.__execute_task(
                task_name, task_args, next_tasks=["step10"], trigger_type=trigger_type
            )
        else:
            try:
                # ------LOGIC---------------
                print(f"..........executing {task_name} logic")
                time.sleep(random.randint(0, 2))
                print(f"..........completed {task_name} logic")
                # --------------------------
                self.__update_task_end_status(task_name)
                # trigger next
                self.step10(task_args, wrapper=True, trigger_type=trigger_type)
            except Exception as ex:
                self.__handleException(task_name, ex)
                self.step6(task_args, wrapper=True, trigger_type=self.ERRORED)

    def step7(self, task_args, wrapper=False, trigger_type=TRIGGERED):
        task_name = inspect.currentframe().f_code.co_name
        if wrapper:
            self.__execute_task(
                task_name, task_args, next_tasks=["step8"], trigger_type=trigger_type
            )
        else:
            try:
                # ------LOGIC---------------
                print(f"..........executing {task_name} logic")
                time.sleep(random.randint(0, 2))
                print(f"..........completed {task_name} logic")
                # --------------------------
                self.__update_task_end_status(task_name)
                # trigger next
                self.step8(task_args, wrapper=True, trigger_type=trigger_type)
            except Exception as ex:
                self.__handleException(task_name, ex)
                self.step7(task_args, wrapper=True, trigger_type=self.ERRORED)

    def step8(self, task_args, wrapper=False, trigger_type=TRIGGERED):
        task_name = inspect.currentframe().f_code.co_name
        if wrapper:
            self.__execute_task(
                task_name, task_args, next_tasks=["step10"], trigger_type=trigger_type
            )
        else:
            try:
                # ------LOGIC---------------
                print(f"..........executing {task_name} logic")
                time.sleep(random.randint(0, 2))
                print(f"..........completed {task_name} logic")
                # --------------------------
                self.__update_task_end_status(task_name)
                # trigger next
                self.step10(task_args, wrapper=True, trigger_type=trigger_type)
            except Exception as ex:
                self.__handleException(task_name, ex)
                self.step8(task_args, wrapper=True, trigger_type=self.ERRORED)

    def step9(self, task_args, wrapper=False, trigger_type=TRIGGERED):
        task_name = inspect.currentframe().f_code.co_name
        if wrapper:
            self.__execute_task(
                task_name, task_args, next_tasks=["step10"], trigger_type=trigger_type
            )
        else:
            try:
                # ------LOGIC---------------
                print(f"..........executing {task_name} logic")
                time.sleep(random.randint(0, 2))
                print(f"..........completed {task_name} logic")
                # --------------------------
                self.__update_task_end_status(task_name)
                # trigger next
                self.step10(task_args, wrapper=True, trigger_type=trigger_type)
            except Exception as ex:
                self.__handleException(task_name, ex)
                self.step9(task_args, wrapper=True, trigger_type=self.ERRORED)

    def step10(self, task_args, wrapper=False, trigger_type=TRIGGERED):
        task_name = inspect.currentframe().f_code.co_name
        if wrapper:
            self.__execute_task(
                task_name,
                task_args,
                previous_tasks=["step6", "step8", "step9"],
                next_tasks=["step11"],
                trigger_type=trigger_type,
            )
        else:
            try:
                # ------LOGIC---------------
                print(f"..........executing {task_name} logic")
                time.sleep(random.randint(0, 2))
                print(f"..........completed {task_name} logic")
                # --------------------------
                self.__update_task_end_status(task_name)
                # trigger next
                self.step11(task_args, wrapper=True, trigger_type=trigger_type)
            except Exception as ex:
                self.__handleException(task_name, ex)
                self.step10(task_args, wrapper=True, trigger_type=self.ERRORED)

    def step11(self, task_args, wrapper=False, trigger_type=TRIGGERED):
        task_name = inspect.currentframe().f_code.co_name
        if wrapper:
            self.__execute_task(task_name, task_args, trigger_type=trigger_type)
        else:
            try:
                # ------LOGIC---------------
                print(f"..........executing {task_name} logic")
                time.sleep(random.randint(0, 2))
                print(f"..........completed {task_name} logic")
                # --------------------------
                self.__update_task_end_status(task_name)
            except Exception as ex:
                self.__handleException(task_name, ex)
                self.step11(task_args, wrapper=True, trigger_type=self.ERRORED)

In [0]:
job = Job()
job.executeflow()

TRIGGERED step1
..........executing step1 logic
..........completed step1 logic
TRIGGERED step2
..........executing step2 logic
..........completed step2 logic
TRIGGERED step3
..........executing step3 logic
..........completed step3 logic
TRIGGERED step5
..........executing step5 logic
..........completed step5 logic
TRIGGERED step6
..........executing step6 logic
..........completed step6 logic
SKIPPED step7
SKIPPED step8
TRIGGERED step4
..........executing step4 logic
..........completed step4 logic
TRIGGERED step9
..........executing step9 logic
..........completed step9 logic
TRIGGERED step10
..........executing step10 logic
..........completed step10 logic
TRIGGERED step11
..........executing step11 logic
..........completed step11 logic
{'step1': 'COMPLETED', 'step2': 'COMPLETED', 'step3': 'COMPLETED', 'step5': 'COMPLETED', 'step6': 'COMPLETED', 'step7': 'SKIPPED', 'step8': 'SKIPPED', 'step4': 'COMPLETED', 'step9': 'COMPLETED', 'step10': 'COMPLETED', 'step11': 'COMPLETED'}
{}
