In [0]:
import requests
import json, os
import pandas as pd
from datetime import datetime
from pyspark.sql import SparkSession
from concurrent.futures import ThreadPoolExecutor, as_completed

context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()

In [0]:
class CollectJobMetrics:
    def __init__(self, target_table_name):
        self.context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
        self.target_table_name = target_table_name
        self.api_url = self.context.apiUrl().get()
        self.api_token = self.context.apiToken().get()
        self.headers = {'Authorization': f'Bearer {self.api_token}'}
        self.all_jobs = self.list_all_jobs()
    
    def list_all_jobs(self, limit=25):
        offset = 0
        all_jobs = []
        while True:
            endpoint = f"/api/2.1/jobs/list?limit={limit}&offset={offset}"
            full_url = f'{self.api_url}{endpoint}'
            response = requests.get(full_url, headers=self.headers)
            if response.status_code != 200:
                print(f"Failed to fetch jobs: {response.status_code}, {response.text}")
                break
            data = response.json()
            jobs = data.get("jobs", [])
            if not jobs:
                break
            for job in jobs:
                if job['settings']['name'].startswith('[dev'):
                    # print(job['settings']['name'])
                    all_jobs.append(job)
            offset += len(jobs)
        return all_jobs
    
    def get_task_names_and_task_run_ids(self, job_id):
        endpoint = f"/api/2.1/jobs/runs/list?job_id={job_id}"
        full_url = f'{self.api_url}{endpoint}'
        response = requests.get(full_url, headers=self.headers)
        if response.status_code != 200:
            print(f"Failed to fetch runs: {response.status_code}, {response.text}")
            return []
        runs = response.json().get('runs', [])
        if not runs:
            print(f"No runs found for the job ID: {job_id}")
            return []
        task_info = {} # for each run in the job run
        for run in runs:
            run_id = run['run_id']
            run_get_url = f"/api/2.1/jobs/runs/get?run_id={run_id}"
            full_run_url = f'{self.api_url}{run_get_url}'
            run_info_response = requests.get(full_run_url, headers=self.headers)
            if run_info_response.status_code != 200:
                print(f"Failed to fetch run details for run_id {run_id}: {run_info_response.status_code}, {run_info_response.text}")
                continue
            run_info = run_info_response.json()
            tasks = run_info.get('tasks', [])
            for task in tasks:
                task_name = task.get('task_key', 'unknown_task')  # Task name (if available)
                task_run_id = task.get('run_id')  # Task run ID
                if task_name and task_run_id:
                    if task_name not in task_info:
                        task_info[task_name] = []
                    task_info[task_name].append(task_run_id)
        # print(task_info)
        t_run_ids = []
        for i in task_info.keys():
            if not (i.endswith('_metrics')):
                t_run_ids += task_info[i]
        return list(set(t_run_ids))
    
    def generate_run_metrics(self, task_run_id):
        endpoint = f'/api/2.1/jobs/runs/get?run_id={task_run_id}'
        full_url = f'{self.api_url}{endpoint}'
        response = requests.get(full_url, headers=self.headers)
        if response.status_code != 200:
            print(f"Error fetching runs: {response.status_code}, {response.text}")
            return {}
        run = response.json()
        pipeline_name = run.get('run_name', '')
        cluster_instance = run.get('cluster_instance', {})
        cluster_id = cluster_instance.get('cluster_id', 'N/A')
        cluster_spec = run.get('cluster_spec', {}).get('new_cluster', {})
        existing_cluster_id = run.get('existing_cluster_id', 'N/A')
        if cluster_id != 'N/A':
            cluster_type = 'New Cluster'
        elif existing_cluster_id != 'N/A':
            cluster_type = 'Existing Cluster'
            cluster_id = existing_cluster_id
        else:
            cluster_type = 'N/A'
        spark_conf = cluster_spec.get('spark_conf', {})
        is_serverless = 'spark.databricks.cluster.source' in spark_conf and spark_conf['spark.databricks.cluster.source'] == 'serverless'
        state = run.get('state', {})
        result_state = state.get('result_state', 'N/A')
        life_cycle_state = state.get('life_cycle_state', 'N/A')
        start_time_ms = run.get('start_time', None)
        end_time_ms = run.get('end_time', None)
        if start_time_ms:
            start_time = datetime.utcfromtimestamp(start_time_ms / 1000) 
        else:
            start_time = None
        if end_time_ms:
            end_time = datetime.utcfromtimestamp(end_time_ms / 1000)
        else:
            end_time = None
        # Calculate the duration in minutes
        if start_time and end_time:
            duration_minutes = (end_time - start_time).total_seconds() / 60 
        else:
            duration_minutes = None
        node_type_id = cluster_spec.get('node_type_id', 'N/A')
        num_workers = cluster_spec.get('num_workers', 'N/A')
        memory_per_worker = 64 
        estimated_max_memory = memory_per_worker * int(num_workers) if num_workers != 'N/A' else 'N/A'
        run_page_url = run.get('run_page_url', 'N/A')
        number_of_retries = run.get('number_of_attempts', 1) - 1
        trigger_type = run.get('trigger', 'N/A')
        job_parameters = run.get('run_parameters', {}).get('parameters', 'N/A')
        termination_reason = run.get('termination_reason', {}).get('code', 'N/A') 
        autoscale = cluster_spec.get('autoscale', {})
        min_workers = autoscale.get('min_workers', 'N/A')
        max_workers = autoscale.get('max_workers', 'N/A')
        tasks = run.get('tasks', [])
        if tasks and len(tasks) == 1:
            notebook_path = tasks[0].get('notebook_task', {}).get('notebook_path', 'N/A')
        else:
            notebook_path = 'N/A'
        run_info = {
            'Job_ID': run['job_id'],
            'Run_ID': run['run_id'],
            'Start_Time': start_time,
            'End_Time': end_time,
            'Duration_Minutes': duration_minutes,
            'Status': result_state if result_state != 'N/A' else life_cycle_state,
            'Cluster_Type': cluster_type,
            'Cluster_ID': cluster_id,
            'Node_Type': node_type_id,
            'Number_of_Workers': num_workers,
            'Is_Serverless': 'Yes' if is_serverless else 'No',
            'Estimated_Max_Memory_GB': estimated_max_memory,
            'Driver_Node_Type': cluster_spec.get('driver_node_type_id', 'N/A'),
            'Cluster_Runtime_Version': cluster_spec.get('spark_version', 'N/A'),
            'Run_Page_URL': run_page_url,
            'Number_of_Retries': number_of_retries,
            'Trigger_Type': trigger_type,
            'Job_Parameters': job_parameters,
            'Termination_Reason': termination_reason,
            'Autoscale_Min_Workers': min_workers,
            'Autoscale_Max_Workers': max_workers,
            'Notebook_Path': notebook_path,
            'Pipeline_Name': pipeline_name
        }
        return run_info
    
    def write_into_table(self):
        # `mode` can take either overwrite/append
        targe_tbl = self.target_table_name.split('.')[-1]
        tbl_name = f'{targe_tbl}_metrics_tbl'
        final_tbl_name = f'main_{tbl_name}'
        table_exists = spark.catalog.tableExists(final_tbl_name)
        if table_exists:
            main_spark_df = spark.sql(f'SELECT * FROM {final_tbl_name}')
            temp_spark_df = spark.sql(f'SELECT * FROM {tbl_name}')
            # Get the records which are not present in `main_spark_df`
            new_records = temp_spark_df.join(main_spark_df, on="Run_ID", how="left_anti")
            new_df = new_records.toPandas()
            records_count = new_df.shape[0]
            if records_count == 0:
                print("No new records found")
            else:
                new_records.write.format('delta').mode("append").saveAsTable(final_tbl_name)
                print(f"{records_count} new records have been appended successfully into the table `{final_tbl_name}`")
            return final_tbl_name
        else:
            print(f"Creating the metrics table `{final_tbl_name}`")
            spark_df = spark.sql(f'SELECT * FROM {tbl_name}')
            spark_df.write.format("delta").mode("overwrite").saveAsTable(final_tbl_name)
            return final_tbl_name

    def parallel_process(self, write_as_stage_tbl=True):
        max_workers = os.cpu_count()
        # Get all the task run ids
        task_run_ids = []
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [executor.submit(self.get_task_names_and_task_run_ids, each_job['job_id']) for each_job in self.all_jobs]
            for future in as_completed(futures):
                task_run_ids.extend(future.result())
        # Get all the metrics for each task run
        metrics = []
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = [executor.submit(self.generate_run_metrics, task_rid) for task_rid in task_run_ids]
            for future in as_completed(futures):
                metrics.append(future.result())
        if write_as_stage_tbl:
            # Creating a temporary staging table
            targe_tbl = self.target_table_name.split('.')[-1]
            print(f"Creating a temporary table {targe_tbl}_metrics_tbl")
            data_df = pd.DataFrame(data=metrics)
            df = spark.createDataFrame(data=data_df)
            try:
                spark.catalog.dropTempView(f'{targe_tbl}_metrics_view')
            except:
                pass
            try:
                spark.sql(f"DROP TABLE IF EXISTS {targe_tbl}_metrics_tbl")
            except:
                pass
            df.createOrReplaceTempView(f'{targe_tbl}_metrics_view')
            df.write.saveAsTable(f'{targe_tbl}_metrics_tbl')
        return metrics

In [0]:
target_tbl_name = dbutils.widgets.get('metrics_tbl_name')
print(target_tbl_name)

def main(target_tbl_name):
    cjm = CollectJobMetrics(target_table_name=target_tbl_name)
    result = cjm.parallel_process()
    cjm.write_into_table()

main(target_tbl_name=target_tbl_name) # note that prefixt and suffix will be added accordingly


# cjm = CollectJobMetrics(target_table_name='')
# all_jobs = cjm.list_all_jobs()
# len(all_jobs) # 319
# # print(all_jobs[0])
# # job_id_val = all_jobs[0]['job_id']
# job_id_val = '955485745460967'
# task_run_ids = cjm.get_task_names_and_task_run_ids(job_id=job_id_val)
# for rid in task_run_ids:
#     d = cjm.generate_run_metrics(task_run_id=rid)
#     print(d)

No runs found for the job ID: 513698363123541
No runs found for the job ID: 1023533753784164
No runs found for the job ID: 664052615823018
No runs found for the job ID: 271848115835620
No runs found for the job ID: 967719383062836
No runs found for the job ID: 781391283903141
No runs found for the job ID: 892997146798889
No runs found for the job ID: 450433826455728
No runs found for the job ID: 480828747621384
No runs found for the job ID: 280503604504826
No runs found for the job ID: 1024618274823478
No runs found for the job ID: 58578256006795
No runs found for the job ID: 993047400453674
No runs found for the job ID: 974499313924143
No runs found for the job ID: 248234243476786
No runs found for the job ID: 1110474221074196
No runs found for the job ID: 90277210829135
No runs found for the job ID: 711006854130151
No runs found for the job ID: 594973186611553
No runs found for the job ID: 139246716657924
No runs found for the job ID: 513604896842106
No runs found for the job ID: 883

In [0]:
df = spark.sql(f'select * from main_{target_tbl_name}_metrics_tbl')
df.display()

Job_ID,Run_ID,Start_Time,End_Time,Duration_Minutes,Status,Cluster_Type,Cluster_ID,Node_Type,Number_of_Workers,Is_Serverless,Estimated_Max_Memory_GB,Driver_Node_Type,Cluster_Runtime_Version,Run_Page_URL,Number_of_Retries,Trigger_Type,Job_Parameters,Termination_Reason,Autoscale_Min_Workers,Autoscale_Max_Workers,Notebook_Path,Pipeline_Name
854114453671658,30385749067845,2024-10-24T08:08:44.911Z,2024-10-24T08:12:35.114Z,3.8367166666666663,FAILED,,,,,No,,,,https://adb-1178028260927946.6.azuredatabricks.net/?o=1178028260927946#job/854114453671658/run/30385749067845,0,RETRY,,,,,/Users/sathesh.subramanium@mccain.ca/.bundle/silver_load_asset_bundle/dev/files/src/pl_transform_exchangerates,pl_ExchangeRates
854114453671658,603749389549832,2024-10-24T09:17:35.739Z,2024-10-24T09:26:06.104Z,8.506083333333333,SUCCESS,,,,,No,,,,https://adb-1178028260927946.6.azuredatabricks.net/?o=1178028260927946#job/854114453671658/run/603749389549832,0,ONE_TIME,,,,,/Users/sathesh.subramanium@mccain.ca/.bundle/silver_load_asset_bundle/dev/files/src/pl_transform_exchangerates,pl_ExchangeRates
504082234260492,719998975982953,2024-10-24T05:39:56.295Z,2024-10-24T05:41:29.346Z,1.55085,SUCCESS,,,,,No,,,,https://adb-1178028260927946.6.azuredatabricks.net/?o=1178028260927946#job/504082234260492/run/719998975982953,0,ONE_TIME,,,,,/Workspace/Users/smitha.pm@mccain.ca/.bundle/silver_load_asset_bundle/dev/files/src/pl_transform_fact_wms_inventory,pl_fact_wms_inventory_uk
854114453671658,1054471855848039,2024-10-24T08:04:08.357Z,2024-10-24T08:08:44.155Z,4.596633333333333,FAILED,,,,,No,,,,https://adb-1178028260927946.6.azuredatabricks.net/?o=1178028260927946#job/854114453671658/run/1054471855848039,0,ONE_TIME,,,,,/Users/sathesh.subramanium@mccain.ca/.bundle/silver_load_asset_bundle/dev/files/src/pl_transform_exchangerates,pl_ExchangeRates
504082234260492,971808677804043,2024-10-24T05:48:08.664Z,2024-10-24T05:48:39.902Z,0.5206333333333333,FAILED,,,,,No,,,,https://adb-1178028260927946.6.azuredatabricks.net/?o=1178028260927946#job/504082234260492/run/971808677804043,0,RETRY,,,,,/Workspace/Users/smitha.pm@mccain.ca/.bundle/silver_load_asset_bundle/dev/files/src/pl_transform_fact_wms_inventory,pl_fact_wms_inventory_usa
504082234260492,218358347898801,2024-10-24T05:46:56.446Z,2024-10-24T05:47:46.123Z,0.82795,SUCCESS,,,,,No,,,,https://adb-1178028260927946.6.azuredatabricks.net/?o=1178028260927946#job/504082234260492/run/218358347898801,0,ONE_TIME,,,,,/Workspace/Users/smitha.pm@mccain.ca/.bundle/silver_load_asset_bundle/dev/files/src/pl_transform_fact_wms_inventory,pl_fact_wms_inventory_aus
504082234260492,237646161328271,2024-10-24T05:42:53.699Z,2024-10-24T05:43:41.807Z,0.8018,SUCCESS,,,,,No,,,,https://adb-1178028260927946.6.azuredatabricks.net/?o=1178028260927946#job/504082234260492/run/237646161328271,0,ONE_TIME,,,,,/Workspace/Users/smitha.pm@mccain.ca/.bundle/silver_load_asset_bundle/dev/files/src/pl_transform_fact_wms_inventory,pl_fact_wms_inventory_pol
504082234260492,568163116044882,2024-10-24T05:45:19.403Z,2024-10-24T05:46:08.576Z,0.81955,SUCCESS,,,,,No,,,,https://adb-1178028260927946.6.azuredatabricks.net/?o=1178028260927946#job/504082234260492/run/568163116044882,0,ONE_TIME,,,,,/Workspace/Users/smitha.pm@mccain.ca/.bundle/silver_load_asset_bundle/dev/files/src/pl_transform_fact_wms_inventory,pl_fact_wms_inventory_arg
504082234260492,612046137723282,2024-10-24T05:43:42.03Z,2024-10-24T05:44:31.396Z,0.8227666666666666,SUCCESS,,,,,No,,,,https://adb-1178028260927946.6.azuredatabricks.net/?o=1178028260927946#job/504082234260492/run/612046137723282,0,ONE_TIME,,,,,/Workspace/Users/smitha.pm@mccain.ca/.bundle/silver_load_asset_bundle/dev/files/src/pl_transform_fact_wms_inventory,pl_fact_wms_inventory_col
504082234260492,169949873361076,2024-10-24T05:41:29.566Z,2024-10-24T05:42:53.404Z,1.3973,SUCCESS,,,,,No,,,,https://adb-1178028260927946.6.azuredatabricks.net/?o=1178028260927946#job/504082234260492/run/169949873361076,0,ONE_TIME,,,,,/Workspace/Users/smitha.pm@mccain.ca/.bundle/silver_load_asset_bundle/dev/files/src/pl_transform_fact_wms_inventory,pl_fact_wms_inventory_mfe


---