In [1]:
# ============================================================
# 02_etl_pyspark.ipynb
# L√†m s·∫°ch & chu·∫©n h√≥a d·ªØ li·ªáu phim tr∆∞·ªõc khi load v√†o Neo4j
# ============================================================

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os

# ------------------------------------------------------------
# 1Ô∏è‚É£ T·∫†O SPARK SESSION
# ------------------------------------------------------------

spark = SparkSession.builder \
    .appName("MovieLens_ETL_PySpark") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

print("‚úÖ SparkSession s·∫µn s√†ng!")

‚úÖ SparkSession s·∫µn s√†ng!


In [2]:
# ------------------------------------------------------------
# 2Ô∏è‚É£ KHAI B√ÅO ƒê∆Ø·ªúNG D·∫™N
# ------------------------------------------------------------
DATALAKE_PATH = "../data/datalake/"
WAREHOUSE_PATH = "../data/warehouse/"

os.makedirs(WAREHOUSE_PATH, exist_ok=True)
print(f"üìÅ Data Warehouse l∆∞u t·∫°i: {WAREHOUSE_PATH}")


üìÅ Data Warehouse l∆∞u t·∫°i: ../data/warehouse/


In [3]:
# ------------------------------------------------------------
# 3Ô∏è‚É£ LOAD D·ªÆ LI·ªÜU T·ª™ DATA LAKE (CSV)
# ------------------------------------------------------------
metadata = spark.read.csv(DATALAKE_PATH + "metadata.csv", header=True)
ratings = spark.read.csv(DATALAKE_PATH + "ratings.csv", header=True)
reviews = spark.read.csv(DATALAKE_PATH + "reviews.csv", header=True)
survey_answers = spark.read.csv(DATALAKE_PATH + "survey_answers.csv", header=True)
tag_count = spark.read.csv(DATALAKE_PATH + "tag_count.csv", header=True)
tags = spark.read.csv(DATALAKE_PATH + "tags.csv", header=True)

print("‚úÖ ƒê√£ load d·ªØ li·ªáu t·ª´ Data Lake!")

‚úÖ ƒê√£ load d·ªØ li·ªáu t·ª´ Data Lake!


In [4]:
# ------------------------------------------------------------
# 4Ô∏è‚É£ KI·ªÇM TRA S∆† B·ªò SCHEMA & M·∫™U D·ªÆ LI·ªÜU
# ------------------------------------------------------------

print("\nüìå Schema: metadata")
metadata.printSchema()
metadata.show(5, truncate=False)

print("\nüìå Schema: ratings")
ratings.printSchema()
ratings.show(5, truncate=False)

print("\nüìå Schema: reviews")
reviews.printSchema()
reviews.show(5, truncate=False)

print("\nüìå Schema: survey_answers")
survey_answers.printSchema()
survey_answers.show(5, truncate=False)

print("\nüìå Schema: tag_count")
tag_count.printSchema()
tag_count.show(5, truncate=False)

print("\nüìå Schema: tags")
tags.printSchema()
tags.show(5, truncate=False)

print("\n‚úÖ Ho√†n t·∫•t ki·ªÉm tra schema v√† m·∫´u d·ªØ li·ªáu!")


üìå Schema: metadata
root
 |-- avgRating: string (nullable = true)
 |-- dateAdded: string (nullable = true)
 |-- directedBy: string (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- starring: string (nullable = true)
 |-- title: string (nullable = true)

+---------+---------+---------------+-------+-------+---------------------------------------------------------------------------------------------------------------------------------------+----------------------------------+
|avgRating|dateAdded|directedBy     |imdbId |item_id|starring                                                                                                                               |title                             |
+---------+---------+---------------+-------+-------+---------------------------------------------------------------------------------------------------------------------------------------+----------------------------------+
|3.89146  |NULL   

In [5]:
# ============================================================
# 5Ô∏è‚É£ L√ÄM S·∫†CH D·ªÆ LI·ªÜU 
# ============================================================

# Movies
metadata_df = (
    metadata
    .withColumnRenamed("avgRating", "avg_rating")
    .withColumnRenamed("directedBy", "directed_by")
    .withColumnRenamed("starring", "actors")
    .withColumnRenamed("imdbId", "imdb_id")
    .dropna(subset=["item_id", "title"])
    .withColumn("item_id", F.col("item_id").cast("int"))
    .withColumn("avg_rating", F.col("avg_rating").cast("float"))
    .fillna({"directed_by": "Unknown", "actors": "Unknown"})
    .dropDuplicates(["item_id"])
)

# Ratings
ratings_df = (
    ratings
    .dropna(subset=["item_id", "user_id", "rating"])
    .withColumn("item_id", F.col("item_id").cast("int"))
    .withColumn("user_id", F.col("user_id").cast("int"))
    .withColumn("rating", F.col("rating").cast("float"))
    .dropDuplicates(["user_id", "item_id"])
)

# Tags
tags_df = (
    tags
    .dropna(subset=["id", "tag"])
    .withColumn("tag_id", F.col("id").cast("int"))
    .withColumn("tag_name", F.lower(F.col("tag")))
    .withColumn("tag_name", F.regexp_replace("tag_name", r"[^a-z0-9\s]", ""))  # <----
    .withColumn("tag_name", F.trim(F.col("tag_name")))
    .drop("id", "tag")
    .dropDuplicates(["tag_id"])
)

# Reviews
reviews_df = (
    reviews
    .dropna(subset=["item_id", "txt"])
    .withColumn("item_id", F.col("item_id").cast("int"))
    .withColumn("txt", F.trim(F.col("txt")))
    .filter(F.length("txt") > 30)      # <----
    .dropDuplicates(["item_id", "txt"])
)

# Tag Count
tag_count_df = (
    tag_count
    .dropna(subset=["item_id", "tag_id", "num"])
    .withColumn("item_id", F.col("item_id").cast("int"))
    .withColumn("tag_id", F.col("tag_id").cast("int"))
    .withColumn("num", F.col("num").cast("int"))
)

# Survey (lo·∫°i score = -1)
survey_df = (
    survey_answers
    .dropna(subset=["user_id", "item_id", "tag_id", "score"])
    .withColumn("user_id", F.col("user_id").cast("int"))
    .withColumn("item_id", F.col("item_id").cast("int"))
    .withColumn("tag_id", F.col("tag_id").cast("int"))
    .withColumn("score", F.col("score").cast("int"))
    .filter(F.col("score") > 0)        # <----
)


In [6]:
# ============================================================
# üîç KI·ªÇM TRA CH·∫§T L∆Ø·ª¢NG D·ªÆ LI·ªÜU
# ============================================================

def check_quality(df, df_name, key_cols=None):
    print(f"\n\n===== üîç DATA QUALITY CHECK: {df_name} =====")

    # 1) T·ªïng s·ªë d√≤ng + s·ªë c·ªôt
    print(f"üìå Rows: {df.count()}  |  Columns: {len(df.columns)}")

    # 2) Ki·ªÉm tra Null theo t·ª´ng c·ªôt
    print("\nüß© Missing values per column:")
    df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show(truncate=False)

    # 3) Ki·ªÉm tra tr√πng l·∫∑p
    dup_count = df.count() - df.dropDuplicates().count()
    print(f"‚ôªÔ∏è Duplicate rows: {dup_count}")

    # 4) N·∫øu c√≥ kho√° ch√≠nh ‚Üí ki·ªÉm tra tr√πng
    if key_cols:
        print(f"\nüîë Ki·ªÉm tra tr√πng kh√≥a ch√≠nh {key_cols}:")
        df.groupBy(key_cols).count().filter("count > 1").show(5, truncate=False)

    print("‚úÖ Done!")

# G·ªçi h√†m cho t·ª´ng b·∫£ng
check_quality(metadata, "metadata", key_cols=["item_id"])
check_quality(ratings, "ratings", key_cols=["user_id", "item_id"])
check_quality(tags, "tags", key_cols=["id"])
check_quality(reviews, "reviews", key_cols=["item_id"])
check_quality(tag_count, "tag_count", key_cols=["item_id", "tag_id"])
check_quality(survey_answers, "survey_answers", key_cols=["user_id", "item_id", "tag_id"])



===== üîç DATA QUALITY CHECK: metadata =====
üìå Rows: 84661  |  Columns: 7

üß© Missing values per column:
+---------+---------+----------+------+-------+--------+-----+
|avgRating|dateAdded|directedBy|imdbId|item_id|starring|title|
+---------+---------+----------+------+-------+--------+-----+
|0        |6518     |3150      |0     |0      |6883    |0    |
+---------+---------+----------+------+-------+--------+-----+

‚ôªÔ∏è Duplicate rows: 0

üîë Ki·ªÉm tra tr√πng kh√≥a ch√≠nh ['item_id']:
+-------+-----+
|item_id|count|
+-------+-----+
+-------+-----+

‚úÖ Done!


===== üîç DATA QUALITY CHECK: ratings =====
üìå Rows: 28490116  |  Columns: 3

üß© Missing values per column:
+-------+------+-------+
|item_id|rating|user_id|
+-------+------+-------+
|0      |0     |0      |
+-------+------+-------+

‚ôªÔ∏è Duplicate rows: 51667

üîë Ki·ªÉm tra tr√πng kh√≥a ch√≠nh ['user_id', 'item_id']:
+-------+-------+-----+
|user_id|item_id|count|
+-------+-------+-----+
|606239 |588    |2

In [7]:
# ------------------------------------------------------------
# 6Ô∏è‚É£ L∆ØU D·ªÆ LI·ªÜU S·∫†CH D·∫†NG PARQUET (Warehouse)
# ------------------------------------------------------------

def save(df, name):
    df.write \
      .mode("overwrite") \
      .option("compression", "snappy") \
      .parquet(f"{WAREHOUSE_PATH}{name}")

save(metadata_df,  "dim_movies")
save(ratings_df,   "fact_ratings")
save(tags_df,      "dim_tags")
save(reviews_df,   "fact_reviews")
save(tag_count_df, "fact_tag_count")
save(survey_df,    "fact_survey")

print("\nüéØ DONE ‚Üí Data Warehouse ƒë√£ ƒë∆∞·ª£c t·∫°o th√†nh c√¥ng!")



üéØ DONE ‚Üí Data Warehouse ƒë√£ ƒë∆∞·ª£c t·∫°o th√†nh c√¥ng!


In [9]:
print("\nüì¶ Warehouse Storage:")

for folder in os.listdir(WAREHOUSE_PATH):
    full_path = os.path.join(WAREHOUSE_PATH, folder)

    # B·ªè qua file (ch·ªâ x√©t th∆∞ m·ª•c)
    if not os.path.isdir(full_path):
        continue

    size = sum(
        os.path.getsize(os.path.join(full_path, f))
        for f in os.listdir(full_path)
    ) / (1024 * 1024)

    print(f" - {folder:20} ‚Üí {size:8.2f} MB")

print("\n‚úÖ Warehouse ki·ªÉm tra ho√†n t·∫•t!")



üì¶ Warehouse Storage:
 - dim_movies           ‚Üí     8.49 MB
 - dim_tags             ‚Üí     0.02 MB
 - fact_ratings         ‚Üí   138.11 MB
 - fact_reviews         ‚Üí  2396.82 MB
 - fact_survey          ‚Üí     0.17 MB
 - fact_tag_count       ‚Üí     0.75 MB

‚úÖ Warehouse ki·ªÉm tra ho√†n t·∫•t!


In [11]:
spark.catalog.clearCache()
spark.stop()
print("üõë Spark Session stopped. RAM has been released.")


üõë Spark Session stopped. RAM has been released.
