In [1]:
from pyspark.sql import SparkSession, functions as F, types as T

spark = (
    SparkSession.builder
    .appName("ETL_Amazon_Electronics_2023")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions", "8")
    .config("spark.sql.files.maxPartitionBytes", str(128 * 1024 * 1024))  # <-- en string
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

# Rutas HDFS
landing_path = "hdfs://localhost:9000/datalake/landing/Electronics.jsonl"
bronze_2023  = "hdfs://localhost:9000/datalake/bronze/amazon/electronics/reviews/curated_2023/"
silver_2023  = "hdfs://localhost:9000/datalake/silver/amazon/electronics/reviews_clean_2023/"

25/11/17 20:16:30 WARN Utils: Your hostname, vbox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/11/17 20:16:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/17 20:16:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql import functions as F, types as T

landing_path = "hdfs://localhost:9000/datalake/landing/Electronics.jsonl"  # o .jsonl
bronze_2023_path = "hdfs://localhost:9000/datalake/bronze/amazon/electronics/reviews/curated_2023/"

# --- Schema explícito (incluye _corrupt_record) ---
schema = T.StructType([
    T.StructField("asin", T.StringType(), True),
    T.StructField("helpful_vote", T.LongType(), True),
    T.StructField("images", T.ArrayType(T.StructType([
        T.StructField("large_image_url",  T.StringType(), True),
        T.StructField("medium_image_url", T.StringType(), True),
        T.StructField("small_image_url",  T.StringType(), True),
        T.StructField("attachment_type",  T.StringType(), True),
    ])), True),
    T.StructField("parent_asin", T.StringType(), True),
    T.StructField("rating", T.DoubleType(), True),
    T.StructField("text", T.StringType(), True),
    T.StructField("timestamp", T.LongType(), True),     # s o ms
    T.StructField("title", T.StringType(), True),
    T.StructField("user_id", T.StringType(), True),
    T.StructField("verified_purchase", T.BooleanType(), True),
    T.StructField("_corrupt_record", T.StringType(), True),  # <-- clave para filtrar
])

df_raw = (
    spark.read
      .option("mode", "PERMISSIVE")
      .option("columnNameOfCorruptRecord", "_corrupt_record")
      .schema(schema)
      .json(landing_path)
)
print("raw rows:", df_raw.count())
df_raw.printSchema()



raw rows: 43886944
root
 |-- asin: string (nullable = true)
 |-- helpful_vote: long (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- large_image_url: string (nullable = true)
 |    |    |-- medium_image_url: string (nullable = true)
 |    |    |-- small_image_url: string (nullable = true)
 |    |    |-- attachment_type: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- title: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)
 |-- _corrupt_record: string (nullable = true)



                                                                                

In [3]:
# Límites de 2023 en epoch segundos (UTC)
ts_min = 1672531200   # 2023-01-01 00:00:00
ts_max = 1704067199   # 2023-12-31 23:59:59
epoch_ms_threshold = F.lit(10**12)  # si timestamp > 10^12, asumimos milisegundos

# Normalizar a segundos
ts_sec = F.when(F.col("timestamp").isNotNull() & (F.col("timestamp") > epoch_ms_threshold),
                (F.col("timestamp")/1000).cast("long")
         ).otherwise(F.col("timestamp").cast("long"))

df_bronze_2023 = (
    df_raw
      .filter(F.col("_corrupt_record").isNull())         # <--- evita filas dañadas
      .withColumn("ts_sec", ts_sec)
      .filter(F.col("ts_sec").between(ts_min, ts_max))   # <--- filtro 2023
      .withColumn("ts_review", F.to_timestamp(F.from_unixtime("ts_sec")))
      .withColumn("dt", F.date_format("ts_review", "yyyy-MM"))  # partición
      .withColumn("asin", F.upper("asin"))
)

print("bronze 2023 rows:", df_bronze_2023.count())
df_bronze_2023.select(
    F.min("ts_review").alias("min_fecha"),
    F.max("ts_review").alias("max_fecha")
).show(truncate=False)

                                                                                

bronze 2023 rows: 1912874


                                                                                

+-------------------+-------------------+
|min_fecha          |max_fecha          |
+-------------------+-------------------+
|2022-12-31 19:00:06|2023-09-13 12:26:21|
+-------------------+-------------------+



In [4]:
(df_bronze_2023
   .repartition("dt")
   .write.mode("overwrite")
   .partitionBy("dt")
   .parquet(bronze_2023_path)
)

print("Escrito en:", bronze_2023_path)



Escrito en: hdfs://localhost:9000/datalake/bronze/amazon/electronics/reviews/curated_2023/


                                                                                

In [5]:
df_b = spark.read.parquet(bronze_2023_path)
print("files:", df_b.count())
df_b.select("dt").groupBy("dt").count().orderBy("dt").show(12, False)
df_b.select(F.min("ts_review").alias("min_ts"),
            F.max("ts_review").alias("max_ts")).show(truncate=False)

files: 1912874


                                                                                

+-------+------+
|dt     |count |
+-------+------+
|2022-12|2897  |
|2023-01|469002|
|2023-02|377300|
|2023-03|423348|
|2023-04|255710|
|2023-05|147819|
|2023-06|95917 |
|2023-07|80960 |
|2023-08|56344 |
|2023-09|3577  |
+-------+------+

+-------------------+-------------------+
|min_ts             |max_ts             |
+-------------------+-------------------+
|2022-12-31 19:00:06|2023-09-13 12:26:21|
+-------------------+-------------------+



In [6]:
from pyspark.sql import functions as F
from pyspark.sql import Window as W

# Paths
bronze_2023_path = "hdfs://localhost:9000/datalake/bronze/amazon/electronics/reviews/curated_2023/"
silver_2023_path = "hdfs://localhost:9000/datalake/silver/amazon/electronics/reviews_clean_2023/"

# 1) Leer Bronze y E S T A N D A R I Z A R nombres
df_b = spark.read.parquet(bronze_2023_path)

# Mapea nombres Bronze -> estándar
df_std = (
    df_b
    .select(
        F.col("asin"),
        F.col("text").alias("reviewText"),          # <-- text -> reviewText
        F.col("title"),
        F.col("rating").alias("overall"),           # <-- rating -> overall
        F.col("helpful_vote").alias("helpful_votes"),
        F.col("user_id"),
        F.col("verified_purchase"),
        F.col("ts_review"),
        F.col("dt")
    )
)

# 2) Limpieza de texto (sin UDF)
txt = F.col("reviewText").cast("string")
txt_clean = F.lower(txt)
txt_clean = F.regexp_replace(txt_clean, r"http[s]?://\S+|www\.\S+", "")      # URLs
txt_clean = F.regexp_replace(txt_clean, r"[\p{C}\p{So}\p{Sk}]+", " ")        # control/emojis/símbolos
txt_clean = F.regexp_replace(txt_clean, r"\s+", " ")                         # espacios

df_norm = (
    df_std
    .withColumn("reviewText", txt_clean)
    .withColumn("overall", F.col("overall").cast("int"))
    .withColumn("helpful_votes", F.col("helpful_votes").cast("long"))
)

# 3) Flags y causas simples
flag_return = F.expr("""IF(reviewText RLIKE '(devolv|devolver|return(ed)?|refund(ed)?|reembolso)', 1, 0)""")
cause = (
    F.when(F.col("reviewText").rlike(r"\b(no funciona|not work(s|ed)?|does.?t work|dead|defect)"), "no_funciona")
     .when(F.col("reviewText").rlike(r"\b(no compatible|incompatible|not compatible|does.?t fit)"), "no_compatible")
     .when(F.col("reviewText").rlike(r"\b(baja calidad|poor qualit(y)?|cheap|frágil|fragil|broken)"), "baja_calidad")
     .otherwise(F.lit(None))
)

df_enriched = (
    df_norm
    .withColumn("flag_return", flag_return.cast("int"))
    .withColumn("cause", cause)
)

# 4) Deduplicación determinística
df_key = (
    df_enriched
    .withColumn(
        "review_id",
        F.md5(F.concat_ws("|",
            F.coalesce(F.col("asin"), F.lit("")),
            F.coalesce(F.col("user_id"), F.lit("")),
            F.date_format(F.col("ts_review"), "yyyy-MM-dd"),
            F.substring(F.coalesce(F.col("reviewText"), F.lit("")), 1, 256)
        ))
    )
    .withColumn("len_text", F.length("reviewText"))
)

w = W.partitionBy("review_id").orderBy(F.col("len_text").desc(), F.col("ts_review").desc())
df_dedup = (
    df_key
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)
    .drop("rn","len_text")
)

# 5) Escribir Silver (Parquet particionado por dt)
(df_dedup
 .repartition("dt")
 .write.mode("overwrite")
 .partitionBy("dt")
 .parquet(silver_2023_path)
)

print("Silver Clean escrito en:", silver_2023_path)

# 6) Validaciones rápidas
df_s = spark.read.parquet(silver_2023_path)
print("Filas Silver:", df_s.count())
df_s.groupBy("dt").count().orderBy("dt").show(12, False)
df_s.select(F.min("ts_review").alias("min_fecha"),
            F.max("ts_review").alias("max_fecha")).show(truncate=False)

                                                                                

Silver Clean escrito en: hdfs://localhost:9000/datalake/silver/amazon/electronics/reviews_clean_2023/
Filas Silver: 1884789
+-------+------+
|dt     |count |
+-------+------+
|2022-12|2865  |
|2023-01|464076|
|2023-02|373165|
|2023-03|418434|
|2023-04|251695|
|2023-05|144514|
|2023-06|93516 |
|2023-07|78673 |
|2023-08|54422 |
|2023-09|3429  |
+-------+------+

+-------------------+-------------------+
|min_fecha          |max_fecha          |
+-------------------+-------------------+
|2022-12-31 19:00:06|2023-09-13 12:26:21|
+-------------------+-------------------+



In [7]:
from pyspark.sql import functions as F

silver_path = "hdfs://localhost:9000/datalake/silver/amazon/electronics/reviews_clean_2023/"
df_s = spark.read.parquet(silver_path).cache()
print(df_s.columns)  # para que veas los nombres reales

# Resolver nombres de columnas según existan
cols = set(df_s.columns)
text_col    = "text" if "text" in cols else ("reviewText" if "reviewText" in cols else None)
rating_col  = "rating" if "rating" in cols else ("overall" if "overall" in cols else None)
helpful_col = "helpful_votes" if "helpful_votes" in cols else ("helpful_vote" if "helpful_vote" in cols else None)

assert text_col is not None,  "No encuentro la columna de texto (text/reviewText)."
assert rating_col is not None, "No encuentro la columna de rating (rating/overall)."

# Renombrar a nombres estándar que usaremos en todo el pipeline
df_std = df_s
if text_col != "text":
    df_std = df_std.withColumnRenamed(text_col, "text")
if rating_col != "rating":
    df_std = df_std.withColumnRenamed(rating_col, "rating")
if helpful_col and helpful_col != "helpful_votes":
    df_std = df_std.withColumnRenamed(helpful_col, "helpful_votes")

df_std = (
    df_std
    .withColumn("rating_int", F.col("rating").cast("int"))
    .withColumn("len_text",  F.length(F.col("text").cast("string")))
    .withColumn("len_title", F.length(F.col("title").cast("string")))
    .withColumn("vp", (F.col("verified_purchase")==True).cast("int"))
)

['asin', 'reviewText', 'title', 'overall', 'helpful_votes', 'user_id', 'verified_purchase', 'ts_review', 'flag_return', 'cause', 'review_id', 'dt']


In [8]:
df_bin = (
    df_std
    .filter(F.col("rating_int").isin(1,2,3,4,5))
    .withColumn(
        "label_bin",
        F.when(F.col("rating_int").isin(1,2), F.lit(1))
         .when(F.col("rating_int").isin(4,5), F.lit(0))
    )
    .filter(F.col("label_bin").isNotNull())
    .select("asin","dt","ts_review","text","title","rating_int","label_bin",
            "len_text","len_title","vp",
            *(["helpful_votes"] if "helpful_votes" in df_std.columns else []))
).cache()

df_bin.groupBy("label_bin").count().show()

25/11/17 20:25:16 WARN MemoryStore: Not enough space to cache rdd_98_3 in memory! (computed 60.1 MiB so far)
25/11/17 20:25:16 WARN BlockManager: Persisting block rdd_98_3 to disk instead.
25/11/17 20:25:17 WARN MemoryStore: Not enough space to cache rdd_98_2 in memory! (computed 120.3 MiB so far)
25/11/17 20:25:17 WARN BlockManager: Persisting block rdd_98_2 to disk instead.
25/11/17 20:25:19 WARN MemoryStore: Not enough space to cache rdd_98_2 in memory! (computed 120.3 MiB so far)
25/11/17 20:25:22 WARN MemoryStore: Not enough space to cache rdd_98_3 in memory! (computed 60.1 MiB so far)
25/11/17 20:25:24 WARN MemoryStore: Not enough space to cache rdd_104_0 in memory! (computed 49.4 MiB so far)
25/11/17 20:25:24 WARN BlockManager: Persisting block rdd_104_0 to disk instead.
25/11/17 20:25:25 WARN MemoryStore: Not enough space to cache rdd_104_1 in memory! (computed 94.8 MiB so far)
25/11/17 20:25:25 WARN BlockManager: Persisting block rdd_104_1 to disk instead.
25/11/17 20:25:28 WA

+---------+-------+
|label_bin|  count|
+---------+-------+
|        1| 389777|
|        0|1372263|
+---------+-------+



                                                                                

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, NGram, CountVectorizer, IDF, VectorAssembler

tok = RegexTokenizer(inputCol="text", outputCol="tokens", pattern=r"[^a-zA-Z0-9]+", minTokenLength=2, toLowercase=True)
rm  = StopWordsRemover(inputCol="tokens", outputCol="tokens_sw")
ng2 = NGram(n=2, inputCol="tokens_sw", outputCol="bi")

# Unigramas
cv_u  = CountVectorizer(inputCol="tokens_sw", outputCol="tf_u", vocabSize=100000, minDF=20)
idf_u = IDF(inputCol="tf_u", outputCol="tfidf_u")

# Bigramas
cv_b  = CountVectorizer(inputCol="bi", outputCol="tf_b", vocabSize=100000, minDF=20)
idf_b = IDF(inputCol="tf_b", outputCol="tfidf_b")

num_feats = [c for c in ["len_text","len_title","vp","helpful_votes"] if c in df_bin.columns]
assembler = VectorAssembler(inputCols=["tfidf_u","tfidf_b"] + num_feats, outputCol="features")

pipeline = Pipeline(stages=[tok, rm, ng2, cv_u, idf_u, cv_b, idf_b, assembler])

train, test = df_bin.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train)

gold_train = model.transform(train).select("asin","dt","ts_review","rating_int","label_bin","features")
gold_test  = model.transform(test ).select("asin","dt","ts_review","rating_int","label_bin","features")

# (opcional) guarda los vocabularios para interpretabilidad:
from pyspark.ml.feature import CountVectorizerModel
cv_u_model = next(s for s in model.stages if isinstance(s, CountVectorizerModel) and s.getOutputCol()=="tf_u")
cv_b_model = next(s for s in model.stages if isinstance(s, CountVectorizerModel) and s.getOutputCol()=="tf_b")

25/11/17 20:25:42 WARN MemoryStore: Not enough space to cache rdd_104_0 in memory! (computed 95.5 MiB so far)
25/11/17 20:25:42 WARN MemoryStore: Not enough space to cache rdd_104_1 in memory! (computed 94.8 MiB so far)
25/11/17 20:26:17 WARN MemoryStore: Not enough space to cache rdd_104_0 in memory! (computed 95.5 MiB so far)
25/11/17 20:26:50 WARN MemoryStore: Not enough space to cache rdd_104_1 in memory! (computed 48.9 MiB so far)
25/11/17 20:27:24 WARN MemoryStore: Not enough space to cache rdd_104_3 in memory! (computed 90.4 MiB so far)
25/11/17 20:28:08 WARN DAGScheduler: Broadcasting large task binary with size 1534.5 KiB
                                                                                

In [10]:
gold_base = "hdfs://localhost:9000/datalake/gold/amazon/electronics"
gold_feat_path = f"{gold_base}/reviews_gold_features_2023"

# particiona por dt para consultas cómodas
(gold_train
 .repartition("dt")
 .write.mode("overwrite")
 .partitionBy("dt")
 .parquet(f"{gold_feat_path}/train"))

(gold_test
 .repartition("dt")
 .write.mode("overwrite")
 .partitionBy("dt")
 .parquet(f"{gold_feat_path}/test"))

print("Gold features ->", gold_feat_path)

25/11/17 20:28:57 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/11/17 20:28:58 WARN MemoryStore: Not enough space to cache rdd_104_1 in memory! (computed 48.9 MiB so far)
25/11/17 20:30:00 WARN DAGScheduler: Broadcasting large task binary with size 14.8 MiB
25/11/17 20:30:18 WARN DAGScheduler: Broadcasting large task binary with size 10.2 MiB
25/11/17 20:30:37 WARN DAGScheduler: Broadcasting large task binary with size 14.8 MiB

Gold features -> hdfs://localhost:9000/datalake/gold/amazon/electronics/reviews_gold_features_2023


                                                                                

In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Read_Gold_2023")
    .config("spark.sql.shuffle.partitions", "8")
    .config("spark.sql.files.maxPartitionBytes", str(128*1024*1024))
    # opcional si tu VM aguanta:
    # .config("spark.driver.memory", "4g")
    .getOrCreate()
)

25/11/17 20:31:01 WARN Utils: Your hostname, vbox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/11/17 20:31:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/17 20:31:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
gold_base = "hdfs://localhost:9000/datalake/gold/amazon/electronics"
gold_feat_path = f"{gold_base}/reviews_gold_features_2023"

gold_train_path = f"{gold_feat_path}/train"
gold_test_path  = f"{gold_feat_path}/test"

df_train = spark.read.parquet(gold_train_path) \
    .select("asin","dt","rating_int","label_bin","features")
df_test  = spark.read.parquet(gold_test_path) \
    .select("asin","dt","rating_int","label_bin","features")

df_train.printSchema()
df_train.limit(5).select("asin","rating_int","label_bin").show(truncate=False)

                                                                                

root
 |-- asin: string (nullable = true)
 |-- dt: string (nullable = true)
 |-- rating_int: integer (nullable = true)
 |-- label_bin: integer (nullable = true)
 |-- features: vector (nullable = true)



                                                                                

+----------+----------+---------+
|asin      |rating_int|label_bin|
+----------+----------+---------+
|0062970704|5         |0        |
|B09Y1ZST8V|5         |0        |
|0511189877|1         |1        |
|B09Y1ZST8V|5         |0        |
|0594450233|5         |0        |
+----------+----------+---------+



In [3]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# ============================
# 1) (Opcional) Muestrear para evitar OOM
# ============================
# Si esto va bien, luego puedes subir el 0.3 a 0.5 o incluso quitar el sample.
train_small = df_train.sample(False, 0.3, seed=42)
test_small  = df_test.sample(False, 0.3, seed=42)

print("train_small:", train_small.count(), "test_small:", test_small.count())

train_small: 423561 test_small: 106088


In [4]:
# ============================
# 2) Modelo: Regresión Logística binaria
# ============================
lr = LogisticRegression(
    featuresCol="features",
    labelCol="label_bin",
    predictionCol="prediction",
    probabilityCol="probability",
    rawPredictionCol="rawPrediction",
    maxIter=20,
    regParam=0.1,
    elasticNetParam=0.0
)

lr_model = lr.fit(train_small)

# ============================
# 3) Predicciones y métricas
# ============================
pred_test = lr_model.transform(test_small)

e_auc = BinaryClassificationEvaluator(
    labelCol="label_bin",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
e_acc = MulticlassClassificationEvaluator(
    labelCol="label_bin",
    predictionCol="prediction",
    metricName="accuracy"
)
e_f1 = MulticlassClassificationEvaluator(
    labelCol="label_bin",
    predictionCol="prediction",
    metricName="f1"
)

auc = e_auc.evaluate(pred_test)
acc = e_acc.evaluate(pred_test)
f1  = e_f1.evaluate(pred_test)

print(f"AUC: {auc:.3f}  |  Accuracy: {acc:.3f}  |  F1: {f1:.3f}")

# Una tablita rápida de matriz de confusión
pred_test.groupBy("label_bin", "prediction").count().orderBy("label_bin", "prediction").show()

25/11/17 20:31:51 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
25/11/17 20:31:59 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/11/17 20:31:59 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
25/11/17 20:32:44 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
25/11/17 20:33:37 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
25/11/17 20:34:32 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
25/11/17 20:35:27 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
25/11/17 20:36:22 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
25/11/17 20:37:19 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
25/11/17 20:38:15 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
25/11/17 20:39:08 WARN DAGScheduler: Broadcasting large task binary with size 6.7 MiB
25/11/17 20:40:01 WARN DAGSche

AUC: 0.961  |  Accuracy: 0.903  |  F1: 0.897


25/11/17 20:49:43 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB

+---------+----------+-----+
|label_bin|prediction|count|
+---------+----------+-----+
|        0|       0.0|80796|
|        0|       1.0| 1555|
|        1|       0.0| 8697|
|        1|       1.0|15040|
+---------+----------+-----+



25/11/17 20:49:47 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB
                                                                                

In [5]:
model_path_hdfs = "hdfs://localhost:9000/datalake/gold/models/lr_reviews_2023_v1"

lr_model.write().overwrite().save(model_path_hdfs)
print("Modelo guardado en:", model_path_hdfs)

Modelo guardado en: hdfs://localhost:9000/datalake/gold/models/lr_reviews_2023_v1


In [6]:
from pyspark.ml.classification import LogisticRegressionModel

model_path_hdfs = "hdfs://localhost:9000/datalake/gold/models/lr_reviews_2023_v1"

lr_loaded = LogisticRegressionModel.load(model_path_hdfs)

# Prueba rápida sobre un pequeño sample de gold_test
pred_sample = lr_loaded.transform(df_test.limit(10))
pred_sample.select("asin","rating_int","label_bin","probability","prediction").show(truncate=False)

25/11/17 20:51:37 WARN DAGScheduler: Broadcasting large task binary with size 7.6 MiB


+----------+----------+---------+-----------------------------------------+----------+
|asin      |rating_int|label_bin|probability                              |prediction|
+----------+----------+---------+-----------------------------------------+----------+
|0547152469|4         |0        |[0.9425286205305041,0.057471379469495876]|0.0       |
|0823430499|4         |0        |[0.9790312279554321,0.02096877204456793] |0.0       |
|0972683275|5         |0        |[0.966049698360309,0.03395030163969104]  |0.0       |
|1426320965|5         |0        |[0.8587313589781541,0.1412686410218459]  |0.0       |
|1449410243|5         |0        |[0.9288930662075883,0.07110693379241173] |0.0       |
|1449410243|1         |1        |[0.6865070801728896,0.31349291982711036] |0.0       |
|1529427975|5         |0        |[0.8495914618664331,0.15040853813356692] |0.0       |
|1593278551|5         |0        |[0.9072620092994566,0.09273799070054345] |0.0       |
|4757567812|2         |1        |[0.8968952

In [7]:
from pyspark.sql import functions as F
from pyspark.ml.classification import LogisticRegressionModel

# --- Rutas en HDFS ---
gold_base = "hdfs://localhost:9000/datalake/gold/amazon/electronics"
gold_feat_path = f"{gold_base}/reviews_gold_features_2023"

model_path_hdfs = "hdfs://localhost:9000/datalake/gold/models/lr_reviews_2023_v1"

# 1) Cargar modelo entrenado
lr_model = LogisticRegressionModel.load(model_path_hdfs)

# 2) Cargar Gold (train + test) y unirlos
df_train = (
    spark.read.parquet(f"{gold_feat_path}/train")
         .select("asin", "dt", "rating_int", "label_bin", "features")
)

df_test = (
    spark.read.parquet(f"{gold_feat_path}/test")
         .select("asin", "dt", "rating_int", "label_bin", "features")
)

df_gold = df_train.unionByName(df_test)

print("Gold rows:", df_gold.count())
df_gold.printSchema()
df_gold.show(5, truncate=False)

                                                                                

Gold rows: 1762040
root
 |-- asin: string (nullable = true)
 |-- dt: string (nullable = true)
 |-- rating_int: integer (nullable = true)
 |-- label_bin: integer (nullable = true)
 |-- features: vector (nullable = true)



25/11/17 20:52:22 WARN DAGScheduler: Broadcasting large task binary with size 13.3 MiB
                                                                                

+----------+-------+----------+---------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|asin      |dt     |rating_int|label_bin|features      

In [9]:
from pyspark.sql import functions as F
from pyspark.ml.functions import vector_to_array

# 3) Scoring sobre TODA la Gold
pred_all = lr_model.transform(df_gold)

# Pasar de VectorUDT -> array<double>
pred_all = pred_all.withColumn("prob_array", vector_to_array("probability"))

# probability es [p_clase0, p_clase1]; usamos p_clase1 = prob de "riesgo" (1–2 estrellas)
pred_all = pred_all.withColumn("prob_neg", F.col("prob_array")[1].cast("double")) \
                   .drop("prob_array")

pred_all.select("asin", "dt", "label_bin", "prob_neg", "prediction").show(5, truncate=False)

# 4) KPI 1: riesgo mensual por ASIN
#   - n_reviews: total
#   - n_neg: # de reseñas negativas (label_bin=1)
#   - pct_neg: porcentaje de negativas
#   - avg_prob_neg: promedio de probabilidad de clase 1 en las reseñas negativas

kpi1 = (
    pred_all
    .groupBy("asin", "dt")
    .agg(
        F.count("*").alias("n_reviews"),
        F.sum(F.when(F.col("label_bin") == 1, 1).otherwise(0)).alias("n_neg"),
        F.avg(F.when(F.col("label_bin") == 1, F.col("prob_neg")).otherwise(None)).alias("avg_prob_neg")
    )
    .withColumn("pct_neg", F.col("n_neg") / F.col("n_reviews"))
)

# (Opcional) ranking dentro de cada mes, por riesgo
from pyspark.sql.window import Window
w = Window.partitionBy("dt").orderBy(F.col("avg_prob_neg").desc())
kpi1 = kpi1.withColumn("risk_rank", F.row_number().over(w))

kpi1.show(10, truncate=False)

# 5) Guardar KPI1 en HDFS (Zona Gold / results)
risk_path = "hdfs://localhost:9000/datalake/gold/results/ranking_riesgo_mensual"

(
  kpi1
  .repartition("dt")
  .write.mode("overwrite")
  .partitionBy("dt")
  .parquet(risk_path)
)

print("KPI1 guardado en:", risk_path)

25/11/17 20:59:22 WARN DAGScheduler: Broadcasting large task binary with size 14.3 MiB
                                                                                

+----------+-------+---------+--------------------+----------+
|asin      |dt     |label_bin|prob_neg            |prediction|
+----------+-------+---------+--------------------+----------+
|0062970704|2023-01|0        |0.12710706265746718 |0.0       |
|B09Y1ZST8V|2023-01|0        |0.017681150651766786|0.0       |
|0511189877|2023-01|1        |0.5003681915851966  |1.0       |
|B09Y1ZST8V|2023-01|0        |0.03836144070524661 |0.0       |
|0594450233|2023-01|0        |0.040787941720065835|0.0       |
+----------+-------+---------+--------------------+----------+
only showing top 5 rows



25/11/17 20:59:24 WARN DAGScheduler: Broadcasting large task binary with size 14.3 MiB
25/11/17 20:59:38 WARN DAGScheduler: Broadcasting large task binary with size 14.3 MiB
25/11/17 20:59:41 WARN DAGScheduler: Broadcasting large task binary with size 14.3 MiB
                                                                                

+----------+-------+---------+-----+------------------+-------+---------+
|asin      |dt     |n_reviews|n_neg|avg_prob_neg      |pct_neg|risk_rank|
+----------+-------+---------+-----+------------------+-------+---------+
|B09FH46J9P|2022-12|1        |1    |0.9999999751244969|1.0    |1        |
|B0BLGW32NR|2022-12|1        |1    |0.9998835808584299|1.0    |2        |
|B0BL2B4PNG|2022-12|1        |1    |0.9998786730860699|1.0    |3        |
|B097CNBDX2|2022-12|1        |1    |0.9998529800145185|1.0    |4        |
|B07QKXM2D3|2022-12|1        |1    |0.9997692749529943|1.0    |5        |
|B07QZF1XHJ|2022-12|1        |1    |0.9996100703065052|1.0    |6        |
|B09V2JG9G1|2022-12|1        |1    |0.9994693387092841|1.0    |7        |
|B0BFDXCRP9|2022-12|1        |1    |0.9984915743536167|1.0    |8        |
|B0B3XF97HV|2022-12|1        |1    |0.9983995069234456|1.0    |9        |
|B0836GXKKB|2022-12|1        |1    |0.9979716660263701|1.0    |10       |
+----------+-------+---------+-----+--

25/11/17 20:59:43 WARN DAGScheduler: Broadcasting large task binary with size 14.3 MiB
25/11/17 20:59:54 WARN DAGScheduler: Broadcasting large task binary with size 14.3 MiB
25/11/17 20:59:55 WARN DAGScheduler: Broadcasting large task binary with size 14.5 MiB

KPI1 guardado en: hdfs://localhost:9000/datalake/gold/results/ranking_riesgo_mensual


                                                                                

In [10]:
# 6) Leer Silver 2023 para el mapa de causas
silver_path = "hdfs://localhost:9000/datalake/silver/amazon/electronics/reviews_clean_2023/"

df_silver = spark.read.parquet(silver_path).select("asin", "dt", "cause")

# Nos quedamos solo con filas que tienen causa
df_ca = df_silver.filter(F.col("cause").isNotNull())

# 7) KPI 2: conteo de causas por ASIN y mes
# Puedes dejarlo "largo" (asin, dt, cause, count)...
kpi2_long = (
    df_ca.groupBy("asin", "dt", "cause")
         .count()
)

# ...o pivotear para tener columnas por causa (no_funciona, no_compatible, baja_calidad)
kpi2 = (
    kpi2_long
    .groupBy("asin", "dt")
    .pivot("cause", ["no_funciona", "no_compatible", "baja_calidad"])
    .sum("count")
    .fillna(0)
)

kpi2.show(10, truncate=False)

causas_path = "hdfs://localhost:9000/datalake/gold/results/mapa_causas_mensual"

(
  kpi2
  .repartition("dt")
  .write.mode("overwrite")
  .partitionBy("dt")
  .parquet(causas_path)
)

print("KPI2 guardado en:", causas_path)

+----------+-------+-----------+-------------+------------+
|asin      |dt     |no_funciona|no_compatible|baja_calidad|
+----------+-------+-----------+-------------+------------+
|B0B3SB76HB|2023-01|0          |0            |1           |
|B07QGVMCJG|2023-01|0          |1            |4           |
|B098FKXT8L|2023-01|7          |0            |4           |
|B0BGKD69FH|2023-01|0          |0            |1           |
|B09SNQ6FQ2|2023-01|1          |0            |1           |
|B0BGJ2NNJ2|2023-01|2          |0            |5           |
|B08X4Y6CGH|2023-01|1          |0            |1           |
|B09WTNWC29|2023-01|3          |0            |0           |
|B09KLB9JMN|2023-01|1          |0            |0           |
|B09JFTS3PF|2023-01|3          |0            |2           |
+----------+-------+-----------+-------------+------------+
only showing top 10 rows



[Stage 95:>                                                         (0 + 1) / 1]

KPI2 guardado en: hdfs://localhost:9000/datalake/gold/results/mapa_causas_mensual


                                                                                