### Loaded: Databricks Functions

for getting:
- Avg. cluster startup time
- Avg. nodes dropped on running clusters
- Databricks availability
- Clusters with unsupported runtimes
- Number of teams active on Databricks
- Number of users active on Databricks

In [0]:
#Module Import
import concurrent.futures
import datetime
import requests
import pandas as pd #Only being used to make date list

In [0]:
#sub-functions used almost entirely for getting avg. cluster startup time + nodes lost events
#-------------------
def get_events(databricks_headers
           ,cluster_id
           ,start_time=None
           ,end_time=None
           ,order=None
           ,event_types=None
           ,offset=None
           ,limit=None):
    data = {}
    if cluster_id is not None:
        data['cluster_id'] = cluster_id
    if start_time is not None:
        data['start_time'] = start_time
    if end_time is not None:
        data['end_time'] = end_time
    if order is not None:
        data['order'] = order
    if event_types is not None:
        data['event_types'] = event_types
    if offset is not None:
        data['offset'] = offset
    if limit is not None:
        data['limit'] = limit
    response = requests.post(
        'https://zalando-dummy.cloud.databricks.com/api/2.0/clusters/events',
        headers=databricks_headers,
        json=data
    )
    return response.json()

#-------------------
def get_cluster_event(databricks_headers,cluster_id, start_time_ts, end_time_ts, event_types):
    cluster_event_list = []
    offset = 0
    cluster_event = {'next_page': '', 'total_count': 99999999999}
    while 'next_page' in cluster_event and offset < cluster_event['total_count']:
        cluster_event = get_events(databricks_headers=databricks_headers
                                   ,cluster_id=cluster_id
                                   ,start_time=start_time_ts, end_time=end_time_ts
                                   ,event_types=event_types, offset=offset
                                   ,limit=500, order='ASC')
        if 'events' in cluster_event:
            cluster_event_list = cluster_event_list + cluster_event['events']
            offset = offset + len(cluster_event['events'])
        if 'next_page' not in cluster_event:
            break
    return cluster_event_list

#-------------------
def get_node_lost_count(event_list):
    event_types = ['NODES_LOST','RUNNING']
    event_list = [e for e in event_list if e['type'] in event_types]
    
    if len(event_list) > 0:
        #If cluster was running at all, there will be events in the list
        return sum([1 for event in event_list if event['type'] == 'NODES_LOST'])
    else:
        return None

#-------------------
def get_avg_startup_time(event_list):
    
    event_types = ['CREATING', 'RESTARTING', 'STARTING', 'RUNNING']
    event_list = [e for e in event_list if e['type'] in event_types]
    
    start_time = None
    
    startup_times = []
    
    for event in event_list:
        event_type = event["type"]
        
        if event_type in ['STARTING', 'RESTARTING', 'CREATING']:
            start_time = event['timestamp'] / 1000
            
        elif event_type == 'RUNNING' and start_time is not None:
            run_time = event['timestamp'] / 1000
            startup_times.append(datetime.datetime.fromtimestamp(run_time) - datetime.datetime.fromtimestamp(start_time))
            start_time = None #Reseting start_time so old values will not be re-used.
                
    if len(startup_times) >= 1:
        #Getting average startup time from all startups in this cluster on this day
        total_start_time = sum([startup for startup in startup_times],datetime.timedelta())
        avg_start_time = total_start_time / len(startup_times)
        
        return avg_start_time
    
    else:
        return None

#-------------------
def get_cluster_list(headers):
    cluster_list = requests.get(
        "https://zalando-dummy.cloud.databricks.com/api/2.0/clusters/list",
        headers=headers
    )
    return cluster_list.json()['clusters']

#-------------------
# def get_time_pairs(first_date,last_date):
#     dates = pd.date_range(start=first_date, end=last_date,
#                           freq="1D").to_pydatetime().tolist()
#     #Again shifting - should definitely be utility function
#     shift = [date + pd.Timedelta(days=1) for date in dates]
#     return list(zip(dates, shift))

In [0]:
#Avg. cluster startup time per starting cluster
#Avg. number of Node_Lost events per running cluster

def get_cluster_startup_and_nodes_lost(time_pairs, databricks_clusters, databricks_headers):
    
    startup_node_lost_kpis = []

    for start_date, end_date in time_pairs:
        start_time_ts = start_date.timestamp() * 1000
        end_time_ts = end_date.timestamp() * 1000
        event_types = ['CREATING', 'RESTARTING', 'STARTING', 'RUNNING', 'NODES_LOST']
        events = []

        #Step 1: getting all events for a particular date range:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = []
            for cluster in databricks_clusters:
                futures.append(
                    executor.submit(get_cluster_event,
                                    databricks_headers, cluster_id=cluster['cluster_id'],
                                    start_time_ts=start_time_ts, end_time_ts=end_time_ts,
                                    event_types = event_types))
            for future in concurrent.futures.as_completed(futures):
                res = future.result()
                events.append(res)

        #Step 2: processing these events to get KPIs:
        #Nodes Lost:
        node_lost_count = [get_node_lost_count(cluster_events) for cluster_events in events]
        cleaned_node_lost_count = [e for e in node_lost_count if e is not None]
        total_node_lost_events = sum(cleaned_node_lost_count)
        if len(cleaned_node_lost_count) > 0:
            avg_node_lost_per_running_cluster = total_node_lost_events / len(cleaned_node_lost_count)
        else:
            avg_node_lost_per_running_cluster = None #If no node_lost events were reported, return None

        #Startup times:
        startup_times = [get_avg_startup_time(cluster_events) for cluster_events in events]
        cleaned_startup_times = [e for e in startup_times if e is not None]
        total_startup_time = sum([startup for startup in cleaned_startup_times],datetime.timedelta())
        if len(cleaned_startup_times) > 0:
            avg_startup_time = total_startup_time / len(cleaned_startup_times)
        else:
            avg_startup_time = None

        #Step 3: Exporting
        startup_node_lost_kpis.append({
            "date":start_date.strftime("%Y-%m-%d"),
            #"avg_nodes_lost":avg_node_lost_per_running_cluster, #Note: since KPIs will likely NOT be analyzed with day granularity, this average will not be used often. Excluding to save space.
            "nodes_lost_total":total_node_lost_events,
            "nodes_lost_cluster_count":len(cleaned_node_lost_count),
            #"avg_startup_time_seconds":avg_startup_time.total_seconds(), #Note: since KPIs will likely NOT be analyzed with day granularity, this average will not be used often. Excluding to save space.
            "startup_time_total_seconds":total_startup_time.total_seconds(),
            "startup_time_cluster_count":len(cleaned_startup_times)
        })
    return startup_node_lost_kpis

In [0]:
#Number of clusters using unsupported runtimes
#
#Note: Cannot be run historically - simply queries clusters as they are at runtime.
#Therefore, if we want this data on **DAY** granularity, then this code will need to be run every day.
#Alternatively, if we settle for *week* granularity, then this can be run once per week
#

def get_clusters_unsupported_runtimes(cluster_list, databricks_headers):
    """Gets clusters that are using unsupported databricks runtime versions"""
    
    response = requests.get('https://zalando-e2.cloud.databricks.com/api/2.0/clusters/spark-versions', headers=databricks_headers)
    supported_runtime_versions = response.json()['versions']
    
    supported_version_keys =  [version['key'] for version in supported_runtime_versions]
    unsupported_clusters = []
    
    for cluster in cluster_list:
        version = cluster['spark_version']
        if version != 'dlt' and version not in supported_version_keys:
            unsupported_clusters.append((
                cluster['cluster_name'],
                cluster['spark_version']
            ))

    return {
        #"date":datetime.date.today().strftime("%Y-%m-%d"),
        #"pct_clusters_unsupported_runtime":len(unsupported_clusters) / len(cluster_list), #Note: since KPIs will likely NOT be analyzed with day granularity, this percent will not be used often. Excluding to save space.
        "dbricks_runtime_clusters_unsupported":len(unsupported_clusters),
        "dbricks_runtime_clusters_total":len(cluster_list)
    }

In [0]:
#Number of interactive clusters vs jobs per day

def get_interactive_clusters_or_jobs(first_date_str,last_date_str):
    spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

    dbx_usage = spark.read.format("delta").load("s3://zalando-datalake-binary/observability/data/databricks-usage/")

    dbx_usage.createOrReplaceTempView("dbx_usage_table")
    
    table = spark.sql(f'''
        WITH jobs_table AS (
            SELECT CASE
                    WHEN cluster_name LIKE 'API_%' THEN 1
                    WHEN db_sku LIKE 'PREMIUM_ALL_PURPOSE_COMPUTE%' THEN 1
                    ELSE 0
                END AS interactive
                ,CASE
                    WHEN db_sku LIKE 'PREMIUM_JOBS_COMPUTE%' THEN 1
                    ELSE 0
                END AS job
                ,start_date

            from dbx_usage_table
            where start_date between "{first_date_str}" and "{last_date_str}"
        )
        SELECT SUM(interactive) dbricks_interactive_sum
        ,SUM(job) dbricks_job_sum
        ,start_date date
        FROM jobs_table
        GROUP BY start_date
        ORDER BY start_date
    ''')
    return table

In [0]:
# Get AWS Cost Per DBU

def get_aws_spend_over_dbus_ratio(first_date_str,last_date_str):
    
    spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

    #Loading Delta tables from AWS S3
    output_prefix = "s3a://zalando-datalake-binary/observability/data"
    dbx_usage_daily = spark.read.format("delta").load(output_prefix + "/databricks-usage-day/")
    aws_usage_daily = spark.read.format("delta").load(output_prefix + "/databricks-aws-billing-day")

    #Creating views
    team_dbx_usage_daily_df = dbx_usage_daily
    team_aws_usage_daily_df = aws_usage_daily

    team_dbx_usage_daily_df.createOrReplaceTempView("team_dbx_usage_daily")
    team_aws_usage_daily_df.createOrReplaceTempView("team_aws_usage_daily")

    table = spark.sql(f"""
        WITH aws_costs AS (
            --no need to filter by account since aws costs are only processed for nucleo-databricks right now
            SELECT SUM(aws_cost) aws_cost_attributed_to_teams
            ,date
            FROM team_aws_usage_daily
            WHERE date >= "{first_date_str}" AND date <= "{last_date_str}"
            AND team_name != 'unknown'
            GROUP BY date
        ), db_costs as (
            SELECT SUM(dbu_total) dbu_total_nucleo_databricks
            ,date
            FROM team_dbx_usage_daily
            WHERE date >= "{first_date_str}" AND date <= "{last_date_str}"
            AND workspace_name in ('zalando', 'zalando-newton')
            GROUP BY date
        )
        SELECT
               dc.dbu_total_nucleo_databricks --newton + main
               ,ac.aws_cost_attributed_to_teams --nucleo-databricks costs that has z_team tag
               --,ac.aws_cost_attributed_to_teams/dc.dbu_total_nucleo_databricks ratio_per_day
               ,dc.date
        FROM db_costs dc
        JOIN aws_costs ac on dc.date = ac.date
        ORDER BY 1 ASC;
      """)
    
    return table

In [0]:
# Unique Active Users over given time span

def get_active_databricks_users(first_date_str,last_date_str):
    spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

    #Loading Delta tables from AWS S3
    dbx_usage_daily = spark.read.format("delta").load("s3a://zalando-datalake-binary/observability/data/databricks-audit")

    dbx_usage_daily.createOrReplaceTempView("audit_table")

    table = spark.sql(f'''
        select count(distinct useridentity['email']) as databricks_user_count
        from audit_table
        where 1=1
        and date between "{first_date_str}" and "{last_date_str}"
        and (useridentity['email'] like '%@zalando.%' or useridentity['email'] like '%@tradebyte.%')
        and useridentity['email'] not like '%+%'
        and useridentity['email'] not like '%+_%' escape '+'
        and useridentity['email'] not like '%-%';
    ''')

    return table

In [0]:
# Active Teams
#
#Note: This is a complex KPI to report because we want to de-duplicate our answer - requiring us to
#ONLY query the timeframe we need (i.e., if we queried on a day level and got '12','14',and '9', it is
#likely that there would be duplicate teams between those 3 numbers, but we wouldn't know how many - making
#it hard to aggregate further (e.g., on a week level)
#

def get_active_databricks_teams(first_date_str, last_date_str):
    spark.conf.set("spark.sql.shuffle.partitions", sc.defaultParallelism)

    #Loading Delta tables from AWS S3
    audit_data = spark.read.format("delta").load("s3a://zalando-datalake-binary/observability/data/databricks-audit")

    z_employees_data = spark.read.format("delta").load("s3a://zalando-data-governance/prod/datalake/zalando_employees")

    audit_data.createOrReplaceTempView("audit_table")
    z_employees_data.createOrReplaceTempView("z_employees_table")
    
    table = spark.sql(f'''
        WITH db_activeusers AS
            (SELECT distinct(useridentity_email) AS userid
            FROM audit_table
            WHERE 1=1
            AND date BETWEEN "{first_date_str}" AND "{last_date_str}"
            AND (useridentity_email like '%@zalando.%' OR useridentity_email like '%@tradebyte.%')
            AND useridentity_email not like '%+%'
            AND useridentity_email not like '%+_%' ESCAPE '+'
            AND useridentity_email not like '%-%'),
        employee_info AS
            (SELECT email, team_team_name
            FROM z_employees_table),
        z_activeusers_employee AS
            (SELECT db.userid AS user_email, ef.team_team_name AS team_name
            FROM db_activeusers db
            JOIN employee_info ef ON db.userid = ef.email)
        SELECT count(DISTINCT team_name) AS active_teams
        FROM z_activeusers_employee;
    ''')
    return table

In [0]:
# Availability of Databricks
#Queried from Databricks audit logs- analysing if there is any minute without logs

def get_databricks_availability(first_date_str,last_date_str,minimum_downtime_in_minutes = 1):
    
    audit_data = spark.read.format("delta").load("s3a://zalando-datalake-binary/observability/data/databricks-audit")
    audit_data.createOrReplaceTempView("audit_table")
    
    table = spark.sql(f'''
        WITH timestamp_gaps AS (
            SELECT timestamp,
            LAG(timestamp) OVER (ORDER BY timestamp ASC) as prev_timestamp,
            date
            FROM audit_table
            WHERE date BETWEEN CAST('{first_date.date()}' AS DATE)
            AND CAST('{last_date.date()}' AS DATE)
            ORDER BY timestamp
        )
        SELECT SUM(
            CASE
                WHEN (timestamp - prev_timestamp)/1000/60 > {minimum_downtime_in_minutes}
                THEN (timestamp - prev_timestamp)/1000/60
                ELSE 0
            END
        ) as dbricks_downtime_in_whole_mins,
        date
        FROM timestamp_gaps
        GROUP BY date
    ''')

    return table