In [None]:
%load_ext autoreload
%autoreload 2
import sys, pathlib
sys.path.append(str(pathlib.Path.cwd().parent.parent))

In [None]:
%reload_ext dotenv

%dotenv ../../env/.env

import warnings
from pandas import Timedelta
# from optiml.utils import sf
import time
from optiml.utils.sf import logger, sql_to_df, run_sql, conn, session
import pandas as pd
warnings.filterwarnings('ignore')

try:
    %load_ext autotime
except:
    !pip install ipython-autotime
    %load_ext autotime

import plotly
plotly.offline.init_notebook_mode()

In [None]:
# to rerun this analysis:

# in the *staging* app, rerun dbt in knot account in order to pull latest staging data [<=10min]
# run `grant select on all tables in schema optiml_share.optiml to database role optiml_share_role` in the query admin console
# run this notebook, connecting to KNOT_SHARE.OPTIML

# for the counterfactual analysis, additional steps are needed:

# use the knot-dba notebook to copy all *tables* from the share into a target schema [in our account]
# run dbt: dbt seed && dbt run --exclude staging daily_rates
# run the cluster autosuspend simulation smart-suspend-simulate on that schema
# rerun dbt # dbt run -s warehouse_era_simulated+
# run this notebook connected to the target schema:

# rerun dbt, just selecting views dbt run -s config.materialized:view?

In [None]:
run_sql("set lookback_days=60")

# run_sql("set wh_name='SEGMENT_LOAD_WH' ")
run_sql("set wh_name='XOGRP_DEV_WH' ")

run_sql("set date_part = 'hour'; ")

merge_cte = """
with whp as (
select 
    date_trunc($date_part, hour_start) as ts,
    sum(dollars_used_compute) as dollars_used_compute,
    sum(active_hours - idle_hours)*60 as query_minutes,
    sum(idle_hours) * 60 as idle_minutes,
    100*sum(idle_hours)/sum(active_hours) as pct_idle

from warehouse_profile_by_hour
where warehouse_name = $wh_name
and hour_start > dateadd('days',-$lookback_days, current_timestamp())
group by 1
),
whe as (
    select 
        date_trunc($date_part, timestamp) as ts,
        -- date_trunc(hour, timestamp) as day,    
        -- event_reason || ':' || nvl(role_name, 'null') as source,
        count(*) as num_suspensions,
        count_if(role_name='OPTIML_SVC') as num_aero_suspensions,
        num_suspensions - num_aero_suspensions num_nonaero_suspensions,
        100*num_aero_suspensions/num_suspensions as pct_aero_suspensions
    from stg_warehouse_events_history 
    where event_name = 'SUSPEND_WAREHOUSE'
    and event_state = 'COMPLETED'
    and warehouse_name=$wh_name
    and timestamp > dateadd('days',-$lookback_days, current_timestamp())
    group by 1
),
qh as 
(
    select
        start_time,
        execution_time/1000 as execution_time, 
        PERCENTAGE_SCANNED_FROM_CACHE
    from 
        stg_query_history
    where start_time > dateadd('days',-$lookback_days, current_timestamp())
    and warehouse_name = $wh_name
    and warehouse_size is not null
),
qh_stats as (
    select 
        date_trunc($date_part, start_time) as ts,
        count(*) as num_queries,
        avg(execution_time) avg_execution_time,
        median(execution_time) median_execution_time,
        min(execution_time) min_execution_time,
        max(execution_time) max_execution_time,
        percentile_cont(.90) within group(order by execution_time) as p90_execution_time,
        percentile_cont(.75) within group(order by execution_time) as p75_execution_time,
        percentile_cont(.25) within group(order by execution_time) as p25_execution_time,
        percentile_cont(.10) within group(order by execution_time) as p10_execution_time,
        avg(PERCENTAGE_SCANNED_FROM_CACHE) as avg_pct_scanned_cache,
        count_if(PERCENTAGE_SCANNED_FROM_CACHE > .50) num_queries_majority_cached,
        100*num_queries_majority_cached/num_queries pct_queries_majority_cached
    from
       qh
    group by 1
    having num_queries > 1000
    order by 1 desc
),
merged as (
select
    whp.ts as timestamp,
    pct_aero_suspensions > 0 as aero_on,
    dollars_used_compute,
    query_minutes,
    idle_minutes,
    pct_idle,
    num_suspensions,
    num_aero_suspensions,
    num_nonaero_suspensions,
    pct_aero_suspensions,
    num_queries,
    avg_execution_time,
    median_execution_time,
    min_execution_time,
    max_execution_time,
    p90_execution_time,
    p75_execution_time,
    p25_execution_time,
    p10_execution_time,
    avg_pct_scanned_cache,
    num_queries_majority_cached,
    pct_queries_majority_cached
from whp
left join whe
on whp.ts = whe.ts
left join qh_stats q
on whp.ts = q.ts
)
"""

sql = f"""
{merge_cte}
select * from merged
order by timestamp asc
"""

summary_sql = f"""
{merge_cte}
select 
    pct_aero_suspensions > 0 as aero_on,
    count(*) as num_periods,
    avg(query_minutes) as avg_hourly_query_minutes,
    avg(idle_minutes) as avg_hourly_idle_minutes,
    100*sum(idle_minutes)/(sum(query_minutes) + sum(idle_minutes)) as pct_idle,
    avg(dollars_used_compute) avg_hourly_dollars_used_compute,
    median(dollars_used_compute) median_hourly_dollars_used_compute,
    stddev(dollars_used_compute) stddev_hourly_dollars_used_compute,
    100*sum(num_queries_majority_cached)/sum(num_queries) as pct_queries_majority_cached
from 
    merged
group by 1;
"""

wpdf_hour = sql_to_df(sql)

summary = sql_to_df(summary_sql)

In [None]:
summary

In [None]:
import plotly.express as px
for c in summary.columns:
    if c != 'aero_on':
        fig = px.bar(summary, y='aero_on', x=c, orientation='h', height=300, width=1000, text=c)
        fig.update_traces( textposition='inside')
        fig.show()

In [None]:
import plotly.graph_objects as go

import numpy as np

for hist_col in ['dollars_used_compute', 'pct_idle', 'idle_minutes', 'query_minutes', 'median_execution_time', 'pct_queries_majority_cached']:
    # wpdf.aero_on = wpdf.aero_on.map(bool)
    x0 = wpdf_hour[hist_col][wpdf_hour.aero_on==False]
    # Add 1 to shift the mean of the Gaussian distribution
    x1 = wpdf_hour[hist_col][wpdf_hour.aero_on==True]
    fig = go.Figure()
    fig.add_trace(go.Histogram(x=x0, histnorm='probability', marker=dict(color='red'),name='aero off'))
    fig.add_trace(go.Histogram(x=x1, histnorm='probability',marker=dict(color='green'), name = 'aero on'))

    # Overlay both histograms
    fig.update_layout(barmode='overlay', height=400, width=600)
    # Reduce opacity to see both histograms
    fig.update_traces(opacity=0.75)
    fig.update_layout(xaxis_title_text = hist_col)
    fig.update_layout(yaxis_title_text = 'percent of hours')
    fig.show()

In [None]:
print(x0.mean())
print(x0.median())

print(x1.mean())
print(x1.median())

print(len(x0))
print(len(x1))

In [None]:
run_sql("set date_part = 'day'; ")
wpdf = sql_to_df(sql)

whload = f"""
select 
    date_trunc(hour, start_time) start_hour,
    avg(avg_running) as avg_running,
    avg(avg_queued_load) as avg_queued_load,
    avg(avg_queued_provisioning) as avg_queued_provisioning,
    avg(avg_blocked) as avg_blocked
from stg_warehouse_load_history 
where warehouse_name = $wh_name 
    and start_time > dateadd('days',-$lookback_days, current_timestamp())
    group by 1
    order by start_hour desc;
"""

load_hour_df = sql_to_df(whload)

suspension_stats = f"""
with eras as (
    select
        'query' as type,
        warehouse_id,
        warehouse_name,
        warehouse_sizes,
        max_cluster_number,
        era_start,
        era_end
    from query_era
    where era_end <= (select max(era_end) from warehouse_era)
    
    union

    select
        'warehouse'as type,
        warehouse_id,
        warehouse_name,
        null as max_cluster_number,
        null as warehouse_sizes,
        era_start,
        era_end
    from warehouse_era
    where era_start > (select min(era_start) from query_era)
    and era_end <= (select max(era_end) from query_era)
),
enriched as (
    select 
        row_number() over(order by warehouse_id, era_end) as era_id,
        *,
        -- max(era_end) over (partition by warehouse_id)
        lag(type) over (partition by warehouse_id order by era_end) as previous_ending_type,
        lag(era_end) over (partition by warehouse_id order by era_end) as previous_ending_time,
        lag(max_cluster_number) over (partition by warehouse_id order by era_end) as previous_max_cluster_number,
        lag(warehouse_sizes) over (partition by warehouse_id order by era_end) as prevous_wh_sizes,
        case when type = 'warehouse' and previous_ending_type = 'query' then timediff(milliseconds, previous_ending_time, era_end)/1000 else null end as suspend_lag,
        case when type = 'query' and previous_ending_type = 'query' then timediff(milliseconds, previous_ending_time, era_start)/1000 else null end as time_since_last_query,
        timediff(seconds, era_start, era_end) as era_seconds
    from eras
)
-- select * from enriched order by era_end desc limit 10;
,
suspends as (
select
    warehouse_id,
    warehouse_name,
    prevous_wh_sizes as warehouse_sizes,
    previous_max_cluster_number,
    era_end as suspend_time,
    suspend_lag
from enriched
where suspend_lag is not null
),
suspension_hour_stats as (
-- select * from suspends limit 10;
select
	warehouse_id,
    warehouse_name,
    date_trunc('hour', suspend_time) as hour,
    array_union_agg(warehouse_sizes) as sizes,
    sizes[0]::text as size,
    max(previous_max_cluster_number) as clusters,
    count(*) as num_suspensions,
    sum(previous_max_cluster_number*(60 - suspend_lag)) as max_saved_idle_seconds,
    avg(suspend_lag) suspend_lag_avg,
    median(suspend_lag) suspend_lag_median,
    min(suspend_lag) suspend_lag_min,
    percentile_cont(.99) within group(order by suspend_lag) as "99_pctile",
    max(suspend_lag) suspend_lag_max
from suspends s
where warehouse_name = $wh_name
and suspend_time > dateadd('days',-$lookback_days, current_timestamp())
group by 1,2,3
),
suspension_savings as (
select 
    s.*,
    max_saved_idle_seconds * wc.credits_per_hour / 3600 as max_credit_diff,
    max_credit_diff * 3 as max_dollar_diff,
    sum(max_dollar_diff) over(order by hour asc) as cum_savings
from 
    suspension_hour_stats s
left join warehouse_credits wc
on s.size = wc.size
)
select * from suspension_savings
order by hour desc;
"""

sdf = sql_to_df(suspension_stats)
# itables.show(sdf)


In [None]:
wpdf.columns

In [None]:
import plotly.express as px
from pandas import Timedelta
aero_on_ts = wpdf['timestamp'][wpdf.aero_on==True].to_list()

def color_ontime(fig):
    for on_time in aero_on_ts:
        fig.add_vrect(
            x0=on_time,
            x1=on_time + Timedelta(days = 1),
            fillcolor="green",
            opacity=0.5,
            line_width=0,
        )

for mistyped_col in ['query_minutes', 'idle_minutes', "num_nonaero_suspensions", "num_aero_suspensions"]:
    wpdf[mistyped_col] = wpdf[mistyped_col].map(float)
    
    # p90_execution_time,
    # p75_execution_time,
    # p25_execution_time,
    # p10_execution_time,
    # avg_pct_scanned_cache,
    # pct_queries_majority_cached
    
for col in [["num_nonaero_suspensions", "num_aero_suspensions"], 'dollars_used_compute', 'pct_idle', ['query_minutes', 'idle_minutes'], 'num_queries', 'avg_pct_scanned_cache', 'pct_queries_majority_cached']:
    fig = px.bar(wpdf, x='timestamp', y=col)
    color_ontime(fig)
    fig.show()

for col in [['p90_execution_time', 'p75_execution_time', 'p25_execution_time', 'p10_execution_time']]:
    fig = px.line(wpdf, x='timestamp', y=col)
    color_ontime(fig)
    fig.show()

for col in [["suspend_lag_avg", 'suspend_lag_max', 'suspend_lag_min', 'suspend_lag_median']]:
    fig = px.line(sdf, x='hour', y=col)
    color_ontime(fig)
    fig.show()

In [None]:
wpdf.dtypes

In [None]:
# %%sh 
# jupyter nbconvert --to html autosuspend_monitoring_simplified.ipynb --output xo_grp_dev_wh

In [None]:
# %%sh 
# jupyter nbconvert --to html smart-suspend-analysis.ipynb --no-input --output test

In [None]:
jupyter nbconvert --ClearOutputPreprocessor.enabled=True --inplace *.ipynb