<a href="https://colab.research.google.com/github/jana-nf/Estagio_CBL_CienciaDeDados_HDL/blob/main/Estagio_CBL_CienciaDeDados_HDL_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Adjust file paths
Subtask:
Create the necessary directory, move the customer CSV file into it, and then execute the CSV to JSON conversion function.

Reasoning: The previous attempt to convert CSV to JSON failed because the input file was not found in the RAW_DATA_PATH. This step will create the RAW_DATA_PATH directory if it doesn't exist, move the olist_customers_dataset.csv file into it, and then call the convert_csv_to_json() function to ensure the conversion happens with the correct file path.

In [1]:
import shutil

# Create the raw data directory if it doesn't exist
os.makedirs(RAW_DATA_PATH, exist_ok=True)
print(f"Ensured directory exists: {RAW_DATA_PATH}")

# Define source and destination paths for the customer CSV file
source_path = os.path.join(BASE_DIR, INPUT_CSV)
destination_path = os.path.join(RAW_DATA_PATH, INPUT_CSV)

# Check if the file needs to be moved
if os.path.exists(source_path) and not os.path.exists(destination_path):
    shutil.move(source_path, destination_path)
    print(f"Moved '{INPUT_CSV}' from '{BASE_DIR}' to '{RAW_DATA_PATH}'.")
elif os.path.exists(destination_path):
    print(f"'{INPUT_CSV}' already exists in '{RAW_DATA_PATH}'. Skipping move.")
elif not os.path.exists(source_path):
    print(f"Error: '{INPUT_CSV}' not found at '{BASE_DIR}'. Cannot move file.")

# Execute the CSV to JSON conversion function
convert_csv_to_json()

NameError: name 'os' is not defined

Adjust file paths and create SQLite DB for order items
Subtask:
Create the necessary directory, move the order items CSV file into it, and then execute the SQLite database creation function.

Reasoning: To resolve the FileNotFoundError for 'olist_order_items_dataset.csv', I will ensure the target directory exists, move the CSV file into it if it's not already there, and then call the create_sqlite_db() function to process the file and create the SQLite database.

In [None]:
import shutil

# Create the raw data directory if it doesn't exist
os.makedirs(RAW_DATA_PATH, exist_ok=True)
print(f"Ensured directory exists: {RAW_DATA_PATH}")

# Define source and destination paths for the order items CSV file
source_path_items = os.path.join(BASE_DIR, INPUT_CSV_ITEMS)
destination_path_items = os.path.join(RAW_DATA_PATH, INPUT_CSV_ITEMS)

# Check if the file needs to be moved
if os.path.exists(source_path_items) and not os.path.exists(destination_path_items):
    shutil.move(source_path_items, destination_path_items)
    print(f"Moved '{INPUT_CSV_ITEMS}' from '{BASE_DIR}' to '{RAW_DATA_PATH}'.")
elif os.path.exists(destination_path_items):
    print(f"'{INPUT_CSV_ITEMS}' already exists in '{RAW_DATA_PATH}'. Skipping move.")
elif not os.path.exists(source_path_items):
    print(f"Error: '{INPUT_CSV_ITEMS}' not found at '{BASE_DIR}'. Please ensure the file is present before attempting to move it.")

# Execute the SQLite database creation function
create_sqlite_db()

Integrate core data sources
Subtask:
Load customer data from JSON, order items from SQLite, and begin merging with the orders DataFrame.

Reasoning: The subtask requires loading customer data from a JSON file, order items from an SQLite database, and then merging these with the existing orders DataFrame. This code block will perform all these data loading and merging operations sequentially, finally displaying the head of the integrated DataFrame.

In [None]:
import pandas as pd
import json
import sqlite3
import os

# 1. Load customer data from JSON
path_customer_json = os.path.join(RAW_DATA_PATH, OUTPUT_JSON)
print(f"Loading customer data from: {path_customer_json}")
with open(path_customer_json, 'r') as f:
    customers_data = json.load(f)
df_customers_json = pd.DataFrame(customers_data)

# 2. Connect to the SQLite database and 3. Read the order_items table
path_sqlite_db = os.path.join(RAW_DATA_PATH, OUTPUT_DB_NAME)
print(f"Connecting to SQLite database: {path_sqlite_db}")
conn = sqlite3.connect(path_sqlite_db)
df_order_items_sql = pd.read_sql_query(f"SELECT * FROM {SQL_TABLE_NAME}", conn)

# 4. Close the SQLite database connection
conn.close()
print("SQLite connection closed.")

# Ensure all DataFrames are loaded
print(f"df_orders shape: {df_orders.shape}")
print(f"df_customers_json shape: {df_customers_json.shape}")
print(f"df_order_items_sql shape: {df_order_items_sql.shape}")

# 5. Merge df_orders with df_customers_json on 'customer_id'
df_merged_data = pd.merge(df_orders, df_customers_json, on='customer_id', how='left')
print(f"Shape after merging df_orders and df_customers_json: {df_merged_data.shape}")

# 6. Merge df_merged_data with df_order_items_sql on 'order_id'
df_merged_data = pd.merge(df_merged_data, df_order_items_sql, on='order_id', how='left')
print(f"Shape after merging with df_order_items_sql: {df_merged_data.shape}")

# 7. Display the first few rows of df_merged_data
print("\nFirst few rows of the integrated DataFrame (df_merged_data):")
df_merged_data.head()

Integrate remaining order-related data
Subtask:
Merge df_order_payments and df_order_reviews into the df_merged_data DataFrame.

Reasoning: To integrate the remaining order-related data, I will merge df_order_payments and df_order_reviews into the df_merged_data DataFrame sequentially, and then display its shape and head.

In [None]:
print(f"Shape before merging df_order_payments: {df_merged_data.shape}")
df_merged_data = pd.merge(df_merged_data, df_order_payments, on='order_id', how='left')
print(f"Shape after merging df_order_payments: {df_merged_data.shape}")

print(f"Shape before merging df_order_reviews: {df_merged_data.shape}")
df_merged_data = pd.merge(df_merged_data, df_order_reviews, on='order_id', how='left')
print(f"Shape after merging df_order_reviews: {df_merged_data.shape}")

print("\nFirst few rows of the integrated DataFrame (df_merged_data) after all merges:")
df_merged_data.head()

Integrate product and seller data
Subtask:
Merge df_products, df_product_category_name_translation, and df_sellers into the df_merged_data DataFrame.

Reasoning: To integrate the remaining product and seller data, I will merge df_products, df_product_category_name_translation, and df_sellers into the df_merged_data DataFrame sequentially, and then display its shape and head.

In [None]:
print(f"Shape before merging df_products: {df_merged_data.shape}")
df_merged_data = pd.merge(df_merged_data, df_products, on='product_id', how='left')
print(f"Shape after merging df_products: {df_merged_data.shape}")

print(f"Shape before merging df_product_category_name_translation: {df_merged_data.shape}")
df_merged_data = pd.merge(df_merged_data, df_product_category_name_translation, on='product_category_name', how='left')
print(f"Shape after merging df_product_category_name_translation: {df_merged_data.shape}")

print(f"Shape before merging df_sellers: {df_merged_data.shape}")
df_merged_data = pd.merge(df_merged_data, df_sellers, on='seller_id', how='left')
print(f"Shape after merging df_sellers: {df_merged_data.shape}")

print("\nFirst few rows of the integrated DataFrame (df_merged_data) after all merges:")
df_merged_data.head()

Clean and preprocess merged data
Subtask:
Handle missing values and convert data types in the df_merged_data DataFrame.

Reasoning: The first step is to inspect the df_merged_data DataFrame for missing values and current data types using .info() and .isnull().sum() to understand the extent of missingness and guide subsequent cleaning steps, as per the instructions.

In [None]:
print("DataFrame Info:")
df_merged_data.info()

print("\nMissing values per column:")
print(df_merged_data.isnull().sum())

In [None]:
datetime_cols = [
    'order_purchase_timestamp',
    'order_approved_at',
    'order_delivered_carrier_date',
    'order_delivered_customer_date',
    'order_estimated_delivery_date',
    'shipping_limit_date',
    'review_creation_date',
    'review_answer_timestamp'
]

for col in datetime_cols:
    # Using errors='coerce' will turn unparseable dates into NaT (Not a Time)
    df_merged_data[col] = pd.to_datetime(df_merged_data[col], errors='coerce')

print("Date and time columns converted to datetime objects.")
print("Updated DataFrame Info:")
df_merged_data.info()

Reasoning: Now that datetime columns are handled, I will proceed to address missing values and convert data types for numerical columns, specifically focusing on product dimensions, payment details, and review scores. This involves identifying the columns, analyzing their current data types, and applying a suitable imputation strategy as outlined in the instructions.

In [None]:
numerical_cols_to_clean = [
    'product_name_lenght',
    'product_description_lenght',
    'product_photos_qty',
    'product_weight_g',
    'product_length_cm',
    'product_height_cm',
    'product_width_cm',
    'payment_sequential',
    'payment_installments',
    'payment_value',
    'review_score',
    'order_item_id',
    'price',
    'freight_value',
    'seller_zip_code_prefix' # Assuming this is treated numerically for simplicity or conversion later
]

for col in numerical_cols_to_clean:
    if df_merged_data[col].isnull().any():
        # For counts/lengths, median might be more robust to outliers than mean
        # For IDs or sequential numbers, fill with 0 or a placeholder if appropriate,
        # but for these, median/mean is often fine if they represent a quantity.
        # Let's use median for most, and 0 for payment_sequential if a missing value implies no payment info.
        if col == 'payment_sequential':
            df_merged_data[col] = df_merged_data[col].fillna(0).astype(int) # 0 for missing payment seq
        elif col == 'order_item_id':
            df_merged_data[col] = df_merged_data[col].fillna(1).astype(int) # Assuming 1 item if missing
        elif col in ['product_name_lenght', 'product_description_lenght', 'product_photos_qty', 'product_weight_g', 'product_length_cm', 'product_height_cm', 'product_width_cm']:
            df_merged_data[col] = df_merged_data[col].fillna(df_merged_data[col].median()).astype(int)
        elif col == 'review_score':
            df_merged_data[col] = df_merged_data[col].fillna(df_merged_data[col].median()).astype(int) # Fill with median score
        elif col in ['price', 'freight_value']:
            df_merged_data[col] = df_merged_data[col].fillna(df_merged_data[col].mean()).astype(float) # Mean for monetary values
        elif col == 'seller_zip_code_prefix':
            df_merged_data[col] = df_merged_data[col].fillna(df_merged_data[col].mode()[0]).astype(int) # Mode for zip code
    else:
        # Convert to appropriate type if no NaNs, or if NaNs were already handled by previous steps
        if col in ['product_name_lenght', 'product_description_lenght', 'product_photos_qty', 'product_weight_g', 'product_length_cm', 'product_height_cm', 'product_width_cm', 'payment_sequential', 'payment_installments', 'review_score', 'order_item_id', 'seller_zip_code_prefix']:
            # Ensure they are integer type after handling potential NaNs
            df_merged_data[col] = df_merged_data[col].astype(int)
        elif col in ['price', 'freight_value', 'payment_value']:
            df_merged_data[col] = df_merged_data[col].astype(float)


print("Numerical columns processed (NaNs imputed and types converted).")
print("Updated DataFrame Info:")
df_merged_data.info()

Reasoning: Following the instructions, I will now address the missing values in categorical columns. For product_category_name, product_category_name_english, review_comment_title, and review_comment_message, I will fill NaN values with the placeholder 'unknown' to retain information and prevent data loss, which is a common practice for categorical features

In [None]:
categorical_cols_to_clean = [
    'product_category_name',
    'product_category_name_english',
    'review_comment_title',
    'review_comment_message',
    'payment_type',
    'product_id',
    'seller_id',
    'seller_city',
    'seller_state',
    'review_id'
]

for col in categorical_cols_to_clean:
    if df_merged_data[col].isnull().any():
        if col == 'product_id':
            df_merged_data[col] = df_merged_data[col].fillna('unknown_product')
        elif col == 'seller_id':
            df_merged_data[col] = df_merged_data[col].fillna('unknown_seller')
        elif col == 'payment_type':
            df_merged_data[col] = df_merged_data[col].fillna(df_merged_data[col].mode()[0])
        elif col == 'review_id':
            df_merged_data[col] = df_merged_data[col].fillna('unknown_review') # Placeholder for review_id
        else:
            df_merged_data[col] = df_merged_data[col].fillna('unknown')

print("Categorical columns processed (NaNs imputed).")
print("Updated DataFrame Info:")
df_merged_data.info()

print("\nMissing values per column after categorical processing:")
print(df_merged_data[categorical_cols_to_clean].isnull().sum())


Reasoning: The previous step handled categorical column NaNs. Now, I will address the remaining missing values in numerical columns (payment_installments, payment_value) by filling them with their respective medians and converting their data types. I will also fill the missing values in datetime columns using the mode, as these represent specific event times that would be best approximated by the most frequent occurrence when missing, ensuring all critical columns are clean.

In [None]:
import numpy as np

# Handle remaining numerical NaNs
# payment_installments: fill with median and convert to int
if df_merged_data['payment_installments'].isnull().any():
    df_merged_data['payment_installments'] = df_merged_data['payment_installments'].fillna(df_merged_data['payment_installments'].median()).astype(int)

# payment_value: fill with mean and convert to float
if df_merged_data['payment_value'].isnull().any():
    df_merged_data['payment_value'] = df_merged_data['payment_value'].fillna(df_merged_data['payment_value'].mean()).astype(float)

# Handle remaining datetime NaNs
datetime_cols_with_nans = [
    'order_approved_at',
    'order_delivered_carrier_date',
    'order_delivered_customer_date',
    'shipping_limit_date',
    'review_creation_date',
    'review_answer_timestamp'
]

for col in datetime_cols_with_nans:
    if df_merged_data[col].isnull().any():
        # Fill with the mode (most frequent datetime) for each column
        # mode() returns a Series, take the first element if there are multiple modes
        mode_value = df_merged_data[col].mode()[0]
        df_merged_data[col] = df_merged_data[col].fillna(mode_value)


print("Remaining NaNs in numerical and datetime columns handled.")
print("Updated DataFrame Info:")
df_merged_data.info()

print("\nMissing values per column after all cleaning steps:")
print(df_merged_data.isnull().sum()[df_merged_data.isnull().sum() > 0])

Carregar o dataset limpo em um arquivo final (ex: clean_data.csv)
Subtask:
Save the cleaned and merged DataFrame (df_merged_data) to a CSV file.

Reasoning: To complete the subtask, I will define the output path and filename for the cleaned data, then save the df_merged_data DataFrame to a CSV file at that location, and finally print a confirmation message.

In [None]:
OUTPUT_CLEAN_CSV = 'clean_data.csv'
PATH_CLEAN_CSV = os.path.join(RAW_DATA_PATH, OUTPUT_CLEAN_CSV)

# Save the cleaned and merged DataFrame to a CSV file
df_merged_data.to_csv(PATH_CLEAN_CSV, index=False)

print(f"Successfully saved cleaned data to: {PATH_CLEAN_CSV}")

Automação: Garantir que este processo seja executado por um único script modular.
Subtask:
Consolidate all ETL steps (extraction, transformation, loading) into a single, modular Python script.

Reasoning: To consolidate all ETL steps into a single, modular Python script as requested, I will provide the complete Python code for etl_pipeline.py. This script will include all necessary imports, global variables, data loading from various sources (CSV, JSON, SQLite), data merging, cleaning, feature engineering, and finally, saving the processed data to a new CSV file. This fulfills the subtask's requirement to create a unified and executable ETL pipeline.

In [None]:
import pandas as pd
import json
import sqlite3
import os
import shutil
import numpy as np

# --- Global Configurations ---
BASE_DIR = os.getcwd()
RAW_DATA_PATH = os.path.join(BASE_DIR, 'data', '01_raw')

# Input/Output file names and table names
INPUT_CUSTOMERS_CSV = 'olist_customers_dataset.csv'
INPUT_ORDER_ITEMS_CSV = 'olist_order_items_dataset.csv'
INPUT_ORDERS_CSV = 'olist_orders_dataset.csv'
INPUT_ORDER_PAYMENTS_CSV = 'olist_order_payments_dataset.csv'
INPUT_ORDER_REVIEWS_CSV = 'olist_order_reviews_dataset.csv'
INPUT_PRODUCTS_CSV = 'olist_products_dataset.csv'
INPUT_SELLERS_CSV = 'olist_sellers_dataset.csv'
INPUT_PRODUCT_CATEGORY_TRANSLATION_CSV = 'product_category_name_translation.csv'

OUTPUT_CUSTOMERS_JSON = 'clientes_api.json'
OUTPUT_ORDER_ITEMS_DB = 'estoque.db'
SQL_TABLE_NAME = 'order_items'
OUTPUT_CLEAN_CSV = 'clean_data.csv'

def ensure_raw_data_path_and_move_files():
    """Ensures the RAW_DATA_PATH exists and moves specific CSVs into it."""
    os.makedirs(RAW_DATA_PATH, exist_ok=True)
    print(f"Ensured directory exists: {RAW_DATA_PATH}")

    files_to_move = [
        INPUT_CUSTOMERS_CSV,
        INPUT_ORDER_ITEMS_CSV,
        INPUT_ORDERS_CSV,
        INPUT_ORDER_PAYMENTS_CSV,
        INPUT_ORDER_REVIEWS_CSV,
        INPUT_PRODUCTS_CSV,
        INPUT_SELLERS_CSV,
        INPUT_PRODUCT_CATEGORY_TRANSLATION_CSV
    ]

    for filename in files_to_move:
        source_path = os.path.join(BASE_DIR, filename)
        destination_path = os.path.join(RAW_DATA_PATH, filename)
        if os.path.exists(source_path) and not os.path.exists(destination_path):
            shutil.move(source_path, destination_path)
            print(f"Moved '{filename}' from '{BASE_DIR}' to '{RAW_DATA_PATH}'.")
        elif os.path.exists(destination_path):
            print(f"'{filename}' already exists in '{RAW_DATA_PATH}'. Skipping move.")
        elif not os.path.exists(source_path):
            print(f"Warning: '{filename}' not found at '{BASE_DIR}'. Cannot move file.")

def convert_csv_to_json():
    """Lê o CSV de clientes e salva como JSON para simular uma fonte de API."""

    path_input_csv = os.path.join(RAW_DATA_PATH, INPUT_CUSTOMERS_CSV)
    path_output_json = os.path.join(RAW_DATA_PATH, OUTPUT_CUSTOMERS_JSON)

    print(f"Lendo CSV de clientes em: {path_input_csv}")

    try:
        df_customers = pd.read_csv(path_input_csv)
        df_customers.to_json(
            path_output_json,
            orient='records',
            date_format='iso'
        )
        print(f"Sucesso! {len(df_customers)} registros convertidos para JSON.")
        print(f"Arquivo JSON salvo em: {path_output_json}")

    except Exception as e:
        print(f"Ocorreu um erro durante a conversão CSV para JSON: {e}")

def create_sqlite_db():
    """Lê o CSV de itens e carrega os dados em uma tabela SQLite local."""

    path_input_csv = os.path.join(RAW_DATA_PATH, INPUT_ORDER_ITEMS_CSV)
    path_output_db = os.path.join(RAW_DATA_PATH, OUTPUT_ORDER_ITEMS_DB)

    print(f"Lendo CSV de itens em: {path_input_csv}")

    try:
        df_items = pd.read_csv(path_input_csv)
        conn = sqlite3.connect(path_output_db)

        print(f"Carregando {len(df_items)} registros na tabela '{SQL_TABLE_NAME}'...")

        df_items.to_sql(
            name=SQL_TABLE_NAME,
            con=conn,
            if_exists='replace',
            index=False
        )

        cursor = conn.cursor()
        cursor.execute(f"SELECT COUNT(*) FROM {SQL_TABLE_NAME}")
        count = cursor.fetchone()[0]
        conn.close()

        print(f"Sucesso! Banco de dados '{OUTPUT_ORDER_ITEMS_DB}' criado com {count} linhas na tabela.")
        print(f"Arquivo DB salvo em: {path_output_db}")

    except Exception as e:
        print(f"Ocorreu um erro ao criar o banco de dados SQLite: {e}")

def run_etl_pipeline():
    """Executes the complete ETL pipeline."""
    print("\n--- Starting ETL Pipeline ---")

    # Step 1: Ensure directory and move raw files
    ensure_raw_data_path_and_move_files()

    # Step 2: Extraction - Create JSON and SQLite from raw CSVs
    convert_csv_to_json()
    create_sqlite_db()

    # Step 3: Load all data sources into DataFrames
    print("\nLoading dataframes...")
    try:
        df_orders = pd.read_csv(os.path.join(RAW_DATA_PATH, INPUT_ORDERS_CSV))
        df_order_payments = pd.read_csv(os.path.join(RAW_DATA_PATH, INPUT_ORDER_PAYMENTS_CSV))
        df_order_reviews = pd.read_csv(os.path.join(RAW_DATA_PATH, INPUT_ORDER_REVIEWS_CSV))
        df_products = pd.read_csv(os.path.join(RAW_DATA_PATH, INPUT_PRODUCTS_CSV))
        df_sellers = pd.read_csv(os.path.join(RAW_DATA_PATH, INPUT_SELLERS_CSV))
        df_product_category_name_translation = pd.read_csv(os.path.join(RAW_DATA_PATH, INPUT_PRODUCT_CATEGORY_TRANSLATION_CSV))

        path_customer_json = os.path.join(RAW_DATA_PATH, OUTPUT_CUSTOMERS_JSON)
        with open(path_customer_json, 'r') as f:
            customers_data = json.load(f)
        df_customers_json = pd.DataFrame(customers_data)

        path_sqlite_db = os.path.join(RAW_DATA_PATH, OUTPUT_ORDER_ITEMS_DB)
        conn = sqlite3.connect(path_sqlite_db)
        df_order_items_sql = pd.read_sql_query(f"SELECT * FROM {SQL_TABLE_NAME}", conn)
        conn.close()
        print("All dataframes loaded successfully.")

    except Exception as e:
        print(f"Error loading initial dataframes: {e}")
        return

    # Step 4: Merging all DataFrames
    print("\nMerging dataframes...")
    df_merged_data = pd.merge(df_orders, df_customers_json, on='customer_id', how='left')
    df_merged_data = pd.merge(df_merged_data, df_order_items_sql, on='order_id', how='left')
    df_merged_data = pd.merge(df_merged_data, df_order_payments, on='order_id', how='left')
    df_merged_data = pd.merge(df_merged_data, df_order_reviews, on='order_id', how='left')
    df_merged_data = pd.merge(df_merged_data, df_products, on='product_id', how='left')
    df_merged_data = pd.merge(df_merged_data, df_product_category_name_translation, on='product_category_name', how='left')
    df_merged_data = pd.merge(df_merged_data, df_sellers, on='seller_id', how='left')
    print(f"Dataframes merged. Final shape: {df_merged_data.shape}")

    # Step 5: Data Cleaning and Preprocessing
    print("\nCleaning and preprocessing data...")
    # Convert datetime columns
    datetime_cols = [
        'order_purchase_timestamp',
        'order_approved_at',
        'order_delivered_carrier_date',
        'order_delivered_customer_date',
        'order_estimated_delivery_date',
        'shipping_limit_date',
        'review_creation_date',
        'review_answer_timestamp'
    ]
    for col in datetime_cols:
        df_merged_data[col] = pd.to_datetime(df_merged_data[col], errors='coerce')

    # Handle numerical NaNs
    numerical_cols_to_clean = [
        'product_name_lenght',
        'product_description_lenght',
        'product_photos_qty',
        'product_weight_g',
        'product_length_cm',
        'product_height_cm',
        'product_width_cm',
        'payment_sequential',
        'payment_installments',
        'payment_value',
        'review_score',
        'order_item_id',
        'price',
        'freight_value',
        'seller_zip_code_prefix'
    ]

    for col in numerical_cols_to_clean:
        if df_merged_data[col].isnull().any():
            if col == 'payment_sequential':
                df_merged_data[col] = df_merged_data[col].fillna(0).astype(int)
            elif col == 'order_item_id':
                df_merged_data[col] = df_merged_data[col].fillna(1).astype(int)
            elif col in ['product_name_lenght', 'product_description_lenght', 'product_photos_qty', 'product_weight_g', 'product_length_cm', 'product_height_cm', 'product_width_cm']:
                df_merged_data[col] = df_merged_data[col].fillna(df_merged_data[col].median()).astype(int)
            elif col == 'review_score':
                df_merged_data[col] = df_merged_data[col].fillna(df_merged_data[col].median()).astype(int)
            elif col in ['price', 'freight_value']:
                df_merged_data[col] = df_merged_data[col].fillna(df_merged_data[col].mean()).astype(float)
            elif col == 'seller_zip_code_prefix':
                df_merged_data[col] = df_merged_data[col].fillna(df_merged_data[col].mode()[0]).astype(int)
        else:
            if col in ['product_name_lenght', 'product_description_lenght', 'product_photos_qty', 'product_weight_g', 'product_length_cm', 'product_height_cm', 'product_width_cm', 'payment_sequential', 'payment_installments', 'review_score', 'order_item_id', 'seller_zip_code_prefix']:
                df_merged_data[col] = df_merged_data[col].astype(int)
            elif col in ['price', 'freight_value', 'payment_value']:
                df_merged_data[col] = df_merged_data[col].astype(float)

    # Handle categorical NaNs
    categorical_cols_to_clean = [
        'product_category_name',
        'product_category_name_english',
        'review_comment_title',
        'review_comment_message',
        'payment_type',
        'product_id',
        'seller_id',
        'seller_city',
        'seller_state',
        'review_id'
    ]
    for col in categorical_cols_to_clean:
        if df_merged_data[col].isnull().any():
            if col == 'product_id':
                df_merged_data[col] = df_merged_data[col].fillna('unknown_product')
            elif col == 'seller_id':
                df_merged_data[col] = df_merged_data[col].fillna('unknown_seller')
            elif col == 'payment_type':
                df_merged_data[col] = df_merged_data[col].fillna(df_merged_data[col].mode()[0])
            elif col == 'review_id':
                df_merged_data[col] = df_merged_data[col].fillna('unknown_review')
            else:
                df_merged_data[col] = df_merged_data[col].fillna('unknown')

    # Handle remaining numerical and datetime NaNs
    if df_merged_data['payment_installments'].isnull().any():
        df_merged_data['payment_installments'] = df_merged_data['payment_installments'].fillna(df_merged_data['payment_installments'].median()).astype(int)
    if df_merged_data['payment_value'].isnull().any():
        df_merged_data['payment_value'] = df_merged_data['payment_value'].fillna(df_merged_data['payment_value'].mean()).astype(float)

    datetime_cols_with_nans = [
        'order_approved_at',
        'order_delivered_carrier_date',
        'order_delivered_customer_date',
        'shipping_limit_date',
        'review_creation_date',
        'review_answer_timestamp'
    ]
    for col in datetime_cols_with_nans:
        if df_merged_data[col].isnull().any():
            mode_value = df_merged_data[col].mode()[0]
            df_merged_data[col] = df_merged_data[col].fillna(mode_value)

    print("Data cleaning and preprocessing complete.")

    # Step 6: Calculate delivery time and product value metrics
    print("\nCalculating new features...")
    df_merged_data['delivery_time_days'] = (df_merged_data['order_delivered_customer_date'] - df_merged_data['order_purchase_timestamp']).dt.days
    df_merged_data['product_total_value'] = df_merged_data['price'] * df_merged_data['order_item_id']
    print("New features calculated.")

    # Step 7: Save the final cleaned and transformed DataFrame
    path_clean_csv = os.path.join(RAW_DATA_PATH, OUTPUT_CLEAN_CSV)
    df_merged_data.to_csv(path_clean_csv, index=False)
    print(f"\nSuccessfully saved cleaned data to: {path_clean_csv}")
    print("--- ETL Pipeline Finished ---")

if __name__ == "__main__":
    run_etl_pipeline()


Reasoning: Now that datetime columns are handled, I will proceed to address missing values and convert data types for numerical columns, specifically focusing on product dimensions, payment details, and review scores. This involves identifying the columns, analyzing their current data types, and applying a suitable imputation strategy as outlined in the instructions.