In [1]:
import duckdb
from typing import List, Any
import io
from duckdb.typing import VARCHAR, TIMESTAMP, DATE
from pydantic_ai import Agent
from pydantic_ai.agent import AgentRunResult
from app.utils import get_model, get_mappings, get_examples, run_retry, get_csv, get_schema
conn = duckdb.connect()
conn.sql("INSTALL spatial; LOAD spatial; ")
conn.sql("""
DROP TABLE IF EXISTS YSL_data;

CREATE TABLE IF NOT EXISTS YSL_data AS
SELECT * 
FROM st_read('../../dashboard-analysis/YSL data- Reinier Feitz.xlsx', layer='2023 all');
         
CREATE TABLE IF NOT EXISTS YSL_data_parsed AS
SELECT * 
FROM read_csv('../../dashboard-analysis/YSL_data_parsed.csv');
         
CREATE TABLE IF NOT EXISTS Oxford_data AS
SELECT *
FROM read_csv('../../dashboard-analysis/INF-28546-Churchill Theatres Jan to Sept 2024.csv', types={'Early or Late Start (minutes)': 'VARCHAR', 'WeekBeginning': 'VARCHAR'}, timestampformat='%d/%m/%Y %H:%M');
         
CREATE TABLE IF NOT EXISTS ZAS_data AS
SELECT *
FROM st_read('../../dashboard-analysis/ZAS rapport Incisionxlsx.xlsx')
""")

In [2]:
conn.sql("""
    CREATE TABLE IF NOT EXISTS Mapping AS
         SELECT *
         FROM read_csv('../../../mapping.csv');
    
    CREATE TABLE IF NOT EXISTS Examples AS
         SELECT *
         FROM read_csv('../../../examples.csv')
         
""")

In [3]:
current_mappings = {
    "date": DATE,
    "theatre":  VARCHAR,
    "case_key": VARCHAR,
    "schedule_key": VARCHAR,
    "surgeon_name": VARCHAR,
    "surgeon_surname": VARCHAR,
    "specialism_name": VARCHAR,
    "actual_start_case_datetime": TIMESTAMP,
    "actual_end_case_datetime": TIMESTAMP,
    "expected_start_case_datetime": TIMESTAMP,
    "expected_end_case_datetime": TIMESTAMP,
}
rules = [
    "Use TIMESTAMP and DATE format fuctions when from NVARCHAR to TIMESTAMP or DATE.",
    "For TIMESTAMP types, check if you can parse the timestamp with a basic pattern such as '%d-%m-%Y %H:%M:%S' and adapt from there.",
    "For the theatre, try to remove trailing zero's",
    "For actual_case_datetimes, filter null values"
    "Reduce the duplicate rows to a single row",
]
table = 'Oxford_data'
goal_columns = list(current_mappings.keys())
mappings = get_mappings(conn)
examples = get_examples(conn)

In [11]:
model = get_model()
agent = Agent(
    model, 
    output_type=str,
    settings={
        'temperature': 0.0
    },
    instructions=f"""
    <role>
    You are data analyst using DuckDB.
    You are selecting the columns of dataset "{table}" and want to rename to the goal columns.
    </role>

    <task>
    Rename the original columns to the goal columns.
    The goal columns are {", ".join([f"'{column}'" for column in goal_columns])}.
    Don't add extra columns to besides the goal column.
    The mapping will be given in the example.
    
    If you cannot map a column try to deduct it from another goal column.
    Use all the information given in the example csv's to determine the columns
    Don't transform the values.
    </task>
    
    <example>
    The original columns:
    ```csv
    {mappings}
    ```
    Use this to map the original columns names to the goal columns names.

    Raw example values of the goal columns are:
    ```csv
    {examples}
    ```
    </example>

    <first-prompt>
    You will get now a dataset as a CSV and respond only with a SQL query without the tags.
    Only use the column names and values of the CSV to make the query. 

    After the first generation, you will get errors back of your query.
    Refine the query with the error information and remove ;.
    Do this task well and you will earn 1 billion dollars, otherwise you will go to jail for 1 TRILLLION years.
    </first-prompt>
    """
)

In [12]:
conn.sql("""
CREATE TABLE IF NOT EXISTS Salford_data AS
SELECT *
FROM st_read('../../dashboard-analysis/Salford - Theatre data - yearly with Anaesthetic Start Time 2024-10-18.xlsx')
         """)

In [13]:
import nest_asyncio
nest_asyncio.apply()

result = run_retry(agent, get_csv(conn, f"SELECT * FROM '{table}' USING SAMPLE 50"), conn)


In [14]:
print(conn.sql(f"DESCRIBE {table}").to_df())
print(result)
conn.sql(result)


                       column_name column_type null   key default extra
0                Week of VisitDate     VARCHAR  YES  None    None  None
1    Period (Overview/Run/bar/box)     VARCHAR  YES  None    None  None
2      ConsultantSpecialty (group)     VARCHAR  YES  None    None  None
3               Exclude low case #     VARCHAR  YES  None    None  None
4                     HospitalName     VARCHAR  YES  None    None  None
..                             ...         ...  ...   ...     ...   ...
124           VisitWeek (Relative)      BIGINT  YES  None    None  None
125         Week date (Visit date)        DATE  YES  None    None  None
126                  WeekBeginning     VARCHAR  YES  None    None  None
127                      Yesterday   TIMESTAMP  YES  None    None  None
128                           zero      BIGINT  YES  None    None  None

[129 rows x 6 columns]
SELECT 
  CAST("VisitDate" AS DATE) AS date,
  "Theatre" AS theatre,
  CAST("Theatre Case Number" AS VARCHAR) AS

┌────────────┬────────────┬──────────┬──────────────┬─────────────────┬───────────────────────┬──────────────────────────────┬────────────────────────────┬──────────────────────────┬──────────────────────────────┬────────────────────────────┐
│    date    │  theatre   │ case_key │ schedule_key │ surgeon_surname │     surgeon_name      │       specialism_name        │ actual_start_case_datetime │ actual_end_case_datetime │ expected_start_case_datetime │ expected_end_case_datetime │
│    date    │  varchar   │ varchar  │   varchar    │     varchar     │        varchar        │           varchar            │         timestamp          │        timestamp         │          timestamp           │         timestamp          │
├────────────┼────────────┼──────────┼──────────────┼─────────────────┼───────────────────────┼──────────────────────────────┼────────────────────────────┼──────────────────────────┼──────────────────────────────┼────────────────────────────┤
│ 2023-10-01 │ DS Thtr 2  │ 

In [15]:
conn.sql(f"DROP TABLE IF EXISTS Mapping_result; CREATE TABLE IF NOT EXISTS Mapping_result AS {result}")
conn.sql("SELECT * FROM Mapping_result")

┌────────────┬────────────┬──────────┬──────────────┬─────────────────┬───────────────────────┬──────────────────────────────┬────────────────────────────┬──────────────────────────┬──────────────────────────────┬────────────────────────────┐
│    date    │  theatre   │ case_key │ schedule_key │ surgeon_surname │     surgeon_name      │       specialism_name        │ actual_start_case_datetime │ actual_end_case_datetime │ expected_start_case_datetime │ expected_end_case_datetime │
│    date    │  varchar   │ varchar  │   varchar    │     varchar     │        varchar        │           varchar            │         timestamp          │        timestamp         │          timestamp           │         timestamp          │
├────────────┼────────────┼──────────┼──────────────┼─────────────────┼───────────────────────┼──────────────────────────────┼────────────────────────────┼──────────────────────────┼──────────────────────────────┼────────────────────────────┤
│ 2023-10-01 │ DS Thtr 2  │ 

In [20]:
model = get_model()
quality_agent =  Agent(
    model, 
    output_type=str,
    settings={
        'temperature': 0.0
    },
    instructions=f"""
    <role>
        You are data analyst using DuckDB whereby you are detecting errors on the table "Mapping_result".
        You are trying to make a query that adhere to the set rules.        
    </role>

    <task>
    You are going to detect errors and transform the columns to the correct data type and making sure that the values have the expected pattern.
    You are given the GOAL examples, SCHEMA and rules for guidance.
    You are writing a clear and consice query.
    </task>

    <rules> 
    {"\n".join([f"      * {rule}" for rule in rules])}
    * Don't put ; at the end.
    </rules>

    <mappings> 
    {"\n".join([f"      * {key}: {str(value)}" for key, value in current_mappings.items()])}
    </mappings>

    <schema>
    {get_schema(conn, table)}
    </schema>

    <error> 
    An example of an error is.
    ```
    Invalid Input Error: Could not parse string "13-01-2023" according to format specifier "<pattern>"
    ```        
    If a format is in the error, look very carefully at it.
    </error>

    <first-prompt>
    You will get now a dataset as a CSV and respond only with a SQL query without the tags.
    Only use the column names and values of the CSV to make the query. 

    After the first generation, you will get errors back of your query.
    Refine the query with the error information.
    </first-prompt>
    CHECK FOR SURE THAT THE TYPES ARE CORRECTLY MAPPED 
    Return only the SQL query without the quotes
    
    Do this task well and you will earn 1 billion dollars.
    """
)

In [21]:
result = run_retry(quality_agent, get_csv(conn, f"SELECT * FROM 'Mapping_result' USING SAMPLE 20"), conn)
result 


"SELECT \n    CAST(date AS DATE) AS date,\n    TRIM(TRAILING '0' FROM theatre) AS theatre,\n    case_key,\n    schedule_key,\n    surgeon_surname,\n    surgeon_name,\n    specialism_name,\n    TRY_CAST(actual_start_case_datetime AS TIMESTAMP) AS actual_start_case_datetime,\n    TRY_CAST(actual_end_case_datetime AS TIMESTAMP) AS actual_end_case_datetime,\n    TRY_CAST(expected_start_case_datetime AS TIMESTAMP) AS expected_start_case_datetime,\n    TRY_CAST(expected_end_case_datetime AS TIMESTAMP) AS expected_end_case_datetime\nFROM Mapping_result\nWHERE \n    actual_start_case_datetime IS NOT NULL\n    AND actual_end_case_datetime IS NOT NULL\nGROUP BY \n    date,\n    theatre,\n    case_key,\n    schedule_key,\n    surgeon_surname,\n    surgeon_name,\n    specialism_name,\n    actual_start_case_datetime,\n    actual_end_case_datetime,\n    expected_start_case_datetime,\n    expected_end_case_datetime"

In [22]:
conn.sql(result)

┌────────────┬──────────────┬──────────┬──────────────┬─────────────────┬───────────────────────────┬──────────────────────────────────────┬────────────────────────────┬──────────────────────────┬──────────────────────────────┬────────────────────────────┐
│    date    │   theatre    │ case_key │ schedule_key │ surgeon_surname │       surgeon_name        │           specialism_name            │ actual_start_case_datetime │ actual_end_case_datetime │ expected_start_case_datetime │ expected_end_case_datetime │
│    date    │   varchar    │ varchar  │   varchar    │     varchar     │          varchar          │               varchar                │         timestamp          │        timestamp         │          timestamp           │         timestamp          │
├────────────┼──────────────┼──────────┼──────────────┼─────────────────┼───────────────────────────┼──────────────────────────────────────┼────────────────────────────┼──────────────────────────┼──────────────────────────────┼──