In [None]:
from graphlib import TopologicalSorter
from threading import Thread, Lock
import queue

from pyspark.sql.functions import explode_outer

from datetime import datetime as dt

#########################################################################
# properties to customize
bg_loadtimestamp = dt.now().strftime('%Y-%m-%d %H:%M:%S.%f')
logger = spark._jvm.org.apache.log4j.Logger.getLogger('com.bigenius-x.application')
threads_amount = 4
execution_dependencies_file_path = '{notebook_orchestration#notebook_orchestration#execution_dependencies_file_path}'
#########################################################################

# lock for writing print messages out of threads, otherwise they can appear on the same line
LOCK = Lock()

topological_sorter = TopologicalSorter()

task_queue = queue.Queue()
finalized_tasks_queue = queue.Queue()

failed_tasks = []

dependencies = {}


def setup_dependencies():
    # read dependencies from generated json configuration file
    dependencies_json_df = spark.read.format("json").option("multiline","true").load(execution_dependencies_file_path)
    exploded_dependencies_json_df = dependencies_json_df.select(explode_outer("task_dependencies").alias('task_dependency'))
    dependencies_df = exploded_dependencies_json_df.select(
        'task_dependency.task_name',
        'task_dependency.dependent_on_task_name'
    )
    pandas_dependencies_df = dependencies_df.toPandas()
    for index, row in pandas_dependencies_df.iterrows():
        task_name = row['task_name']
        dependent_on_task_name = row['dependent_on_task_name']

        topological_sorter.add(task_name, dependent_on_task_name)

        # prepare dependencies for beeing able to deactivate dependent objects on errors
        if dependent_on_task_name not in dependencies:
            dependencies[dependent_on_task_name] = []
        dependencies[dependent_on_task_name].append(task_name)


def info(targetName, message):
    logger.info(f'{targetName}: {message}')
    with LOCK:
        print(f"{dt.now().strftime('%Y/%m/%d, %H:%M:%S')} - {targetName}: {message}")

def set_task_failed(task_name, root_task_name):
    failed_tasks.append(task_name)
    if task_name == root_task_name:
        info(task_name, f'disabling task {task_name} and all its depending tasks (successors)')
    else:
        info(task_name, f'disabling task {task_name} as successor of {root_task_name}')
    successors = dependencies.get(task_name, [])
    for successor in successors:
        if successor not in failed_tasks:
            set_task_failed(successor, root_task_name)

def worker():
    while True:
        item = task_queue.get()
        
        if item is None:
            info("worker", "Termination signal received. Exiting thread.")
            task_queue.task_done()
            break
        
        if item != 'begin' and item != 'end' and item not in failed_tasks:

            info(item, 'starting task')

            try:
                return_value = dbutils.notebook.run(item, {notebook_orchestration#notebook_orchestration#run_notebook_timeout}, {"bg_loadtimestamp": bg_loadtimestamp})
                info(item, return_value)
                info(item, 'finished task')
            except Exception as e:
                if "FAILED: Unable to access the notebook" in str(e):
                    info(item, f'task not found: {e}')
                info(item, 'task failed')
                set_task_failed(item, item)        

        else:
            info(item, 'task skipped, nothing to do here')
        
        task_queue.task_done()
        finalized_tasks_queue.put(item)

setup_dependencies()

threads = list()

for i in range(threads_amount):
    thread = Thread(target=worker, daemon=True)
    threads.append(thread)

for thread in threads:
    thread.start()

topological_sorter.prepare()
while topological_sorter.is_active():
    for node in topological_sorter.get_ready():
        # Worker threads or processes take nodes to work on off the
        # 'task_queue' queue.
        task_queue.put(node)

    # When the work for a node is done, workers put the node in
    # 'finalized_tasks_queue' so we can get more nodes to work on.
    # The definition of 'is_active()' guarantees that, at this point, at
    # least one node has been placed on 'task_queue' that hasn't yet
    # been passed to 'done()', so this blocking 'get()' must (eventually)
    # succeed.  After calling 'done()', we loop back to call 'get_ready()'
    # again, so put newly freed nodes on 'task_queue' as soon as
    # logically possible.
    node = finalized_tasks_queue.get()
    topological_sorter.done(node)
    
for thread in threads:
    task_queue.put(None)

task_queue.join()

for thread in threads:
    thread.join()

info('all', 'work completed')

return_value = 0
if len(failed_tasks) > 0:
    return_value = 1

dbutils.notebook.exit(return_value)

