In [1]:
import sqlite3
import os
import pandas as pd
import datetime

db_path = "sample_crm.db"
tests_folder = "tests/generated"

In [2]:
# Connect to database

conn = sqlite3.connect(db_path)
cursor = conn.cursor()

In [9]:
# Fetcn all SQL queries from tests folder into a dictionary

sql_dict = {}

for filename in os.listdir(tests_folder):
    if filename.endswith(".sql"):
        with open(os.path.join(tests_folder, filename), "r") as f:
            sql = f.read()
        sql_dict[filename.replace('.sql','')] = sql

In [11]:
# Evaluate all SQL queries and store results in a dictionary of DataFrames

sql_res_dict = {}

for sql_query_id in sql_dict.keys():
    cursor.execute(sql_dict[sql_query_id])
    columns = [desc[0] for desc in cursor.description]
    results = cursor.fetchall()
    df = pd.DataFrame(results, columns=columns)
    sql_res_dict[sql_query_id] = df

In [30]:
# Aggregate the evaluation results and attach timestamp and name of test 

rf = pd.DataFrame(columns=['sql_query_id','timestamp','test_result','count'])

for sql_query_id in sql_res_dict.keys():
    df = sql_res_dict[sql_query_id]
    df = pd.DataFrame(['PASS','FAIL'],columns=['test_result']).merge(
        df.groupby(['test_result']).size().reset_index(name='count'),
        on='test_result',
        how='left'
    ).fillna(0).assign(timestamp=datetime.datetime.now().isoformat()).assign(sql_query_id=sql_query_id)
    df = df[['sql_query_id','timestamp','test_result','count']]
    rf = pd.concat([rf, df], ignore_index=True)

  rf = pd.concat([rf, df], ignore_index=True)


In [None]:
table_name = 'test_results_aggregated'

cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}'")
exists = cursor.fetchone() is not None

if exists:
    rf.to_sql(table_name, conn, if_exists='append', index=False)
else:
    rf.to_sql(table_name, conn, if_exists='replace', index=False)