<a href="https://colab.research.google.com/github/jaimehdzgt/ETL-y-Aseguramiento-de-Calidad-de-Datos/blob/main/etl_cleaning_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Proyecto ETL End-to-End: Limpieza y Generación de Insights desde Datos Sucios

**Objetivo:** Convertir el dataset `etl_dirty_orders.xlsx` en una tabla confiable para análisis, documentando reglas de limpieza, validación y enriquecimiento; y entregar un dataset final + reporte de calidad + visualizaciones clave.

> **Instrucciones:** Coloca el archivo **`etl_dirty_orders.xlsx`** en la misma carpeta del notebook (o ajusta la variable `DATA_PATH`). El archivo incluye:
> - Hoja **`raw_orders`** (datos sucios)
> - Hoja **`lookup_states`** (normalización de estados)
> - Hoja **`lookup_subcats`** (corrección de subcategorías)


## 1) Configuración e importaciones

In [None]:

import os, re
from typing import Tuple
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

pd.set_option('display.max_columns', 100)
plt.rcParams['figure.figsize'] = (8, 5)

DATA_PATH = 'etl_dirty_orders.xlsx'  # coloca el Excel junto al .ipynb
SHEET_RAW = 'raw_orders'
SHEET_STATES = 'lookup_states'
SHEET_SUBCATS = 'lookup_subcats'


## 2) Carga del dataset y snapshot inicial

In [None]:

# === Montar Google Drive ===
from google.colab import drive
drive.mount('/content/drive')

# === Rutas y hojas ===
import os
import pandas as pd

DATA_PATH     = "/content/drive/MyDrive/Colab Notebooks/datasets/etl_dirty_orders.xlsx"  # <-- tu archivo en Drive
SHEET_RAW     = "raw_orders"
SHEET_STATES  = "lookup_states"
SHEET_SUBCATS = "lookup_subcats"

# === Carga con validación ===
assert os.path.exists(DATA_PATH), f"No se encontró {DATA_PATH}. Verifica la ruta exacta en tu Drive."
xls = pd.ExcelFile(DATA_PATH)
df            = pd.read_excel(xls, SHEET_RAW)
lookup_states = pd.read_excel(xls, SHEET_STATES)
lookup_subcats= pd.read_excel(xls, SHEET_SUBCATS)

print("Forma inicial:", df.shape)
display(df.head(10))

nulls = df.isna().sum().sort_values(ascending=False)
types = df.dtypes
print("\nNulos por columna (top 15):"); display(nulls.head(15).to_frame("nulls"))
print("\nTipos detectados:"); display(types.to_frame("dtype"))




## 3) Estandarización de esquema (nombres y columnas duplicadas)

- Normalizamos encabezados a `snake_case` y trim de espacios.
- Resolución de columnas duplicadas/solapadas (por ejemplo: `TOTAL $` vs `TOTAL`). Guardamos **una** columna final y documentamos la decisión.


In [None]:

def normalize_colname(c: str) -> str:
    c = c.strip()
    c = c.replace('%','_pct').replace('$','_usd').replace('#','num')
    c = re.sub(r'[^0-9a-zA-Z]+', '_', c)
    c = re.sub(r'__+', '_', c).strip('_')
    return c.lower()

df.columns = [normalize_colname(c) for c in df.columns]

# Unificar TOTAL si existe variante
if 'total_usd' in df.columns and 'total' in df.columns:
    df['total'] = df['total'].fillna(df['total_usd'])
    df.drop(columns=['total_usd'], inplace=True, errors='ignore')

# Unificar posibles duplicados de order_id
dup_cols = [c for c in df.columns if c.startswith('order_id') and c != 'order_id']
for c in dup_cols:
    df['order_id'] = df['order_id'].fillna(df[c]) if 'order_id' in df.columns else df[c]
    df.drop(columns=[c], inplace=True, errors='ignore')

print('Columnas tras normalización:', list(df.columns))



## 4) Limpieza de filas no-datos (SUBTOTAL / TOTAL GENERAL)

Removemos filas que son subtotales o totales incrustadas en el cuerpo de la tabla.


In [None]:

before = df.shape[0]
mask_bad = df['order_id'].astype(str).str.upper().isin(['SUBTOTAL', 'TOTAL GENERAL', 'TOTAL_GENERAL'])
df = df[~mask_bad].copy()
after = df.shape[0]
print(f'Filas removidas (no-datos): {before - after}')



## 5) Tipos y formatos (fechas, moneda, porcentajes)

- **Fechas:** múltiples formatos → parseo robusto.
- **Moneda:** normalizamos a `float` (admite `$`, `MXN`, comas como separador decimal y miles).
- **Porcentajes:** unificamos a fracción (0–1).


In [None]:

def parse_date_series(s: pd.Series) -> pd.Series:
    def _parse_one(x):
        if pd.isna(x) or str(x).strip()=='' or str(x).lower()=='none':
            return pd.NaT
        x = str(x).strip()
        v = pd.to_datetime(x, errors='coerce', dayfirst=True, infer_datetime_format=True)
        if pd.isna(v):
            try:
                v = pd.to_datetime(x, errors='coerce', format='%m/%d/%y')
            except:
                v = pd.NaT
        return v
    return s.apply(_parse_one)

money_regex = re.compile(r'[^0-9,.-]')
def parse_money_series(s: pd.Series) -> pd.Series:
    def _to_float(x):
        if pd.isna(x):
            return np.nan
        x = str(x).strip()
        if x.lower() in ['n/a','na','incluido','']:
            return 0.0 if x.lower()=='incluido' else np.nan
        x = money_regex.sub('', x)
        if x.count(',') > 0 and x.count('.') > 0:
            if x.rfind(',') > x.rfind('.'):
                x = x.replace('.', '').replace(',', '.')
            else:
                x = x.replace(',', '')
        else:
            if x.count(',') == 1 and x.count('.') == 0:
                x = x.replace(',', '.')
            else:
                x = x.replace(',', '')
        try:
            return float(x)
        except:
            return np.nan
    return s.apply(_to_float)

def parse_percent_series(s: pd.Series) -> pd.Series:
    def _pct(x):
        if pd.isna(x):
            return np.nan
        x = str(x).strip().replace('%','')
        try:
            v = float(x)
        except:
            return np.nan
        return v/100 if v>1 else (v if 0<=v<=1 else np.nan)
    return s.apply(_pct)

for col in ['order_date','ship_date','payment_date']:
    if col in df.columns:
        df[col] = parse_date_series(df[col])

for col in ['unit_price','tax','shipping','profit','total']:
    if col in df.columns:
        df[col] = parse_money_series(df[col])

for col in ['discount_pct','discount']:
    if col in df.columns:
        df[col] = parse_percent_series(df[col])

if 'discount_pct' in df.columns and 'discount' in df.columns:
    df['discount_pct'] = df['discount_pct'].fillna(df['discount'])
    df.drop(columns=['discount'], inplace=True, errors='ignore')
elif 'discount' in df.columns and 'discount_pct' not in df.columns:
    df.rename(columns={'discount':'discount_pct'}, inplace=True)



## 6) Limpieza general de texto

- `strip()` de espacios, normalización de mayúsculas/minúsculas donde aplique.
- Columnas objetivo: nombres, ciudad, estado, email, teléfono, categoría, subcategoría, segmento, dirección.


In [None]:

def trim_str(s: pd.Series) -> pd.Series:
    return s.astype(str).str.strip().replace({'': np.nan})

rename_map = {
    'phone': 'phone_num',
    'customer_email': 'email',
    'customername': 'customer_name',
}
df.rename(columns={k:v for k,v in rename_map.items() if k in df.columns}, inplace=True)

for c in [col for col in ['customer_name','email','phone_num','state','city','category','sub_category','segment','address','order_priority','payment_method','order_status','product_name'] if col in df.columns]:
    df[c] = trim_str(df[c])

for c in ['customer_name','state','city','category','sub_category','segment','order_status','order_priority','payment_method']:
    if c in df.columns:
        df[c] = df[c].str.replace('\s+', ' ', regex=True).str.title()

if 'email' in df.columns:
    df['email'] = df['email'].str.lower()



## 7) Normalización de estados y subcategorías (lookups)

- Unificamos variantes de **estado** usando `lookup_states`.
- Corregimos **typos de subcategoría** con `lookup_subcats`.
- Unificamos valores de **segmento** (e.g., `Home-Office` → `Home Office`, `Consumr` → `Consumer`).


In [None]:

# Estados
if {'state_variant','state_clean'}.issubset(lookup_states.columns):
    lk = lookup_states.copy()
    lk['state_variant_norm'] = lk['state_variant'].astype(str).str.strip().str.title().str.replace('\s+', ' ', regex=True)
    state_map = dict(zip(lk['state_variant_norm'], lk['state_clean']))
    if 'state' in df.columns:
        state_norm = df['state'].astype(str).str.strip().str.title().str.replace('\s+', ' ', regex=True)
        df['state'] = state_norm.map(state_map).fillna(state_norm)

# Subcategorías
if {'dirty_subcategory','clean_subcategory'}.issubset(lookup_subcats.columns):
    sub_map = dict(zip(lookup_subcats['dirty_subcategory'].astype(str).str.title(), lookup_subcats['clean_subcategory']))
    if 'sub_category' in df.columns:
        df['sub_category'] = df['sub_category'].replace(sub_map)

# Segmento
if 'segment' in df.columns:
    df['segment'] = df['segment'].replace({'Home-Office': 'Home Office', 'Consumr': 'Consumer'})



## 8) Reglas de integridad de negocio

- `ship_date >= order_date` y `payment_date >= order_date` → marcamos excepciones.
- `qty` mínimo 1; tratamos 0/negativos como inválidos y los reportamos.
- Recalculamos `total_calc = (unit_price * qty) * (1 - discount_pct) + tax + shipping` (shipping nulo → 0).
- **Flag** si `total` reportado difiere de `total_calc`.


In [None]:

if 'qty' in df.columns:
    df['qty'] = pd.to_numeric(df['qty'], errors='coerce')

df['flag_ship_before_order'] = False
df['flag_payment_before_order'] = False
if {'ship_date','order_date'}.issubset(df.columns):
    df['flag_ship_before_order'] = (df['ship_date'] < df['order_date']) & df['ship_date'].notna() & df['order_date'].notna()
if {'payment_date','order_date'}.issubset(df.columns):
    df['flag_payment_before_order'] = (df['payment_date'] < df['order_date']) & df['payment_date'].notna() & df['order_date'].notna()

if 'shipping' in df.columns:
    df['shipping'] = df['shipping'].fillna(0.0)
if 'discount_pct' in df.columns:
    df['discount_pct'] = df['discount_pct'].fillna(0.0)

for col in ['unit_price','qty','tax','shipping','discount_pct']:
    if col not in df.columns:
        df[col] = 0.0

df['total_calc'] = (df['unit_price'] * df['qty']) * (1 - df['discount_pct']) + df['tax'].fillna(0) + df['shipping'].fillna(0)

if 'total' in df.columns:
    df['flag_total_discrepancy'] = (np.round(df['total'],2) != np.round(df['total_calc'],2))
else:
    df['total'] = df['total_calc']
    df['flag_total_discrepancy'] = False

df['flag_qty_invalid'] = df['qty'].fillna(0) <= 0
print('Resumen flags:')
display(df[['flag_ship_before_order','flag_payment_before_order','flag_total_discrepancy','flag_qty_invalid']].sum())



## 9) Geodatos: detección de lat/long intercambiados

- Asumimos México: latitudes entre ~14 y 33, longitudes entre ~-118 y -86.
- Si detectamos coordenadas fuera de rango pero su inversión cae en rango, **intercambiamos**.


In [None]:

def fix_latlon(row):
    lat, lon = row.get('latitude', np.nan), row.get('longitude', np.nan)
    if pd.isna(lat) or pd.isna(lon):
        return lat, lon, False
    def in_mx(a, b):
        return (14 <= a <= 33) and (-118 <= b <= -86)
    ok = in_mx(lat, lon)
    swapped_ok = in_mx(lon, lat)
    if (not ok) and swapped_ok:
        return lon, lat, True
    return lat, lon, False

if {'latitude','longitude'}.issubset(df.columns):
    fixed = df.apply(fix_latlon, axis=1, result_type='expand')
    df[['latitude','longitude','flag_latlon_swapped']] = fixed
    print('Coordenadas corregidas (swaps):', int(df['flag_latlon_swapped'].sum()))



## 10) Duplicados

- Definimos llave de granularidad (`order_id`, `productid`, `order_date` si existen).
- Eliminamos duplicados conservando la primera ocurrencia y reportamos el impacto.


In [None]:

keys = [k for k in ['order_id','productid','order_date'] if k in df.columns]
before = df.shape[0]
if keys:
    df = df.sort_values(keys).drop_duplicates(subset=keys, keep='first')
after = df.shape[0]
print('Duplicados eliminados:', before - after)



## 11) Reporte de calidad (antes/después por regla)

Generamos un pequeño panel con:
- Nulos por columna
- Conteo de flags
- Filas afectadas por cada regla


In [None]:

quality = {}
quality['nulls_total'] = int(df.isna().sum().sum())

for flag in ['flag_ship_before_order','flag_payment_before_order','flag_total_discrepancy','flag_qty_invalid','flag_latlon_swapped']:
    if flag in df.columns:
        quality[flag] = int(df[flag].sum())

quality_df = pd.DataFrame({'metric': list(quality.keys()), 'value': list(quality.values())})
print('Resumen de calidad:'); display(quality_df)

OUT_CSV = 'fact_orders_clean.csv'
OUT_QUALITY = 'quality_report.csv'
df.to_csv(OUT_CSV, index=False)
quality_df.to_csv(OUT_QUALITY, index=False)
print(f'Archivos exportados: {OUT_CSV}, {OUT_QUALITY}')



## 12) Insights rápidos (visualizaciones)

- Ventas (total_calc) por categoría
- Margen (profit) por categoría
- Distribución de descuento efectivo


In [None]:

for col in ['total_calc','category','profit','discount_pct','qty']:
    if col not in df.columns:
        df[col] = np.nan

sales_cat = df.groupby('category', dropna=False)['total_calc'].sum().sort_values(ascending=False)
sales_cat.plot(kind='bar', title='Ventas por categoría')
plt.xlabel('Categoría'); plt.ylabel('Ventas'); plt.show()

if 'profit' in df.columns:
    profit_cat = df.groupby('category', dropna=False)['profit'].sum().sort_values(ascending=False)
    profit_cat.plot(kind='bar', title='Margen por categoría')
    plt.xlabel('Categoría'); plt.ylabel('Margen'); plt.show()

df['discount_pct'].dropna().plot(kind='hist', bins=20, title='Distribución de descuento efectivo')
plt.xlabel('Descuento (fracción)'); plt.ylabel('Frecuencia'); plt.show()


## 13) Vista previa del dataset limpio

In [None]:

display(df.head(10))
print('Forma final:', df.shape)
