<H1>📘 Módulo 1 — Ingesta y Preparación (STG1)

🎯 Objetivos del Módulo

En este módulo vamos a simular la capa de ingesta cruda de un flujo ETL real.

Aquí NO transformamos datos, solo los cargamos tal cual vienen desde la fuente (Google Drive) hacia nuestra base de staging.

Al finalizar serás capaz de:

Montar Google Drive en Colab y leer archivos CSV.

Crear una base SQLite para simular un entorno staging.

Insertar datos crudos en stg1_customers_raw.

Registrar metadatos del proceso en etl_logs.

Validar la carga con consultas SQL.


<H1>📚 Contexto Teórico
¿Qué es STG1 (Staging Layer 1)?

En arquitecturas ETL empresariales, STG1 es la primera capa de staging donde:

Los datos llegan sin transformar desde las fuentes

Se preserva el formato original para auditoría

Se registra metadata del proceso (timestamp, cantidad de registros, fuente)

Sirve como punto de recuperación ante errores downstream


**Analogía con PowerCenter:**

Es como el Source Qualifier que carga directo a una tabla de staging sin lógica de transformación.

¿Por qué SQLite?
SQLite es perfecto para este ejercicio porque:

No requiere servidor (archivo .db local)

Sintaxis SQL estándar

Ideal para prototipos y desarrollo local

Similar conceptualmente a cargar a Teradata/SQL Server en ambientes reales

In [1]:
#Paso 1 Preparación del entorno

#Importar librerias necesarias
import pandas as pd
import numpy as np
import sqlite3
from datetime import datetime
import os
from google.colab import drive

#verificamos versiones de las herramientas a usar
print(f"Pandas version:  {pd.__version__}")
print(f"SQlite version : {sqlite3._deprecated_version}")


Pandas version:  2.2.2
SQlite version : 2.6.0


In [2]:
# Paso 2 montar google Drive
drive.mount('/content/drive')

#Verificamos que se monto de manera correcta
!ls "/content/drive/MyDrive"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
'Adiciones Framework Replicación'       Python_Pandas
'Colab Notebooks'		        UNAD_JHON_2025
 data_engineering_project_practice      UNAD_JHON_2025.rar
 Ejemplos_desarrollos_teradata	        Untitled0.ipynb
 PROYECTOS_MEJORA_PERFIL_DATA_ENGINER


In [3]:
#Paso 3  Cargar dataset en crudo

#Definimos la ruta
file_path = "/content/drive/MyDrive/Python_Pandas/Synthetic_Customer_Practic/CARGA_STG_1/synthetic_customer_dirty_data.csv"

#Verificamos que el archivo existe
if not os.path.exists(file_path):
  raise FileNotFoundError(f"No se encontro el archivo en: {file_path}")
else:
  print("Archivo encontrado en: ", file_path)

#Cargar el CSV sin transformar
#Usamos dtype='object' para que lo mantega como viene desde la fuente

df_customers = pd.read_csv(
    file_path,
    dtype='object'#Todo como string para mantener el formato original
    ,na_values=['']#solo strings vacios como NULL
    ,keep_default_na=False
)

print("Dataset cargado con exito")
rows, cols = df_customers.shape
print(f"El dataset tiene {rows:,} filas por {cols} columnas")

print("Primeros tres registros")
display(df_customers.head(3))




Archivo encontrado en:  /content/drive/MyDrive/Python_Pandas/Synthetic_Customer_Practic/CARGA_STG_1/synthetic_customer_dirty_data.csv
Dataset cargado con exito
El dataset tiene 20,400 filas por 9 columnas
Primeros tres registros


Unnamed: 0,Customer ID,Name,Address,Engagement Score,Satisfaction Level,Role within Service,Action Intent,Has Bought,Date
0,9965,William Kramer,"6933 Melissa Rest, Andersonbury, MA 20335",458.8309521,72.04137133,owner,lawyer,True,2752013
1,1044,Mary Harrington,"731 Carlson Village, New Laurenburgh, NH 14166",658.8748471,51.52114233,myself,go,False,2692012
2,5265,Tara Carter,"5270 Bryant Point Apt. 422, East Tonyborough, ...",972.420271,24.02560241,when,visit,False,2662013


In [5]:
# Paso 4 creacion de la base de datos SQL

DB_NAME = "/content/drive/MyDrive/Python_Pandas/Synthetic_Customer_Practic/CARGA_STG_1/CLIENTES.db"

##Creamos la conexion a SQLITE3 (Si no existe la crea en automatico)

conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()

print(f"Base de datos: {DB_NAME} conectada/creada")

cursor.execute("""DROP TABLE IF EXISTS STG1_CLIENTES""")
cursor.execute("""DROP TABLE IF EXISTS ETL_LOGS""")
#Creamos tabla de logs
cursor.execute("""
CREATE TABLE IF NOT EXISTS ETL_LOGS(
  LOG_ID INTEGER PRIMARY KEY AUTOINCREMENT,
  PROCESS_NAME TEXT,
  START_TIME TEXT,
  END_TIME TEXT,
  RECORDS_PROCESSED INTEGER,
  STATUS TEXT,
  SOURCE_FILE TEXT,
  COMMENTS TEXT
)""")

print("Tabla de logs creada")

#Creamos la tabla de clientes y cargamos los datos alli recordemos que los datos se cargan en crudo
cursor.execute("""
CREATE TABLE IF NOT EXISTS STG1_CLIENTES (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    customer_ID TEXT,
    name TEXT,
    address TEXT,
    engagement_score TEXT,
    satisfaction_level TEXT,
    role_within_service TEXT,
    action_intent TEXT,
    has_bought TEXT,
    date TEXT,
    load_timestamp TEXT,
    source_file TEXT,
    batch_id INTEGER ,
    FOREIGN KEY (batch_id) REFERENCES ETL_LOGS (log_id)  -- Relación con logs
)
""")

print("Tabla STG1_CLIENTES creada")




conn.commit()

Base de datos: /content/drive/MyDrive/Python_Pandas/Synthetic_Customer_Practic/CARGA_STG_1/CLIENTES.db conectada/creada
Tabla de logs creada
Tabla STG1_CLIENTES creada


In [6]:
# ============================================================
# PASO 5: CARGA CON BATCH_ID
# ============================================================
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()

load_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
start_time = datetime.now()

# ============================================================
# 1. PRIMERO: Crear el registro en etl_logs (genera batch_id)
# ============================================================
cursor.execute("""
INSERT INTO etl_logs (
    process_name, start_time, status, source_file, comments
) VALUES (?, ?, ?, ?, ?)
""", (
    'STG1_INGESTA_RAW',
    load_timestamp,
    'IN_PROGRESS',
    os.path.basename(file_path),
    'Carga iniciada'
))
conn.commit()

# AHORA SÍ obtener el batch_id
batch_id = cursor.lastrowid
print(f"🏷️  Batch ID generado: {batch_id}")

# ============================================================
# 2. DESPUÉS: Agregar batch_id al DataFrame
# ============================================================
df_customers['batch_id'] = batch_id  # ✅ Ahora tiene valor
df_customers['load_timestamp'] = load_timestamp
df_customers['source_file'] = os.path.basename(file_path)

df_customers.columns = df_customers.columns.str.lower().str.replace(' ', '_')

# ============================================================
# 3. TRUNCAR Y CARGAR
# ============================================================
print("\n🔄 Truncando tabla STG1_CLIENTES...")
cursor.execute("DELETE FROM STG1_CLIENTES")
conn.commit()

try:
    df_customers.to_sql('STG1_CLIENTES', conn, if_exists='append', index=False)

    records_loaded = len(df_customers)
    end_time = datetime.now()
    duration = (end_time - start_time).total_seconds()
    status = 'SUCCESS'
    comments = f'Batch {batch_id}: {records_loaded:,} registros en {duration:.2f}s'

    print(f"✅ {records_loaded:,} registros insertados")

except Exception as e:
    status = 'FAILED'
    comments = f'Error: {str(e)}'
    records_loaded = 0
    end_time = datetime.now()
    print(f"❌ Error: {e}")

# ============================================================
# 4. ACTUALIZAR el log con el resultado final
# ============================================================
cursor.execute("""
UPDATE etl_logs
SET
    end_time = ?,
    records_processed = ?,
    status = ?,
    comments = ?
WHERE log_id = ?
""", (
    end_time.strftime('%Y-%m-%d %H:%M:%S'),
    records_loaded,
    status,
    comments,
    batch_id
))
conn.commit()

print(f"✅ Batch {batch_id} completado y registrado")
conn.close()

🏷️  Batch ID generado: 1

🔄 Truncando tabla STG1_CLIENTES...
✅ 20,400 registros insertados
✅ Batch 1 completado y registrado


In [7]:
#paso 6 validación de carga
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
print("Validacion de datos cargados en STG1")
print("="*60)

query_count = "SELECT COUNT(*) FROM STG1_CLIENTES"
result = pd.read_sql_query(query_count, conn)

print(f"\n Total de registros cargados: {result}")

# Consulta 2: Muestra de datos
query_sample = """
SELECT
    *
FROM STG1_CLIENTES
LIMIT 5
"""
df_sample = pd.read_sql_query(query_sample, conn)
print(f"\n2️⃣ Muestra de registros:")
display(df_sample)

# Consulta 3: Verificar logs
query_logs = "SELECT * FROM etl_logs ORDER BY log_id DESC LIMIT 1"
df_logs = pd.read_sql_query(query_logs, conn)
print(f"\n3️⃣ Último registro en ETL_LOGS:")
print(df_logs.T)  # Transponer para mejor visualización

# Cerrar conexión
conn.close()
print("\n✅ Conexión cerrada. Módulo 1 completado exitosamente.")

Validacion de datos cargados en STG1

 Total de registros cargados:    COUNT(*)
0     20400

2️⃣ Muestra de registros:


Unnamed: 0,id,customer_ID,name,address,engagement_score,satisfaction_level,role_within_service,action_intent,has_bought,date,load_timestamp,source_file,batch_id
0,1,9965,William Kramer,"6933 Melissa Rest, Andersonbury, MA 20335",458.8309521,72.04137133,owner,lawyer,True,2752013,2025-10-12 00:06:46,synthetic_customer_dirty_data.csv,1
1,2,1044,Mary Harrington,"731 Carlson Village, New Laurenburgh, NH 14166",658.8748471,51.52114233,myself,go,False,2692012,2025-10-12 00:06:46,synthetic_customer_dirty_data.csv,1
2,3,5265,Tara Carter,"5270 Bryant Point Apt. 422, East Tonyborough, ...",972.420271,24.02560241,when,visit,False,2662013,2025-10-12 00:06:46,synthetic_customer_dirty_data.csv,1
3,4,4853,Teresa Perkins,"111 John Ways, South Davidfurt, MA 39654",132.5001248,93.77232927,begin,support,True,1102018,2025-10-12 00:06:46,synthetic_customer_dirty_data.csv,1
4,5,1303,Savannah Bowers,"USS Hall, FPO AP 14539",664.7009999,16.46063055,pick,degree,True,1582018,2025-10-12 00:06:46,synthetic_customer_dirty_data.csv,1



3️⃣ Último registro en ETL_LOGS:
                                                    0
LOG_ID                                              1
PROCESS_NAME                         STG1_INGESTA_RAW
START_TIME                        2025-10-12 00:06:46
END_TIME                          2025-10-12 00:06:46
RECORDS_PROCESSED                               20400
STATUS                                        SUCCESS
SOURCE_FILE         synthetic_customer_dirty_data.csv
COMMENTS           Batch 1: 20,400 registros en 0.39s

✅ Conexión cerrada. Módulo 1 completado exitosamente.


In [8]:
#Validacion de la tabla STG1_CLIENTES

conn = sqlite3.connect(DB_NAME)
#cursor = conn.cursor()

conteo_por_llave = ("""
SELECT customer_id, count(*) as conteo
FROM STG1_CLIENTES
GROUP BY customer_id
HAVING conteo > 1""")

result = pd.read_sql_query(conteo_por_llave, conn)
display(result)

customer_dup = ("""SELECT * FROM STG1_CLIENTES
WHERE customer_id = '1000'""")
result = pd.read_sql_query(customer_dup,conn)
display(result)

# registros_com_dup = ("""SELECT COUNT(*) AS CONTEO FROM (SELECT DISTINCT * FROM STG1_CLIENTES) A """)
# result = pd.read_sql_query(registros_com_dup,conn)
# display(result)

Unnamed: 0,customer_ID,conteo
0,1000,3
1,1001,2
2,1002,3
3,1004,4
4,1005,4
...,...,...
5894,9993,3
5895,9994,2
5896,9995,3
5897,9996,2


Unnamed: 0,id,customer_ID,name,address,engagement_score,satisfaction_level,role_within_service,action_intent,has_bought,date,load_timestamp,source_file,batch_id
0,5474,1000,Jeremiah Johnson,"0179 Scott Mall, North Richard, KY 78506",552.5220139,83.37004038,blood,region,True,2212021,2025-10-12 00:06:46,synthetic_customer_dirty_data.csv,1
1,7488,1000,Kaitlyn Marshall,"55295 Schwartz Station, Pearsonmouth, CA 25286",424.925153,11.99095653,Congress,science,False,2492017,2025-10-12 00:06:46,synthetic_customer_dirty_data.csv,1
2,9260,1000,Richard Jones,"USNV Nolan, FPO AE 99039",972.0592342,23.98961569,doctor,stock,False,1052011,2025-10-12 00:06:46,synthetic_customer_dirty_data.csv,1


In [9]:
# Ver tu trabajo
conn = sqlite3.connect(DB_NAME)

print("🏆 TU TRABAJO HOY:")
print(f"Batches procesados: {pd.read_sql_query('SELECT COUNT(*) FROM etl_logs', conn).iloc[0,0]}")
print(f"Registros en STG1: {pd.read_sql_query('SELECT COUNT(*) FROM STG1_CLIENTES', conn).iloc[0,0]:,}")

conn.close()

🏆 TU TRABAJO HOY:
Batches procesados: 1
Registros en STG1: 20,400
