In [1]:
%load_ext sql

In [2]:
# Set configurations on jupysql to directly output data to Pandas and to simplify the output that is printed to the notebook.

%config SqlMagic.autopandas = True
%config SqlMagic.feedback = False
%config SqlMagic.displaycon = False

In [3]:
%sql duckdb:///mydb.duckdb

In [4]:
import dlt
import duckdb
import pandas as pd
import numpy as np
import requests
import random
import os
from datetime import datetime
from dotenv import load_dotenv

load_dotenv()

print("All libraries imported successfully!")
print(f"Current working directory: {os.getcwd()}")

All libraries imported successfully!
Current working directory: C:\Users\victoria\Desktop\DE\zz_de


In [5]:
# creating dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name='insurance_payments_orchestration',
    destination='duckdb',
    dataset_name='insurance_payments_data'
)

print("dlt pipeline initilised sucessfully!")
print(f"pipeline_name: {pipeline.pipeline_name}")

dlt pipeline initilised sucessfully!
pipeline_name: insurance_payments_orchestration


In [6]:
def load_core_payments_api_with_transformations():
    # Calling the API 
    url = "https://my.api.mockaroo.com/core_payments.json?key=e1e5c550"
    response = requests.get(url)
    
    # Check if request worked
    if response.status_code == 200:
        data = response.json()   # Parse JSON
    
        # Convert to DataFrame
        df_core_payments = pd.DataFrame(data)
        
        # getting a brief look at the data
        print(df_core_payments.head())

        # Convert string money fields to numeric
        df_core_payments['payment_amount'] = pd.to_numeric(df_core_payments['payment_amount'].str.replace('$', '').str.replace(',', ''), errors='coerce')
        df_core_payments['processing_fee'] = pd.to_numeric(df_core_payments['processing_fee'].str.replace('$', '').str.replace(',', ''), errors='coerce')

        # Data quality checks
        print("\n Data Quality Checks")
        print(f" Null values: {df_core_payments.isnull().sum().sum()}")
        print(f" Duplicate rows: {df_core_payments.duplicated().sum()}")
        print(f" Date range: {df_core_payments['transaction_timestamp'].min()} to {df_core_payments['transaction_timestamp'].max()}")
    
        # Basic transformations for insurance payments
        df_core_payments['transaction_timestamp'] = pd.to_datetime(df_core_payments['transaction_timestamp'])
        df_core_payments['total_cost'] = df_core_payments['payment_amount'] + df_core_payments['processing_fee']
        df_core_payments['month'] = df_core_payments['transaction_timestamp'].dt.month
        df_core_payments['day_of_week'] = df_core_payments['transaction_timestamp'].dt.day_name()
        df_core_payments['is_completed'] = df_core_payments['payment_status'] == 'Completed'
    
        # Adding data quality assertions for insurance payments
        assert df_core_payments['payment_amount'].min() > 0, "Payment amount should be positive"
        assert df_core_payments['processing_fee'].min() >= 0, "Processing fee should be non-negative"
        assert df_core_payments['total_cost'].min() > 0, "Total cost should be positive"
        assert df_core_payments['retry_count'].min() >= 0, "Retry count should be non-negative"
    
        print("Data transformations completed successfully!")
    
        # yielding records for dlt
        for record in df_core_payments.to_dict(orient="records"):
            yield record
    else:
        print("Error:", response.status_code)


# Load data into dlt pipeline
info = pipeline.run(
    load_core_payments_api_with_transformations(),
    table_name="sample_data",
    write_disposition="replace"
)

print(f"\n Data loaded successfully!")
print(f"Load info: {info}")


# Get the database path and connect directly to DuckDB
db_path = pipeline.sql_client().credentials.database
print(f"\n Database path: {db_path}")

# Connect to DuckDB and check tables with proper schema handling
con = duckdb.connect(db_path)

all_tables = con.execute("""
    SELECT table_schema, table_name
    FROM information_schema.tables
    WHERE table_type = 'BASE TABLE'
      AND table_schema != 'information_schema'
""").fetchdf()


print(f"\n Available tables:")
print(all_tables)


# Find the data table (exclude dlt internal tables)
data_tables = all_tables[~all_tables['table_name'].str.startswith('_dlt')]
if not data_tables.empty:
    table_schema = data_tables.iloc[0]['table_schema']
    table_name = data_tables.iloc[0]['table_name']
    full_table_name = f"{table_schema}.{table_name}"
    
    row_count = con.execute(f"SELECT COUNT(*) FROM {full_table_name}").fetchone()[0]
    print(f"\n Records in table '{full_table_name}': {row_count}")
    
    # Show sample data
    sample = con.execute(f"SELECT * FROM {full_table_name} LIMIT 3").fetchdf()
    print(f"\n Sample data:")
    print(sample)
else:
    print("\n No data tables found in the database")

con.close()

   id first_name   last_name                             email  gender  \
0   1     Cobbie      Legier                 clegier0@1688.com    Male   
1   2      Roley      Aslott          raslott1@kickstarter.com    Male   
2   3       Yves      Aveson                   yaveson2@un.org    Male   
3   4      Ogdan  Littledike  olittledike3@theglobeandmail.com    Male   
4   5   Carmelle    Kmietsch            ckmietsch4@dropbox.com  Female   

       ip_address   claim_id policy_number payment_type payment_amount  \
0  26.157.123.116  CLM-00019    POL-789012        Claim      $45338.29   
1  150.243.59.111  CLM-00014    POL-456789      Premium      $37554.73   
2  143.104.38.199  CLM-00014    POL-024689       Refund      $18000.17   
3   253.76.121.85  CLM-00005    POL-802467      Premium      $21884.43   
4   50.208.96.238  CLM-00015    POL-468023   Adjustment      $15019.83   

  payment_currency payment_status payment_method payer_id   payee_id  \
0           Shekel         Failed     

In [7]:
# Connect to DuckDB and query the loaded data
# Get the database path from the pipeline
db_path = pipeline.sql_client().credentials.database
print(f" Connected to DuckDB at: {db_path}")

# Connect to DuckDB
con = duckdb.connect(db_path)

# Get available tables with proper schema handling
all_tables = con.execute("""
    SELECT table_schema, table_name 
    FROM information_schema.tables 
    WHERE table_type = 'BASE TABLE' AND table_schema != 'information_schema'
""").fetchdf()

print(f"\n Available tables:")
print(all_tables)

# Find our data table (exclude dlt internal tables)
data_tables = all_tables[~all_tables['table_name'].str.startswith('_dlt')]

if data_tables.empty:
    print("No data tables found. Please run the data loading cell first.")
else:
    table_schema = data_tables.iloc[0]['table_schema']
    table_name = data_tables.iloc[0]['table_name']
    full_table_name = f"{table_schema}.{table_name}"
    
    print(f"\n Using table: {full_table_name}")
    
    # Query 1: Basic summary statistics
    print("\n Summary Statistics:")
    summary_query = f"""
    SELECT 
        COUNT(*) as total_records,
        COUNT(DISTINCT policy_number) as unique_policies,
        SUM(payment_amount) as total_payment_amount,
        SUM(processing_fee) as total_processing_fees,
        SUM(total_cost) as total_cost_with_fees,
        AVG(payment_amount) as avg_payment_amount,
        AVG(processing_fee) as avg_processing_fee,
        MIN(transaction_timestamp) as earliest_transaction,
        MAX(transaction_timestamp) as latest_transaction,
        COUNT(CASE WHEN payment_status = 'Completed' THEN 1 END) as completed_payments,
        COUNT(CASE WHEN payment_status = 'Failed' THEN 1 END) as failed_payments,
        COUNT(CASE WHEN payment_status = 'Pending' THEN 1 END) as pending_payments
    FROM {full_table_name}
    """
    
    summary_result = con.execute(summary_query).fetchdf()
    print(summary_result.to_string(index=False))

    # Query 2: Payments by Type 
    print("\nPayments by Type:")
    type_query = f"""
    SELECT 
        payment_type,
        COUNT(*) as record_count,
        SUM(payment_amount) as total_payment_amount,
        SUM(processing_fee) as total_processing_fees,
        SUM(total_cost) as total_cost_with_fees,
        AVG(payment_amount) as avg_payment_amount,
        AVG(processing_fee) as avg_processing_fee
    FROM {full_table_name}
    GROUP BY payment_type
    ORDER BY total_payment_amount DESC
    """
    type_result = con.execute(type_query).fetchdf()
    print(type_result)

    # Query 3: Payments by Gateway 
    print("\nPayments by Gateway:")
    gateway_query = f"""
    SELECT 
        payment_gateway,
        COUNT(*) as record_count,
        SUM(payment_amount) as total_payment_amount,
        SUM(processing_fee) as total_processing_fees,
        AVG(payment_amount) as avg_payment_amount,
        COUNT(CASE WHEN payment_status = 'Completed' THEN 1 END) as successful_payments,
        COUNT(CASE WHEN payment_status = 'Failed' THEN 1 END) as failed_payments,
        ROUND(COUNT(CASE WHEN payment_status = 'Failed' THEN 1 END) * 100.0 / COUNT(*), 2) as failure_rate_percent
    FROM {full_table_name}
    GROUP BY payment_gateway
    ORDER BY total_payment_amount DESC
    """
    gateway_result = con.execute(gateway_query).fetchdf()
    print(gateway_result)

    # Query 4: Payments by Status
    print("\nPayments by Status:")
    status_query = f"""
    SELECT 
        payment_status,
        COUNT(*) as record_count,
        SUM(payment_amount) as total_payment_amount,
        AVG(payment_amount) as avg_payment_amount,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage_of_total
    FROM {full_table_name}
    GROUP BY payment_status
    ORDER BY record_count DESC
    """
    
    status_result = con.execute(status_query).fetchdf()  
    print(status_result)

con.close()
print("\n DuckDB connection closed")

 Connected to DuckDB at: C:\Users\victoria\Desktop\DE\zz_de\insurance_payments_orchestration.duckdb

 Available tables:
              table_schema           table_name
0  insurance_payments_data          sample_data
1  insurance_payments_data           _dlt_loads
2  insurance_payments_data  _dlt_pipeline_state
3  insurance_payments_data         _dlt_version

 Using table: insurance_payments_data.sample_data

 Summary Statistics:
 total_records  unique_policies  total_payment_amount  total_processing_fees  total_cost_with_fees  avg_payment_amount  avg_processing_fee      earliest_transaction        latest_transaction  completed_payments  failed_payments  pending_payments
          1000               20            25609900.8               13509.85           25623410.65          25609.9008            13.50985 2024-01-01 01:00:00+01:00 2025-08-28 01:00:00+01:00                 212              189               211

Payments by Type:
  payment_type  record_count  total_payment_amount  tota

In [8]:
# Create output directory if it doesn't exist
os.makedirs("../output", exist_ok=True)

# Get database path and connect
db_path = pipeline.sql_client().credentials.database
con = duckdb.connect(db_path)

# Get table name with proper schema handling
all_tables = con.execute("""
    SELECT table_schema, table_name 
    FROM information_schema.tables 
    WHERE table_type = 'BASE TABLE' AND table_schema != 'information_schema'
""").fetchdf()

data_tables = all_tables[~all_tables['table_name'].str.startswith('_dlt')]

if not data_tables.empty:
    table_schema = data_tables.iloc[0]['table_schema']
    table_name = data_tables.iloc[0]['table_name']
    full_table_name = f"{table_schema}.{table_name}"
    
    # Export summary results to CSV
    summary_query = f"""
    SELECT 
        COUNT(*) as total_records,
        COUNT(DISTINCT policy_number) as unique_policies,
        SUM(payment_amount) as total_payment_amount,
        SUM(processing_fee) as total_processing_fees,
        AVG(payment_amount) as avg_payment_amount,
        SUM(payment_amount + processing_fee) as total_cost
    FROM {full_table_name}
    """
    summary_result = con.execute(summary_query).fetchdf()
    summary_result.to_csv("C:/Users/victoria/Desktop/DE/zz_de/files/payment_summary_statistics.csv", index=False)

    # Payments by Type Analysis
    payment_type_query = f"""
    SELECT 
        payment_type,
        COUNT(*) as record_count,
        SUM(payment_amount) as total_amount,
        AVG(payment_amount) as avg_amount
    FROM {full_table_name}
    GROUP BY payment_type
    ORDER BY total_amount DESC
    """
    payment_type_result = con.execute(payment_type_query).fetchdf()
    payment_type_result.to_csv("C:/Users/victoria/Desktop/DE/zz_de/files/payments_by_type.csv", index=False)

    # Payments by Status Analysis  
    payment_status_query = f"""
    SELECT 
        payment_status,
        COUNT(*) as record_count,
        SUM(payment_amount) as total_amount,
        COUNT(*) * 100.0 / SUM(COUNT(*)) OVER () as percentage
    FROM {full_table_name}
    GROUP BY payment_status
    ORDER BY record_count DESC
    """
    payment_status_result = con.execute(payment_status_query).fetchdf()
    payment_status_result.to_csv("C:/Users/victoria/Desktop/DE/zz_de/files/payments_by_status.csv", index=False)

    # Payments by Gateway Analysis 
    gateway_query = f"""
    SELECT 
        payment_gateway,
        COUNT(*) as record_count,
        SUM(payment_amount) as total_amount,
        COUNT(CASE WHEN payment_status = 'Completed' THEN 1 END) as successful_payments,
        COUNT(CASE WHEN payment_status = 'Failed' THEN 1 END) as failed_payments
    FROM {full_table_name}
    GROUP BY payment_gateway
    ORDER BY total_amount DESC
    """
    gateway_result = con.execute(gateway_query).fetchdf()
    gateway_result.to_csv("C:/Users/victoria/Desktop/DE/zz_de/files/payments_by_gateway.csv", index=False)
    

    print("Files created:")
    print("  - summary_statistics.csv")
    print("  - payment_type_result.csv")
    print("  - payment_status_result.csv")
    print("  - gateway_result.csv")
else:
    print("No data tables found. Please run the data loading cell first.")

con.close()

Files created:
  - summary_statistics.csv
  - payment_type_result.csv
  - payment_status_result.csv
  - gateway_result.csv
