# Creación de tablas

 Este notebook crea 2 tablas dónde se van a guardar los datos respectivamente en la silver y gold layer. En la bronze layer no se crea tabla ya que los datos se encuentran en formato raw.


In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS silver;

-- Crear la tabla ProductInfo en la capa Silver
CREATE TABLE IF NOT EXISTS silver.ProductInfo (
    product_id STRING,
    product_name STRING,
    category STRING,
    actual_price DOUBLE,
    discounted_price DOUBLE,
    discount_percentage DOUBLE,
    img_link STRING,
    product_link STRING,
    about_product STRING,
    ingestion_timestamp TIMESTAMP)
USING DELTA
PARTITIONED BY (category)
LOCATION '';

-- Crear la tabla ProductReviews en la capa Silver
CREATE TABLE IF NOT EXISTS silver.ProductReviews (
    review_id STRING,
    product_id STRING,
    user_id STRING,
    user_name STRING,
    rating DOUBLE,
    rating_count INT,
    review_title STRING,
    review_content STRING,
    review_timestamp TIMESTAMP)
USING DELTA
PARTITIONED BY (product_id)
LOCATION '';

CREATE SCHEMA IF NOT EXISTS gold;
-- Crear la tabla ProductSummary en la capa Gold
CREATE TABLE IF NOT EXISTS gold.ProductSummary (
    product_id STRING,
    product_name STRING,
    category STRING,
    average_rating DOUBLE,
    total_reviews INT,
    total_rating_count INT,
    most_recent_review TIMESTAMP)
USING DELTA
PARTITIONED BY (category)
LOCATION '';

-- Crear la tabla UserActivity en la capa Gold
CREATE TABLE IF NOT EXISTS gold.UserActivity (
    user_id STRING,
    user_name STRING,
    total_reviews_written INT,
    average_rating_given DOUBLE,
    most_frequent_category STRING)
USING DELTA
LOCATION '';


In [0]:
# Importar la biblioteca de funciones de PySpark
from pyspark.sql import functions as F

# Cargar el dataset desde la capa de bronce
bronze_path = "/mnt/bronze/amazon_product_data.csv"
df_bronze = spark.read.csv(bronze_path, header=True, inferSchema=False)

# Transformaciones para la tabla "product_info"
df_product_info = df_bronze.select(
    F.col("product_id").cast("string"),
    F.col("product_name").cast("string"),
    F.col("category").cast("string"),
    F.col("discounted_price").cast("double"),
    F.col("actual_price").cast("double"),
    F.col("discount_percentage").cast("double"),
    F.col("rating").cast("double"),
    F.col("rating_count").cast("int"),
    F.col("about_product").cast("string"),
    F.col("img_link").cast("string"),
    F.col("product_link").cast("string")
)

# Transformaciones para la tabla "product_reviews"
df_product_reviews = df_bronze.select(
    F.col("product_id").cast("string"),
    F.col("user_id").cast("string"),
    F.col("user_name").cast("string"),
    F.col("review_id").cast("string"),
    F.col("review_title").cast("string"),
    F.col("review_content").cast("string")
)

# Definir rutas donde se almacenarán las tablas en la capa de plata
silver_product_info_path = "/mnt/silver/product_info"
silver_product_reviews_path = "/mnt/silver/product_reviews"

# Guardar los DataFrames transformados como tablas Delta en la capa de plata
df_product_info.write.format("delta").mode("overwrite").option("path", silver_product_info_path).saveAsTable("silver.product_info")
df_product_reviews.write.format("delta").mode("overwrite").option("path", silver_product_reviews_path).saveAsTable("silver.product_reviews")
