In [13]:
!pip install deltalake --quiet
!pip install pyspark==3.4.1 --quiet
!pip install pyarrow --quiet


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.6/56.6 MB[0m [31m10.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.8/2.8 MB[0m [31m32.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
dataproc-spark-connect 0.8.3 requires pyspark[connect]~=3.5.1, but you have pyspark 3.4.1 which is incompatible.[0m[31m
[0m

In [14]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("fakestore_project")
    .getOrCreate()
)

spark


In [15]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Creamos las carpetas de tu Data Lake (Bronze / Silver / Gold)

In [16]:
import os

base_path = "/content/drive/MyDrive/fakestore_datalake"

bronze_path = os.path.join(base_path, "bronze")
silver_path = os.path.join(base_path, "silver")
gold_path   = os.path.join(base_path, "gold")

for p in [bronze_path, silver_path, gold_path]:
    os.makedirs(p, exist_ok=True)

base_path


'/content/drive/MyDrive/fakestore_datalake'

Creamos función EXTRACT para API FakeStore

In [17]:
import requests
import pandas as pd

def extract_fakestore_products():
    url = "https://fakestoreapi.com/products"
    try:
        response = requests.get(url)
        response.raise_for_status()
        return response.json()
    except Exception as e:
        print("Error en la extracción:", e)
        return None


Google Drive no permite la operación “rename” que Delta Lake necesita para escribir el _delta_log, por lo tanto lo guardamos en una carpeta local y despues o coopio al drive

In [20]:
local_bronze_path = "/content/fakestore_bronze_local"
os.makedirs(local_bronze_path, exist_ok=True)

local_bronze_path


'/content/fakestore_bronze_local'

 Guardamos BRONZE en Delta Lake localmente

In [21]:
from deltalake import write_deltalake
import pyarrow as pa

# Extraer datos
data = extract_fakestore_products()

if data:
    # Convertir a pandas
    pdf = pd.DataFrame(data)

    # Convertir a PyArrow Table
    table = pa.Table.from_pandas(pdf)

    # Ruta donde se va a guardar la tabla bronze
    bronze_local_products = os.path.join(local_bronze_path, "products")

    # Guardar como tabla delta usando delta-rs
    write_deltalake(bronze_local_products, table, mode="overwrite")

    print("Bronze creado en:", bronze_local_products)
else:
    print("No se pudo extraer información")


Bronze creado en: /content/fakestore_bronze_local/products


Copiamos carpeta completa a Google Drive

In [22]:
import shutil

# destino en Google Drive
bronze_products_drive = os.path.join(bronze_path, "products")

# si ya existe, la borramos
if os.path.exists(bronze_products_drive):
    shutil.rmtree(bronze_products_drive)

# copiamos la carpeta con _delta_log y parquet
shutil.copytree(bronze_local_products, bronze_products_drive)

print("Bronze copiado a Drive:", bronze_products_drive)


Bronze copiado a Drive: /content/drive/MyDrive/fakestore_datalake/bronze/products


Leemos Bronze con delta-rs (sin spark, porque Spark no pude leer Derta Lake) para ver si funciona correctamante

In [24]:
from deltalake import DeltaTable

bronze_products_drive = "/content/drive/MyDrive/fakestore_datalake/bronze/products"

dt_bronze = DeltaTable(bronze_products_drive)

pdf_bronze = dt_bronze.to_pandas()
pdf_bronze.head()


Unnamed: 0,id,title,price,description,category,image,rating
0,1,"Fjallraven - Foldsack No. 1 Backpack, Fits 15 ...",109.95,Your perfect pack for everyday use and walks i...,men's clothing,https://fakestoreapi.com/img/81fPKd-2AYL._AC_S...,"{'count': 120, 'rate': 3.9}"
1,2,Mens Casual Premium Slim Fit T-Shirts,22.3,"Slim-fitting style, contrast raglan long sleev...",men's clothing,https://fakestoreapi.com/img/71-3HjGNDUL._AC_S...,"{'count': 259, 'rate': 4.1}"
2,3,Mens Cotton Jacket,55.99,great outerwear jackets for Spring/Autumn/Wint...,men's clothing,https://fakestoreapi.com/img/71li-ujtlUL._AC_U...,"{'count': 500, 'rate': 4.7}"
3,4,Mens Casual Slim Fit,15.99,The color could be slightly different between ...,men's clothing,https://fakestoreapi.com/img/71YXzeOuslL._AC_U...,"{'count': 430, 'rate': 2.1}"
4,5,John Hardy Women's Legends Naga Gold & Silver ...,695.0,"From our Legends Collection, the Naga was insp...",jewelery,https://fakestoreapi.com/img/71pWzhdJNwL._AC_U...,"{'count': 400, 'rate': 4.6}"


Creamos SILVER (limpieza y normalización):
Convertimo precios a float, separamos “rating” en rate y count, Convertimos id a int, ordenamos y limpiamos columnas, normalizamos categorías (minúsculas, sin espacios).

In [26]:
import pandas as pd

df_silver = pdf_bronze.copy()

# Convertir tipos
df_silver["id"] = df_silver["id"].astype(int)
df_silver["price"] = df_silver["price"].astype(float)

# Extraer rating
df_silver["rating_rate"] = df_silver["rating"].apply(lambda x: x["rate"])
df_silver["rating_count"] = df_silver["rating"].apply(lambda x: x["count"])

# Normalizar categorías
df_silver["category"] = df_silver["category"].str.lower().str.strip()

# Eliminar columna rating original
df_silver = df_silver.drop(columns=["rating"])

df_silver.head()


Unnamed: 0,id,title,price,description,category,image,rating_rate,rating_count
0,1,"Fjallraven - Foldsack No. 1 Backpack, Fits 15 ...",109.95,Your perfect pack for everyday use and walks i...,men's clothing,https://fakestoreapi.com/img/81fPKd-2AYL._AC_S...,3.9,120
1,2,Mens Casual Premium Slim Fit T-Shirts,22.3,"Slim-fitting style, contrast raglan long sleev...",men's clothing,https://fakestoreapi.com/img/71-3HjGNDUL._AC_S...,4.1,259
2,3,Mens Cotton Jacket,55.99,great outerwear jackets for Spring/Autumn/Wint...,men's clothing,https://fakestoreapi.com/img/71li-ujtlUL._AC_U...,4.7,500
3,4,Mens Casual Slim Fit,15.99,The color could be slightly different between ...,men's clothing,https://fakestoreapi.com/img/71YXzeOuslL._AC_U...,2.1,430
4,5,John Hardy Women's Legends Naga Gold & Silver ...,695.0,"From our Legends Collection, the Naga was insp...",jewelery,https://fakestoreapi.com/img/71pWzhdJNwL._AC_U...,4.6,400


Guardamos SILVER como Delta Lake (local)

In [27]:
import os
import pyarrow as pa
from deltalake import write_deltalake

# Crear carpeta local de silver
silver_local_path = "/content/fakestore_silver_local"
os.makedirs(silver_local_path, exist_ok=True)

silver_local_products = os.path.join(silver_local_path, "products")

# Convertimos a PyArrow Table
table_silver = pa.Table.from_pandas(df_silver)

# Guardamos con Delta Lake (delta-rs)
write_deltalake(silver_local_products, table_silver, mode="overwrite")

print("Silver creado en local:", silver_local_products)


Silver creado en local: /content/fakestore_silver_local/products


Copiamos SILVER a Google Drive

In [28]:
import shutil

silver_products_drive = "/content/drive/MyDrive/fakestore_datalake/silver/products"

# borrar antes si existe
if os.path.exists(silver_products_drive):
    shutil.rmtree(silver_products_drive)

# copiar delta completa
shutil.copytree(silver_local_products, silver_products_drive)

print("Silver copiado a Drive:", silver_products_drive)


Silver copiado a Drive: /content/drive/MyDrive/fakestore_datalake/silver/products


(Gold)  metricas:
1.  Precio promedio por categoría
2.  Rating promedio
3.  Total de reviews
4.  Cantidad de productos

In [29]:
gold_df = df_silver.groupby("category").agg({
    "price": "mean",
    "rating_rate": "mean",
    "rating_count": "sum",
    "id": "count"
}).reset_index()

gold_df.columns = ["category", "avg_price", "avg_rating", "total_reviews", "product_count"]

gold_df


Unnamed: 0,category,avg_price,avg_rating,total_reviews,product_count
0,electronics,332.498333,3.483333,1782,6
1,jewelery,220.995,3.35,970,4
2,men's clothing,51.0575,3.7,1309,4
3,women's clothing,26.286667,3.683333,1675,6


Guardamos Gold como Delta Lake

In [30]:
gold_local_path = "/content/fakestore_gold_local"
os.makedirs(gold_local_path, exist_ok=True)

gold_local_products = os.path.join(gold_local_path, "category_metrics")

table_gold = pa.Table.from_pandas(gold_df)

write_deltalake(gold_local_products, table_gold, mode="overwrite")

print("Gold creado en local:", gold_local_products)


Gold creado en local: /content/fakestore_gold_local/category_metrics


Copiamos a DRIVE

In [31]:
gold_products_drive = "/content/drive/MyDrive/fakestore_datalake/gold/category_metrics"

if os.path.exists(gold_products_drive):
    shutil.rmtree(gold_products_drive)

shutil.copytree(gold_local_products, gold_products_drive)

print("Gold copiado a Drive:", gold_products_drive)


Gold copiado a Drive: /content/drive/MyDrive/fakestore_datalake/gold/category_metrics
