In [None]:
import os
from pyspark.sql import SparkSession
print(os.environ.get('SPARK_HOME'))
os.environ['SPARK_HOME'] = '/usr/local/spark'
print(os.environ.get('SPARK_HOME'))

/opt/bitnami/spark
/usr/local/spark


In [2]:
!pip install --upgrade spark-nlp

Collecting spark-nlp
  Downloading spark_nlp-5.5.3-py2.py3-none-any.whl (635 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m635.7/635.7 kB[0m [31m12.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: spark-nlp
Successfully installed spark-nlp-5.5.3


## INIT AND DEFINE SCHEMA

In [None]:
def stop_spark():
    try:
        from pyspark import SparkContext
        sc = SparkContext.getOrCreate()
        sc.stop()
        print("Stopped existing SparkContext")
    except Exception as e:
        print(f"No existing SparkContext to stop or error occurred: {e}")

In [None]:
# First, stop any existing SparkContext        
def init_spark():
    stop_spark()
    spark_nlp_jar_path = "/usr/local/spark/jars/spark-nlp_2.12-5.3.3.jar" 
    
    from pyspark.sql import SparkSession
    # Create a Spark session with explicit cluster configuration
    spark = SparkSession.builder \
        .appName("Explicit Spark Job Test") \
        .master("yarn") \
        .config("spark.driver.host", "jupyter") \
        .config("spark.submit.deployMode", "client") \
        .config("spark.executor.instances", "3") \
        .config("spark.executor.memory", "4g") \
        .config("spark.executor.cores", "4") \
        .config("spark.driver.memory", "2g") \
        .config("spark.sql.warehouse.dir", "hdfs://namenode:9000/user/hive/warehouse") \
        .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.caseSensitive", "false") \
        .enableHiveSupport() \
        .getOrCreate()

    print(f"Spark version: {spark.version}")
    print(f"Spark UI: {spark.sparkContext.uiWebUrl}")
    
    return spark

spark = init_spark()


Stopped existing SparkContext


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructType, StructField, StringType, FloatType,
                               ArrayType, LongType, BooleanType, IntegerType,
                               TimestampType)
from pyspark.sql.functions import from_unixtime, year, month, col

HDFS_RAW_FILE = "hdfs:///data/raw/amazon_reviews/Kindle_Store.jsonl"
HDFS_PROCESSED_DIR = "hdfs:///data/processed/amazon_reviews/kindle_store" # Thư mục chứa Parquet partitions
HIVE_DATABASE_NAME = "amazon_data" # Tên database Hive (tạo nếu chưa có)
HIVE_TABLE_NAME = "kindle_reviews_processed"



# --- 1. Define Schema based on the image ---
schema = StructType([
    StructField("rating", FloatType(), True),
    StructField("title", StringType(), True),
    StructField("text", StringType(), True),
    StructField("images", ArrayType(StringType()), True), 
    StructField("asin", StringType(), True),
    StructField("parent_asin", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("timestamp", LongType(), True),
    StructField("verified_purchase", BooleanType(), True),
    StructField("helpful_vote", IntegerType(), True)
])
print("Schema Defined.")

try:
    df_raw = spark.read.schema(schema).json(HDFS_RAW_FILE)
    print(f"Successfully started reading from: {HDFS_RAW_FILE}")
    df_raw.printSchema() 
    df_raw.cache()
    print(f"Raw data count: {df_raw.count()}") 
except Exception as e:
    print(f"Error reading raw data from {HDFS_RAW_FILE}: {e}")
    spark.stop()
    exit(1)



Schema Defined.
Successfully started reading from: hdfs:///data/raw/amazon_reviews/Kindle_Store.jsonl
root
 |-- rating: float (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- asin: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- verified_purchase: boolean (nullable = true)
 |-- helpful_vote: integer (nullable = true)

Raw data count: 25577616


## DATA PROCESSING

In [None]:
from pyspark.sql.functions import from_unixtime, year, month, dayofmonth, date_format, coalesce, lit, trim, when, array

print("============= CLEANING ============\n")
df_cleaned = df_raw \
    .filter(col("text").isNotNull() & (trim(col("text")) != "")) \
    .filter(col("rating").isNotNull()) \
    .filter(col("asin").isNotNull() & (trim(col("asin")) != "")) \
    .filter(col("user_id").isNotNull() & (trim(col("user_id")) != "")) \
    .withColumn("title", trim(coalesce(col("title"), lit("[no title]")))) \
    .withColumn("text", trim(col("text"))) \
    .withColumn("asin", trim(col("asin"))) \
    .withColumn("parent_asin", trim(col("parent_asin"))) \
    .withColumn("user_id", trim(col("user_id"))) \
    .withColumn("helpful_vote", coalesce(col("helpful_vote"), lit(0)).cast(IntegerType())) \
    .withColumn("images", coalesce(col("images"), array().cast(ArrayType(StringType())))) \
    .withColumn("review_time", from_unixtime(col("timestamp") / 1000).cast(TimestampType())) \
    .filter(col("review_time").isNotNull()) 

df_cleaned = df_cleaned.withColumn("year", year(col("review_time"))) \
                     .withColumn("month", month(col("review_time"))) \
                     .withColumn("day", dayofmonth(col("review_time")))\
                     .withColumn("date_str", date_format(col("review_time"), "yyyy-MM-dd")) \
                     .filter(col("year").isNotNull() & (col("year") >= 1990) & (col("year") <= 2025))

df_cleaned.select("timestamp", "review_time", "year", "month", "date_str").show(20, False)


+-------------+-------------------+----+-----+----------+
|timestamp    |review_time        |year|month|date_str  |
+-------------+-------------------+----+-----+----------+
|1427541413000|2015-03-28 11:16:53|2015|3    |2015-03-28|
|1504226946142|2017-09-01 00:49:06|2017|9    |2017-09-01|
|1644883955777|2022-02-15 00:12:35|2022|2    |2022-02-15|
|1363027885000|2013-03-11 18:51:25|2013|3    |2013-03-11|
|1637557512064|2021-11-22 05:05:12|2021|11   |2021-11-22|
|1637134078567|2021-11-17 07:27:58|2021|11   |2021-11-17|
|1632291278732|2021-09-22 06:14:38|2021|9    |2021-09-22|
|1614145710980|2021-02-24 05:48:30|2021|2    |2021-02-24|
|1599452688091|2020-09-07 04:24:48|2020|9    |2020-09-07|
|1574812541555|2019-11-26 23:55:41|2019|11   |2019-11-26|
|1568214752013|2019-09-11 15:12:32|2019|9    |2019-09-11|
|1567293346345|2019-08-31 23:15:46|2019|8    |2019-08-31|
|1566774264228|2019-08-25 23:04:24|2019|8    |2019-08-25|
|1558889036351|2019-05-26 16:43:56|2019|5    |2019-05-26|
|155833333716

In [None]:
from pyspark.sql.functions import length


print("=========  VALIDATION ==========\n")

df_validated = df_cleaned.withColumn("validation_error", lit(None).cast(StringType()))

# Áp dụng các quy tắc validation
df_validated = df_validated.withColumn("validation_error",
    when((col("rating") < 1.0) | (col("rating") > 5.0), "Invalid Rating")
    .otherwise(col("validation_error"))) 

df_validated = df_validated.withColumn("validation_error",
    when(col("helpful_vote") < 0, "Negative Helpful Vote")
    .otherwise(col("validation_error")))


df_validated = df_validated.withColumn("validation_error",
    when(length(col("asin")) != 10, "Invalid ASIN Length")
    .otherwise(col("validation_error")))


from pyspark.sql.functions import current_timestamp
df_validated = df_validated.withColumn("validation_error",
    when(col("review_time") > current_timestamp(), "Future Timestamp")
    .otherwise(col("validation_error")))


df_valid = df_validated.filter(col("validation_error").isNull()).drop("validation_error")
df_invalid = df_validated.filter(col("validation_error").isNotNull())


# Bảng Silver phụ
HDFS_VALIDATION_FAILURES_DIR = "hdfs:///data/processed/amazon_reviews/kindle_store_validation_failures"
print(f"Writing validation failures to: {HDFS_VALIDATION_FAILURES_DIR}")
df_invalid.write \
    .partitionBy("year", "month", "validation_error") \
    .mode("overwrite") \
    .parquet(HDFS_VALIDATION_FAILURES_DIR)

ImportError: cannot import name 'deprecated' from 'typing_extensions' (/opt/conda/lib/python3.10/site-packages/typing_extensions.py)

In [None]:
from pyspark.sql.functions import lower

print("============== STANRDALIZATION ===========\n")
df_standardized = df_valid \
    .withColumn("title_processed", lower(col("title"))) \
    .withColumn("text_processed", lower(col("text")))
    
# df_standardized.count()


Writing validation failures to: hdfs:///data/processed/amazon_reviews/kindle_store_validation_failures


In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("user_id", "asin").orderBy(col("review_time").desc())

df_deduplicated = df_standardized.withColumn("rn", row_number().over(window_spec)) \
                               .filter(col("rn") == 1) \
                               .drop("rn")
    
# print(f"Deduplication complete. Count after deduplication: {df_deduplicated.count()}")




25576202

In [None]:
df_final_processed = df_deduplicated

In [None]:
HDFS_PROCESSED_BASE = "hdfs:///data/processed/amazon_reviews"

# Hive Database Names
HIVE_PROCESSED_DB = "processed"

HDFS_PROCESSED_MAIN_DIR = f"{HDFS_PROCESSED_BASE}/kindle_store_main"
HIVE_PROCESSED_MAIN_TABLE = "kindle_reviews_main"

# Validation Failures Table 
HDFS_VALIDATION_FAILURES_DIR = f"{HDFS_PROCESSED_BASE}/kindle_store_validation_failures"
HIVE_VALIDATION_FAILURES_TABLE = "kindle_reviews_validation_failures"

# Product Dimension VIEW 
HIVE_PRODUCTS_DIM_VIEW = "kindle_products_dim"

# User Dimension VIEW 
HIVE_USERS_DIM_VIEW = "kindle_users_dim"

Deduplication complete. Count after deduplication: 25300905


In [None]:
df_to_save = df_final_processed.select(
    "rating", "title_processed", "text_processed", "images", "asin",
    "parent_asin", "user_id", "verified_purchase", "helpful_vote",
    "review_time", "year", "month", "day", "date_str"
)

print(f"Writing main processed data to: {HDFS_PROCESSED_MAIN_DIR}")
df_to_save.write \
    .partitionBy("year", "month") \
    .mode("overwrite") \
    .parquet(HDFS_PROCESSED_MAIN_DIR)

In [None]:
print("\n--- Creating/Updating Hive Tables and VIEWS ---")

spark.sql(f"CREATE DATABASE IF NOT EXISTS {HIVE_PROCESSED_DB}")
print(f"Ensured Hive database '{HIVE_PROCESSED_DB}' exists.")

# Create Hive External TABLE for Main Processed Data
print(f"Creating Hive TABLE: {HIVE_PROCESSED_DB}.{HIVE_PROCESSED_MAIN_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {HIVE_PROCESSED_DB}.{HIVE_PROCESSED_MAIN_TABLE}")
create_main_table_sql = f"""
CREATE EXTERNAL TABLE {HIVE_PROCESSED_DB}.{HIVE_PROCESSED_MAIN_TABLE} (
    rating FLOAT,
    title_processed STRING,
    text_processed STRING,
    images ARRAY<STRING>,
    asin STRING,
    parent_asin STRING,
    user_id STRING,
    verified_purchase BOOLEAN,
    helpful_vote INT,
    review_time TIMESTAMP,
    date_str STRING
)
PARTITIONED BY (year INT, month INT)
STORED AS PARQUET
LOCATION '{HDFS_PROCESSED_MAIN_DIR}'
TBLPROPERTIES ('parquet.compression'='SNAPPY')
"""
try:
    spark.sql(create_main_table_sql)
    print(f"Running MSCK REPAIR TABLE for {HIVE_PROCESSED_MAIN_TABLE}...")
    spark.sql(f"MSCK REPAIR TABLE {HIVE_PROCESSED_DB}.{HIVE_PROCESSED_MAIN_TABLE}")
    print(f"Successfully created and repaired main processed table.")
except Exception as e:
    print(f"ERROR creating/repairing main processed table: {e}")


# Create Hive External TABLE for Validation Failures (vẫn giữ nguyên)
print(f"Creating Hive TABLE: {HIVE_PROCESSED_DB}.{HIVE_VALIDATION_FAILURES_TABLE}")
spark.sql(f"DROP TABLE IF EXISTS {HIVE_PROCESSED_DB}.{HIVE_VALIDATION_FAILURES_TABLE}")
create_failures_table_sql = f"""
CREATE EXTERNAL TABLE {HIVE_PROCESSED_DB}.{HIVE_VALIDATION_FAILURES_TABLE} (
    rating FLOAT,
    title STRING,
    text STRING,
    asin STRING,
    parent_asin STRING,
    user_id STRING,
    verified_purchase BOOLEAN,
    helpful_vote INT,
    timestamp LONG,
    review_time TIMESTAMP,
    date_str STRING
)
PARTITIONED BY (year INT, month INT, validation_error STRING)
STORED AS PARQUET
LOCATION '{HDFS_VALIDATION_FAILURES_DIR}'
TBLPROPERTIES ('parquet.compression'='SNAPPY')
"""
try:
    spark.sql(create_failures_table_sql)
    print(f"Running MSCK REPAIR TABLE for {HIVE_VALIDATION_FAILURES_TABLE}...")
    spark.sql(f"MSCK REPAIR TABLE {HIVE_PROCESSED_DB}.{HIVE_VALIDATION_FAILURES_TABLE}")
    print(f"Successfully created and repaired validation failures table.")
except Exception as e:
    print(f"ERROR creating/repairing validation failures table: {e}")


# Create Hive VIEW for Products Dimension
print(f"Creating Hive VIEW: {HIVE_PROCESSED_DB}.{HIVE_PRODUCTS_DIM_VIEW}")
spark.sql(f"DROP VIEW IF EXISTS {HIVE_PROCESSED_DB}.{HIVE_PRODUCTS_DIM_VIEW}")
create_products_view_sql = f"""
CREATE VIEW {HIVE_PROCESSED_DB}.{HIVE_PRODUCTS_DIM_VIEW} AS
SELECT
    asin,
    parent_asin,
    MIN(review_time) AS first_review_time,
    MAX(review_time) AS last_review_time,
    AVG(rating) AS avg_rating,
    COUNT(*) AS total_reviews,
    SUM(helpful_vote) AS total_helpful_votes_received
FROM {HIVE_PROCESSED_DB}.{HIVE_PROCESSED_MAIN_TABLE}
GROUP BY asin, parent_asin
"""
try:
    spark.sql(create_products_view_sql)
    print(f"Successfully created products dimension VIEW.")
except Exception as e:
    print(f"ERROR creating products dimension VIEW: {e}")


# Create Hive VIEW for Users Dimension
print(f"Creating Hive VIEW: {HIVE_PROCESSED_DB}.{HIVE_USERS_DIM_VIEW}")
spark.sql(f"DROP VIEW IF EXISTS {HIVE_PROCESSED_DB}.{HIVE_USERS_DIM_VIEW}")
create_users_view_sql = f"""
CREATE VIEW {HIVE_PROCESSED_DB}.{HIVE_USERS_DIM_VIEW} AS
SELECT
    user_id,
    MIN(review_time) AS first_review_time,
    MAX(review_time) AS last_review_time,
    COUNT(*) AS total_reviews_written,
    AVG(rating) AS avg_rating_given,
    SUM(helpful_vote) AS total_helpful_votes_on_written_reviews
FROM {HIVE_PROCESSED_DB}.{HIVE_PROCESSED_MAIN_TABLE}
GROUP BY user_id
"""
try:
    spark.sql(create_users_view_sql)
    print(f"Successfully created users dimension VIEW.")
except Exception as e:
    print(f"ERROR creating users dimension VIEW: {e}")


print("\n--- Finished Simplified Storage and Hive Table/View Creation (Phase 3) ---")


In [None]:
spark.stop()

Writing main processed data to: hdfs:///data/processed/amazon_reviews/kindle_store_main


## ENRICH AND JOIN WITH METADATA

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (StringType, ArrayType, StructType,
                               StructField, FloatType, IntegerType, LongType,
                               BooleanType, TimestampType, MapType)
from pyspark.sql.window import Window
from pyspark.storagelevel import StorageLevel
import time
import traceback


HIVE_PROCESSED_DB = "processed"
HIVE_CURATED_DB = "curated"

# Input Tables/Files
HIVE_PROCESSED_TABLE = "kindle_reviews_main"
HDFS_RAW_METADATA_PATH = "hdfs:///data/raw/amazon_reviews/meta_Kindle_Store.jsonl"


# Base Table
HDFS_CURATED_BASE_DIR = "hdfs:///data/curated/amazon_reviews/kindle_store_base"
HIVE_CURATED_BASE_TABLE = "kindle_reviews_base"

# Materialized Summary Tables (MỚI)
HDFS_PRODUCT_MONTHLY_SUMMARY_DIR = "hdfs:///data/curated/amazon_reviews/product_monthly_summary_materialized"
HIVE_PRODUCT_MONTHLY_SUMMARY_TABLE = "product_monthly_summary_materialized"

HDFS_PARENT_PRODUCT_OVERALL_SUMMARY_DIR = "hdfs:///data/curated/amazon_reviews/parent_product_overall_summary_materialized"
HIVE_PARENT_PRODUCT_OVERALL_TABLE = "parent_product_overall_summary_materialized"

HDFS_PRODUCT_OVERALL_SUMMARY_DIR = "hdfs:///data/curated/amazon_reviews/product_overall_summary_materialized" 
HIVE_PRODUCT_OVERALL_SUMMARY_TABLE = "product_overall_summary_materialized" 

HDFS_REVIEW_STATS_MONTHLY_DIR = "hdfs:///data/curated/amazon_reviews/review_stats_monthly_materialized" 
HIVE_REVIEW_STATS_MONTHLY_TABLE = "review_stats_monthly_materialized" 

HIVE_MONTHLY_STATS_VIEW = "review_stats_monthly" 

# VIEWs (Sẽ bị xóa và thay bằng TABLE)
HIVE_PRODUCT_MONTHLY_VIEW_OLD = "product_monthly_summary"
HIVE_PARENT_PRODUCT_OVERALL_VIEW_OLD = "parent_product_overall_summary"
HIVE_PRODUCT_OVERALL_VIEW_OLD = "product_overall_summary"
HIVE_MONTHLY_STATS_VIEW_OLD = "review_stats_monthly" 

# Default Values
DEFAULT_TITLE = "[Unknown Parent Title]"
DEFAULT_MAIN_CATEGORY = "[Unknown Main Category]"
DEFAULT_CATEGORY = "[Unknown Parent Category]"


--- Creating/Updating Hive Tables and VIEWS ---
Ensured Hive database 'processed' exists.
Creating Hive TABLE: processed.kindle_reviews_main
Running MSCK REPAIR TABLE for kindle_reviews_main...
Successfully created and repaired main processed table.
Creating Hive TABLE: processed.kindle_reviews_validation_failures
Running MSCK REPAIR TABLE for kindle_reviews_validation_failures...
Successfully created and repaired validation failures table.
Creating Hive VIEW: processed.kindle_products_dim
Successfully created products dimension VIEW.
Creating Hive VIEW: processed.kindle_users_dim
Successfully created users dimension VIEW.

--- Finished Simplified Storage and Hive Table/View Creation (Phase 3) ---


In [None]:
# Định nghĩa schema của bảng metadata

metadata_schema = StructType([
    StructField("main_category", StringType(), True),
    StructField("title", StringType(), True),
    StructField("average_rating", FloatType(), True),
    StructField("rating_number", IntegerType(), True),
    StructField("features", ArrayType(StringType()), True),
    StructField("description", ArrayType(StringType()), True),
    StructField("price", FloatType(), True),
    StructField("images", ArrayType(MapType(StringType(), StringType())), True),
    StructField("videos", ArrayType(MapType(StringType(), StringType())), True),
    StructField("store", StringType(), True),
    StructField("categories", ArrayType(StringType()), True),
    StructField("details", MapType(StringType(), StringType()), True),
    StructField("parent_asin", StringType(), True),
    StructField("bought_together", ArrayType(StringType()), True)
])

def process_metadata(spark: SparkSession) -> 'pyspark.sql.DataFrame':
    """Đọc, xử lý và cache dữ liệu metadata."""
    print(f"--- [Metadata Processing] Starting ---")
    print(f"Reading metadata from: {HDFS_RAW_METADATA_PATH}")
    start_meta = time.time()
    try:

        df_meta_raw = spark.read.schema(metadata_schema).json(HDFS_RAW_METADATA_PATH)
        # print(f"Read {df_meta_raw.count()} raw metadata records.")

        df_meta_filtered = df_meta_raw.filter(F.col("parent_asin").isNotNull() & (F.trim(F.col("parent_asin")) != ""))

        # Lấy phần tử cuối của Array<String> ***
        df_meta_processed = df_meta_filtered.withColumn(
            "parent_category_extracted",
            F.when(F.size(F.col("categories")) > 0, 
                   F.element_at(F.col("categories"), -1)) 
             .otherwise(None)
        )

        # Chọn final columns, clean, handle nulls, drop duplicates
        df_metadata = df_meta_processed.select(
                F.trim(F.col("parent_asin")).alias("parent_asin"),
                F.trim(F.coalesce(F.col("title"), F.lit(DEFAULT_TITLE))).alias("parent_title"),
                F.trim(F.coalesce(F.col("main_category"), F.lit(DEFAULT_MAIN_CATEGORY))).alias("main_category"),
                F.trim(F.coalesce(F.col("parent_category_extracted"), F.lit(DEFAULT_CATEGORY))).alias("parent_category")
            ).dropDuplicates(["parent_asin"])

        # Cache the processed metadata
        df_metadata.persist(StorageLevel.MEMORY_AND_DISK)
        meta_count = df_metadata.count() # Trigger cache population
        print(f"Finished processing metadata. Count after deduplication: {meta_count}")
        print(f"Metadata processing took {time.time() - start_meta:.2f} seconds.")
        print(f"--- [Metadata Processing] Finished ---")
        return df_metadata
    
    except Exception as e:
        print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        print(f"!!! ERROR during Metadata Processing: {e}")
        print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        traceback.print_exc()
        raise e 

In [None]:
stop_spark()
spark = None
start_time_job = time.time()
print("==========================================================")
print("=== Create Base Curated & Views (with Metadata) ===")
print("==========================================================")

spark = SparkSession.builder \
    .appName("Create Base Curated Data (with Metadata) and Views") \
    .master("yarn") \
    .config("spark.driver.host", "jupyter") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.executor.instances", "3") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.warehouse.dir", "hdfs://namenode:9000/user/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .enableHiveSupport() \
    .getOrCreate()

print(f"Spark Session created. UI: {spark.sparkContext.uiWebUrl}")

# df_metadata = process_metadata(spark)

In [None]:
# 2. Read Silver Table  (Reviews)
print(f"\n--- [Reviews Reading] Starting ---")
print(f"Reading data from Silver table: {HIVE_PROCESSED_DB}.{HIVE_PROCESSED_TABLE}")
start_read_silver = time.time()
df_silver = spark.sql(f"SELECT * FROM {HIVE_PROCESSED_DB}.{HIVE_PROCESSED_TABLE}")

print(f"Finished reading Silver data in {time.time() - start_read_silver:.2f} seconds.")
print(f"--- [Reviews Reading] Finished ---")

# 3. Join Silver data with Metadata (Join bằng parent_asin)
print(f"\n--- [Joining Reviews and Metadata] Starting ---")
start_join = time.time()
df_joined = df_silver.join(

    F.broadcast(df_metadata),
    on="parent_asin",
    how="left"
)
print(f"Finished joining data in {time.time() - start_join:.2f} seconds.")
# print(f"Joined DataFrame count: {joined_count}")
print(f"--- [Joining Reviews and Metadata] Finished ---")


# 4. Chọn lọc cột cuối cùng cho bảng Base Curated
print(f"\n--- [Selecting Final Columns] Starting ---")

df_base_curated = df_joined.select(
    F.col("parent_asin"), F.col("asin"), F.col("user_id"), F.col("review_time"),
    F.col("year"), F.col("month"), F.col("date_str"), F.col("rating"), F.col("helpful_vote"),
    F.col("verified_purchase"), F.col("text_processed"),
    F.coalesce(F.col("parent_title"), F.lit(DEFAULT_TITLE)).alias("parent_title"),
    F.coalesce(F.col("main_category"), F.lit(DEFAULT_MAIN_CATEGORY)).alias("main_category"),
    F.coalesce(F.col("parent_category"), F.lit(DEFAULT_CATEGORY)).alias("parent_category"),
    F.col("images")
)


df_base_curated.persist(StorageLevel.MEMORY_AND_DISK) 
base_count = df_base_curated.count() # Trigger join và cache
print(f"Base Curated DataFrame selected and cached. Approx Count: {base_count}")
print(f"Schema for Base Curated Table:")
df_base_curated.printSchema()
print(f"--- [Selecting Final Columns for Base] Finished ---")


DataFrame[]

In [None]:
# 5. Ghi bảng Base Curated vào HDFS (ACTION)
print(f"\n--- [Writing Base Curated Table] Starting ---")
print(f"Writing Base Curated data (with metadata) to: {HDFS_CURATED_BASE_DIR}")
start_write = time.time()
(df_base_curated.write
    .partitionBy("year", "month")
    .mode("overwrite")
    .parquet(HDFS_CURATED_BASE_DIR)
)
print(f"Successfully wrote Base Curated Parquet data in {time.time() - start_write:.2f} seconds.")
print(f"--- [Writing Base Curated Table] Finished ---")

# 6. Tạo/Cập nhật bảng Hive External cho Base Curated Data
print(f"\n--- [Creating/Repairing Hive Base Table] Starting ---")
start_hive_base = time.time()
spark.sql(f"CREATE DATABASE IF NOT EXISTS {HIVE_CURATED_DB}")
spark.sql(f"DROP TABLE IF EXISTS {HIVE_CURATED_DB}.{HIVE_CURATED_BASE_TABLE}")
create_base_table_sql_updated = f"""
    CREATE EXTERNAL TABLE {HIVE_CURATED_DB}.{HIVE_CURATED_BASE_TABLE} (
        parent_asin STRING, asin STRING, user_id STRING, review_time TIMESTAMP,
        date_str STRING, rating FLOAT, helpful_vote INT, verified_purchase BOOLEAN,
        text_processed STRING, parent_title STRING, main_category STRING,
        parent_category STRING, images ARRAY<STRING>
    ) PARTITIONED BY (year INT, month INT) STORED AS PARQUET
    LOCATION '{HDFS_CURATED_BASE_DIR}' 
    TBLPROPERTIES ('parquet.compression'='SNAPPY')
"""

spark.sql(create_base_table_sql_updated)
spark.sql(f"MSCK REPAIR TABLE {HIVE_CURATED_DB}.{HIVE_CURATED_BASE_TABLE}")
print(f"Successfully created/repaired Base Curated table in {time.time() - start_hive_base:.2f} seconds.")
print(f"--- [Creating/Repairing Hive Base Table] Finished ---")

134

In [None]:
# 7. Tính toán Product Monthly Summary
print(f"\n--- [Calculating Product Monthly Summary] Starting ---")
start_calc_monthly = time.time()
df_product_monthly_summary = df_base_curated.groupBy("year", "month", "parent_asin", "asin").agg(
    F.count("*").alias("total_reviews"),
    F.avg("rating").alias("avg_rating"),
    F.sum("helpful_vote").alias("total_helpful_votes"),
    F.sum(F.when(F.col("verified_purchase") == True, 1).otherwise(0)).alias("total_verified_reviews")
)
print(f"Calculated Product Monthly Summary in {time.time() - start_calc_monthly:.2f} seconds.")
print(f"--- [Calculating Product Monthly Summary] Finished ---")
        
# 8. Ghi Product Monthly Summary vào HDFS
print(f"\n--- [Writing Product Monthly Summary Table] Starting ---")
start_write_monthly = time.time()
(df_product_monthly_summary.write
    .partitionBy("year") 
    .mode("overwrite")
    .parquet(HDFS_PRODUCT_MONTHLY_SUMMARY_DIR)
)
print(f"Successfully wrote Product Monthly Summary Parquet data in {time.time() - start_write_monthly:.2f} seconds.")
print(f"--- [Writing Product Monthly Summary Table] Finished ---")

# 9. Tạo Hive Table cho Product Monthly Summary
print(f"\n--- [Creating Hive Product Monthly Summary Table (Partitioned by Year)] Starting ---")
start_hive_monthly = time.time()
spark.sql(f"DROP TABLE IF EXISTS {HIVE_CURATED_DB}.{HIVE_PRODUCT_MONTHLY_SUMMARY_TABLE}")
spark.sql(f"DROP VIEW IF EXISTS {HIVE_CURATED_DB}.{HIVE_PRODUCT_MONTHLY_VIEW_OLD}")

create_monthly_summary_table_sql = f"""
    CREATE EXTERNAL TABLE {HIVE_CURATED_DB}.{HIVE_PRODUCT_MONTHLY_SUMMARY_TABLE} (
        month INT, parent_asin STRING, asin STRING, total_reviews BIGINT,
        avg_rating DOUBLE, total_helpful_votes BIGINT, total_verified_reviews BIGINT
    )
    PARTITIONED BY (year INT)
    STORED AS PARQUET LOCATION '{HDFS_PRODUCT_MONTHLY_SUMMARY_DIR}'
    TBLPROPERTIES ('parquet.compression'='SNAPPY')
"""
spark.sql(create_monthly_summary_table_sql)

print(f"Table created. Running MSCK REPAIR TABLE for {HIVE_PRODUCT_MONTHLY_SUMMARY_TABLE}...")
spark.sql(f"MSCK REPAIR TABLE {HIVE_CURATED_DB}.{HIVE_PRODUCT_MONTHLY_SUMMARY_TABLE}")
print(f"Successfully created/repaired Product Monthly Summary table in {time.time() - start_hive_monthly:.2f} seconds.")
print(f"--- [Creating Hive Product Monthly Summary Table] Finished ---")

In [None]:
# 10. Tính toán Parent Product Overall Summary
print(f"\n--- [Calculating Parent Product Overall Summary] Starting ---")
start_calc_parent = time.time()

# Cách khác để tạo Map rating distribution (có thể hiệu quả hơn)
df_parent_overall_summary = df_base_curated.groupBy("parent_asin").agg(
     F.first("parent_title").alias("parent_title"),
     F.first("main_category").alias("main_category"),
     F.first("parent_category").alias("parent_category"),
     F.min("review_time").alias("first_review_time"),
     F.max("review_time").alias("last_review_time"),
     F.countDistinct("asin").alias("num_variants"),
     F.count("*").alias("total_reviews"),
     F.avg("rating").alias("avg_rating"),
     F.sum("helpful_vote").alias("total_helpful_votes"),
     F.sum(F.when(F.col("verified_purchase") == True, 1).otherwise(0)).alias("total_verified_reviews"),
     F.expr("""
         map(
             1.0f, count(case when rating = 1.0 then 1 end),
             2.0f, count(case when rating = 2.0 then 1 end),
             3.0f, count(case when rating = 3.0 then 1 end),
             4.0f, count(case when rating = 4.0 then 1 end),
             5.0f, count(case when rating = 5.0 then 1 end)
         )
     """).alias("rating_distribution")
)
print(f"Calculated Parent Product Overall Summary in {time.time() - start_calc_parent:.2f} seconds.")
print(f"--- [Calculating Parent Product Overall Summary] Finished ---")


# 11. Ghi Parent Product Overall Summary vào HDFS
print(f"\n--- [Writing Parent Product Overall Summary Table] Starting ---")
start_write_parent = time.time()
(df_parent_overall_summary.write
    .mode("overwrite")
    .parquet(HDFS_PARENT_PRODUCT_OVERALL_SUMMARY_DIR)
)
print(f"Successfully wrote Parent Product Overall Summary Parquet data in {time.time() - start_write_parent:.2f} seconds.")
print(f"--- [Writing Parent Product Overall Summary Table] Finished ---")

# 12. Tạo Hive Table cho Parent Product Overall Summary
print(f"\n--- [Creating Hive Parent Product Overall Summary Table] Starting ---")
start_hive_parent = time.time()
spark.sql(f"DROP TABLE IF EXISTS {HIVE_CURATED_DB}.{HIVE_PARENT_PRODUCT_OVERALL_TABLE}")
spark.sql(f"DROP VIEW IF EXISTS {HIVE_CURATED_DB}.{HIVE_PARENT_PRODUCT_OVERALL_VIEW_OLD}") 

create_parent_summary_table_sql = f"""
    CREATE EXTERNAL TABLE {HIVE_CURATED_DB}.{HIVE_PARENT_PRODUCT_OVERALL_TABLE} (
        parent_asin STRING, parent_title STRING, main_category STRING, parent_category STRING,
        first_review_time TIMESTAMP, last_review_time TIMESTAMP, num_variants BIGINT,
        total_reviews BIGINT, avg_rating DOUBLE, total_helpful_votes BIGINT,
        total_verified_reviews BIGINT, rating_distribution MAP<FLOAT, BIGINT>
    ) STORED AS PARQUET LOCATION '{HDFS_PARENT_PRODUCT_OVERALL_SUMMARY_DIR}'
    TBLPROPERTIES ('parquet.compression'='SNAPPY')
"""
spark.sql(create_parent_summary_table_sql)
print(f"Successfully created Parent Product Overall Summary table in {time.time() - start_hive_parent:.2f} seconds.")
print(f"--- [Creating Hive Parent Product Overall Summary Table] Finished ---")

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|         parent_asin|              string|   null|
|       main_category|              string|   null|
|           cat_group|                 int|   null|
|            cat_path|              string|   null|
|# Partition Infor...|                    |       |
|          # col_name|           data_type|comment|
|           cat_group|                 int|   null|
|            cat_path|              string|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|             curated|       |
|               Table|       item_metadata|       |
|               Owner|              jovyan|       |
|        Created Time|Wed Apr 02 05:42:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.3.0|       |
|           

In [None]:
# 13. Tính toán Product (ASIN) Overall Summary
print(f"\n--- [Calculating Product (ASIN) Overall Summary] Starting ---")
start_calc_asin = time.time()
df_product_overall_summary = df_base_curated.groupBy("parent_asin", "asin").agg(
     F.first("parent_title", ignorenulls=True).alias("parent_title"),
     F.first("parent_category", ignorenulls=True).alias("parent_category"),
     F.min("review_time").alias("first_review_time"),
     F.max("review_time").alias("last_review_time"),
     F.count("*").alias("total_reviews"),
     F.avg("rating").alias("avg_rating"),
     F.sum("helpful_vote").alias("total_helpful_votes"),
     F.sum(F.when(F.col("verified_purchase") == True, 1).otherwise(0)).alias("total_verified_reviews"),
     F.expr("map(1.0f, count(case when rating = 1.0 then 1 end), 2.0f, count(case when rating = 2.0 then 1 end), 3.0f, count(case when rating = 3.0 then 1 end), 4.0f, count(case when rating = 4.0 then 1 end), 5.0f, count(case when rating = 5.0 then 1 end))").alias("rating_distribution")
).withColumn("first_letter_parent_asin", F.substring(F.col("parent_asin"), 1, 1)) 
print(f"Calculated Product (ASIN) Overall Summary in {time.time() - start_calc_asin:.2f} seconds.")
print(f"--- [Calculating Product (ASIN) Overall Summary] Finished ---")

# 13. Ghi Product (ASIN) Overall Summary vào HDFS
print(f"\n--- [Writing Product (ASIN) Overall Summary Table (Partitioned by First Letter)] Starting ---")
start_write_asin = time.time()
(df_product_overall_summary.write
    .partitionBy("first_letter_parent_asin")
    .mode("overwrite")
    .parquet(HDFS_PRODUCT_OVERALL_SUMMARY_DIR)
)
print(f"Successfully wrote Product (ASIN) Overall Summary Parquet data in {time.time() - start_write_asin:.2f} seconds.")
print(f"--- [Writing Product (ASIN) Overall Summary Table] Finished ---")


# 14. Hive Table cho Product (ASIN) Overall Summary
print(f"\n--- [Creating Hive Product (ASIN) Overall Summary Table (Partitioned by First Letter)] Starting ---")
start_hive_asin = time.time()
spark.sql(f"DROP TABLE IF EXISTS {HIVE_CURATED_DB}.{HIVE_PRODUCT_OVERALL_SUMMARY_TABLE}")
spark.sql(f"DROP VIEW IF EXISTS {HIVE_CURATED_DB}.{HIVE_PRODUCT_OVERALL_VIEW_OLD}")

create_asin_summary_table_sql = f"""
    CREATE EXTERNAL TABLE {HIVE_CURATED_DB}.{HIVE_PRODUCT_OVERALL_SUMMARY_TABLE} (
        parent_asin STRING, asin STRING, parent_title STRING, parent_category STRING,
        first_review_time TIMESTAMP, last_review_time TIMESTAMP,
        total_reviews BIGINT, avg_rating DOUBLE, total_helpful_votes BIGINT,
        total_verified_reviews BIGINT, rating_distribution MAP<FLOAT, BIGINT>
    )
    PARTITIONED BY (first_letter_parent_asin STRING) -- <<< PARTITIONED BY FIRST LETTER
    STORED AS PARQUET LOCATION '{HDFS_PRODUCT_OVERALL_SUMMARY_DIR}'
    TBLPROPERTIES ('parquet.compression'='SNAPPY')
"""
spark.sql(create_asin_summary_table_sql)


print(f"Table created. Running MSCK REPAIR TABLE for {HIVE_PRODUCT_OVERALL_SUMMARY_TABLE}...")
spark.sql(f"MSCK REPAIR TABLE {HIVE_CURATED_DB}.{HIVE_PRODUCT_OVERALL_SUMMARY_TABLE}")
print(f"Successfully created/repaired Product (ASIN) Overall Summary table in {time.time() - start_hive_asin:.2f} seconds.")
print(f"--- [Creating Hive Product (ASIN) Overall Summary Table] Finished ---")

+-----------+-------------+---------+--------------------+
|parent_asin|main_category|cat_group|            cat_path|
+-----------+-------------+---------+--------------------+
| B004C6S25I| Buy a Kindle|       -2|Kindle Store > Ki...|
| B00499R2TU| Buy a Kindle|       -2|Kindle Store > Ki...|
+-----------+-------------+---------+--------------------+



In [None]:
#         Stats Monthly
print(f"\n--- [Calculating Review Stats Monthly] Starting ---")
start_calc_stats = time.time()
df_review_stats_monthly = df_product_monthly_summary.groupBy("year", "month").agg(
    F.sum("total_reviews").alias("total_reviews"),
    (F.sum(F.col("avg_rating") * F.col("total_reviews")) / F.sum("total_reviews")).alias("avg_rating"),
    F.sum("total_helpful_votes").alias("total_helpful_votes")
)
print(f"Calculated Review Stats Monthly in {time.time() - start_calc_stats:.2f} seconds.")
print(f"--- [Calculating Review Stats Monthly] Finished ---")


print(f"\n--- [Writing Review Stats Monthly Table] Starting ---")
start_write_stats = time.time()
(df_review_stats_monthly.write
    .mode("overwrite")
    .parquet(HDFS_REVIEW_STATS_MONTHLY_DIR)
)
print(f"Successfully wrote Review Stats Monthly Parquet data in {time.time() - start_write_stats:.2f} seconds.")
print(f"--- [Writing Review Stats Monthly Table] Finished ---")

# Hive Table Review Stats Monthly
print(f"\n--- [Creating Hive Review Stats Monthly Table] Starting ---")
start_hive_stats = time.time()
spark.sql(f"DROP TABLE IF EXISTS {HIVE_CURATED_DB}.{HIVE_REVIEW_STATS_MONTHLY_TABLE}")
spark.sql(f"DROP VIEW IF EXISTS {HIVE_CURATED_DB}.{HIVE_MONTHLY_STATS_VIEW_OLD}")
create_stats_monthly_table_sql = f"""
    CREATE EXTERNAL TABLE {HIVE_CURATED_DB}.{HIVE_REVIEW_STATS_MONTHLY_TABLE} (
        year INT, month INT, total_reviews BIGINT, avg_rating DOUBLE,
        total_helpful_votes BIGINT
    ) STORED AS PARQUET LOCATION '{HDFS_REVIEW_STATS_MONTHLY_DIR}'
    TBLPROPERTIES ('parquet.compression'='SNAPPY')
"""
spark.sql(create_stats_monthly_table_sql)
print(f"Successfully created Review Stats Monthly table in {time.time() - start_hive_stats:.2f} seconds.")
print(f"--- [Creating Hive Review Stats Monthly Table] Finished ---")


print("\n--- [Cleanup] Starting ---")
try:
    if 'df_metadata' in locals() and df_metadata.is_cached:
        print("Unpersisting Metadata DataFrame...")
        df_metadata.unpersist()
    if 'df_base_curated' in locals() and df_base_curated.is_cached:
        print("Unpersisting Base Curated DataFrame...")
        df_base_curated.unpersist()
except Exception as unpersist_err:
     print(f"Warning: Error unpersisting data - {unpersist_err}")
print("--- [Cleanup] Finished ---")


end_time_job = time.time()
print("\n==========================================================")
print(f"=== Execution Completed Successfully ===")
print(f"=== Total Time: {end_time_job - start_time_job:.2f} seconds ({ (end_time_job - start_time_job)/60 :.2f} minutes) ===")
print("==========================================================")


+-----------+-------------+--------+
|parent_asin|main_category|cat_path|
+-----------+-------------+--------+
+-----------+-------------+--------+



## TEST CASE

In [None]:
queries = ["SELECT COUNT(DISTINCT parent_asin) AS distinct_parent_asin_count FROM curated.kindle_reviews_base",
           "SELECT COUNT(*) AS parent_summary_count FROM curated.parent_product_overall_summary_materialized",
           "SELECT COUNT(DISTINCT year, month) AS distinct_year_month_count FROM curated.kindle_reviews_base",
           "SELECT COUNT(*) AS monthly_stats_count FROM curated.review_stats_monthly",
           "SELECT COUNT(*) AS silver_count FROM processed.kindle_reviews_main", 
           "SELECT COUNT(*) AS base_curated_count FROM curated.kindle_reviews_base"]
for q in queries:
    spark.sql(q).show()

+--------------------+-------------+
|       main_category|product_count|
+--------------------+-------------+
|        Buy a Kindle|      1560072|
|                    |        31199|
|                null|           92|
|Magazine Subscrip...|            5|
|            Software|            3|
+--------------------+-------------+



In [None]:
import time

def test_partition_filtering_performance(spark: SparkSession,
                                         table_name: str,
                                         specific_year: int,
                                         specific_month: int,
                                         some_asin: str):
    print(f"--- Starting Test Case TC4aM_18: Partition Filtering Performance ---")
    print(f"Testing table: {table_name}")
    print(f"Partition filter: year={specific_year}, month={specific_month}")
    print(f"Non-partition filter: asin='{some_asin}'")
    print("-" * 60)

    t1 = -1.0
    t2 = -1.0
    count1 = -1
    count2 = -1

    query1 = f"""
        SELECT COUNT(*)
        FROM {table_name}
        WHERE month = {specific_month} 
    """
    print(f"Executing Query 1 (Partition Filtered):\n{query1}")
    try:
        start_time_1 = time.time()
        result1 = spark.sql(query1).first()
        t1 = time.time() - start_time_1
        count1 = result1[0] if result1 else 0
        print(f"Query 1 completed in: {t1:.4f} seconds")
        print(f"Result Count 1: {count1}")
    except Exception as e:
        print(f"!!! ERROR executing Query 1: {e}")
        import traceback
        traceback.print_exc()

    print("-" * 60)

    query2 = f"""
        SELECT COUNT(*)
        FROM {table_name}
        WHERE asin = '{some_asin}'
    """
    print(f"Executing Query 2 (Non-Partition Filtered):\n{query2}")
    try:
        start_time_2 = time.time()
        result2 = spark.sql(query2).first()
        t2 = time.time() - start_time_2
        count2 = result2[0] if result2 else 0
        print(f"Query 2 completed in: {t2:.4f} seconds")
        print(f"Result Count 2: {count2}")
    except Exception as e:
        print(f"!!! ERROR executing Query 2: {e}")
        import traceback
        traceback.print_exc()

    print("-" * 60)


    print("--- Test Result Comparison ---")
    if t1 >= 0 and t2 >= 0: 
        print(f"Time with Partition Filter (T1): {t1:.4f} seconds")
        print(f"Time without Partition Filter (T2): {t2:.4f} seconds")

        if t1 < t2:
            speedup_factor = t2 / t1 if t1 > 0 else float('inf')
            print(f"\nPASSED: Query with partition filter (T1) is faster than without (T2).")
            print(f"Speedup Factor (T2/T1): {speedup_factor:.2f}x (Higher is better)")
            print("Conclusion: Partition pruning appears to be working effectively.")
        elif t1 == t2:
             print(f"\nWARNING: Query times are equal (T1 == T2).")
             print("This might happen if:")
             print("  - The table is very small.")
             print(f"  - The specific ASIN '{some_asin}' only exists within the partition year={specific_year}, month={specific_month}.")
             print("  - Partition pruning might not be as effective as expected in this specific case.")
        else: # t1 > t2
            print(f"\nFAILED: Query with partition filter (T1) is SLOWER than without (T2).")
            print("This is unexpected and indicates a potential issue with partitioning, statistics, or the query plan.")
            print("Please investigate the query plans and table statistics.")

    else:
        print("\nSKIPPED COMPARISON: One or both queries failed to execute.")

    print(f"--- Finished Test Case TC4aM_18 ---")



TARGET_TABLE = "curated.kindle_reviews_base"
TEST_YEAR = 2014
TEST_MONTH = 6
TEST_ASIN = "B000FA5KKA"

try:
    spark.sql(f"SELECT 1 FROM {TARGET_TABLE} LIMIT 1").collect()
    print(f"Table {TARGET_TABLE} found. Proceeding with test...")

    test_partition_filtering_performance(spark, TARGET_TABLE, TEST_YEAR, TEST_MONTH, TEST_ASIN)
except Exception as setup_err:
    print(f"Error accessing table {TARGET_TABLE}. Cannot run test. Error: {setup_err}")


+-------------+--------------------+
|main_category|            cat_path|
+-------------+--------------------+
|         null|Kindle Store > Ki...|
|         null|Kindle Store > Ki...|
|         null|Kindle Store > Ki...|
|         null|Kindle Store > Ki...|
|         null|Kindle Store > Ki...|
|         null|Kindle Store > Ki...|
|         null|Kindle Store > Ki...|
|         null|Kindle Store > Ki...|
|         null|Kindle Store > Ki...|
|         null|Kindle Store > Ki...|
|         null|Kindle Store > Ki...|
|         null|Kindle Store > Ki...|
|         null|Kindle Store > Ki...|
|         null|Kindle Store > Ki...|
|             |Kindle Store > Ki...|
|             |Kindle Store > Ki...|
|             |Kindle Store > Ki...|
|             |Kindle Store > Ki...|
|             |Kindle Store > Ki...|
|             |Kindle Store > Ki...|
+-------------+--------------------+
only showing top 20 rows



In [31]:
spark.sql("""
    SELECT main_category, COUNT(DISTINCT parent_asin) AS product_count
    FROM curated.item_metadata
    GROUP BY main_category
    HAVING product_count > 50
    ORDER BY product_count DESC;
""").show()

+-------------+-------------+
|main_category|product_count|
+-------------+-------------+
| Buy a Kindle|      1560072|
|             |        31199|
|         null|           92|
+-------------+-------------+



In [32]:
spark.sql("""
    WITH cat_path_count AS (
      SELECT main_category, cat_path, COUNT(*) AS path_count
      FROM curated.item_metadata
      GROUP BY main_category, cat_path
    )
    SELECT main_category, cat_path, path_count
    FROM cat_path_count
    WHERE path_count > 1
    ORDER BY main_category, path_count DESC;
""").show()

+-------------+--------------------+----------+
|main_category|            cat_path|path_count|
+-------------+--------------------+----------+
|         null|Kindle Store > Ki...|        32|
|         null|Kindle Store > Ki...|         6|
|         null|Kindle Store > Ki...|         5|
|         null|Kindle Store > Ki...|         5|
|         null|Kindle Store > Ki...|         5|
|         null|Kindle Store > Ki...|         4|
|         null|Kindle Store > Ki...|         4|
|         null|Kindle Store > Ki...|         3|
|         null|Kindle Store > Ki...|         3|
|         null|Kindle Store > Ki...|         3|
|         null|Kindle Store > Ki...|         3|
|         null|Kindle Store > Ki...|         3|
|         null|Kindle Store > Ki...|         2|
|         null|Kindle Store > Ki...|         2|
|             |Kindle Store > Ki...|     26300|
|             |Kindle Store > Ki...|      1951|
|             |Kindle Store > Ki...|       796|
|             |Kindle Store > Ki...|    

## Sentiment Analysis and LDA

In [None]:
from pyspark.sql import SparkSession
import time
import traceback

HIVE_CURATED_DB = "curated"
HIVE_CURATED_BASE_TABLE = "kindle_reviews_base"
START_YEAR = 2023
END_YEAR = 2023

def count_records_in_years(spark: SparkSession, db_name: str, table_name: str, start_year: int, end_year: int) -> int:
    """
    Đếm số lượng bản ghi trong một bảng Hive trong khoảng năm nhất định.
    Sử dụng partition pruning hiệu quả.
    """
    print(f"--- Counting records in {db_name}.{table_name} from year {start_year} to {end_year} ---")
    start_time = time.time()
    try:

        count_query = f"""
        SELECT COUNT(*) as record_count
        FROM {db_name}.{table_name}
        WHERE year >= {start_year} AND year <= {end_year}
        """

        print(f"Executing query: {count_query}")
        count_result_df = spark.sql(count_query)

        count_value = count_result_df.first()["record_count"]

        print(f"Query executed successfully.")
        print(f"Counting took {time.time() - start_time:.2f} seconds.")
        return count_value
    except Exception as e:
        print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        print(f"!!! ERROR during record counting: {e}")
        print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        traceback.print_exc()
        return -1

def run_record_count_check():

    start_time_job = time.time()
    print("======================================================")
    print("=== Starting Job: Count Records for Recent Years ===")
    print("======================================================")

    try:

        record_count = count_records_in_years(spark, HIVE_CURATED_DB, HIVE_CURATED_BASE_TABLE, START_YEAR, END_YEAR)

        if record_count >= 0:
            print("\n------------------------------------------------------")
            print(f"RESULT: Found {record_count:,} records between {START_YEAR} and {END_YEAR} (inclusive).")
            print("------------------------------------------------------")
 
            target_sample_size = 2_000_000
            percentage_of_target = (record_count / target_sample_size) * 100 if target_sample_size > 0 else 0
            print(f"This represents roughly {percentage_of_target:.2f}% of the target sample size (~2 million).")
            print("\nNext Step Considerations:")
            if abs(record_count - target_sample_size) / target_sample_size <= 0.25: 
                 print(f"-> The record count is close to the target. Consider using years {START_YEAR}-{END_YEAR} directly for Job 4b.")
            elif record_count < target_sample_size:
                 print(f"-> The record count is significantly smaller than the target. Consider expanding the year range (e.g., start from 2016 or earlier) or combining with sampling from older years.")
            else:
                 print(f"-> The record count is significantly larger than the target. Consider narrowing the year range (e.g., 2019-2023) or applying additional sampling (e.g., `.sample(fraction=...)`) on the {START_YEAR}-{END_YEAR} data.")

        else:
            print("\nError occurred during counting. Please check the logs.")


        end_time_job = time.time()
        print("\n======================================================")
        print(f"=== Job Execution Finished ===")
        print(f"=== Total Time: {end_time_job - start_time_job:.2f} seconds ===")
        print("======================================================")

    except Exception as e:
        print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        print(f"!!! FATAL ERROR occurred during job execution !!!")
        print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        traceback.print_exc()
        
if __name__ == "__main__":
    run_record_count_check()

--- Phase 4: Data Enrichment (DistilBERT + LDA) ---
Target Hardware Profile: 4 Cores, 16GB RAM
Sentiment Model: sentiment_tinybert
LDA Topics: 10, Vocab Size: 5000
------------------------------


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (StringType, ArrayType, StructType,
                               StructField, FloatType, IntegerType, LongType,
                               BooleanType, TimestampType, MapType, DoubleType)
from pyspark.storagelevel import StorageLevel
from pyspark.ml import Pipeline as SparkMLPipeline

import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *

from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.clustering import LDA, LDAModel

import time
import traceback

HIVE_CURATED_DB = "curated"


HIVE_CURATED_BASE_TABLE = "kindle_reviews_base"

# Output Enriched Sample Table
HDFS_CURATED_ENRICHED_SAMPLE_DIR = "hdfs:///data/curated/amazon_reviews/kindle_store_enriched_sample"
HIVE_CURATED_ENRICHED_SAMPLE_TABLE = "kindle_reviews_enriched_sample"

# --- Sampling Parameters ---
INCLUDE_YEARS = [2023, 2022]
FILTER_3_STAR = True
RATINGS_TO_KEEP = [1.0, 2.0, 4.0, 5.0]
MIN_REVIEWS_THRESHOLD = 20 # Ngưỡng cho parent_asin


SENTIMENT_MODEL_NAME = "sentimentdl_use_imdb"
# LDA Parameters
NUM_TOPICS = 15 
LDA_MAX_ITER = 20
LDA_VOCAB_SIZE = 10000 
LDA_MIN_DF = 5 


DEFAULT_TITLE = "[Unknown Parent Title]"
DEFAULT_MAIN_CATEGORY = "[Unknown Main Category]"
DEFAULT_CATEGORY = "[Unknown Parent Category]"
DEFAULT_SENTIMENT = "neutral" # Default nếu model lỗi
DEFAULT_TOPIC = -1
DEFAULT_KEYWORDS = "N/A"


--- Step 2: Loading Data from Silver Layer ---
Reading from Hive table: processed.kindle_reviews_main
Schema of loaded data:
root
 |-- rating: float (nullable = true)
 |-- title_processed: string (nullable = true)
 |-- text_processed: string (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- asin: string (nullable = true)
 |-- parent_asin: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- verified_purchase: boolean (nullable = true)
 |-- helpful_vote: integer (nullable = true)
 |-- review_time: timestamp (nullable = true)
 |-- date_str: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

Successfully loaded data schema.


In [None]:
spark = None
start_time_job = time.time()
print("====================================================================")
print("=== Starting Job 4b: Create Enriched Sample Table (Sentiment+Topic) ===")
print("====================================================================")


spark_nlp_version = "5.5.3" 
scala_version = "2.12" 

hadoop_aws_version = "3.3.1" 
aws_sdk_version = "1.12.262" 

packages = [
    f"com.johnsnowlabs.nlp:spark-nlp_{scala_version}:{spark_nlp_version}",
    f"org.apache.hadoop:hadoop-aws:{hadoop_aws_version}",
    f"com.amazonaws:aws-java-sdk-bundle:{aws_sdk_version}" 
]
packages_str = ",".join(packages)
print(f"Using packages: {packages_str}")


spark = SparkSession.builder \
    .appName("SparkNLP_AmazonReviews") \
    .master("yarn") \
    .config("spark.driver.host", "jupyter") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.executor.instances", "9") \
    .config("spark.executor.memory", "10g") \
    .config("spark.executor.cores", "3") \
    .config("spark.driver.memory", "8g") \
    .config("spark.yarn.executor.memoryOverhead", "2g") \
    .config("spark.kryoserializer.buffer.max", "1g") \
    .config("spark.driver.maxResultSize", "4G") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.default.parallelism", "30") \
    .config("spark.sql.shuffle.partitions", "30") \
    .config("spark.sql.files.maxPartitionBytes", "256m") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.warehouse.dir", "hdfs://namenode:9000/user/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.jars.packages", packages_str) \
    .enableHiveSupport() \
    .getOrCreate()



print("\n--- SparkSession Initialized ---")
print(f"Spark version: {spark.version}")
try:
    print(f"Spark NLP version: {sparknlp.version()}") 
except Exception as e:
    print(f"Could not get Spark NLP version via sparknlp.version(): {e}")
    
print(f"Master: {spark.sparkContext.master}")
print(f"Deploy Mode: {spark.sparkContext.getConf().get('spark.submit.deployMode')}")
print(f"Spark Packages Config: {spark.sparkContext.getConf().get('spark.jars.packages')}") # Check packages
print("---------------------------------")


--- Step 3: Sentiment Analysis (DistilBERT) ---
Setting up DistilBERT pipeline using 'sentiment_tinybert'...
ERROR during Sentiment Analysis: 
Possible causes: OutOfMemoryError (check Spark UI), incorrect model name, network issues.


In [None]:
print(f"\n--- [Reading Base Curated Table] Starting ---")
start_read_base = time.time()
df_base_read = spark.table(f"{HIVE_CURATED_DB}.{HIVE_CURATED_BASE_TABLE}")
print(f"Finished reading Base Curated data in {time.time() - start_read_base:.2f} seconds.")
print(f"--- [Reading Base Curated Table] Finished ---")

# 2. Áp dụng Logic Lấy Mẫu Có Chủ Đích
print(f"\n--- [Applying Purposeful Sampling Logic] Starting ---")
start_sampling = time.time()

# 2.1 Lọc theo năm
print(f"Filtering by years: {INCLUDE_YEARS}")
df_filtered_time = df_base_read.filter(F.col("year").isin(INCLUDE_YEARS))

# 2.2 Lọc theo rating 
if FILTER_3_STAR:
    print(f"Filtering ratings to keep: {RATINGS_TO_KEEP}")
    df_filtered_rating = df_filtered_time.filter(F.col("rating").isin(RATINGS_TO_KEEP))
else:
    print("Keeping all ratings.")
    df_filtered_rating = df_filtered_time
    
    

df_filtered_rating.persist(StorageLevel.MEMORY_AND_DISK)
filtered_rating_count = df_filtered_rating.count()
print(f"Count after time and rating filter: {filtered_rating_count}")

In [None]:
# 2.3 Tính toán số lượng review theo parent_asin
print("Calculating review counts per parent_asin on filtered data...")
parent_counts = df_filtered_rating.groupBy("parent_asin") \
                                 .agg(F.count("*").alias("review_count"))

# 2.4 Xác định parent_asin đủ điều kiện
print(f"Identifying parent_asins with >= {MIN_REVIEWS_THRESHOLD} reviews...")
eligible_parents = parent_counts.filter(F.col("review_count") >= MIN_REVIEWS_THRESHOLD) \
                                .select("parent_asin")
eligible_parent_count = eligible_parents.count()
print(f"Found {eligible_parent_count} eligible parent_asins.")

# 2.5 Lọc dữ liệu cuối cùng bằng semi join
print("Filtering final sample using semi join...")
df_sample = df_filtered_rating.join(
    F.broadcast(eligible_parents), 
    on="parent_asin",
    how="semi"
)

df_sample.persist(StorageLevel.MEMORY_AND_DISK)
final_sample_count = df_sample.count() # Trigger


df_filtered_rating.unpersist()

print(f"Finished purposeful sampling. Final sample size: {final_sample_count}")
print(f"Sampling took {time.time() - start_sampling:.2f} seconds.")
print(f"Schema of the sample:")
df_sample.printSchema()
print(f"--- [Applying Purposeful Sampling Logic] Finished ---")


if final_sample_count == 0:
    print("WARNING: Sample size is 0! Check filtering logic and data. Stopping job.")
    spark.stop()
    exit(1)
elif final_sample_count < 1000:
     print(f"WARNING: Sample size ({final_sample_count}) is very small.")

In [None]:
# 3. Sentiment Analysis 
print(f"\n--- [Sentiment Analysis (USE + sentimentdl_use_imdb)] Starting ---")
start_sentiment = time.time()

document_assembler_sent = DocumentAssembler() \
    .setInputCol("text_processed") \
    .setOutputCol("document_sent")

# Load Universal Sentence Encoder
use_embeddings = UniversalSentenceEncoder.pretrained('tfhub_use', lang="en") \
    .setInputCols(["document_sent"])\
    .setOutputCol("sentence_embeddings")

# Load SentimentDL model pretrained on IMDB using USE embeddings
sentiment_classifier_use = SentimentDLModel().pretrained('sentimentdl_use_imdb', 'en') \
    .setInputCols(["sentence_embeddings"]) \
    .setOutputCol("sentiment_class_use") \
    .setThreshold(0.6) 

sentiment_pipeline_use = SparkMLPipeline(stages=[
    document_assembler_sent,
    use_embeddings,
    sentiment_classifier_use
])

print("Applying USE + SentimentDL pipeline to the sample...")

df_sentiment_result = sentiment_pipeline_use.fit(df_sample).transform(df_sample)

--- Initializing SparkSession via Packages (includes Spark NLP + AWS deps) ---
Using packages: com.johnsnowlabs.nlp:spark-nlp_2.12:5.3.3,org.apache.hadoop:hadoop-aws:3.3.1,com.amazonaws:aws-java-sdk-bundle:1.12.262

--- SparkSession Initialized ---
Spark version: 3.3.0
Spark NLP version: 5.5.3
Master: yarn
Deploy Mode: client
Spark Packages Config: com.johnsnowlabs.nlp:spark-nlp_2.12:5.3.3,org.apache.hadoop:hadoop-aws:3.3.1,com.amazonaws:aws-java-sdk-bundle:1.12.262
---------------------------------


In [None]:

df_sentiment_added = df_sentiment_result \
    .withColumn("sentiment_label_raw", F.col("sentiment_class_use.result")[0]) \
    .withColumn("sentiment_meta", F.col("sentiment_class_use.metadata")[0]) \
    .withColumn("sentiment_label", \
        F.when(F.col("sentiment_label_raw") == "positive", "positive") \
        .when(F.col("sentiment_label_raw") == "negative", "negative") \
        .otherwise(DEFAULT_SENTIMENT)) \
    .withColumn("sentiment_score", \
        F.when(F.col("sentiment_label") == "positive", F.col("sentiment_meta.positive").cast(DoubleType())) \
         .when(F.col("sentiment_label") == "negative", F.col("sentiment_meta.negative").cast(DoubleType())) \
         .when(F.col("sentiment_label") == "neutral", F.col("sentiment_meta.neutral").cast(DoubleType())) \
         .otherwise(0.0) \
         .alias("sentiment_score", metadata={'confidence': True}))


df_sentiment_processed = df_sentiment_added.select(
    "*",
    "sentiment_label",
    "sentiment_score"
).drop("document_sent", "sentence_embeddings", "sentiment_class_use", "sentiment_label_raw", "sentiment_meta")


df_sentiment_processed.persist(StorageLevel.MEMORY_AND_DISK)
sentiment_count = df_sentiment_processed.count()

print(f"Sentiment Analysis completed. Processed {sentiment_count} records.")
print(f"Sentiment analysis took {time.time() - start_sentiment:.2f} seconds.")
print(f"--- [Sentiment Analysis ({SENTIMENT_MODEL_NAME})] Finished ---")


In [None]:
# 4. Topic Modeling (LDA)
print(f"\n--- [Topic Modeling (LDA k={NUM_TOPICS})] Starting ---")
start_topic = time.time()

# 4.1. NLP Preprocessing for LDA
print("Setting up NLP pipeline for LDA...")
document_assembler_lda = DocumentAssembler() \
    .setInputCol("text_processed") \
    .setOutputCol("document_lda")
tokenizer_lda = Tokenizer() \
    .setInputCols(["document_lda"]) \
    .setOutputCol("token_lda")
normalizer_lda = Normalizer() \
    .setInputCols(["token_lda"]) \
    .setOutputCol("normalized_lda") \
    .setLowercase(True) \
    .setCleanupPatterns(["""[^\w\d\s]"""])
lemmatizer_lda = LemmatizerModel.pretrained("lemma_antbnc", "en") \
    .setInputCols(["normalized_lda"]) \
    .setOutputCol("lemma_lda")
stopwords_cleaner_lda = StopWordsCleaner.pretrained("stopwords_en", "en") \
    .setInputCols(["lemma_lda"]) \
    .setOutputCol("clean_lemma_lda") \
    .setCaseSensitive(False)
finisher_lda = Finisher() \
    .setInputCols(["clean_lemma_lda"]) \
    .setOutputCols(["finished_tokens_lda"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)
nlp_pipeline_lda = SparkMLPipeline(stages=[
    document_assembler_lda, tokenizer_lda, normalizer_lda,
    lemmatizer_lda, stopwords_cleaner_lda, finisher_lda
])
print("Applying NLP pipeline for LDA...")
df_lda_tokens = nlp_pipeline_lda.fit(df_sentiment_processed).transform(df_sentiment_processed)
df_lda_tokens = df_lda_tokens.filter(F.size(F.col("finished_tokens_lda")) > 0) 

Setting up common NLP pipeline...
lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
stopwords_en download started this may take some time.
Approximate size to download 2.9 KB
[OK!]
Applying common NLP pipeline...


In [None]:
# 4.2. CountVectorizer
print(f"Applying CountVectorizer (vocabSize={LDA_VOCAB_SIZE}, minDF={LDA_MIN_DF})...")
cv = CountVectorizer(inputCol="finished_tokens_lda", outputCol="features_countvec",
                     vocabSize=LDA_VOCAB_SIZE, minDF=LDA_MIN_DF)
cv_model = cv.fit(df_lda_tokens)
df_vectorized_lda = cv_model.transform(df_lda_tokens)
vocabulary = cv_model.vocabulary
print(f"Actual vocabulary size: {len(vocabulary)}")

Applying TF-IDF for Sentiment Analysis...


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
# 4.3. LDA Training & Transform
print(f"Training LDA model (k={NUM_TOPICS}, maxIter={LDA_MAX_ITER})...")
lda = LDA(k=NUM_TOPICS, maxIter=LDA_MAX_ITER, featuresCol="features_countvec",
          seed=42, optimizer="online")
lda_model: LDAModel = lda.fit(df_vectorized_lda) 
print("Applying LDA model to get topic distributions...")
df_topics = lda_model.transform(df_vectorized_lda)

In [None]:
# 4.4. Extract Dominant Topic and Keywords
print("Extracting dominant topic and keywords...")

@F.udf(returnType=IntegerType())
def get_dominant_topic_udf(topic_dist):
    if topic_dist is None: return DEFAULT_TOPIC
    return int(topic_dist.argmax())

df_topics = df_topics.withColumn("dominant_topic", get_dominant_topic_udf(F.col("topicDistribution")))

In [None]:
###########################         Lấy keywords
topic_indices = lda_model.describeTopics(maxTermsPerTopic=7)
topic_keywords_map = {}
for row in topic_indices.collect():
    topic_id = row['topic']
    term_indices = row['termIndices']
    keywords = [vocabulary[i] for i in term_indices]
    topic_keywords_map[topic_id] = ", ".join(keywords)

# Tạo DataFrame lookup và join
topic_keywords_list = list(topic_keywords_map.items())
if not topic_keywords_list: 
    print("WARNING: No topics described by LDA model!")
    df_enriched_sample = df_topics.withColumn("topic_keywords", F.lit(DEFAULT_KEYWORDS))
else:
    df_topic_keywords = spark.createDataFrame(topic_keywords_list, ["dominant_topic_ref", "topic_keywords"])
    df_enriched_sample = df_topics.join(
        F.broadcast(df_topic_keywords), 
        df_topics["dominant_topic"] == df_topic_keywords["dominant_topic_ref"],
        "left"
    ).drop("dominant_topic_ref") \
     .fillna({"topic_keywords": DEFAULT_KEYWORDS}) 

print("Topic Modeling and Keyword Mapping completed.")
df_enriched_sample.select("text_processed", "dominant_topic", "topic_keywords").show(5, truncate=80)
print(f"Topic modeling took {time.time() - start_topic:.2f} seconds.")
print(f"--- [Topic Modeling (LDA k={NUM_TOPICS})] Finished ---")

In [None]:
df_enriched_sample.select("text_processed", "dominant_topic", "topic_keywords").show(5, truncate=80)

In [None]:
# 5. Select Final Columns for Enriched Sample Table
print(f"\n--- [Selecting Final Columns for Enriched Sample] Starting ---")
df_final_enriched = df_enriched_sample.select(
    # Cột gốc từ Base
    "parent_asin", "asin", "user_id", "review_time",
    "year", "month", "date_str", "rating", "helpful_vote",
    "verified_purchase", "text_processed", "parent_title", "main_category",
    "parent_category", "images",
    # Cột làm giàu
    F.coalesce(F.col("sentiment_label"), F.lit(DEFAULT_SENTIMENT)).alias("sentiment_label"),
    F.coalesce(F.col("sentiment_score"), F.lit(0.0)).alias("sentiment_score"),
    F.coalesce(F.col("dominant_topic"), F.lit(DEFAULT_TOPIC)).alias("dominant_topic"),
    F.coalesce(F.col("topic_keywords"), F.lit(DEFAULT_KEYWORDS)).alias("topic_keywords")
)
print("Final Schema for Enriched Sample Table:")
df_final_enriched.printSchema()
print(f"--- [Selecting Final Columns for Enriched Sample] Finished ---")


print(f"\n--- [Writing Enriched Sample Table] Starting ---")
print(f"Writing Enriched Sample data to: {HDFS_CURATED_ENRICHED_SAMPLE_DIR}")
start_write_enriched = time.time()
(df_final_enriched.write
    .partitionBy("year", "month")
    .mode("overwrite")
    .parquet(HDFS_CURATED_ENRICHED_SAMPLE_DIR)
)
print(f"Successfully wrote Enriched Sample Parquet data in {time.time() - start_write_enriched:.2f} seconds.")
print(f"--- [Writing Enriched Sample Table] Finished ---")

In [None]:
# Creating Hive External Table cho Enriched Sample
print(f"\n--- [Creating Hive Enriched Sample Table] Starting ---")
start_hive_enriched = time.time()

spark.sql(f"CREATE DATABASE IF NOT EXISTS {HIVE_CURATED_DB}")
spark.sql(f"DROP TABLE IF EXISTS {HIVE_CURATED_DB}.{HIVE_CURATED_ENRICHED_SAMPLE_TABLE}")

create_enriched_sample_table_sql = f"""
    CREATE EXTERNAL TABLE {HIVE_CURATED_DB}.{HIVE_CURATED_ENRICHED_SAMPLE_TABLE} (
        parent_asin STRING, asin STRING, user_id STRING, review_time TIMESTAMP,
        date_str STRING, rating FLOAT, helpful_vote INT, verified_purchase BOOLEAN,
        text_processed STRING, parent_title STRING, main_category STRING,
        parent_category STRING, images ARRAY<STRING>,
        sentiment_label STRING, sentiment_score DOUBLE,
        dominant_topic INT, topic_keywords STRING
    )
    PARTITIONED BY (year INT, month INT)
    STORED AS PARQUET
    LOCATION '{HDFS_CURATED_ENRICHED_SAMPLE_DIR}'
    TBLPROPERTIES ('parquet.compression'='SNAPPY')
"""
spark.sql(create_enriched_sample_table_sql)
print(f"Table created. Running MSCK REPAIR TABLE for {HIVE_CURATED_ENRICHED_SAMPLE_TABLE}...")
spark.sql(f"MSCK REPAIR TABLE {HIVE_CURATED_DB}.{HIVE_CURATED_ENRICHED_SAMPLE_TABLE}")

print(f"Successfully created Enriched Sample table in {time.time() - start_hive_enriched:.2f} seconds.")
print(f"--- [Creating Hive Enriched Sample Table] Finished ---")

In [None]:
print("\n--- [Cleanup] Starting ---")
try:
    if 'df_sample' in locals() and df_sample.is_cached:
        print("Unpersisting Sample DataFrame...")
        df_sample.unpersist()
    if 'df_sentiment_processed' in locals() and df_sentiment_processed.is_cached:
        print("Unpersisting Sentiment Processed DataFrame...")
        df_sentiment_processed.unpersist()

except Exception as unpersist_err:
     print(f"Warning: Error unpersisting data - {unpersist_err}")
print("--- [Cleanup] Finished ---")

end_time_job = time.time()
print("\n====================================================================")
print(f"=== Job 4b (Enrich Sample) Execution Completed Successfully ===")
print(f"=== Total Time: {end_time_job - start_time_job:.2f} seconds ({ (end_time_job - start_time_job)/60 :.2f} minutes) ===")
print(f"=== Final Sample Size Written: {final_sample_count} ===")
print("====================================================================")


## OPTIMIZE WAREHOUSE

In [None]:
HIVE_CURATED_DB = "curated"


HDFS_CURATED_ENRICHED_SAMPLE_DIR = "hdfs:///data/curated/amazon_reviews/kindle_store_enriched_sample"

HDFS_MONTHLY_SENTIMENT_SUMMARY_SAMPLE_DIR = "hdfs:///data/curated/amazon_reviews/monthly_sentiment_summary_sample"
HIVE_MONTHLY_SENTIMENT_SUMMARY_SAMPLE_TABLE = "monthly_sentiment_summary_sample"

In [None]:
print(f"\n--- [Reading Stored Enriched Sample Data] Starting ---")
print(f"Reading Parquet data from: {HDFS_CURATED_ENRICHED_SAMPLE_DIR}")
start_read_enriched = time.time()

try:
    df_final_enriched_loaded = spark.read.parquet(HDFS_CURATED_ENRICHED_SAMPLE_DIR)
    enriched_count = df_final_enriched_loaded.count() 
    print(f"Successfully read {enriched_count} records from stored enriched sample.")
    print(f"Schema of loaded data:")
    df_final_enriched_loaded.printSchema()
except Exception as read_err:
    print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    print(f"!!! ERROR Reading enriched data from {HDFS_CURATED_ENRICHED_SAMPLE_DIR}: {read_err}")
    print(f"!!! Please ensure Job 4b ran successfully and data exists at this path.")
    print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
    traceback.print_exc()
    raise read_err

print(f"Reading stored enriched data took {time.time() - start_read_enriched:.2f} seconds.")
print(f"--- [Reading Stored Enriched Sample Data] Finished ---")


# --- Materialize Monthly Sentiment Summary ---
print(f"\n--- [Materializing Monthly Sentiment Summary] Starting ---")
start_mat_monthly_sent = time.time()

# 2. Tính toán monthly sentiment counts
print("Calculating monthly sentiment counts...")
df_monthly_sentiment_summary = df_final_enriched_loaded.groupBy("year", "month", "sentiment_label").agg(
    F.count("*").alias("review_count")
)


df_monthly_sentiment_summary.persist(StorageLevel.MEMORY_AND_DISK)
monthly_sent_count = df_monthly_sentiment_summary.count()
print(f"Calculated Monthly Sentiment Summary (Count: {monthly_sent_count})")

# 3. Ghi bảng materialized vào HDFS (Partition theo Year)
print(f"Writing Materialized Monthly Sentiment Summary to: {HDFS_MONTHLY_SENTIMENT_SUMMARY_SAMPLE_DIR}")
start_write_monthly_sent = time.time()
(df_monthly_sentiment_summary.write
    .partitionBy("year")
    .mode("overwrite")
    .parquet(HDFS_MONTHLY_SENTIMENT_SUMMARY_SAMPLE_DIR)
)
print(f"Successfully wrote Materialized Monthly Sentiment Summary in {time.time() - start_write_monthly_sent:.2f} seconds.")


In [None]:

# 4. Tạo Hive Table cho bảng materialized
print(f"Creating Hive table: {HIVE_CURATED_DB}.{HIVE_MONTHLY_SENTIMENT_SUMMARY_SAMPLE_TABLE}")
start_hive_monthly_sent = time.time()
spark.sql(f"CREATE DATABASE IF NOT EXISTS {HIVE_CURATED_DB}")
spark.sql(f"DROP TABLE IF EXISTS {HIVE_CURATED_DB}.{HIVE_MONTHLY_SENTIMENT_SUMMARY_SAMPLE_TABLE}")

create_monthly_sent_summary_table_sql = f"""
    CREATE EXTERNAL TABLE {HIVE_CURATED_DB}.{HIVE_MONTHLY_SENTIMENT_SUMMARY_SAMPLE_TABLE} (
        month INT,
        sentiment_label STRING,
        review_count BIGINT
    )
    PARTITIONED BY (year INT)
    STORED AS PARQUET LOCATION '{HDFS_MONTHLY_SENTIMENT_SUMMARY_SAMPLE_DIR}'
    TBLPROPERTIES ('parquet.compression'='SNAPPY')
"""
spark.sql(create_monthly_sent_summary_table_sql)
print(f"Table created. Running MSCK REPAIR TABLE for {HIVE_MONTHLY_SENTIMENT_SUMMARY_SAMPLE_TABLE}...")
spark.sql(f"MSCK REPAIR TABLE {HIVE_CURATED_DB}.{HIVE_MONTHLY_SENTIMENT_SUMMARY_SAMPLE_TABLE}")
print(f"Successfully created/repaired {HIVE_MONTHLY_SENTIMENT_SUMMARY_SAMPLE_TABLE} in {time.time() - start_hive_monthly_sent:.2f} seconds.")

df_monthly_sentiment_summary.unpersist()

print(f"Materializing Monthly Sentiment Summary took {time.time() - start_mat_monthly_sent:.2f} seconds.")
print(f"--- [Materializing Monthly Sentiment Summary] Finished ---")

## MIGRATE DATA FROM HIVE TO POSTGRES

In [None]:
stop_spark()

spark = SparkSession.builder \
    .appName("SparkNLP_AmazonReviews_YARN_Packages") \
    .master("yarn") \
    .config("spark.driver.host", "jupyter") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.executor.instances", "3") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.caseSensitive", "false") \
    .config("spark.kryoserializer.buffer.max", "1000M") \
    .config("spark.driver.maxResultSize", "2G") \
    .config("spark.default.parallelism", "10") \
    .config("spark.sql.shuffle.partitions", "10") \
    .config("spark.sql.files.maxPartitionBytes", "128m") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.caseSensitive", "false") \
    .config("spark.sql.warehouse.dir", "hdfs://namenode:9000/user/hive/warehouse") \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .config("spark.jars.packages", 'org.postgresql:postgresql:42.7.3') \
    .enableHiveSupport() \
    .getOrCreate()

In [None]:
# --- PostgreSQL Connection Configuration ---
print("\n--- Configuring PostgreSQL Connection ---")

PG_HOST = "postgres"
PG_PORT = "5432"
PG_DATABASE = "amazon_reviews_curated"
PG_USER = "spark_writer"
PG_PASSWORD = "password"

In [None]:
PG_URL = f"jdbc:postgresql://{PG_HOST}:{PG_PORT}/{PG_DATABASE}"
PG_DRIVER = "org.postgresql.Driver"
PG_PROPERTIES = {
    "user": PG_USER,
    "password": PG_PASSWORD,
    "driver": PG_DRIVER,
    "stringtype": "unspecified" 
}
print(f"PostgreSQL URL: {PG_URL}")
print(f"PostgreSQL User: {PG_USER}")

In [None]:
def load_hive_to_postgres(spark_session, hive_db, hive_table, pg_table, pg_url_conn, pg_props, save_mode="overwrite"):
    """Reads from Hive table, handles specific type conversions, and writes to PostgreSQL."""
    full_hive_table = f"{hive_db}.{hive_table}"
    print(f"\n--- [Loading {full_hive_table} to PG {pg_table}] Starting ---")
    start_load_time = time.time()
    try:
        # Read from Hive table
        print(f"Reading from Hive table: {full_hive_table}...")
        df_hive = spark_session.table(full_hive_table)
        read_count = df_hive.count() 
        print(f"Read {read_count} records from {full_hive_table}.")

        # Schema handling and transformations
        df_to_write = df_hive

        # Xử lí cho rating_distribution (MAP -> JSONB)
        if 'rating_distribution' in df_to_write.columns:
            
            is_map = any(isinstance(field.dataType, MapType) for field in df_to_write.schema.fields if field.name == 'rating_distribution')
            if is_map:
                print("Converting 'rating_distribution' column from MAP to JSON string...")
                df_to_write = df_to_write.withColumn("rating_distribution", F.to_json(F.col("rating_distribution")))
            else:
                print("'rating_distribution' column is likely already String/JSON, skipping conversion.")


        if hive_table == "product_overall_summary_materialized" and "first_letter_parent_asin" in df_to_write.columns:
            print("Dropping Hive partition column 'first_letter_parent_asin' before writing to PG.")
            df_to_write = df_to_write.drop("first_letter_parent_asin")

        # Write to PostgreSQL
        print(f"Writing {read_count} records to PostgreSQL table: {pg_table} (mode: {save_mode})...")
        start_write = time.time()
        (df_to_write.write
            .jdbc(url=pg_url_conn,
                  table=pg_table,
                  mode=save_mode,
                  properties=pg_props)
        )
        write_duration = time.time() - start_write
        print(f"Successfully wrote to PostgreSQL table '{pg_table}' in {write_duration:.2f} seconds.")

    except Exception as e:
        print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        print(f"!!! ERROR loading {full_hive_table} to PG {pg_table}: {e}")
        print(f"!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
        traceback.print_exc()
    finally:
        load_duration = time.time() - start_load_time
        print(f"--- [Loading {full_hive_table} to PG {pg_table}] Finished in {load_duration:.2f} seconds ---")

In [None]:
table_mappings = {
    "kindle_reviews_base": "kindle_reviews_base_pg",
    "product_monthly_summary_materialized": "product_monthly_summary_pg",
    "parent_product_overall_summary_materialized": "parent_product_overall_summary_pg",
    "product_overall_summary_materialized": "product_overall_summary_pg",
    "review_stats_monthly_materialized": "review_stats_monthly_pg",
    "kindle_reviews_enriched_sample": "kindle_reviews_enriched_sample_pg"
}

In [None]:
HIVE_DB_SOURCE = "curated"

print("\n==========================================================")
print("=== Starting Data Loading from Hive Curated to PostgreSQL ===")
print("==========================================================")
overall_start_time = time.time()

for hive_tbl, pg_tbl in table_mappings.items():
    load_hive_to_postgres(spark, HIVE_DB_SOURCE, hive_tbl, pg_tbl, PG_URL, PG_PROPERTIES)

overall_end_time = time.time()
print("\n==========================================================")
print("=== All Data Loading Jobs Completed ===")
print(f"=== Total Execution Time: {overall_end_time - overall_start_time:.2f} seconds ({ (overall_end_time - overall_start_time)/60 :.2f} minutes) ===")
print("==========================================================")


print("\nStopping Spark Session...")
stop_spark()
print("Spark Session stopped.")