In [2]:
import requests

import numpy as np
from numpy.random import normal
import pandas as pd
import altair as alt
import networkx as nx
from ipycytoscape import CytoscapeWidget
import ipywidgets as widgets

from IPython.display import JSON, Markdown, display
def printmd(string):
    display(Markdown(string))

ModuleNotFoundError: No module named 'altair'

# Setup 🛠

In [None]:
cloud_api_base_url = 'https://cloud.getdbt.com/api/v2'
metadata_api_url = 'https://metadata.cloud.getdbt.com/graphql'

In [None]:
api_key = ''
headers = {
    "Accept": "application/json",
    "Authorization": f"Bearer {api_key}"
}

### Chart configuration

In [None]:
%%html
<style>
@import url('https://fonts.googleapis.com/css2?family=Source+Sans+Pro&family=Roboto+Slab');
</style>

In [None]:
coalesce_colors = {
    "blue": "#345F70",
    "baby_blue": "#50ADD2",
    "orange": "#ED7357",
    "magenta": "#8F4056",
    "green": "#82B14F",
    "dark_green": "#327176"
}


def coalesce_theme():
    # Typography
    font = "Source Sans Pro, -apple-system, BlinkMacSystemFont, sans-serif"
    title_font = "Roboto Slab, -apple-system, BlinkMacSystemFont, serif"
    font_color = "#000000"
    labelFont = font
    sourceFont = font
    # Axes
    axisColor = "#000000"
    gridColor = "#DEDDDD"
    
    width = 600
    height = 400
    
    title_font_size = 28
    lg_font_size = 24
    md_font_size = 20
    sm_font_size = 16
    return {
        "config": {
            "view": {
                "width": width,
                "height": height
            },
            "title": {
                "fontSize": title_font_size,
                "font": title_font,
                "anchor": "start",
                "fontColor": font_color,
                "offset": 20,
                "subtitleFont": title_font,
                "subtitleFontSize": sm_font_size,
                "subtitleFontWeight": 200
            },
            "subtitle": {
                "fontSize": title_font_size,
                "font": title_font,
                "anchor": "start",
                "fontColor": font_color,
            },
            "legend": {
                "titleFont": font,
                "titleColor": font_color,
                "titleFontSize": sm_font_size,
                "labelFont": font,
                "labelColor": font_color,
                "labelFontSize": sm_font_size,
            },
            "axisX": {
                "domain": True,
                "domainColor": axisColor,
                "domainWidth": 1,
                "labelFont": labelFont,
                "labelFontSize": sm_font_size,
                "labelAngle": 0,
                "tickColor": axisColor,
                "tickSize": 5,
                "titleFont": font,
                "titleFontSize": md_font_size,
                "titlePadding": 10,
                "title": "X Axis Title (units)", 
            },
            "axisY": {
                "domain": False,
                "grid": True,
                "gridColor": gridColor,
                "gridWidth": 1,
                "labelFont": labelFont,
                "labelFontSize": sm_font_size,
                "labelAngle": 0, 
                "ticks": False,
                "titleFont": font,
                "titleFontSize": md_font_size,
                "titlePadding": 10,
                "title": "Y Axis Title (units)"
            },
            "range": {
                "category": [v for v in coalesce_colors.values()]
            }
        }
    }

# register
alt.themes.register("coalesce_theme", coalesce_theme)
# enable|
alt.themes.enable("coalesce_theme")

# Job 1. Real-time monitoring ⏱

### Questions
  
You have dbt jobs running periodically in the cloud. Both for your own development purposes and to incorporate into a pipeline, how can we programmatically answer questions like:

**1. What are the status of my jobs right now?**

**2. What are potential model bottlenecks?**

### Get account

In [None]:
def get_accounts():
    r = requests.get(f'{cloud_api_base_url}/accounts', headers=headers)
    accounts_result = r.json()
    return accounts_result.get('data')

accounts_result = get_accounts()
account_id = accounts_result[0].get("id")
display(pd.DataFrame(accounts_result))

### Get jobs in project

In [None]:
def get_jobs(account_id):
    r = requests.get(f'{cloud_api_base_url}/accounts/{account_id}/jobs', headers=headers)
    jobs_result = r.json()
    return jobs_result.get('data')

jobs = get_jobs(account_id)
display(pd.DataFrame(jobs))

most_recent_job = jobs[0]
job_id = most_recent_job.get("id")
job_name = most_recent_job.get("name")

### Get runs in job

In [3]:
def get_runs_for_job(account_id, job_id, num_runs=100):
    data = {
        "job_definition_id": job_id,
        "order_by": '-created_at',
        "limit": num_runs
    }
    
    r = requests.get(f'{cloud_api_base_url}/accounts/{account_id}/runs', params=data, headers=headers)
    runs_result = r.json()
    return runs_result.get("data")

num_runs = 150
runs = get_runs_for_job(account_id, job_id, num_runs=num_runs)
display(pd.DataFrame(runs).head())


def get_run_status(run):
    if run["in_progress"]:
        return "in_progress"
    if run["is_complete"]:
        return "is_complete"
    if run["is_error"]:
        return "is_error"
    if run["is_cancelled"]:
        return "is_cancelled"
    
def create_job_status_df(runs):
    run_statuses = [
        {
            "id": run["id"],
            "created_at": run["created_at"],
            "status": get_run_status(run),
        }
        for run in runs
    ]
    return pd.DataFrame(run_statuses)

run_statuses_df = create_job_status_df(runs)
display(run_statuses_df.head())

NameError: name 'account_id' is not defined

### Visualize model statuses

In [None]:
def create_run_status_heatmap(df):
    chart = alt.Chart(
        df
    ).mark_rect(stroke='white', strokeWidth=1).encode(
        x=alt.X('hours(created_at):O', title='Run Hour', axis=alt.Axis(grid=False)),
        y=alt.Y('date(created_at):O', title='Run Date', axis=alt.Axis(grid=False)),
        text=alt.Text('hours(created_at):O'),
        color=alt.Color('status', scale=alt.Scale(
            domain=['in_progress', 'is_complete', 'is_error', 'is_cancelled'],
            range=[coalesce_colors["baby_blue"], coalesce_colors["dark_green"], coalesce_colors['magenta'], coalesce_colors['orange']]
        )),
        tooltip=[alt.Tooltip('monthdate(created_at)', title='Date'), alt.Tooltip('hoursminutes(created_at)', title='Time'), alt.Tooltip('status', title='Status')]
    ).properties(
        title={
            "text": 'Run Statuses Over Time',
            "subtitle": [f'''Status of scheduled job runs retrieved from the dbt Cloud API for job "{job_name}"''']
        }
    ).interactive()
    return chart

run_status_heatmap = create_run_status_heatmap(run_statuses_df)
display(run_status_heatmap)

### Getting model execution times

In [None]:
def get_models_for_job(job_id):
    data = f"""{{
        models(jobId: {job_id}) {{
            name
            uniqueId
            alias
            database
            schema
            runGeneratedAt
            executionTime
            executeStartedAt
            executeCompletedAt
        }}
    }}"""

    r = requests.post(metadata_api_url, data={"query": data}, headers=headers)
    models = r.json()['data']['models']
    return models

models = get_models_for_job(job_id)
display(pd.DataFrame(models).head())

### Visualize model execution times

def get_models_by_execution_time_df(models):
    models_by_execution_time = [{
        "model_id": model["uniqueId"],
        "execution_time": model["executionTime"],
        "execute_started_at": model["executeStartedAt"],
        "execute_completed_at": model["executeCompletedAt"],
    } for model in models if model["executionTime"]]
    models_by_execution_time_df = pd.DataFrame(models_by_execution_time)
    models_by_execution_time_df['model_name'] = models_by_execution_time_df['model_id'].str.rsplit('.').str[-1]
    return models_by_execution_time_df

models_by_execution_time_df = get_models_by_execution_time_df(models)

def create_model_execution_gantt_chart(df):
    character_height = 16
    chart = alt.Chart(df).mark_bar().encode(
        x=alt.X('execute_started_at:T', title='Execution Start Time'),
        x2=alt.X2('execute_completed_at:T'),
        y=alt.Y('model_name:N', sort='x', title='Model ID'),
        color=alt.Color('execution_time:Q', scale=alt.Scale(scheme="reds")),
        tooltip=[alt.Tooltip('model_name', title='Model Name'), alt.Tooltip('execution_time', title='Execution Time')]
    ).properties(title={
        "text": 'Model Execution Times',
        "subtitle": [f'''Gantt chart of model execution times in sequence for most recent run of job "{job_name}"''']
    }, height=max(400, 16 * len(pd.unique(df['model_name'])))
    ).configure_axisY(
        labelLimit=300,
        titleAngle=0,
        titleY=0,
        titleX=-38
    ).interactive()
    return chart

gantt_chart = create_model_execution_gantt_chart(models_by_execution_time_df)
display(gantt_chart)

In [None]:
# Job 2: Impact analysis 🏝

### Questions

**1. You're changing existing models in a dbt DAG, what is the downstream impact of these changes?**
  
**2. You're looking to refactor your dbt jobs because your warehouse consumption is increasing, where are good places to start?**

### Retrieve DAG

In [None]:
run_ids = [r["id"] for r in runs]
selected_run_id = widgets.Dropdown(
    options=run_ids,
    value=run_ids[0],
    description='Run ID:'
)
display(selected_run_id)

In [None]:
def retrieve_artifacts_list_for_run(account_id, run_id):
    r = requests.get(f'{cloud_api_base_url}/accounts/{account_id}/runs/{selected_run_id.value}/artifacts', headers=headers)
    return r.json()

artifacts = retrieve_artifacts_list_for_run(account_id, selected_run_id.value)
display(JSON(artifacts))

In [None]:
def retrieve_manifest_for_run(account_id, run_id):
    artifact_headers = headers = {
        "Authorization": f"Bearer {api_key}"
    }
    r = requests.get(f'{cloud_api_base_url}/accounts/{account_id}/runs/{run_id}/artifacts/manifest.json', headers=artifact_headers)
    manifest = r.json()
    return manifest

manifest = retrieve_manifest_for_run(account_id, selected_run_id.value)
display(JSON(manifest))

### Represent manifest child_map as network

In [None]:
def construct_digraph_from_manifest(manifest, character_width = 6):
    graph = nx.DiGraph()

    for model_id, metadata in manifest['nodes'].items():
        model_name = model_id.rsplit('.')[-1]
        graph.add_node(model_id, name=model_name, node_type="model", color=coalesce_colors["blue"], width=character_width * len(model_id))

    for source_id, metadata in manifest['sources'].items():
        source_name = source_id.rsplit('.')[-1]
        graph.add_node(source_id, name=source_name, node_type="source", color=coalesce_colors["green"], width=character_width * len(source_id))

    for parent_id, children in manifest['child_map'].items():
        for child_id in children:
            graph.add_edge(parent_id, child_id)
    return graph

graph = construct_digraph_from_manifest(manifest)

### Visualize DAG

In [None]:
def create_visualization_of_digraph(graph):
    cyto = CytoscapeWidget()
    cyto.graph.add_graph_from_networkx(graph, directed=True)
    cyto.set_layout(
        name="dagre",
        rankDir="LR"
    )
    cyto.set_style([
        {
            'selector': 'node',
            'style': {
                'content': 'data(name)',
                'text-valign': 'center',
                'background-color': 'data(color)',
                'shape': 'rectangle',
                'width': 'data(width)',
                'color': 'white',
                'border-radius': '5px',
                'font': "Source Sans Pro"
            }
        },
        {
            "selector": "edge.directed",
            "style": {
                "curve-style": "bezier",
                "target-arrow-shape": "triangle",
            },
        }
    ])
    return cyto
visualization_of_digraph = create_visualization_of_digraph(graph)
display(visualization_of_digraph)

### Determine successors + predecessors of selected model

In [None]:
def mark_node_successors_in_digraph(graph, selected_model):
    graph_with_marked_successors = graph.copy()
    bfs_successors = [s for s in nx.algorithms.traversal.breadth_first_search.bfs_successors(graph_with_marked_successors, selected_model)]
    successors_of_selected_model = []
    if bfs_successors:
        for successor in bfs_successors:
            successors_of_selected_model.append(successor[0])
            successors_of_selected_model.extend(successor[1])
        for successor in set(successors_of_selected_model):
            graph_with_marked_successors.nodes[successor]['color'] = coalesce_colors["orange"]
    graph_with_marked_successors.nodes[selected_model]['color'] = coalesce_colors["magenta"]
    return graph_with_marked_successors

def mark_node_predecessors_in_digraph(graph, selected_model):
    graph_with_marked_predecessors = graph.copy()
    predecessors = [s for s in  nx.algorithms.dag.ancestors(graph, selected_model)]
    predecessors_of_selected_model = []
    if predecessors:
        for predecessor in set(predecessors):
            graph_with_marked_predecessors.nodes[predecessor]['color'] = coalesce_colors["orange"]
    graph_with_marked_predecessors.nodes[selected_model]['color'] = coalesce_colors["magenta"]
    return graph_with_marked_predecessors

### Visualize downstream and upstream impact

In [None]:
model_unique_ids = sorted([model_id for model_id, metadata in manifest['nodes'].items()])
selected_model = widgets.Dropdown(
    options=model_unique_ids,
    value=model_unique_ids[0],
    description='Model:'
)
display(selected_model)

In [None]:
mode = 'successors'
if mode == 'predecessors': graph_with_marked_relations = mark_node_predecessors_in_digraph(graph, selected_model.value)
elif mode == 'successors': graph_with_marked_relations = mark_node_successors_in_digraph(graph, selected_model.value)
visualization_of_digraph = create_visualization_of_digraph(graph_with_marked_relations)
display(visualization_of_digraph)

### Visualize number of descendants

In [None]:
def get_number_of_descendants_df(graph):
    number_of_descendants_dict = []
    for node, node_attributes in graph.nodes.items():
        node_name = node.rsplit('.')[-1]
        number_of_descendants_dict.append({
            "node": node_name,
            "num_descendants": len(nx.algorithms.dag.descendants(graph, node)),
            "type": node_attributes['node_type']
        })
    number_of_descendants_df = pd.DataFrame(number_of_descendants_dict)
    return number_of_descendants_df

number_of_descendants_df = get_number_of_descendants_df(graph)

def create_histogram_of_number_of_descendants(df, limit=30):
    chart = alt.Chart(df[:limit]).mark_bar().encode(
        x=alt.X('num_descendants:Q', title='Number of Descendants'),
        y=alt.Y('node:N', sort='-x', title='Node'),
        color=alt.Color('type:N', title='Node Type', scale=alt.Scale(range=[coalesce_colors["blue"], coalesce_colors["green"]])),
        tooltip=alt.Tooltip(['node', 'num_descendants'])
    ).properties(title={
        "text": 'Distribution of Nodes by Number of Successors',
        "subtitle": [f'''Number of successors for each node (model, source, seed) in dbt job "{job_name}"'''],
    }, height=600
    ).configure_axisY(
        labelLimit=300,
        titleAngle=0,
        titleY=0,
        titleX=-22
    ).interactive()
    return chart

chart = create_histogram_of_number_of_descendants(number_of_descendants_df, limit=30)
display(chart)

# Job 3: Historical understanding 📈

**1. As you increase the size of your source tables and complexity of your DAGs, are your job runs are starting to take longer?**

**2. If job run times are increasing, which specific models are taking more time?**

### Retrieve run durations over time

In [None]:
num_runs = 100
runs = get_runs_for_job(account_id, job_id, num_runs=num_runs)
display(pd.DataFrame(runs).head())

def get_run_time_duration_df(runs):
    run_times = [
        {
            "id": run["id"],
            "start_time": pd.Timestamp(run["started_at"]),
            "duration": (pd.Timestamp(run["finished_at"]) - pd.Timestamp(run["started_at"])).total_seconds()
        }
        for run in runs
        if run["is_complete"]
    ]
    run_time_duration_df = pd.DataFrame(run_times).sort_values("start_time", ascending=True)
    return run_time_duration_df

run_time_duration_df = get_run_time_duration_df(runs)

### Visualize job runs times over time

In [None]:
def create_run_time_line_chart(df, num_runs=None):
    chart = alt.Chart(df).mark_line().encode(
        x=alt.X('start_time', title='Run Start Time'),
        y=alt.Y('duration', title='Run Duration (sec)'),
        tooltip=['id', 'start_time', 'duration']
    ).properties(title={
        "text": 'Job Run Times',
        "subtitle": [f'''Line chart of job run times for most {num_runs} recent runs of job "{job_name}"''']
    }).interactive()
    return chart

run_time_line_chart = create_run_time_line_chart(run_time_duration_df, num_runs=num_runs)
display(run_time_line_chart)

### Retrieve run results for each run

In [None]:
def get_model_execution_results_by_run(account_id, num_runs=100):
    limited_headers = {"Authorization": headers['Authorization']}

    all_model_execution_results_by_run = []
    for i, run in enumerate(runs[:num_runs]):
        print(f"Retrieving {i + 1} / {num_runs} runs")
        run_id = run["id"]
        r = requests.get(f'{cloud_api_base_url}/accounts/{account_id}/runs/{run_id}/artifacts/run_results.json', headers=limited_headers)
        try:
            runs_results = r.json()['results']
        except Exception as e:
            print(e)
            continue

        model_execution_results_by_run = []
        for run_result in runs_results:
            execution_timing = next((r for r in run_result['timing'] if r['name'] == 'execute'))
            
            if 'node' in run_result:
                unique_id = run_result['node']['unique_id']
            else:
                unique_id = run_result['unique_id']
            model_execution_results_by_run.append({
                "model_id": unique_id,
                "execution_time": run_result['execution_time'],
                "execute_started_at": execution_timing['started_at'],
                "execute_completed_at": execution_timing['completed_at'],
                "run_id": run_id,
                "run_started_at": run["started_at"]
            })
        all_model_execution_results_by_run.extend(model_execution_results_by_run)
    
    model_execution_results_by_run_df = pd.DataFrame(all_model_execution_results_by_run).sort_values('execute_started_at', ascending=True).reset_index()
    model_execution_results_by_run_df['model_name'] = model_execution_results_by_run_df['model_id'].str.rsplit('.').str[-1]
    return model_execution_results_by_run_df

model_execution_results_by_run_df = get_model_execution_results_by_run(account_id, num_runs=20)

### Visualize model execution times over time

In [None]:
def filter_only_longest_running_models(model_execution_results_by_run_df, num_long_running_models=10):
    longest_running_models = model_execution_results_by_run_df.groupby('model_name').mean().reset_index().sort_values('execution_time').iloc[-num_long_running_models:]['model_name']
    model_execution_results_by_run_for_longest_runs_df = model_execution_results_by_run_df[model_execution_results_by_run_df['model_name'].isin(longest_running_models)]
    return model_execution_results_by_run_for_longest_runs_df

model_execution_results_by_run_for_longest_runs_df = filter_only_longest_running_models(model_execution_results_by_run_df, num_long_running_models=5)

def create_line_chart_of_longest_running_model_execution_times(df, num_runs=None):
    chart = alt.Chart(df).mark_line().encode(
        alt.X('run_started_at:T', title='Run Start Time'),
        alt.Y('execution_time:Q', title='Model Execution (seconds)'),
        alt.Color('model_name:N',
            scale=alt.Scale(scheme='tableau10')
        )
    ).properties(title={
        "text": 'Model Execution Times Over Time',
        "subtitle": [f'''Line chart of model execution times most {num_runs} recent runs of job "{job_name}"''']
    }).interactive()
    return chart

chart = create_line_chart_of_longest_running_model_execution_times(model_execution_results_by_run_for_longest_runs_df, num_runs=num_runs)
display(chart)