# Python packages used along the way

In [None]:
import requests
import base64
import os
import math

# Constants

## Job generation related

In [None]:
JOB_NAME = 'name'

TASKS = 'tasks'
TASK_KEY = 'task_key'
TASK_DEPENDS_ON = 'depends_on'
TASK_EXISTING_CLUSTER_ID = 'existing_cluster_id'
TASK_NOTEBOOK_TASK = 'notebook_task'
TASK_NOTEBOOK_PATH = 'notebook_path'
TASK_NOTEBOOK_SOURCE = 'source' #WORKSPACE or GIT
TASK_NOTEBOOK_SOURCE_WORKSPACE = 'WORKSPACE'
TASK_FORMAT = 'format'
TASK_FORMAT_MULTITASK = 'MULTI_TASK'
EMAIL_NOTIFICATION = 'email_notifications'
EMAIL_NOTIFICATION_ON_FAILURE = 'on_failure'
EMAIL_NOTIFICATION_ON_FAILURE_MAIL = 'rm.gogloza@gmail-dot.com'

## Cluster related

In [None]:
DBX_JOB_LIMIT = 100
DBX_INSTANCE_NBR_OF_WORKERS = 2
TASK_CLUSTER_ID = os.getenv('DBX_JOB_CLUSTER_ID') # Example: '1234-567890-abcdef12'

## Rest API Related

In [None]:
DBX_INSTANCE = os.getenv('DBX_INSTANCE') # Example 'https://adb-1234567890.1.azuredatabricks.net'
DBX_TOKEN = os.getenv('DBX_TOKEN') # Example: 'dapif12scvde34ffda34tm34n123n1m-2'

# REST API related functions

In [None]:
class BearerAuth(requests.auth.AuthBase):
    def __init__(self):
        self.token = DBX_TOKEN
    def __call__(self, r):
        r.headers["authorization"] = "Bearer " + self.token
        return r

In [None]:
def list_workspace(path: str) -> str:
    data = {'path':path}
    return requests.get(f"{DBX_INSTANCE}/api/2.0/workspace/list", auth=BearerAuth(), json=data)

In [None]:
def create_job(job_json) -> str:
    return requests.post(f"{DBX_INSTANCE}/api/2.1/jobs/create", auth=BearerAuth(), json=job_json)

In [None]:
def start_job(job_id: str) -> str:
    data = {'job_id':job_id}
    return requests.post(f"{DBX_INSTANCE}/api/2.1/jobs/run-now", auth=BearerAuth(), json=data).json()

# Processing function to generate job

## Helper functions
- generating task list definition
- generating job definition

In [None]:
def generate_task_definition(notebooks: [], cluster_id: str) -> []:
    """
    Jobs contains tasks. To create job you will need tasks created.

    Parameters
    ----------
    notebooks : list
        List of notebooks to generate task definition
    cluster_id : str
        Cluster instance where jobs will be executed. Example: 1234-567890-abcdef12

    Returns
    -------
    list
        a list of tasks created
    """

    tasks = []
    task_key_depends_on = ''
    for notebook in notebooks:
        if notebook["object_type"] == 'NOTEBOOK':
            notebook_task_def = {TASK_NOTEBOOK_PATH : notebook['path'],
                        TASK_NOTEBOOK_SOURCE : TASK_NOTEBOOK_SOURCE_WORKSPACE}

            task_key = notebook['path'].split('/')[-1]

            task_def = {
                TASK_KEY : task_key,
                TASK_NOTEBOOK_TASK : notebook_task_def,
                TASK_EXISTING_CLUSTER_ID : cluster_id
            }

            if len(task_key_depends_on) > 0: 
                task_def[TASK_DEPENDS_ON] = [ {TASK_KEY : task_key_depends_on} ]

            task_key_depends_on = task_key
            
            tasks.append(task_def)
    return tasks   

In [None]:
def generate_job_definition(job_name: str, tasks: []) -> {}:
    """
    Creates job definition containing tasks. To create job you will need tasks list created.

    Parameters
    ----------
    job_name : str
        name of a job
    tasks : list
        lists of tasks definition
 
    Returns
    -------
    dictionary
        a structure of a job
    """
    email_notification_on_failure = {EMAIL_NOTIFICATION_ON_FAILURE : [EMAIL_NOTIFICATION_ON_FAILURE_MAIL]}

    job_def = { JOB_NAME : job_name,
                TASKS : tasks,
                EMAIL_NOTIFICATION : email_notification_on_failure
           }
    return job_def

## Core function - creating job instance

In [None]:

def create_job_instance(job_name: str, notebook_path: str) -> []:
    """Create jobs definition (json). Number of jobs depends on number of workers and tasks.

    Simlifying: If you have 2 workers set in a constants you will have 2 jobs created.

    Parameters
    ----------
    job_name : str
        Name of a job. Example next-level-bronze
    notebook_path : str
        Notebooks path location. Example /01-bronze/

    Returns
    -------
    list
        a list of job_id's created

    """
    notebooks = list_workspace(notebook_path).json()['objects']    

    # Distribute tasks among the workers
    chunk_len = DBX_JOB_LIMIT if math.ceil(len(notebooks) / DBX_INSTANCE_NBR_OF_WORKERS ) > DBX_JOB_LIMIT else math.ceil(len(notebooks) / DBX_INSTANCE_NBR_OF_WORKERS )
    chunks = [notebooks[x:x+chunk_len] for x in range(0, len(notebooks), chunk_len)]
    # Creating jobs
    jobs = []
    i = 0
    for notebooks_chunk in chunks:
        tasks = generate_task_definition(notebooks_chunk, TASK_CLUSTER_ID)
        job_def = generate_job_definition(f"{job_name}_{i}", tasks)
        jobs.append(create_job(job_def).json())
        i+=1
    return jobs

In [None]:
create_job_instance('next-level-bronze','/Shared/01 bronze/')

In [None]:
# This is bonus ;) If you would like to run a job that has just been created.
#[start_job(job["job_id"]) for job in jobs]