In [None]:
#| output: false
%pip install sqlalchemy
# be sure to adjust this if the duckdb version in the dbt driver is differs
%pip install duckdb==0.5.1
%pip install duckdb-engine
%pip install psycopg2
%pip install python-dotenv

import matplotlib.pylab as plt
plt.rcParams['figure.dpi'] = 100

from IPython.display import display, Markdown, HTML
from datetime import datetime, timezone
import pandas as pd
from dotenv import load_dotenv
import os
from pathlib import Path 

In [None]:
#| tags: [parameters]

load_dotenv(override=True)
connection_string = os.environ.get('DB_CONNECTION_STRING')
connection_string

In [None]:
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool
con = create_engine(connection_string, poolclass=NullPool)

In [None]:
metric_time = con.execute("SELECT MAX(run_started_at) FROM quality_metrics").fetchall()
as_of_time = metric_time[0][0]

if as_of_time:
    utc_dt = datetime.fromisoformat(as_of_time)
    utc_dt = utc_dt.replace(tzinfo=timezone.utc).astimezone(tz=None)
    display(Markdown("## Run at " + utc_dt.strftime("%b %d, %Y at %-I:%M:%S %p")))
else:
    display(Markdown("## No quality metrics found"))   

### Error thershold settings

In [None]:
default_error_threshold = 0.05
error_thresholds = {
    "q_obs_comp_value_range": default_error_threshold,
    "q_obs_value_range": default_error_threshold
}
error_threshold_sql_fragments = []
for k,v in error_thresholds.items():
    error_threshold_sql_fragments.append(f"SELECT '{k}' AS metric_name, {v} AS error_threshold")  
error_threshold_sql = " UNION ALL ".join(error_threshold_sql_fragments)
thresholds = pd.read_sql(error_threshold_sql, con)
output = thresholds.to_html(index=False)

display(HTML(output))

## Summary 
#### Error rate (by metric and critera)

In [None]:
sql = f"""
    WITH error_thresholds AS ({error_threshold_sql}),
    summary_metrics AS (
        SELECT 
            metric_name,
            criteria,
            SUM(error_count)::numeric AS error_count,
            SUM(denominator)::numeric AS denominator
        FROM quality_metrics
        WHERE run_started_at = (SELECT MAX(run_started_at) FROM quality_metrics)
        GROUP BY 1,2
    )

    SELECT
        summary_metrics.metric_name, 
        criteria, 
        error_count::INTEGER AS errors, 
        denominator::INTEGER AS denominator,
        (error_count/denominator) AS "error rate"
    FROM summary_metrics
    LEFT JOIN error_thresholds ON
        summary_metrics.metric_name = error_thresholds.metric_name
    WHERE
        (error_count/denominator)::DECIMAL >= COALESCE(error_threshold, {default_error_threshold})
"""
summary = pd.read_sql(sql, con)
output = summary.to_html(formatters={
    'errors': '{:,}'.format, 
    'denominator': '{:,}'.format, 
    'error rate': '{:,.0%}'.format
}, index=False)

display(HTML(output))

#### Error Rate over Time (by metric)

In [None]:
import matplotlib.dates as mdates

sql = f"""
SELECT 
    run_started_at::TIMESTAMP AS run_started_at,
    metric_name,
    ROUND((SUM(error_count)::NUMERIC/SUM(denominator)::NUMERIC)*100) AS error_rate
FROM quality_metrics
GROUP BY 1,2
"""
metric_errors = pd.read_sql(sql, con)
pivot = metric_errors.pivot(index="run_started_at", columns="metric_name", values="error_rate")
chart = pivot.plot.line(marker="o", xlabel="run started at", ylabel="error count", title="Errors")
chart.legend(title="metric")
chart.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%b-%d %H:%M'))


### Errors Above Threshold (by metric and criteria)

In [None]:
def metric_to_table(metric_name):
    sql = f"""
        WITH error_thresholds AS ({error_threshold_sql}),
        summary_metrics AS (
            SELECT 
                metric_name,
                criteria,
                error_examples,
                SUM(error_count)::numeric AS error_count,
                SUM(denominator)::numeric AS denominator
            FROM quality_metrics
            WHERE run_started_at = (SELECT MAX(run_started_at) FROM quality_metrics)
                AND metric_name = '{metric_name}'
            GROUP BY 1,2,3
        )

        SELECT
            criteria, 
            error_count::INTEGER AS errors, 
            denominator::INTEGER AS denominator,
            (error_count/denominator) AS "error rate",
            error_examples AS examples
        FROM summary_metrics
        LEFT JOIN error_thresholds ON
            summary_metrics.metric_name = error_thresholds.metric_name
        WHERE
            (error_count/denominator)::DECIMAL >= COALESCE(error_threshold, {default_error_threshold})
    """
    detail = pd.read_sql(sql, con)
    return detail.to_html(formatters={
        'errors': '{:,}'.format, 
        'denominator': '{:,}'.format, 
        'error rate': '{:,.0%}'.format
    }, index=False)


sql = f"""
SELECT 
    metric_name
FROM quality_metrics
GROUP BY 1
"""
metrics = pd.read_sql(sql, con)

for metric_name in metrics["metric_name"]:
    display(Markdown("### " + metric_name))
    display(HTML(metric_to_table(metric_name)))

In [None]:
#| output: false
# close the database
con.dispose()