# 🧹 Masterclass de Limpieza de Datos con Pandas - Parte 3

## Contenido
1. [Técnicas Avanzadas de Imputación](#1)
2. [Normalización y Escalado](#2)
3. [Caso Práctico: Análisis de Ventas](#3)
4. [Automatización de la Limpieza](#4)

En esta tercera parte exploraremos técnicas más sofisticadas de limpieza de datos y veremos un caso práctico completo.

In [None]:
import pandas as pd
import numpy as np
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

# Configuraciones
pd.set_option('display.max_columns', None)
plt.style.use('seaborn')

<a id='1'></a>
## 1. Técnicas Avanzadas de Imputación

Vamos a explorar diferentes métodos de imputación para datos faltantes:

In [None]:
# Crear dataset con valores faltantes
np.random.seed(42)
n = 1000

# Generar datos correlacionados
x = np.random.normal(0, 1, n)
y = 2 * x + np.random.normal(0, 0.5, n)
z = x - y + np.random.normal(0, 0.3, n)

df_missing = pd.DataFrame({
    'x': x,
    'y': y,
    'z': z
})

# Introducir valores faltantes aleatoriamente
for col in df_missing.columns:
    mask = np.random.random(n) < 0.2  # 20% de valores faltantes
    df_missing.loc[mask, col] = np.nan

print("Valores faltantes por columna:")
print(df_missing.isnull().sum())

# Visualizar patrones de valores faltantes
plt.figure(figsize=(10, 6))
sns.heatmap(df_missing.isnull(), cmap='viridis', yticklabels=False)
plt.title('Patrón de Valores Faltantes')
plt.show()

In [None]:
# 1. Imputación Simple
imputer_mean = SimpleImputer(strategy='mean')
df_imputed_mean = pd.DataFrame(
    imputer_mean.fit_transform(df_missing),
    columns=df_missing.columns
)

# 2. Imputación KNN
imputer_knn = KNNImputer(n_neighbors=5)
df_imputed_knn = pd.DataFrame(
    imputer_knn.fit_transform(df_missing),
    columns=df_missing.columns
)

# 3. Imputación por interpolación
df_imputed_interp = df_missing.interpolate(method='cubic')

# Comparar resultados
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Comparación de Métodos de Imputación')

# Datos originales con valores faltantes
sns.scatterplot(data=df_missing, x='x', y='y', ax=axes[0,0])
axes[0,0].set_title('Datos Originales con Valores Faltantes')

# Imputación por media
sns.scatterplot(data=df_imputed_mean, x='x', y='y', ax=axes[0,1])
axes[0,1].set_title('Imputación por Media')

# Imputación KNN
sns.scatterplot(data=df_imputed_knn, x='x', y='y', ax=axes[1,0])
axes[1,0].set_title('Imputación KNN')

# Imputación por interpolación
sns.scatterplot(data=df_imputed_interp, x='x', y='y', ax=axes[1,1])
axes[1,1].set_title('Imputación por Interpolación')

plt.tight_layout()
plt.show()

<a id='2'></a>
## 2. Normalización y Escalado

Vamos a explorar diferentes técnicas de normalización y escalado de datos:

In [None]:
# Crear datos con diferentes escalas
np.random.seed(42)
n = 1000

df_scale = pd.DataFrame({
    'pequeña_escala': np.random.normal(0, 1, n),
    'gran_escala': np.random.normal(1000, 100, n),
    'con_outliers': np.concatenate([
        np.random.normal(10, 2, n-10),
        np.random.normal(50, 5, 10)  # outliers
    ])
})

# Aplicar diferentes escaladores
scalers = {
    'standard': StandardScaler(),
    'minmax': MinMaxScaler(),
    'robust': RobustScaler()
}

scaled_dfs = {}
for name, scaler in scalers.items():
    scaled_dfs[name] = pd.DataFrame(
        scaler.fit_transform(df_scale),
        columns=df_scale.columns
    )

# Visualizar resultados
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Comparación de Métodos de Escalado')

# Datos originales
df_scale.boxplot(ax=axes[0,0])
axes[0,0].set_title('Datos Originales')

# Standard Scaler
scaled_dfs['standard'].boxplot(ax=axes[0,1])
axes[0,1].set_title('Standard Scaler')

# MinMax Scaler
scaled_dfs['minmax'].boxplot(ax=axes[1,0])
axes[1,0].set_title('MinMax Scaler')

# Robust Scaler
scaled_dfs['robust'].boxplot(ax=axes[1,1])
axes[1,1].set_title('Robust Scaler')

plt.tight_layout()
plt.show()

<a id='3'></a>
## 3. Caso Práctico: Análisis de Ventas

Vamos a crear un caso práctico completo de limpieza de datos:

In [None]:
# Crear dataset de ventas con problemas típicos
np.random.seed(42)
n = 1000

# Generar fechas
fechas_base = pd.date_range(start='2022-01-01', end='2023-12-31', periods=n)
fechas = [f.strftime(random.choice(['%Y-%m-%d', '%d/%m/%Y', '%d-%m-%y'])) for f in fechas_base]

# Generar productos y categorías
productos = ['Laptop', 'Smartphone', 'Tablet', 'Smartwatch', 'Auriculares']
categorias = ['Electrónica', 'electronica', 'ELECTRONICA', 'Electrnica']

# Crear DataFrame
df_ventas = pd.DataFrame({
    'fecha': fechas,
    'producto': np.random.choice(productos, n),
    'categoria': np.random.choice(categorias, n),
    'precio': np.random.uniform(100, 1000, n),
    'cantidad': np.random.randint(1, 10, n),
    'id_cliente': [f'CLI_{i:04d}' for i in range(n)],
    'email': [f'cliente{i}@ejemplo.com' if random.random() > 0.1 else 'invalido@.com' for i in range(n)]
})

# Introducir problemas típicos
# 1. Valores nulos
df_ventas.loc[np.random.choice(n, 50), 'precio'] = None
df_ventas.loc[np.random.choice(n, 30), 'email'] = None

# 2. Outliers en precios
df_ventas.loc[np.random.choice(n, 10), 'precio'] = np.random.uniform(5000, 10000, 10)

# 3. Duplicados
df_ventas = pd.concat([df_ventas, df_ventas.sample(n=50)])

print("Estado inicial del dataset:")
print(df_ventas.info())
print("\nValores nulos:")
print(df_ventas.isnull().sum())

In [None]:
class LimpiadorDatos:
    def __init__(self, df):
        self.df_original = df.copy()
        self.df = df.copy()
        
    def limpiar_fechas(self):
        """Estandarizar fechas a formato ISO"""
        def parse_fecha(fecha_str):
            formatos = ['%Y-%m-%d', '%d/%m/%Y', '%d-%m-%y']
            for fmt in formatos:
                try:
                    return pd.to_datetime(fecha_str, format=fmt)
                except:
                    continue
            return pd.NaT
        
        self.df['fecha'] = self.df['fecha'].apply(parse_fecha)
        return self
    
    def estandarizar_categorias(self):
        """Estandarizar nombres de categorías"""
        mapping = {
            'electronica': 'Electrónica',
            'ELECTRONICA': 'Electrónica',
            'Electrnica': 'Electrónica'
        }
        self.df['categoria'] = self.df['categoria'].replace(mapping)
        return self
    
    def limpiar_precios(self):
        """Limpiar y validar precios"""
        # Eliminar precios negativos
        self.df.loc[self.df['precio'] < 0, 'precio'] = np.nan
        
        # Detectar y tratar outliers
        Q1 = self.df['precio'].quantile(0.25)
        Q3 = self.df['precio'].quantile(0.75)
        IQR = Q3 - Q1
        
        self.df.loc[self.df['precio'] > Q3 + 1.5*IQR, 'precio'] = Q3 + 1.5*IQR
        
        # Imputar valores faltantes con la mediana
        self.df['precio'] = self.df['precio'].fillna(self.df['precio'].median())
        return self
    
    def validar_emails(self):
        """Validar formato de emails"""
        patron = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        self.df['email_valido'] = self.df['email'].str.match(patron)
        return self
    
    def eliminar_duplicados(self):
        """Eliminar registros duplicados"""
        self.df = self.df.drop_duplicates()
        return self
    
    def generar_metricas(self):
        """Calcular métricas de ventas"""
        self.df['total_venta'] = self.df['precio'] * self.df['cantidad']
        return self
    
    def obtener_reporte_limpieza(self):
        """Generar reporte de cambios realizados"""
        reporte = {
            'registros_originales': len(self.df_original),
            'registros_finales': len(self.df),
            'duplicados_eliminados': len(self.df_original) - len(self.df),
            'emails_invalidos': (~self.df['email_valido']).sum(),
            'categorias_unicas': self.df['categoria'].nunique()
        }
        return reporte

# Aplicar limpieza
limpiador = LimpiadorDatos(df_ventas)
df_limpio = (limpiador
    .limpiar_fechas()
    .estandarizar_categorias()
    .limpiar_precios()
    .validar_emails()
    .eliminar_duplicados()
    .generar_metricas()
    .df
)

# Mostrar reporte
reporte = limpiador.obtener_reporte_limpieza()
print("\nReporte de Limpieza:")
for k, v in reporte.items():
    print(f"{k}: {v}")

# Visualizar resultados
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Análisis de Datos Limpios')

# Ventas por categoría
df_limpio.groupby('categoria')['total_venta'].sum().plot(kind='bar', ax=axes[0,0])
axes[0,0].set_title('Ventas por Categoría')

# Distribución de precios
sns.histplot(data=df_limpio, x='precio', ax=axes[0,1])
axes[0,1].set_title('Distribución de Precios')

# Ventas por mes
df_limpio.set_index('fecha')['total_venta'].resample('M').sum().plot(ax=axes[1,0])
axes[1,0].set_title('Ventas Mensuales')

# Top productos
df_limpio.groupby('producto')['cantidad'].sum().sort_values(ascending=False).plot(kind='bar', ax=axes[1,1])
axes[1,1].set_title('Productos más Vendidos')

plt.tight_layout()
plt.show()

<a id='4'></a>
## 4. Automatización de la Limpieza

Vamos a crear una clase para automatizar el proceso de limpieza:

In [None]:
class AutoLimpiador:
    def __init__(self, df):
        self.df_original = df.copy()
        self.df = df.copy()
        self.log = []
        
    def detectar_tipo_columna(self, columna):
        """Detectar el tipo de datos de una columna"""
        muestra = self.df[columna].dropna()
        if muestra.empty:
            return 'unknown'
            
        # Intentar convertir a datetime
        try:
            pd.to_datetime(muestra.iloc[0])
            return 'fecha'
        except:
            pass
        
        # Verificar si es numérico
        if pd.api.types.is_numeric_dtype(muestra):
            return 'numerico'
        
        # Verificar si es categórico
        if muestra.nunique() / len(muestra) < 0.05:
            return 'categorico'
            
        # Verificar si parece un email
        if muestra.iloc[0].count('@') == 1:
            return 'email'
            
        return 'texto'
    
    def limpiar_columna(self, columna):
        """Aplicar limpieza según el tipo de columna"""
        tipo = self.detectar_tipo_columna(columna)
        self.log.append(f"Limpiando columna {columna} (tipo: {tipo})")
        
        if tipo == 'fecha':
            self.df[columna] = pd.to_datetime(self.df[columna], errors='coerce')
            
        elif tipo == 'numerico':
            # Convertir a numérico y manejar outliers
            self.df[columna] = pd.to_numeric(self.df[columna], errors='coerce')
            Q1 = self.df[columna].quantile(0.25)
            Q3 = self.df[columna].quantile(0.75)
            IQR = Q3 - Q1
            self.df[columna] = self.df[columna].clip(Q1 - 1.5*IQR, Q3 + 1.5*IQR)
            
        elif tipo == 'categorico':
            # Estandarizar categorías
            self.df[columna] = self.df[columna].str.strip().str.title()
            
        elif tipo == 'email':
            # Validar emails
            patron = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
            mascara = ~self.df[columna].str.match(patron, na=False)
            self.df.loc[mascara, columna] = None
            
        # Imputar valores faltantes
        if self.df[columna].isnull().any():
            if tipo == 'numerico':
                self.df[columna] = self.df[columna].fillna(self.df[columna].median())
            elif tipo in ['categorico', 'texto']:
                self.df[columna] = self.df[columna].fillna('DESCONOCIDO')
                
        return self
    
    def limpiar_automaticamente(self):
        """Limpiar todas las columnas automáticamente"""
        for columna in self.df.columns:
            self.limpiar_columna(columna)
            
        # Eliminar duplicados
        n_antes = len(self.df)
        self.df = self.df.drop_duplicates()
        n_duplicados = n_antes - len(self.df)
        self.log.append(f"Eliminados {n_duplicados} registros duplicados")
        
        return self
    
    def obtener_reporte(self):
        """Generar reporte de cambios"""
        reporte = {
            'registros_originales': len(self.df_original),
            'registros_finales': len(self.df),
            'valores_nulos_original': self.df_original.isnull().sum().sum(),
            'valores_nulos_final': self.df.isnull().sum().sum(),
            'log_operaciones': self.log
        }
        return reporte

# Ejemplo de uso
auto_limpiador = AutoLimpiador(df_ventas)
df_auto_limpio = auto_limpiador.limpiar_automaticamente().df

# Mostrar reporte
reporte = auto_limpiador.obtener_reporte()
print("Reporte de Limpieza Automática:")
for k, v in reporte.items():
    if k != 'log_operaciones':
        print(f"{k}: {v}")
print("\nLog de operaciones:")
for log in reporte['log_operaciones']:
    print(f"- {log}")

## Conclusiones

En esta serie de notebooks hemos cubierto una amplia gama de técnicas de limpieza de datos:

1. **Parte 1**: Conceptos básicos y manejo inicial de datos
   - Exploración inicial
   - Manejo de valores nulos
   - Eliminación de duplicados
   - Corrección de tipos de datos

2. **Parte 2**: Técnicas avanzadas
   - Limpieza de fechas con regex
   - Estandarización de categorías
   - Detección y manejo de outliers
   - Validación de datos

3. **Parte 3**: Técnicas especializadas y automatización
   - Técnicas avanzadas de imputación
   - Normalización y escalado
   - Caso práctico completo
   - Automatización del proceso

### Recomendaciones Finales

1. Siempre realizar una exploración inicial detallada de los datos
2. Documentar todos los pasos de limpieza realizados
3. Validar los resultados después de cada transformación
4. Mantener una copia de los datos originales
5. Automatizar procesos repetitivos de limpieza
6. Considerar el impacto de cada transformación en el análisis posterior