# BanVic Data Pipeline: From Raw CSV to a Full Data Warehouse

This notebook executes the full ELT (Extract, Load, Transform) process for the BanVic project in DuckDB. It handles:
1.  **Extract & Load:** Reading source CSVs into a `raw` layer (`raw_banvic.db`).
2.  **Transform (Staging):** Cleaning and standardizing the raw data into a `staging` layer (`stg_banvic.db`).
3.  **Verify:** Running data quality checks on the `staging` layer.
4.  **Transform (Data Warehouse):** Modeling the clean data into a final Star Schema in the `dw_banvic.db`.

## Step 1: Create and Populate the Raw Layer (`raw_banvic.db`)

The first step is to load the original, untouched CSV files into our `raw` database. This serves as the single source of truth for our pipeline. Each CSV file is loaded into a corresponding table within the `raw` schema.

In [1]:
import duckdb
import os

# --- Configuration ---
# Define the path for the database file
db_file = '../database/raw_banvic.db'
# Define the path to the raw data directory
data_dir = '../data'

# --- Execution ---
# Connect to the database file (it will be created if it doesn't exist)
con = duckdb.connect(db_file)

# Create the schema for the raw data
con.execute("CREATE SCHEMA IF NOT EXISTS raw;")

csv_files = [f for f in os.listdir(data_dir) if f.endswith('.csv')]

# Loop to load each CSV into a table in the 'raw' schema
print("--- Loading raw data into DuckDB ---")
for file in csv_files:
    table_name = file.replace('.csv', '')
    file_path = os.path.join(data_dir, file).replace('\\', '/')
    
    # This query creates a table in the 'raw' schema with the same name as the file
    con.execute(f"""
        CREATE OR REPLACE TABLE raw.{table_name} AS 
        SELECT * FROM read_csv_auto('{file_path}');
    """)
    print(f"Table 'raw.{table_name}' created successfully.")

con.close()
print("\n--- Raw data loading complete. ---")

--- Loading raw data into DuckDB ---
Table 'raw.agencies' created successfully.
Table 'raw.bank_accounts' created successfully.
Table 'raw.clients' created successfully.
Table 'raw.credit_proposal' created successfully.
Table 'raw.employee' created successfully.
Table 'raw.employee_agency' created successfully.
Table 'raw.transactions' created successfully.

--- Raw data loading complete. ---


## Step 2: Create and Populate the Staging Layer (`stg_banvic.db`)

This is the core transformation step. The following script connects to our target staging database (`stg_banvic.db`) and **attaches** the source raw database (`raw_banvic.db`).

It then executes a series of SQL queries to transform the raw data by:
- Renaming columns to an English standard.
- Casting data types correctly (e.g., to `VARCHAR`, `DATE`).
- Standardizing text fields (e.g., `zip_code`).
- Parsing new information from existing columns (e.g., `parsed_state`).
- Filtering out known data integrity issues (e.g., the orphan account).

In [6]:
import duckdb
import os

# --- Configuration ---
# Path to the STAGING database file (where we will write data)
stg_db_file = '../database/stg_banvic.db'
# Path to the RAW database file (where we will read data from)
raw_db_file = '../database/raw_banvic.db'

# --- SQL Queries for Staging Layer ---

# A list to hold all transformation queries
staging_queries = [
    {
        "table_name": "agencies",
        "sql": """
            CREATE OR REPLACE TABLE stg.agencies AS
            SELECT
                cod_agencia::VARCHAR AS agency_id,
                CAST(data_abertura AS DATE) AS opening_date,
                nome AS agency_name,
                endereco AS address,
                cidade AS city,
                uf AS state,
                tipo_agencia AS agency_type
            FROM
                raw_db.raw.agencies; -- Read from the attached raw_db
        """
    },
    {
        "table_name": "clients",
        "sql": """
            CREATE OR REPLACE TABLE stg.clients AS
            SELECT
                cod_cliente::VARCHAR AS client_id,
                CAST(data_inclusao AS DATE) AS inclusion_date,
                CAST(data_nascimento AS DATE) AS birth_date,
                primeiro_nome AS first_name,
                ultimo_nome AS last_name,
                email,
                tipo_cliente AS client_type,
                cpfcnpj::VARCHAR AS cpf_cnpj,
                endereco AS full_address,
                regexp_replace(cep, '[^0-9]', '', 'g')::VARCHAR AS zip_code,
                regexp_extract(endereco, '/ ([A-Z]{2})$') AS parsed_state
            FROM
                raw_db.raw.clients; -- Read from the attached raw_db
        """
    },
    {
        "table_name": "bank_accounts",
        "sql": """
            CREATE OR REPLACE TABLE stg.bank_accounts AS
            SELECT
                CAST(b.num_conta AS VARCHAR) AS account_id,
                CAST(b.cod_cliente AS VARCHAR) AS client_id,
                CAST(b.cod_agencia AS VARCHAR) AS agency_id,
                CAST(b.cod_colaborador AS VARCHAR) AS employee_id,
                CAST(b.data_abertura AS DATE) AS opening_date,
                CAST(b.data_ultimo_lancamento AS DATE) AS last_transaction_date,
                b.tipo_conta AS account_type,
                b.saldo_total AS total_balance,
                b.saldo_disponivel AS available_balance
            FROM
                raw_db.raw.bank_accounts AS b -- Read from the attached raw_db
            INNER JOIN
                raw_db.raw.clients AS c ON b.cod_cliente = c.cod_cliente;
        """
    },
    {
        "table_name": "employee",
        "sql": """
            CREATE OR REPLACE TABLE stg.employee AS
            SELECT
                cod_colaborador::VARCHAR AS employee_id,
                CAST(data_nascimento AS DATE) AS birth_date,
                primeiro_nome AS first_name,
                ultimo_nome AS last_name,
                email,
                cpf::VARCHAR AS cpf,
                endereco AS address,
                regexp_replace(cep, '[^0-9]', '', 'g')::VARCHAR AS zip_code
            FROM
                raw_db.raw.employee; -- Read from the attached raw_db
        """
    },
    {
        "table_name": "employee_agency",
        "sql": """
            CREATE OR REPLACE TABLE stg.employee_agency AS
            SELECT
                cod_colaborador::VARCHAR AS employee_id,
                cod_agencia::VARCHAR AS agency_id
            FROM
                raw_db.raw.employee_agency; -- Read from the attached raw_db
        """
    },
    {
        "table_name": "transactions",
        "sql": """
            CREATE OR REPLACE TABLE stg.transactions AS
            SELECT
                cod_transacao::VARCHAR AS transaction_id,
                num_conta::VARCHAR AS account_id,
                CAST(data_transacao AS TIMESTAMP WITH TIME ZONE) AS transaction_date,
                nome_transacao AS transaction_name,
                valor_transacao AS transaction_amount
            FROM
                raw_db.raw.transactions; -- Read from the attached raw_db
        """
    },
    {
        "table_name": "credit_proposal",
        "sql": """
            CREATE OR REPLACE TABLE stg.credit_proposal AS
            SELECT
                cod_proposta::VARCHAR AS proposal_id,
                cod_cliente::VARCHAR AS client_id,
                cod_colaborador::VARCHAR AS employee_id,
                CAST(data_entrada_proposta AS DATE) AS proposal_date,
                taxa_juros_mensal AS monthly_interest_rate,
                valor_proposta AS proposal_amount,
                valor_financiamento AS financing_amount,
                valor_entrada AS down_payment_amount,
                valor_prestacao AS installment_amount,
                quantidade_parcelas AS number_of_installments,
                carencia AS grace_period,
                status_proposta AS proposal_status
            FROM
                raw_db.raw.credit_proposal; -- Read from the attached raw_db
        """
    }
]

# --- Execution ---
con = None
try:
    # Connect to the STAGING database file (our target)
    con = duckdb.connect(stg_db_file)
    print(f"Successfully connected to '{stg_db_file}'")

    # Attach the RAW database file, giving it an alias 'raw_db'
    con.execute(f"ATTACH '{raw_db_file.replace('\\', '/')}' AS raw_db (READ_ONLY);")
    print(f"Successfully attached '{raw_db_file}' as 'raw_db'")

    # Create the staging schema if it doesn't exist
    con.execute("CREATE SCHEMA IF NOT EXISTS stg;")
    print("Schema 'stg' is ready.")

    # Loop to execute each transformation query
    print("\n--- Running Staging Transformations ---")
    for item in staging_queries:
        table_name = item["table_name"]
        sql_query = item["sql"]
        
        con.execute(sql_query)
        print(f"Table 'stg.{table_name}' created successfully.")

    print("\n--- Staging layer created successfully. ---")

finally:
    # Ensure the connection is closed
    if con:
        con.close()
        print("\nDatabase connection closed.")

Successfully connected to '../database/stg_banvic.db'
Successfully attached '../database/raw_banvic.db' as 'raw_db'
Schema 'stg' is ready.

--- Running Staging Transformations ---
Table 'stg.agencies' created successfully.
Table 'stg.clients' created successfully.
Table 'stg.bank_accounts' created successfully.
Table 'stg.employee' created successfully.
Table 'stg.employee_agency' created successfully.
Table 'stg.transactions' created successfully.
Table 'stg.credit_proposal' created successfully.

--- Staging layer created successfully. ---

Database connection closed.


## Specific Verifications

Here we run specific tests to ensure the cleaning steps were successful

In [3]:
import duckdb
import pandas as pd

# Define the path to the database we want to test
stg_db_file = '../database/stg_banvic.db'

con = None
try:
    # Step 1: Open a new connection for this cell's scope
    # read_only=True is a good practice for test scripts
    con = duckdb.connect(stg_db_file, read_only=True)
    print(f"Successfully connected to '{stg_db_file}' for verification.")
    
    print("\n--- Running Specific Verification Tests ---")
    
    # Test 1: Verify orphan account was filtered from 'bank_accounts'
    # The raw table had 999 rows. The staging table should have 998.
    bank_accounts_count = con.execute("SELECT COUNT(*) FROM stg.bank_accounts;").fetchone()[0]
    if bank_accounts_count == 998:
        print(f"✅ PASSED: 'stg.bank_accounts' has {bank_accounts_count} rows, orphan account correctly filtered.")
    else:
        print(f"❌ FAILED: 'stg.bank_accounts' has {bank_accounts_count} rows, expected 998.")

    # Test 2: Verify zip_code standardization in 'clients'
    # This query should find 0 rows with non-digit characters in zip_code.
    non_digit_zips_clients = con.execute("""
        SELECT zip_code FROM stg.clients WHERE regexp_matches(zip_code, '\D')
    """).fetchall()
    if len(non_digit_zips_clients) == 0:
        print("✅ PASSED: 'zip_code' column in 'stg.clients' is successfully standardized (contains only digits).")
    else:
        print("❌ FAILED: Found zip codes with non-digit characters in 'stg.clients'.")

    # Test 3: Verify parsed_state extraction in 'clients'
    # Let's see the distribution of the new state column.
    print("\n[Value Counts for 'parsed_state' in stg.clients]")
    print(con.execute("""
        SELECT parsed_state, COUNT(*) AS count 
        FROM stg.clients 
        GROUP BY parsed_state 
        ORDER BY count DESC;
    """).fetchdf())

finally:
    # Step 2: Ensure the connection is closed after the tests are done
    if con:
        con.close()
        print("\nDatabase connection for verification closed.")

  SELECT zip_code FROM stg.clients WHERE regexp_matches(zip_code, '\D')


Successfully connected to '../database/stg_banvic.db' for verification.

--- Running Specific Verification Tests ---
✅ PASSED: 'stg.bank_accounts' has 998 rows, orphan account correctly filtered.
✅ PASSED: 'zip_code' column in 'stg.clients' is successfully standardized (contains only digits).

[Value Counts for 'parsed_state' in stg.clients]
   parsed_state  count
0          / AM     53
1          / ES     49
2          / SP     48
3          / MS     47
4          / MA     44
5          / CE     43
6          / RR     41
7          / AL     40
8          / MG     40
9          / AC     38
10         / PB     37
11         / TO     37
12         / SC     37
13         / AP     35
14         / RS     35
15         / RN     34
16         / BA     34
17         / GO     34
18         / PR     33
19         / DF     33
20         / PA     33
21         / RJ     32
22         / PI     31
23         / SE     29
24         / PE     28
25         / MT     28
26         / RO     25

Database co

## Step 4: Create the Final Data Warehouse Layer (`dw_banvic.db`)

This is the final step of the data engineering pipeline. The script connects to a new `dw_banvic.db` file, attaches the verified staging database as a source, and builds the final, analysis-ready Star Schema with dimension (`dim_`) and fact (`fct_`) tables.

In [None]:
import duckdb
import os

# --- Configuration ---
# Path to the final Data Warehouse database file
dw_db_file = '../database/dw_banvic.db'
# Path to the Staging database file (our source)
stg_db_file = '../database/stg_banvic.db'

# --- SQL Queries for Data Warehouse Layer ---
dw_queries = [
    {
        "table_name": "dim_dates",
        "sql": """
            CREATE OR REPLACE TABLE dim_dates AS
            SELECT
              d AS full_date,
              EXTRACT(YEAR FROM d) AS year,
              EXTRACT(QUARTER FROM d) AS quarter,
              EXTRACT(MONTH FROM d) AS month,
              EXTRACT(WEEK FROM d) AS week_of_year,
              EXTRACT(DAY FROM d) AS day,
              EXTRACT(DAYOFYEAR FROM d) AS day_of_year,
              EXTRACT(DAYOFWEEK FROM d) AS day_of_week,
              strftime(d, '%b') AS month_name,
              strftime(d, '%a') AS day_of_week_name,

            -- This is the syntax for generating the date series
            FROM 
              generate_series(
                (SELECT MIN(CAST(transaction_date AS DATE)) FROM stg_db.stg.transactions),
                (SELECT MAX(CAST(transaction_date AS DATE)) FROM stg_db.stg.transactions),
                interval '1 day'
              ) AS t(d);
        """
    },
    {
        "table_name": "dim_agencies",
        "sql": "CREATE OR REPLACE TABLE dim_agencies AS SELECT * FROM stg_db.stg.agencies;"
    },
    {
        "table_name": "dim_employees",
        "sql": """
            CREATE OR REPLACE TABLE dim_employees AS
            SELECT
              e.employee_id,
              e.first_name || ' ' || e.last_name AS full_name,
              e.email,
              e.cpf,
              e.birth_date,
              ea.agency_id
            FROM
              stg_db.stg.employee AS e
            LEFT JOIN
              stg_db.stg.employee_agency AS ea
              ON e.employee_id = ea.employee_id;
        """
    },
    {
        "table_name": "dim_clients",
        "sql": """
            CREATE OR REPLACE TABLE dim_clients AS
            SELECT
              client_id,
              first_name || ' ' || last_name AS full_name,
              email,
              client_type,
              inclusion_date,
              cpf_cnpj,
              birth_date,
              full_address,
              zip_code,
              parsed_state AS state
            FROM
              stg_db.stg.clients;
        """
    },
    {
        "table_name": "fct_transactions",
        "sql": """
            CREATE OR REPLACE TABLE fct_transactions AS
            SELECT
              t.transaction_id,
              b.client_id,
              b.agency_id,
              b.employee_id,
              b.account_id,
              CAST(t.transaction_date AS TIMESTAMP) AS transaction_date_id,
              t.transaction_name,
              t.transaction_amount
            FROM
              stg_db.stg.transactions AS t
            LEFT JOIN
              stg_db.stg.bank_accounts AS b
              ON t.account_id = b.account_id;
        """
    },
    {
        "table_name": "fct_credit_proposals",
        "sql": """
            CREATE OR REPLACE TABLE fct_credit_proposals AS
            SELECT
              cp.proposal_id,
              CAST(cp.proposal_date AS DATE) AS proposal_date_id,
              cp.client_id,
              cp.employee_id,
              cp.proposal_status,
              cp.monthly_interest_rate,
              cp.proposal_amount,
              cp.financing_amount,
              cp.down_payment_amount,
              cp.installment_amount,
              cp.number_of_installments,
              cp.grace_period
            FROM
              stg_db.stg.credit_proposal AS cp;
        """
    }
]

# --- Execution ---
con = None
try:
    # Connect to the DW database file
    dw_db_file = '../database/dw_banvic.db'
    stg_db_file = '../database/stg_banvic.db'
    con = duckdb.connect(dw_db_file)
    print(f"Successfully connected to '{dw_db_file}'")

    # Attach the STAGING database file, giving it an alias 'stg_db'
    con.execute(f"ATTACH '{stg_db_file.replace('\\', '/')}' AS stg_db (READ_ONLY);")
    print(f"Successfully attached '{stg_db_file}' as 'stg_db'")

    # Loop to execute each transformation query
    print("\n--- Building Data Warehouse Layer (dim and fct tables) ---")
    for item in dw_queries:
        table_name = item["table_name"]
        sql_query = item["sql"]
        
        con.execute(sql_query)
        print(f"Table '{table_name}' created successfully.")

    print("\n--- Data Warehouse layer built successfully. ---")

finally:
    # Ensure the connection is closed
    if con:
        con.close()
        print("\nDatabase connection closed.")

Successfully connected to '../database/dw_banvic.db'
Successfully attached '../database/stg_banvic.db' as 'stg_db'

--- Building Data Warehouse Layer (dim and fct tables) ---
Table 'dim_dates' created successfully.
Table 'dim_agencies' created successfully.
Table 'dim_employees' created successfully.
Table 'dim_clients' created successfully.
Table 'fct_transactions' created successfully.
Table 'fct_credit_proposals' created successfully.

--- Data Warehouse layer built successfully. ---

Database connection closed.


## Conclusion

The data engineering pipeline is now complete. We have a robust, three-layer data architecture ready for the next phase: connecting Power BI to the `dw_banvic.db` file for analysis and visualization.