In [1]:
%pip install pyspark -q
import pyspark
print(pyspark.__version__)

Note: you may need to restart the kernel to use updated packages.
4.0.1


In [1]:
from pyspark.sql import SparkSession

# Iniciamos Spark en modo local
spark = SparkSession.builder \
    .appName("Medallion - Capa Bronze") \
    .master("local[*]") \
    .getOrCreate()

print("Spark Version:", spark.version)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/22 21:18:11 WARN Utils: Your hostname, Mac-mini.local, resolves to a loopback address: 127.0.0.1; using 192.168.1.1 instead (on interface en1)
25/12/22 21:18:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/22 21:18:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Version: 4.0.1


In [2]:
# Ruta al CSV generado en Etapa 1
input_path = "../../etapa1-fundamentos/data/raw_sales.csv"
output_path = "../data/bronze/sales_raw.parquet"

# Leemos el CSV crudo (sin ninguna transformación)
df_bronze = spark.read.csv(input_path, header=True, inferSchema=True)

# Mostramos esquema y algunas filas para verificar
df_bronze.printSchema()
df_bronze.show(5, truncate=False)

# Guardamos en formato Parquet (eficiente y columnar)
df_bronze.write.mode("overwrite").parquet(output_path)
print(f"Datos guardados en capa Bronze: {output_path}")

root
 |-- sale_date: date (nullable = true)
 |-- store_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- total: double (nullable = true)
 |-- status: string (nullable = true)

+----------+--------+------------------------------------+----------+--------+-------+-------+---------+
|sale_date |store_id|customer_id                         |product   |quantity|price  |total  |status   |
+----------+--------+------------------------------------+----------+--------+-------+-------+---------+
|2025-07-27|NW-166  |8b1ff910-f298-4568-8f85-d7879a818f90|Smartphone|2       |1190.42|2380.84|Completed|
|2025-01-15|NW017   |03afe752-6931-46bf-a7f2-f1d57ddba376|Headphones|1       |1026.98|1026.98|Completed|
|2025-03-12|NW055   |7d3928fa-7308-4afb-9c1f-0759e812d758|Monitor   |1       |NULL   |575.75 |Returned |
|2025-03-14|NW-142  |9670544b-48ea-4a5d-946b-1798f

In [4]:
# Leemos de vuelta para confirmar que todo está intacto
df_bronze_loaded = spark.read.parquet(output_path)

# Conteo de filas (debe coincidir con el CSV original)
print("Filas en CSV original:", df_bronze.count())
print("Filas en Parquet Bronze:", df_bronze_loaded.count())

# Verificamos que los nulos y formatos raros de store_id siguen intactos
df_bronze_loaded.filter(df_bronze_loaded["price"].isNull()).show(5)
df_bronze_loaded.select("store_id").distinct().show(20, truncate=False)

Filas en CSV original: 5000
Filas en Parquet Bronze: 5000
+----------+--------+--------------------+----------+--------+-----+-------+---------+
| sale_date|store_id|         customer_id|   product|quantity|price|  total|   status|
+----------+--------+--------------------+----------+--------+-----+-------+---------+
|2025-03-12|   NW055|7d3928fa-7308-4af...|   Monitor|       1| NULL| 575.75| Returned|
|2025-06-16|  011-NW|f0f8ec8b-3553-498...|   Monitor|       2| NULL|1506.66|Completed|
|2025-10-05|  NW-122|d4b325bb-7cdf-441...|    Tablet|       4| NULL|3618.72|Completed|
|2025-10-18|   NW-59|32515277-cdc5-452...|Smartphone|       4| NULL|3921.12|Completed|
|2025-09-02|   NW062|3cd230cb-10f4-491...|    Tablet|       4| NULL|4605.52|Completed|
+----------+--------+--------------------+----------+--------+-----+-------+---------+
only showing top 5 rows
+--------+
|store_id|
+--------+
|NW-156  |
|NW-49   |
|NW095   |
|NW-175  |
|152-NW  |
|058-NW  |
|NW-60   |
|NW010   |
|NW-33   |
|06

In [5]:
# Generamos un pequeño batch adicional (simulando datos que llegan después)
from faker import Faker
import numpy as np
import pandas as pd

fake = Faker()
np.random.seed(42)

late_data = []
for _ in range(500):
    sale_date = fake.date_between(start_date="-30d", end_date="today")
    store_id = f"NW-{np.random.randint(1, 181):03d}"
    customer_id = fake.uuid4()
    product = "Extra Batch Product"
    quantity = np.random.randint(1, 6)
    price = round(np.random.uniform(100, 500), 2)
    total = quantity * price
    status = "Completed"

    late_data.append([sale_date, store_id, customer_id, product, quantity, price, total, status])

df_late = pd.DataFrame(late_data, columns=["sale_date", "store_id", "customer_id", "product", "quantity", "price", "total", "status"])

# Guardamos temporalmente
late_csv = "../data/bronze/late_sales.csv"
df_late.to_csv(late_csv, index=False)

# Lo ingirimos en modo append a Bronze
df_late_spark = spark.read.csv(late_csv, header=True, inferSchema=True)
df_late_spark.write.mode("append").parquet(output_path)

print("Datos tardíos agregados a Bronze. Total filas ahora:", spark.read.parquet(output_path).count())

Datos tardíos agregados a Bronze. Total filas ahora: 5500
