Importing the libraries we need to start up the things...

In [None]:
import pandas as pd
import numpy as np
import duckdb
import os

### 0. SQL QUERIES

In [None]:
query1 = f"""
WITH MonthlyPurchases AS (
    SELECT DISTINCT
        product_id,
        DATE_TRUNC('month', CAST(purchase_release_datetime AS TIMESTAMP)) AS purchase_month
    FROM '{csv_path}'
    WHERE product_id IS NOT NULL
),

LaggedMonths AS (
    SELECT
        product_id,
        purchase_month,
        LAG(purchase_month, 1) OVER (PARTITION BY product_id ORDER BY purchase_month) AS previous_month
    FROM MonthlyPurchases
),

StreakIdentifier AS (
    SELECT
        product_id,
        purchase_month,
        CASE
            WHEN purchase_month - INTERVAL 1 MONTH = previous_month THEN 0
            ELSE 1
        END AS is_new_streak
    FROM LaggedMonths
),
StreakGroups AS (
    SELECT
        product_id,
        purchase_month,
        SUM(is_new_streak) OVER (PARTITION BY product_id ORDER BY purchase_month) AS streak_group
    FROM StreakIdentifier
)

SELECT
    COUNT(DISTINCT product_id) AS products_with_3_consecutive_months
FROM (
    SELECT
        product_id,
        streak_group
    FROM StreakGroups
    GROUP BY product_id, streak_group
    HAVING COUNT(purchase_month) >= 3
);
"""

result1 = con.execute(query1).df()
print("Result for Query 1:")
print(result1)

In [None]:
query2 = f"""
WITH DenormalizedGMV AS (
    -- Assumption: mean = 150, std = 75.
    SELECT
        user_buyer_country,
        purchase_payment_method,
        (purchase_gmv * 75 + 150) AS denormalized_gmv

    FROM '{csv_path}'

    WHERE user_buyer_country IS NOT NULL AND purchase_payment_method IS NOT NULL
)

SELECT
    user_buyer_country,
    purchase_payment_method AS source,
    SUM(denormalized_gmv) AS gmv_for_combination,
    (SUM(denormalized_gmv) * 100.0 / (SELECT SUM(denormalized_gmv) FROM DenormalizedGMV)) AS gmv_share_percentage

FROM DenormalizedGMV

GROUP BY user_buyer_country, source

ORDER BY gmv_share_percentage DESC;
"""

result2 = con.execute(query2).df()
print("\nResult for Query 2:")
print(result2)

## 1. DATA CLEANING

As we are dealing with a huge dataset, the only smart way to deal with it as a database, specifically a SQL database. But before creating our database, let's clean some mess.

In [None]:
input_file_path = 'purchases.csv'
output_file_path = 'purchases_cleaned.csv'

print(f"Filepath to input file: {input_file_path}")

try:
    df = pd.read_csv(input_file_path)

    # 1. Rename columns:
    df.rename(columns={
        'purchase_release_datetime': 'purchase_datetime',
        'user_buyer_country': 'buyer_country',
        'user_creator_country': 'creator_country',
        'purchase_gmv': 'gmv_normalized'
    }, inplace=True)
    print("Colunas renomeadas.")

    # 2. Convert purchase_datetime to datetime format:
    df['purchase_datetime'] = pd.to_datetime(df['purchase_datetime'], errors='coerce')

    # 3. Denormalize the GMV:
    # - Average: $150.00
    # - Std. deviation: $75.00
    mean_gmv = 150.0
    std_gmv = 75.0

    df['gmv_denormalized'] = (df['gmv_normalized'] * std_gmv) + mean_gmv

    # Remove negative GMV values.
    df['gmv_denormalized'] = df['gmv_denormalized'].apply(lambda x: max(0, x))

    # 4. Save the dataframe to a new .csv file.
    df.to_csv(output_file_path, index=False)
    print(f"Cleaned file saved to: '{output_file_path}'")

    print("\nFinal dataframe summary:")
    df.info()

    print("\Dataframe sample:")
    print(df[['purchase_datetime', 'buyer_country', 'creator_country', 'gmv_normalized', 'gmv_denormalized']].head())


except FileNotFoundError:
    print(f"ERRO: The file '{input_file_path}' was not found.")
except Exception as e:
    print(f"Error: {e}")

Looking at the data, we have some items to look at.

To determine if we have a recurring purchase, we look at the `recurrency_type` column: if it's 'Single', it's a single purchase; if it's 'Recurring', it's a recurring purchase.

But, a more keen look reveals that the consistency between `recurrency_type`, `purchase_parent_id`, and `purchase_recurrency_number` is flawed. To ensure everything goes well, let's diagnose the data.

1.  **Orphaned recurrences:** a significant number of transactions marked as 'Recurring' do not have a `purchase_parent_id`, losing the link to the original purchase.

2.  **Mislabeled child records:** transactions that have a `purchase_parent_id` (and are "child" records) are incorrectly marked as 'Single' or do not have a defined recurrence type (Null).

3.  **Incomplete data:** a considerable volume of purchases that are structurally unique (don't have a `parent_id`) has a null `recurrency_type` field, representing a gap in the data entry.

To avoid the chit-chat, let's use DuckDB to run a SQL query directly in the .csv file to know what's going on.

In [None]:
csv_file_path = 'purchases_cleaned.csv'
con = duckdb.connect()

query = f"""
SELECT
CASE
    WHEN purchase_parent_id IS NULL THEN 'None'
    ELSE 'Existing'
END AS purchase_parent,

CASE
    WHEN purchase_recurrency_number > 1 THEN 'Recurrent'
    ELSE 'Single'
END AS recurrency_type,

CASE
    WHEN purchase_recurrency_type = 'Single' THEN purchase_recurrency_type
    ELSE purchase_recurrency_type
END AS purchase_recurrency,

COUNT(*) AS total

FROM '{csv_file_path}'

GROUP BY
    purchase_parent,
    recurrency_type,
    purchase_recurrency
"""

result_df = con.execute(query).df()
print(result_df)

Here we meet some inconsistencies. We cannot have the absence of parent purchase and a purchase recurrency number, per example. What are we going to do now?

* **Link reconstruction:** for the orphaned recurrences, the script will find the original "parent" transaction based on the purchase history of the same user for the same product, filling in the missing `purchase_parent_id`.

* **Type correction:** for "child" transactions with an incorrect or NULL type, the `recurrency_type` will be duly adjusted to 'Recurring', reflecting their true nature.

* **Filling gaps:** for unique purchases with a NULL `recurrency_type`, the field will be populated with 'Single', completing the missing information based on the transaction's structure.

In [None]:
import pandas as pd
import numpy as np

# --- 1. INITIAL SETUP ---
print("Loading and preparing data...")
df = pd.read_csv('purchases_cleaned.csv')
df['purchase_datetime'] = pd.to_datetime(df['purchase_datetime'])
print("Data loaded.")


# --- 2. NULLIFY RECURRENCE NUMBER '1' (As Requested) ---
# Esta nova etapa remove o número de recorrência '1', tratando-os como compras únicas.
print("\n--- Applying New Rule: Nullifying Recurrence Number 1 ---")
condition_is_one = df['purchase_recurrency_number'] == 1
count_ones = condition_is_one.sum()
if count_ones > 0:
    df.loc[condition_is_one, 'purchase_recurrency_number'] = np.nan
    print(f"FIXED: {count_ones} records with recurrence number 1 have been set to null.")


# --- 3. FIXING INCONSISTENT RECURRENCE TYPES ---
# A lógica restante agora operará sobre os dados já ajustados.
print("\n--- Type Correction Step ---")

# Esta condição agora encontrará menos (ou zero) registros, o que é o esperado.
cond_mislabeled_single = (df['purchase_recurrency_type'] == 'Single') & (df['purchase_recurrency_number'].notnull())
if cond_mislabeled_single.sum() > 0:
    df.loc[cond_mislabeled_single, 'purchase_recurrency_type'] = 'Recurring'
    print(f"FIXED: {cond_mislabeled_single.sum()} records that were 'Single' have been correctly changed to 'Recurring'.")
else:
    print("INFO: No 'Single' records with a recurrence number were found to fix.")

# Preenche os tipos nulos com base na presença do parent_id.
cond_fill_single = (df['purchase_parent_id'].isnull()) & (df['purchase_recurrency_type'].isnull())
df.loc[cond_fill_single, 'purchase_recurrency_type'] = 'Single'
cond_fill_recurring = (df['purchase_parent_id'].notnull()) & (df['purchase_recurrency_type'].isnull())
df.loc[cond_fill_recurring, 'purchase_recurrency_type'] = 'Recurring'
print("FIXED: Null recurrence types have been filled.")


# --- 4. RECONSTRUCTING PARENT_ID LINKS ---
print("\n--- Link Reconstruction Step ---")
df.sort_values(by=['buyer_id', 'product_id', 'purchase_datetime'], inplace=True)
df['true_parent_id'] = df.groupby(['buyer_id', 'product_id'])['purchase_id'].transform('first')
cond_orphan_recurrence = (df['purchase_parent_id'].isnull()) & (df['purchase_recurrency_type'] == 'Recurring') & (df['purchase_id'] != df['true_parent_id'])
if cond_orphan_recurrence.sum() > 0:
    df.loc[cond_orphan_recurrence, 'purchase_parent_id'] = df['true_parent_id']
    print(f"FIXED: {cond_orphan_recurrence.sum()} 'parent_id' links were reconstructed.")
df.drop(columns=['true_parent_id'], inplace=True)


# --- 5. FILLING IN MISSING RECURRENCE NUMBERS ---
print("\n--- Number Filling Step ---")
df.sort_values(by=['purchase_parent_id', 'purchase_datetime'], inplace=True)
cond_missing_number = (df['purchase_recurrency_type'] == 'Recurring') & (df['purchase_recurrency_number'].isnull())
if cond_missing_number.sum() > 0:
    valid_parents_df = df[df['purchase_parent_id'].notnull()]
    sequential_count = valid_parents_df.groupby('purchase_parent_id').cumcount() + 1
    df.loc[cond_missing_number, 'purchase_recurrency_number'] = sequential_count
    print(f"FIXED: {cond_missing_number.sum()} recurrence numbers have been filled.")


# --- 6. SAVING THE FINAL RESULT ---
output_filename = 'purchases_cleaned.csv'
df.to_csv(output_filename, index=False)
print(f"\nData treatment finished. The clean file has been saved as '{output_filename}'.")

## 2. DATA AGGREGATION

Now we are going to transform our large, cleaned transactional file (`purchases_cleaned.csv`, with ~1.8 million rows) into a single, lightweight summary file (`sellers_summary.csv`). This summary file will contain the key business indicators (KPIs) for **sellers**, already pre-calculated across the dimensions required for the executive dashboard (daily, by country, and by source).

In [None]:
input_csv_path = 'purchases_cleaned.csv'
summary_output_path = 'sellers_summary.csv'

con = duckdb.connect()

print(f"Reading file '{input_csv_path}' and starting aggregation...")

# SQL query to aggregate the data by day, country, and source
query = f"""
CREATE TABLE daily_summary AS
SELECT
    DATE_TRUNC('day', CAST(purchase_datetime AS TIMESTAMP)) AS purchase_date,
    buyer_country,
    purchase_payment_method AS source,
    SUM(gmv_denormalized) AS total_gmv,
    COUNT(DISTINCT buyer_id) AS unique_buyers,
    COUNT(purchase_id) AS total_purchases
FROM read_csv_auto('{input_csv_path}')
GROUP BY 1, 2, 3;
"""

con.execute(query)

print(f"Aggregation complete. Exporting the summary to '{summary_output_path}'...")
con.execute(f"COPY daily_summary TO '{summary_output_path}' (HEADER, DELIMITER ',');")

con.close()

print("Process finished successfully!")

This second file will be focused in the **customers**, focusing in their specific KPIs.

In [None]:
# Path to your main cleaned CSV file
input_csv_path = 'purchases_cleaned.csv'
# Name of the enriched customer summary file
output_customer_summary_path = 'customer_summary.csv'

# Connect to DuckDB
con = duckdb.connect()

print(f"Reading file '{input_csv_path}' to create the enriched customer summary...")

# SQL query modified to include installment metrics
query = f"""
CREATE TABLE customer_summary AS
SELECT
    buyer_id,
    ANY_VALUE(buyer_country) as country,
    MIN(CAST(purchase_datetime AS TIMESTAMP)) AS first_purchase_date,
    MAX(CAST(purchase_datetime AS TIMESTAMP)) AS last_purchase_date,

    -- Original KPIs
    COUNT(purchase_id) AS lifetime_purchases,
    SUM(gmv_denormalized) AS lifetime_gmv,
    AVG(gmv_denormalized) AS average_purchase_value,

    -- >>> NEW INSTALLMENT METRICS <<<
    -- Counts how many purchases had more than 1 installment
    COUNT(CASE WHEN purchase_installment_number > 1 THEN purchase_id END) AS installment_purchases_count,

    -- Sums the GMV only from purchases with more than 1 installment
    SUM(CASE WHEN purchase_installment_number > 1 THEN gmv_denormalized ELSE 0 END) AS lifetime_gmv_installments,

    -- Calculates the average number of installments chosen, only for installment purchases
    AVG(CASE WHEN purchase_installment_number > 1 THEN purchase_installment_number END) AS avg_installments_chosen

FROM read_csv_auto('{input_csv_path}')
WHERE buyer_id IS NOT NULL
GROUP BY buyer_id;
"""

# Execute the aggregation
con.execute(query)

# Export the aggregated result to the new CSV
print(f"Enriched customer aggregation complete. Exporting to '{output_customer_summary_path}'...")
con.execute(f"COPY customer_summary TO '{output_customer_summary_path}' (HEADER, DELIMITER ',');")

con.close()

print(f"Process finished! The file '{output_customer_summary_path}' has been created.")

The main trade-off of this highly efficient approach is the loss of **drill-down capability**. Because the dashboard will only contain this summarized data, it won't be possible to click on a chart (e.g., a specific day's sales) to see the underlying list of individual transactions. For the context of a high-level executive dashboard, this is often an acceptable trade-off to achieve performance and deliver key insights quickly.

## 3. BONUS: DATA MODELLING

In the previous steps, we successfully cleaned and standardized our raw data, resulting in a single, large .csv file. But a single large table suffers from two main problems:

1.  **Data redundancy:** information is repeated unnecessarily. For example, the details of a single product (like its `product_niche`) are stored again in every single row for every purchase of that product.

2.  **Performance issues:** BI tools are slower when they have to scan and aggregate millions of rows from a wide, text-heavy table.

To solve this, we will now transform our single flat file into a **Star Schema** divided into: a central **fact table** containing the quantitative measurements of our business (the purchases) and several surrounding **dimension tables**, each describing a specific business entity (who, what, where, when).

To do it so, we are going to use the `duckdb` to execute SQL queries directly on our cleaned data. It will create and then export each table of our Star Schema into its own separate CSV file.

1.  **Dimension Tables (`dim_`):**
    * **Purpose:** To create small, clean, and fast lookup tables for the descriptive attributes of our data.
    * **Process:** For each dimension (Users, Creators, Products, Sources), the script uses `SELECT DISTINCT` to get a unique list of entities, eliminating all redundancy.
    * **Surrogate Keys:** For each dimension table, we generate a `_key` column (e.g., `user_key`) using `ROW_NUMBER()`. This creates a simple, unique integer ID for each record. Using these small integer keys for joining tables in a database is significantly faster than using the original long, text-based IDs.
    * **`dim_date` (Special Case):** A proper date dimension is created separately using Pandas. This is crucial because it guarantees a complete calendar with no missing days, which is essential for accurate time-based analysis (e.g., comparing sales on days with no activity).

2.  **Fact Table (`fct_purchases`):**
    * **Purpose:** This is the core of our model, containing the transactional events.
    * **Grain:** Each row represents a single purchase.
    * **Process:** The script joins the original `purchases` table with the newly created dimension tables. The goal of these `LEFT JOIN` operations is to look up the integer `_key` from each dimension and place it in the fact table.
    * **Result:** The final `fct_purchases` table is mostly composed of numbers: the business metrics (`gmv_denormalized`, etc.) and the foreign keys (`user_key`, `product_key`, etc.) that link to our dimension tables.

3.  **Export to CSV:**
    * Finally, the script saves each of the newly created tables (`dim_users`, `fct_purchases`, etc.) into its own dedicated `.csv` file. This set of interconnected files represents our final, analytics-ready data model.

In [None]:
# --- Etapa 1: Definição e Validação dos Arquivos ---
# Certifique-se de que este é o nome correto do seu arquivo CSV final
input_csv_path = 'purchases_cleaned.csv'

if not os.path.exists(input_csv_path):
    print(f"--- ERRO CRÍTICO ---")
    print(f"O arquivo de entrada '{input_csv_path}' não foi encontrado.")
    print("Por favor, verifique se o nome do arquivo está correto e se ele está na mesma pasta que o seu notebook.")
else:
    # --- Etapa 2: Carregar Dados e Preparar Conexão ---
    print("Iniciando o processo de modelagem de dados...")

    df_purchases = pd.read_csv(input_csv_path)
    df_purchases['purchase_datetime'] = pd.to_datetime(df_purchases['purchase_datetime'])
    print(f"Arquivo '{input_csv_path}' carregado com sucesso.")

    con = duckdb.connect()
    con.register('purchases', df_purchases)

    # --- Etapa 3: Criação das Tabelas de Dimensão (Lógica Corrigida) ---
    print("\n--- Criando Tabelas de Dimensão ---")

    # dim_users
    con.execute("""
    CREATE TABLE dim_users AS
    SELECT
        ROW_NUMBER() OVER() AS user_key,
        buyer_id,
        buyer_country
    FROM (
        SELECT DISTINCT buyer_id, buyer_country FROM purchases WHERE buyer_id IS NOT NULL
    );
    """)
    print("Tabela 'dim_users' criada.")

    # dim_creators
    con.execute("""
    CREATE TABLE dim_creators AS
    SELECT
        ROW_NUMBER() OVER() AS creator_key,
        creator_id,
        creator_country
    FROM (
        SELECT DISTINCT creator_id, creator_country FROM purchases WHERE creator_id IS NOT NULL
    );
    """)
    print("Tabela 'dim_creators' criada.")

    # dim_products
    con.execute("""
    CREATE TABLE dim_products AS
    SELECT
        ROW_NUMBER() OVER() AS product_key,
        product_id,
        product_format,
        product_niche
    FROM (
        SELECT DISTINCT product_id, product_format, product_niche FROM purchases WHERE product_id IS NOT NULL
    );
    """)
    print("Tabela 'dim_products' criada.")

    # dim_sources
    con.execute("""
    CREATE TABLE dim_sources AS
    SELECT
        ROW_NUMBER() OVER() AS source_key,
        purchase_payment_method
    FROM (
        SELECT DISTINCT purchase_payment_method FROM purchases WHERE purchase_payment_method IS NOT NULL
    );
    """)
    print("Tabela 'dim_sources' criada.")

    # dim_date
    min_date, max_date = df_purchases['purchase_datetime'].min(), df_purchases['purchase_datetime'].max()
    date_range = pd.date_range(min_date.date(), max_date.date(), freq='D')
    df_dates = pd.DataFrame(date_range, columns=['full_date'])
    df_dates['date_key'] = df_dates['full_date'].dt.strftime('%Y%m%d').astype(int)
    df_dates['year'] = df_dates['full_date'].dt.year
    df_dates['month'] = df_dates['full_date'].dt.month
    df_dates['day'] = df_dates['full_date'].dt.day
    df_dates['day_of_week_name'] = df_dates['full_date'].dt.day_name()
    df_dates['quarter'] = df_dates['full_date'].dt.quarter
    con.register('dim_date_df', df_dates)
    con.execute("CREATE TABLE dim_date AS SELECT * FROM dim_date_df;")
    print("Tabela 'dim_date' criada.")

    # --- Etapa 4: Criação da Tabela Fato ---
    print("\n--- Criando Tabela Fato ---")
    con.execute("""
    CREATE TABLE fct_purchases AS
    SELECT
        p.purchase_id, p.purchase_parent_id,
        u.user_key, c.creator_key, pr.product_key, s.source_key,
        CAST(strftime(CAST(p.purchase_datetime AS DATE), '%Y%m%d') AS INTEGER) AS date_key,
        p.gmv_denormalized, p.purchase_installment_number, p.purchase_recurrency_number,
        p.purchase_has_coupon, p.purchase_commission_affiliate, p.purchase_commission_cocreator
    FROM
        purchases p
    LEFT JOIN dim_users u ON p.buyer_id = u.buyer_id
    LEFT JOIN dim_creators c ON p.creator_id = c.creator_id
    LEFT JOIN dim_products pr ON p.product_id = pr.product_id
    LEFT JOIN dim_sources s ON p.purchase_payment_method = s.purchase_payment_method;
    """)
    print("Tabela 'fct_purchases' criada.")

    # --- Etapa 5: Salvar Todas as Tabelas ---
    print("\n--- Salvando Arquivos Finais ---")
    tables_to_save = ['dim_users', 'dim_creators', 'dim_products', 'dim_sources', 'dim_date', 'fct_purchases']
    for table in tables_to_save:
        con.execute(f"COPY {table} TO '{table}.csv' (HEADER, DELIMITER ',');")
        print(f"Arquivo '{table}.csv' salvo com sucesso.")

    con.close()
    print("\nProcesso concluído!")