In [1]:
import os
import sys
import json
import logging
import traceback
from datetime import datetime, timedelta

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    col, from_json, explode, to_date, date_format,
    dayofweek, dayofmonth, dayofyear, weekofyear,
    month, quarter, year, when, unix_timestamp,
    expr, sum, size, length, lit, to_timestamp, current_timestamp, max, udf
)
from pyspark.sql.types import (
    ArrayType, StructType, StructField, BooleanType,
    StringType, IntegerType, DateType, FloatType,
    DoubleType, LongType
)
from delta.tables import DeltaTable


In [2]:

spark = SparkSession.builder \
    .appName("TestJupyter") \
    .config("spark.jars", "jars/hadoop-aws-3.3.4.jar,jars/spark-sql-kafka-0-10_2.12-3.2.1.jar,jars/aws-java-sdk-bundle-1.12.262.jar,jars/delta-core_2.12-2.2.0.jar,jars/delta-storage-2.2.0.jar")\
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "conbo123") \
    .config("spark.hadoop.fs.s3a.secret.key", "123conbo") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
    .config("delta.enable-non-concurrent-writes", "true") \
    .config('spark.sql.warehouse.dir', "s3a://lakehouse/") \
    .getOrCreate()



25/06/08 13:48:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [8]:
gold_base_path = "s3a://lakehouse/gold/"

tables = [
    "fact_movies",
    "dim_movie",
    "dim_cast",
    "dim_crew",
    "dim_genre",
    "dim_date",
    "dim_keyword",
    "movie_cast",
    "movie_crew",
    "movie_genre",
    "movie_keyword"
]

for table in tables:
    df = spark.read.format("delta").load(f"{gold_base_path}{table}")
    count = df.count()
    print(f"{table}: {count} rows")

fact_movies: 45347 rows
dim_movie: 45347 rows
dim_cast: 205688 rows
dim_crew: 159659 rows
dim_genre: 32 rows
dim_date: 146098 rows
dim_keyword: 19956 rows
movie_cast: 560238 rows
movie_crew: 418831 rows
movie_genre: 90969 rows
movie_keyword: 156602 rows


In [9]:
gold_base_path = "s3a://lakehouse/gold/"

tables = [
    "fact_movies",
    "dim_movie",
    "dim_cast",
    "dim_crew",
    "dim_genre",
    "dim_date",
    "dim_keyword",
    "movie_cast",
    "movie_crew",
    "movie_genre",
    "movie_keyword"
]

for table in tables:
    df = spark.read.format("delta").load(f"{gold_base_path}{table}")
    count = df.count()
    print(f"{table}: {count} rows")

fact_movies: 45384 rows
dim_movie: 45384 rows
dim_cast: 206274 rows
dim_crew: 160512 rows


                                                                                

dim_genre: 32 rows
dim_date: 146098 rows
dim_keyword: 20010 rows
movie_cast: 561118 rows
movie_crew: 420358 rows
movie_genre: 91062 rows
movie_keyword: 156779 rows


In [6]:
Bronze_base_path = "s3a://lakehouse/bronze/"

tables = [
    "Bronze_Movies_API",
    "Bronze_Crews_API",
    "Bronze_Keywords_API"
]

for table in tables:
    df = spark.read.format("delta").load(f"{Bronze_base_path}{table}")
    count = df.count()
    print(f"{table}: {count} rows")

Bronze_Movies_API: 40 rows
Bronze_Crews_API: 40 rows


                                                                                

Bronze_Keywords_API: 40 rows


In [3]:
gold_base_path = "s3a://lakehouse/gold/"

tables = [
    "fact_movies",
    "dim_movie",
    "dim_cast",
    "dim_crew",
    "dim_genre",
    "dim_date",
    "dim_keyword",
    "movie_cast",
    "movie_crew",
    "movie_genre",
    "movie_keyword"
]

for table in tables:
    df = spark.read.format("delta").load(f"{gold_base_path}{table}")
    count = df.count()
    print(f"{table}: {count} rows")

25/05/25 11:42:15 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/05/25 11:42:21 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

fact_movies: 46490 rows
dim_movie: 46490 rows


                                                                                

dim_cast: 221257 rows
dim_crew: 194325 rows
dim_genre: 32 rows
dim_date: 146098 rows
dim_keyword: 21359 rows
movie_cast: 588545 rows


                                                                                

movie_crew: 551702 rows
movie_genre: 93085 rows
movie_keyword: 164894 rows


In [10]:
gold_base_path = "s3a://lakehouse/gold/"

tables = [
    "fact_movies",
    "dim_movie",
    "dim_cast",
    "dim_crew",
    "dim_genre",
    "dim_date",
    "dim_keyword",
    "movie_cast",
    "movie_crew",
    "movie_genre",
    "movie_keyword"
]

for table in tables:
    df = spark.read.format("delta").load(f"{gold_base_path}{table}")
    
    if table == "fact_movies":
        count = df.select("id").distinct().count()
        print(f"{table}: {count} distinct movie IDs")
    else:
        count = df.count()
        print(f"{table}: {count} rows")


                                                                                

fact_movies: 45384 distinct movie IDs
dim_movie: 45384 rows
dim_cast: 206274 rows
dim_crew: 160512 rows
dim_genre: 32 rows
dim_date: 146098 rows
dim_keyword: 20010 rows
movie_cast: 561118 rows
movie_crew: 420358 rows
movie_genre: 91062 rows
movie_keyword: 156779 rows


In [4]:
gold_base_path = "s3a://lakehouse/gold/"

tables = [
    "fact_movies",
    "dim_movie",
    "dim_cast",
    "dim_crew",
    "dim_genre",
    "dim_date",
    "dim_keyword",
    "movie_cast",
    "movie_crew",
    "movie_genre",
    "movie_keyword"
]

for table in tables:
    df = spark.read.format("delta").load(f"{gold_base_path}{table}")
    count = df.count()
    print(f"{table}: {count} rows")

                                                                                

fact_movies: 46289 rows


                                                                                

dim_movie: 46289 rows
dim_cast: 214598 rows


                                                                                

dim_crew: 181226 rows
dim_genre: 32 rows
dim_date: 45640 rows
dim_keyword: 20814 rows
movie_cast: 574835 rows
movie_crew: 500748 rows


                                                                                

movie_genre: 92477 rows
movie_keyword: 160507 rows
25/05/24 19:11:56 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 769948 ms exceeds timeout 120000 ms


In [40]:
df = spark.read.format("delta").load("s3a://lakehouse/bronze/Bronze_Movies_API")



                                                                                

In [41]:
df.count()

5293

In [4]:
from pyspark.sql.functions import col

df = spark.read.format("delta").load("s3a://lakehouse/bronze/Bronze_Movies_API")
df.select(col("read_time")).tail(40)

[Row(read_time=datetime.datetime(2025, 5, 25, 10, 5, 20, 666000)),
 Row(read_time=datetime.datetime(2025, 5, 24, 15, 38, 32, 73000)),
 Row(read_time=datetime.datetime(2025, 5, 24, 15, 38, 32, 73000)),
 Row(read_time=datetime.datetime(2025, 5, 24, 15, 38, 20, 422000)),
 Row(read_time=datetime.datetime(2025, 5, 24, 15, 38, 20, 422000)),
 Row(read_time=datetime.datetime(2025, 5, 24, 15, 38, 43, 197000)),
 Row(read_time=datetime.datetime(2025, 5, 24, 15, 38, 43, 197000)),
 Row(read_time=datetime.datetime(2025, 5, 24, 15, 38, 43, 197000)),
 Row(read_time=datetime.datetime(2025, 5, 24, 15, 38, 43, 197000)),
 Row(read_time=datetime.datetime(2025, 5, 24, 15, 38, 43, 197000)),
 Row(read_time=datetime.datetime(2025, 5, 24, 14, 22, 51, 939000)),
 Row(read_time=datetime.datetime(2025, 5, 24, 14, 22, 51, 939000)),
 Row(read_time=datetime.datetime(2025, 5, 25, 10, 7, 26, 209000)),
 Row(read_time=datetime.datetime(2025, 5, 25, 10, 7, 26, 209000)),
 Row(read_time=datetime.datetime(2025, 5, 25, 10, 7, 

In [4]:
df_Crews = spark.read.format("delta").load("s3a://lakehouse/gold/dim_cast")


                                                                                

In [5]:
df_Crews.count()

205735

In [84]:
dim_cast_df = df_Crews.select(
    explode(col("cast")).alias("cast")
    ).select(
        col("cast.id").alias("id"),
        col("cast.name").alias("name"),
        col("cast.profile_path").alias("profile_path"),
        col("cast.gender").alias("gender")
)

In [87]:
dim_cast_df.groupBy("id").count().filter("count > 1").show()
dim_cast_df = dim_cast_df.dropDuplicates(["id"])

+---+-----+
| id|count|
+---+-----+
+---+-----+



In [None]:
dim_cast_df.groupBy("id").count().filter("count > 1").show()

In [88]:
dim_cast = DeltaTable.forPath(spark, "s3a://lakehouse/gold/dim_cast")
dim_cast.alias("target").merge(
    dim_cast_df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

25/05/22 20:51:13 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
25/05/22 20:51:13 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
25/05/22 20:51:13 WARN HintErrorLogger: Hint (strategy=broadcast) is not supported in the query: build left for full outer join.
25/05/22 20:51:13 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [69]:
dim_cast_df = df_Crews.select(
    explode(col("cast")).alias("cast")
    ).select(
        col("cast.id").alias("id"),
        col("cast.name").alias("name"),
        col("cast.profile_path").alias("profile_path"),
        col("cast.gender").alias("gender")
)

try:
    dim_cast = DeltaTable.forPath(spark, "s3a://lakehouse/gold/dim_cast")
    dim_cast.alias("target").merge(
        dim_cast_df.alias("source"),
        "target.id = source.id"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
except:
    dim_cast_df.write.format("delta").save("s3a://lakehouse/gold/dim_cast")

AnalysisException: Cannot write to already existent path s3a://lakehouse/gold/dim_cast without setting OVERWRITE = 'true'.

In [75]:
dim_cast_df = df_Crews.select(
    explode(col("cast")).alias("cast")
    ).select(
        col("cast.id").alias("id"),
        col("cast.name").alias("name"),
        col("cast.profile_path").alias("profile_path"),
        col("cast.gender").alias("gender")
)
dim_cast_df.count()

46972

In [76]:
dim_cast_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- profile_path: string (nullable = true)
 |-- gender: integer (nullable = true)



In [79]:
df_Movies = spark.read.format("delta").load("s3a://lakehouse/gold/dim_cast")

In [80]:
df_Movies.count()

229376

In [74]:
df_Movies.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- profile_path: string (nullable = true)
 |-- id: integer (nullable = true)



In [6]:
import os
import sys
import traceback
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import  explode,col, expr,when,to_date, sum, from_json,size,length, lit, to_timestamp,current_timestamp, max
from pyspark.sql.types import  ArrayType,StructType, StructField, BooleanType, StringType, IntegerType, DateType, FloatType,DoubleType, LongType
from pyspark.sql.functions import (
    col, from_json, explode, to_date, date_format,
    dayofweek, dayofmonth, dayofyear, weekofyear,
    month, quarter, year, when, unix_timestamp
)
from delta.tables import DeltaTable
from pyspark.sql import functions as F

from datetime import datetime, timedelta

In [None]:
df_Movies = spark.read.format("delta").load("s3a://lakehouse/bronze/Bronze_Movies_API")
df_Movies = df_Movies.filter(f"read_time > '{last_read_time_movie}'")
df_Crews = spark.read.format("delta").load("s3a://lakehouse/bronze/Bronze_Crews_API")
df_Crews = df_Crews.filter(f"read_time > '{last_read_time_crew}'")
df_Keywords = spark.read.format("delta").load("s3a://lakehouse/bronze/Bronze_Keywords_API")
df_Keywords = df_Keywords.filter(f"read_time > '{last_read_time_keyword}'")

In [47]:
try:
    readTime = spark.read.format("delta").load("s3a://lakehouse/ReadTime")
except:
    spark.sql("""
CREATE TABLE IF NOT EXISTS delta.`s3a://lakehouse/ReadTime` (
    task_id STRING,
    last_read_time TIMESTAMP
) USING DELTA
""")
    spark.sql("""
    INSERT INTO delta.`s3a://lakehouse/ReadTime`
    VALUES 
      ('BatchApi_Process_Movies', '1970-01-01 00:00:00'),
      ('BatchApi_Process_Crews', '1970-01-01 00:00:00'),
      ('BatchApi_Process_Keywords', '1970-01-01 00:00:00')
""")
    readTime = spark.read.format("delta").load("s3a://lakehouse/ReadTime")

# result = readTime.filter(f"task_id = 'BatchApi_Process'").select("last_read_time").collect()
# last_read_time = result[0][0]
result_movie = readTime.filter(f"task_id = 'BatchApi_Process_Movies'").select("last_read_time").collect()
result_crew = readTime.filter(f"task_id = 'BatchApi_Process_Crews'").select("last_read_time").collect()
result_keyword = readTime.filter(f"task_id = 'BatchApi_Process_Keywords'").select("last_read_time").collect()

last_read_time_movie = result_movie[0][0]
last_read_time_crew = result_crew[0][0]
last_read_time_keyword = result_keyword[0][0]


df_Movies = spark.read.format("delta").load("s3a://lakehouse/bronze/Bronze_Movies_API")
df_Movies = df_Movies.filter(f"read_time > '{last_read_time_movie}'")
df_Crews = spark.read.format("delta").load("s3a://lakehouse/bronze/Bronze_Crews_API")
df_Crews = df_Crews.filter(f"read_time > '{last_read_time_crew}'")
df_Keywords = spark.read.format("delta").load("s3a://lakehouse/bronze/Bronze_Keywords_API")
df_Keywords = df_Keywords.filter(f"read_time > '{last_read_time_keyword}'")

schema_movie = StructType([
    StructField("id", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("original_title", StringType(), True),
    StructField("original_language", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("release_date", StringType(), True),
    StructField("runtime", DoubleType(), True),
    StructField("budget", IntegerType(), True),
    StructField("revenue", DoubleType(), True),
    StructField("popularity", DoubleType(), True),
    StructField("vote_average", DoubleType(), True),
    StructField("vote_count", DoubleType(), True),
    StructField("tagline", StringType(), True),
    StructField("status", StringType(), True),
    StructField("homepage", StringType(), True),
    StructField("genres", ArrayType(StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])), True)
])
schema_crew = StructType([
    StructField("id", IntegerType(), True),
    StructField("cast", ArrayType(
        StructType([
            StructField("adult", BooleanType(), True),
            StructField("gender", IntegerType(), True),
            StructField("id", IntegerType(), True),
            StructField("known_for_department", StringType(), True),
            StructField("name", StringType(), True),
            StructField("original_name", StringType(), True),
            StructField("popularity", DoubleType(), True),
            StructField("profile_path", StringType(), True),
            StructField("cast_id", IntegerType(), True),
            StructField("character", StringType(), True),
            StructField("credit_id", StringType(), True),
            StructField("order", IntegerType(), True)
        ])
    ), True),
    StructField("crew", ArrayType(
        StructType([
            StructField("adult", BooleanType(), True),
            StructField("gender", IntegerType(), True),
            StructField("id", IntegerType(), True),
            StructField("known_for_department", StringType(), True),
            StructField("name", StringType(), True),
            StructField("original_name", StringType(), True),
            StructField("popularity", DoubleType(), True),
            StructField("profile_path", StringType(), True),
            StructField("credit_id", StringType(), True),
            StructField("department", StringType(), True),
            StructField("job", StringType(), True)
        ])
    ), True)
])
schema_keyword = StructType([
    StructField("id", IntegerType(), True),
    StructField("keywords", ArrayType(
        StructType([
            StructField("id", IntegerType(), True),
            StructField("name", StringType(), True)
        ])
    ), True)
])
df_Movies = df_Movies.withColumn("data", from_json(col("raw_json"), schema_movie)) \
                             .select("data.*", "read_time")

df_Movies = df_Movies.select(
    col("id").cast("integer"),
    col("budget").cast("integer"),
    col("popularity").cast("double").alias("popularity"),
    col("revenue").cast("double").alias("revenue"),
    col("vote_average").cast("double").alias("vote_average"),
    col("vote_count").cast("double").alias("vote_count"),
    date_format(col("release_date"), "yyyyMMdd").cast("integer").alias("date_id"),
    col("title"),                                                # Giữ nguyên kiểu string
    col("original_title"),                                       # Giữ nguyên kiểu string
    col("original_language").alias("language"),                  # Đổi tên trường: original_language -> language
    col("overview"),                                             # Giữ nguyên kiểu string
    col("runtime").cast("double").alias("runtime"),              # Ép về double
    col("tagline"),                                              # Giữ nguyên kiểu string
    col("status"),                                               # Giữ nguyên kiểu string
    col("homepage"),
    col("genres"),
    col("release_date"),
    col("read_time")
)


df_Keywords = df_Keywords.withColumn("data", from_json(col("raw_json"), schema_keyword)) \
                             .select("data.*", "read_time")
df_Crews = df_Crews.withColumn("data", from_json(col("raw_json"), schema_crew)) \
                             .select("data.*", "read_time")

# last_read_time = Variable.get("last_read_time", default_var="1970-01-01T00:00:00")
# last_read_time_ts = to_timestamp(lit(last_read_time), "yyyy-MM-dd HH:mm:ss")


# df = df.filter(
#                                 (col("read_time") >= last_read_time_ts)
#                             )

# last_read_time = Variable.get("last_read_time", default_var="1970-01-01T00:00:00")

# # Chuyển đổi thành kiểu timestamp nếu cần
# filtered_df = df.filter(col("read_time") > lit(last_read_time).cast("timestamp"))



# df_Movies = df_Movies.filter(f"read_time > '{last_read_time}'")
# df_Crews = df_Crews.filter(f"read_time > '{last_read_time}'")
# df_Keywords = df_Keywords.filter(f"read_time > '{last_read_time}'")

try:
    tb_movie = DeltaTable.forPath(spark, "s3a://lakehouse/silver/Silver_Movies_API")
    tb_movie.alias("target").merge(
        df_Movies.alias("source"),
        "target.id = source.id"
    ).whenMatchedUpdateAll() .whenNotMatchedInsertAll().execute()
except:
    df_Movies.write.format("delta").save("s3a://lakehouse/silver/Silver_Movies_API")

try:
    tb_crew = DeltaTable.forPath(spark, "s3a://lakehouse/silver/Silver_Crews_API")
    tb_crew.alias("target").merge(
        df_Crews.alias("source"),
        "target.id = source.id"
    ).whenMatchedUpdateAll() .whenNotMatchedInsertAll().execute()
except:
    df_Crews.write.format("delta").save("s3a://lakehouse/silver/Silver_Crews_API")


try:
    tb_keyword = DeltaTable.forPath(spark, "s3a://lakehouse/silver/Silver_Keywords_API")
    tb_keyword.alias("target").merge(
        df_Keywords.alias("source"),
        "target.id = source.id"
    ).whenMatchedUpdateAll() .whenNotMatchedInsertAll().execute()
except:
    df_Keywords.write.format("delta").save("s3a://lakehouse/silver/Silver_Keywords_API")


25/05/22 20:35:13 WARN DeltaLog: Change in the table id detected while updating snapshot. 
Previous snapshot = Snapshot(path=s3a://lakehouse/silver/Silver_Movies_API/_delta_log, version=1, metadata=Metadata(51dda7bb-d218-4077-b63b-1a70ad987856,null,null,Format(parquet,Map()),{"type":"struct","fields":[{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"budget","type":"integer","nullable":true,"metadata":{}},{"name":"popularity","type":"double","nullable":true,"metadata":{}},{"name":"revenue","type":"double","nullable":true,"metadata":{}},{"name":"vote_average","type":"double","nullable":true,"metadata":{}},{"name":"vote_count","type":"double","nullable":true,"metadata":{}},{"name":"date_id","type":"integer","nullable":true,"metadata":{}},{"name":"title","type":"string","nullable":true,"metadata":{}},{"name":"original_title","type":"string","nullable":true,"metadata":{}},{"name":"language","type":"string","nullable":true,"metadata":{}},{"name":"overview","type":"string

In [20]:
df_Keywords.count()

1744

In [22]:
df_Keywords.head()

Row(id=985939, keywords=[Row(id=3298, name='hallucination'), Row(id=9743, name='stranded'), Row(id=33787, name='tower'), Row(id=197623, name='free climbing'), Row(id=209439, name='sport climbing'), Row(id=234505, name='adrenaline junkie'), Row(id=235677, name='dehydration'), Row(id=309974, name='absurd'), Row(id=316832, name='tense'), Row(id=336497, name='survival thriller')], read_time=datetime.datetime(2025, 5, 22, 19, 56, 11, 862000))

In [12]:
df_Movies.show()

+-------+---------+----------+-------------+------------+----------+--------+--------------------+--------------------+--------+--------------------+-------+--------------------+--------+--------------------+--------------------+------------+--------------------+
|     id|   budget|popularity|      revenue|vote_average|vote_count| date_id|               title|      original_title|language|            overview|runtime|             tagline|  status|            homepage|              genres|release_date|           read_time|
+-------+---------+----------+-------------+------------+----------+--------+--------------------+--------------------+--------+--------------------+-------+--------------------+--------+--------------------+--------------------+------------+--------------------+
|1233620|        0|  108.2867|          0.0|         6.8|       4.0|20250227| ¡Qué Huevos, Sofía!| ¡Qué Huevos, Sofía!|      es|Sofia gives every...|  106.0|                    |Released|                    |

In [17]:
df_Crews = spark.read.format("delta").load("s3a://lakehouse/bronze/Bronze_Keywords_API")

In [18]:
df_Crews.head(1)

[Row(raw_json='{"id": 985939, "keywords": [{"id": 3298, "name": "hallucination"}, {"id": 9743, "name": "stranded"}, {"id": 33787, "name": "tower"}, {"id": 197623, "name": "free climbing"}, {"id": 209439, "name": "sport climbing"}, {"id": 234505, "name": "adrenaline junkie"}, {"id": 235677, "name": "dehydration"}, {"id": 309974, "name": "absurd"}, {"id": 316832, "name": "tense"}, {"id": 336497, "name": "survival thriller"}]}', topic='tmdb_keywords', read_time=datetime.datetime(2025, 5, 22, 19, 56, 11, 862000))]

In [46]:
dim_cast = spark.read.format("delta").load("s3a://lakehouse/gold/dim_cast")
dim_cast.count()

46972

In [42]:
dim_cast.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- profile_path: string (nullable = true)
 |-- id: integer (nullable = true)



In [48]:
dim_cr = spark.read.format("delta") \
    .load("s3a://lakehouse/gold/dim_crew")
dim_cr.count()

132269

In [49]:
dim_cr.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- profile_path: string (nullable = true)
 |-- gender: integer (nullable = true)



In [37]:
dim_cr.show()

+-------+-----------------+--------------------+------+
|     id|             name|        profile_path|gender|
+-------+-----------------+--------------------+------+
|  11874|   Neal H. Moritz|/xgCTFHhbQh3QyIT7...|     2|
|  93364|      Jeff Fowler|/wExdubFgeBkEUP8M...|     2|
|  64227|    Brandon Trost|/ojlCBUXBaueoNV0d...|     2|
|  76054|    Luke Freeborn|                null|     2|
|1908574|        Al LeVine|                null|     2|
|1538618|         AJ Riach|/enK8bxdYmBPj6Oy3...|     2|
|  56827|   Tom Holkenborg|/oRxT9sySUewNqcfm...|     2|
|4527293|  Deanna Marshall|                null|     1|
|1634438|  Allan Padelford|                null|     2|
|2441229|     Oleg Podobin|                null|     2|
|  63453|    Patrick Casey|/tAhWI0rojGBVf9ik...|     2|
|  63449|      Josh Miller|/m9dCtaDDXycswi47...|     2|
|1492059| John Whittington|/oaaQ6BsmryuEeBlf...|     2|
|1486976|    Christina Low|/c8mwAXsOPFxM6sXS...|     1|
|1378696|Ethan Van der Ryn|/cee7w7XpNMSXyFv0...|

In [35]:
facr = spark.read.format("delta") \
    .load("s3a://lakehouse/gold/fact_movies")

from pyspark.sql.functions import col

facr.filter((col("date_id") / 10000).cast("int") == 2025).count()

432

In [36]:
facr.printSchema()

root
 |-- id: integer (nullable = true)
 |-- budget: integer (nullable = true)
 |-- popularity: double (nullable = true)
 |-- revenue: double (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- date_id: long (nullable = true)



In [38]:
dim_g = spark.read.format("delta").load("s3a://lakehouse/gold/movie_cast")
dim_g.count()

46972

In [40]:
dim_g .printSchema()

root
 |-- cast_id: integer (nullable = true)
 |-- character: string (nullable = true)
 |-- movie_id: integer (nullable = true)
 |-- order: integer (nullable = true)



In [108]:
df_Movies = spark.read.format("delta").load("s3a://lakehouse/silver/Silver_Movies_API")
df_Crews = spark.read.format("delta").load("s3a://lakehouse/silver/Silver_Crews_API")
df_Keywords = spark.read.format("delta").load("s3a://lakehouse/silver/Silver_Keywords_API")
# last_read_time = Variable.get("last_read_time_SG", default_var="1970-01-01T00:00:00")
# last_read_time_ts = to_timestamp(lit(last_read_time), "yyyy-MM-dd HH:mm:ss")
try:
    readTime = spark.read.format("delta").load("s3a://lakehouse/ReadTime")
except:
    spark.sql("""
CREATE TABLE IF NOT EXISTS delta.`s3a://lakehouse/ReadTime` (
    task_id STRING,
    last_read_time TIMESTAMP
) USING DELTA
""")
    spark.sql("""
    INSERT INTO delta.`s3a://lakehouse/ReadTime`
    VALUES 
      ('BatchApi_Process_Movies', '1970-01-01 00:00:00'),
      ('BatchApi_Process_Crews', '1970-01-01 00:00:00'),
      ('BatchApi_Process_Keywords', '1970-01-01 00:00:00')
""")
    readTime = spark.read.format("delta").load("s3a://lakehouse/ReadTime")

result_movie = readTime.filter(f"task_id = 'BatchApi_Process_Movies'").select("last_read_time").collect()
result_crew = readTime.filter(f"task_id = 'BatchApi_Process_Crews'").select("last_read_time").collect()
result_keyword = readTime.filter(f"task_id = 'BatchApi_Process_Keywords'").select("last_read_time").collect()

last_read_time_movie = result_movie[0][0]
last_read_time_crew = result_crew[0][0]
last_read_time_keyword = result_keyword[0][0]


df_Movies = df_Movies.filter(f"read_time > '{last_read_time_movie}'")
df_Crews = df_Crews.filter(f"read_time > '{last_read_time_crew}'")

In [109]:
df_Crews.show()

+---+----+----+---------+
| id|cast|crew|read_time|
+---+----+----+---------+
+---+----+----+---------+



In [110]:
dim_cast_df = df_Crews.select(
    explode(col("cast")).alias("cast")
    ).select(
        col("cast.id").alias("cast_id"),
        col("cast.name").alias("name"),
        col("cast.profile_path").alias("profile_path"),
        col("cast.gender").alias("gender")
)



In [112]:
dim_cast = DeltaTable.forPath(spark, "s3a://lakehouse/gold/dim_cast")
dim_cast.alias("target").merge(
    dim_cast_df.alias("source"),
    "target.cast_id = source.cast_id"
).whenNotMatchedInsertAll().execute()

AnalysisException: cannot resolve target.cast_id in search condition given columns target.name, target.gender, target.profile_path, target.id, source.cast_id, source.name, source.profile_path, source.gender; line 1 pos 0

In [None]:

try:
    dim_cast = DeltaTable.forPath(spark, "s3a://lakehouse/gold/dim_cast")
    dim_cast.alias("target").merge(
        dim_cast_df.alias("source"),
        "target.cast_id = source.cast_id"
    ).whenNotMatchedInsertAll().execute()
except:
    dim_cast_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/gold/dim_cast")

In [103]:
dim_cast = DeltaTable.forPath(spark, "s3a://lakehouse/gold/dim_cast")

In [104]:
dim_cast.count()

AttributeError: 'DeltaTable' object has no attribute 'count'

In [105]:
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/gold/dim_cast")

In [113]:
df = spark.read.format("delta") \
    .load("s3a://lakehouse/gold/dim_cast")


In [114]:
df.count()

205735

In [97]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- profile_path: string (nullable = true)
 |-- id: integer (nullable = true)



In [32]:
df.show()

+-------+-------------------+--------------------+------+-----+
|cast_id|               name|        profile_path|gender|order|
+-------+-------------------+--------------------+------+-----+
|5443785|   Braedon Trujillo|/tJrzXw3sIJBSqbSb...|     2|    0|
|1258966|    Iqbaal Ramadhan|/AoETtoy7K04cRCtP...|     2|    0|
|2034251|      Angga Yunanda|/1M3cLNTQLL1Vy6Ub...|     2|    1|
|1878346|      Aghniny Haque|/5pFyssmQGxxCu5ge...|     1|    2|
|1972045|      Rachel Amanda|/c2Ts2WgLSUUZV6xT...|     1|    3|
|1825362|          Ari Irham|/1GL3vMkA2MedpOsk...|     2|    4|
|2110619|        Umay Shahab|/6iRD80jucvDQWIyY...|     2|    5|
|1157592|   Atiqah Hasiholan|/fmvlEKtxQQLHzmFH...|     1|    6|
|1155281|     Tio Pakusadewo|/uAbKPJPbXVg1pi1b...|     2|    7|
|1158143|         Dwi Sasono|/3l5aJxHeDI2wMajt...|     2|    8|
|1503670|      Ganindra Bimo|/6nsI1LCCadVaMKjK...|     2|    9|
|1571858|        Andrea Dian|/7G9jaBQ437o4GSbe...|     1|   10|
|2095812|      Muhammad Khan|/lJjEBY9c23

In [17]:
id_counts = df.groupBy("profile_path").agg(F.count("*").alias("count"))

# Lọc ra các id xuất hiện nhiều hơn 1 lần
duplicate_ids = id_counts.filter(F.col("count") > 1)

duplicate_ids.show()

+--------------------+------+
|        profile_path| count|
+--------------------+------+
|/7MLLqCXPFEF9PwnV...|     2|
|                null|128430|
|/924xXlQnyah5S7IX...|     2|
|/zw5NqIuPMJ9dOcat...|     2|
|/XOBozygTE94ERu1g...|     2|
|/uS4a3epqXVtjTUGR...|     2|
|/vXC9xIRPuI60wN2P...|     2|
|/zUqyn3aQXTzeP1n8...|     3|
|/cC4ogX4QInjkCSSk...|     2|
|/qV7OgJ3rcRmwKPkg...|     3|
|/eA6I3oUAR7iyIgxD...|     2|
|/njnqNhDGKxM2qLye...|     2|
|/miYaN1tdR41HqoxF...|     2|
|/67ezLWzb4bV3Z6IO...|     3|
+--------------------+------+



In [9]:
df.movi

[Row(name='Harrison Ford', gender=2, profile_path='/7CcoVFTogQgex2kJkXKMe8qHZrC.jpg', id=3),
 Row(name='Peter Cushing', gender=2, profile_path='/fg7ufC0IMr6VasQzzdmTtX5ycQF.jpg', id=5),
 Row(name='Allison Janney', gender=1, profile_path='/fido6hwI8tFSZNt6HtP2DZH2eu6.jpg', id=19),
 Row(name='Elizabeth Perkins', gender=1, profile_path='/bkdWnnrMADJcFpKAOitMjQH3uEQ.jpg', id=20),
 Row(name='Sally Field', gender=1, profile_path='/ymhpsxujOO3a9qaGYSpkenCt9Le.jpg', id=35),
 Row(name='Alan Silvestri', gender=2, profile_path='/chEsfnDEtRmv1bfOaNAoVEzhCc6.jpg', id=37),
 Row(name='Orson Welles', gender=2, profile_path='/2DF3e98c7GGa1uJJvpgIiMPg0h2.jpg', id=40),
 Row(name='Sean Bean', gender=2, profile_path='/iIxP2IzvcLgr5WaTBD4UfSqaV3q.jpg', id=48),
 Row(name='Maurice Roeves', gender=2, profile_path='/qXYjnIHClkey4QQxPG5D5JD7Y74.jpg', id=54),
 Row(name='Gary Oldman', gender=2, profile_path='/v4qJEX4TEgEt2Zghldbd71AFjbV.jpg', id=64),
 Row(name='Robert August', gender=2, profile_path=None, id=92),


In [19]:
df.count()

560238

In [20]:
df = spark.read.format("delta").load("s3a://lakehouse/silver/Silver_Movies_API")


In [23]:
df = spark.read.format("delta").load("s3a://lakehouse/gold/dim_movie")


In [24]:
df.count()

45434

In [16]:
df.count()

136418

In [18]:
df.count()

1133

In [17]:
def fix_json_format(crew_str):
    if crew_str is None:
        return None
    try:
        # Dùng json.loads để kiểm tra nếu hợp lệ, nếu không thì sửa
        fixed_json = json.dumps(eval(crew_str))  # Chuyển đổi thành JSON chuẩn
        return fixed_json
    except Exception as e:
        return None  # Trả về None nếu có lỗi
fix_json_udf = udf(fix_json_format, StringType()) 
df_dim_cast = spark.read.format("delta").load("s3a://lakehouse/silver/credit")
df_dim_cast= df_dim_cast.withColumn("cast", fix_json_udf(col("cast")))


cast_schema = ArrayType(
    StructType([
        StructField("cast_id", IntegerType(), True),
        StructField("character", StringType(), True),
        StructField("credit_id", StringType(), True),
        StructField("gender", IntegerType(), True),
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True),
        StructField("order", IntegerType(), True),
        StructField("profile_path", StringType(), True)
    ])
)
df_parsed = df_dim_cast.withColumn("cast", from_json(col("cast"), cast_schema))

# Explode cột cast để có nhiều dòng
df_exploded = df_parsed.withColumn("cast", explode(col("cast")))

# Chọn các trường cần thiết
df_selected_Dim = df_exploded.select(
    col("cast.name"),
    col("cast.gender"),
    col("cast.profile_path"),
    col("cast.id")
)
df_selected_Bridge = df_exploded.select(
    col("cast.id").alias("cast_id"),
    col("cast.character"),
    col("id").alias("movie_id"),
    col("cast.order")

)

print("df_exploded" ,df_exploded.count())
try:
    dim_cast = DeltaTable.forPath(spark, "s3a://lakehouse/gold/dim_cast")
    dim_cast.alias("target").merge(
        df_selected_Dim.alias("source"),
        "target.id = source.id"
    ).whenNotMatchedInsertAll().execute()
except:
    df_selected_Dim.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/gold/dim_cast")
    
try:
    movie_cast = DeltaTable.forPath(spark, "s3a://lakehouse/gold/movie_cast")
    movie_cast.alias("target").merge(
        df_selected_Bridge.alias("source"),
        "target.movie_id = source.movie_id AND target.cast_id = source.cast_id AND target.character = source.character"
    ).whenNotMatchedInsertAll().execute()
except:
    df_selected_Bridge.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/gold/movie_cast")

df_exploded 560668


In [16]:
df_selected_Bridge.count()

560668

In [15]:
df.count()

607386

In [23]:
df.head(20)

[Row(id=31357, comb='singlemother interracialrelationship chickflick divorce basedonnovel WhitneyHouston AngelaBassett LorettaDevine ForestWhitaker Comedy Drama Romance'),
 Row(id=46785, comb='celebration money goldfish fish AidaMohammadkhani FereshtehSadreOrafaiy MohsenKafili JafarPanahi Drama Family'),
 Row(id=9614, comb='lossofmother taxes golf lossoffather sport BenStiller AdamSandler ChristopherMcDonald DennisDugan Comedy'),
 Row(id=51352, comb='typhus burglary fascism victim prejudice auschwitz rabbit interview GlennClose KennethBranagh JonBlair Foreign Drama Documentary'),
 Row(id=73067, comb='flashback ireland boy writer troopship CorbanWalker MattDillon GabrielByrne MichaelLindsay-Hogg War Drama Romance'),
 Row(id=2292, comb="loser salesclerk aftercreditsstinger BrianO'Halloran JasonMewes JeffAnderson KevinSmith Comedy"),
 Row(id=9271, comb='visualeffect computerprogram hologram virtualreality artificialintelligence policetraining ex-cop android DenzelWashington KellyLynch Rus

In [10]:
df = spark.read.format("delta").load("s3a://lakehouse/silver/Silver_API_1")
# last_read_time = Variable.get("last_read_time_SG", default_var="1970-01-01T00:00:00")
# last_read_time_ts = to_timestamp(lit(last_read_time), "yyyy-MM-dd HH:mm:ss")
try:
    readTime = spark.read.format("delta").load("s3a://lakehouse/ReadTime")
except:
    spark.sql("""
CREATE TABLE IF NOT EXISTS delta.`s3a://your-bucket/processing_state` (
    task_id STRING,
    last_read_time TIMESTAMP
) USING DELTA
""")
    spark.sql("""
    INSERT INTO delta.`s3a://lakehouse/ReadTime`
    VALUES ('BatchApi_Process', '1970-01-01 00:00:00')
    """)
    readTime = spark.read.format("delta").load("s3a://lakehouse/ReadTime")

result = readTime.filter(f"task_id = 'BatchApi_Process'").select("last_read_time").collect()
last_read_time = result[0][0]

df.filter(f"read_time > '{last_read_time}'")
# --------------------------------------------------
# FACT TABLE: fact_movie
# --------------------------------------------------
fact_movie_df = df.select(
    col("id"),
    col("budget"),
    col("popularity"),
    col("revenue"),
    col("vote_average"),
    col("vote_count"),
    col("date_id")
).dropDuplicates(["id"])

try:

    fact_movie = DeltaTable.forPath(spark, "s3a://lakehouse/gold/fact_movies")

    fact_movie.alias("target").merge(
        fact_movie_df.alias("source"),
        "target.id = source.id"
    ).whenNotMatchedInsertAll().execute()
except :
    fact_movie_df.write.format("delta").mode("overwrite").save("s3a://lakehouse/gold/fact_movies")



# fact_movie_query = fact_movie_df.writeStream \
#     .format("delta") \
#     .outputMode("append") \
#     .option("checkpointLocation", "s3a://lakehouse/check/fact_movies") \
#     .option("path", "s3a://lakehouse/gold/fact_movies") \
#     .start()

# --------------------------------------------------
# DIMENSION TABLE: dim_movie
# Lưu ý: Đổi tên các cột để khớp với schema của Delta table hiện có:
# Schema mong đợi: id, title, original_title, language, overview, runtime, tagline, status, homepage
# --------------------------------------------------
dimmovie_df = df.select(
    col("id"),                        # Ép sang long
    col("title"),                                                # Giữ nguyên kiểu string
    col("original_title"),                                       # Giữ nguyên kiểu string
    col("language"),                  # Đổi tên trường: original_language -> language
    col("overview"),                                             # Giữ nguyên kiểu string
    col("runtime"),              # Ép về double
    col("tagline"),                                              # Giữ nguyên kiểu string
    col("status"),                                               # Giữ nguyên kiểu string
    col("homepage")                                              # Giữ nguyên kiểu string
).dropDuplicates(["id"])


try:
    dimmovie = DeltaTable.forPath(spark, "s3a://lakehouse/gold/dim_movie")
    dimmovie.alias("target").merge(
        dimmovie_df.alias("source"),
        "target.id = source.id"
    ).whenNotMatchedInsertAll().execute()
except:
    dimmovie_df.write.format("delta").mode("overwrite").save("s3a://lakehouse/gold/dim_movie")
# dimmovie_query = dimmovie_df.writeStream \
#     .format("delta") \
#     .outputMode("append") \
#     .option("checkpointLocation", "s3a://lakehouse/check/dim_movie") \
#     .option("path", "s3a://lakehouse/gold/dim_movie") \
#     .start()

# --------------------------------------------------
# DIMENSION TABLE: dim_date
# --------------------------------------------------
dimdate_df = df.withColumn("release_date", to_date(col("release_date"), "yyyy-MM-dd")) \
    .select(
        date_format(col("release_date"), "yyyy-MM-dd").alias("release_date"),
        dayofweek(col("release_date")).alias("DayOfWeek"),
        date_format(col("release_date"), "EEEE").alias("DayName"),
        dayofmonth(col("release_date")).alias("DayOfMonth"),
        dayofyear(col("release_date")).alias("DayOfYear"),
        weekofyear(col("release_date")).alias("WeekOfYear"),
        date_format(col("release_date"), "MMMM").alias("MonthName"),
        month(col("release_date")).alias("MonthOfYear"),
        quarter(col("release_date")).alias("Quarter"),
        year(col("release_date")).alias("Year"),
        when(dayofweek(col("release_date")).between(2, 6), True).otherwise(False).alias("IsWeekDay"),
        col("date_id")
    ).dropDuplicates(["date_id"])

try:
    dimdate = DeltaTable.forPath(spark, "s3a://lakehouse/gold/dim_date")
    dimdate.alias("target").merge(
        dimdate_df.alias("source"),
        "target.date_id = source.date_id"
    ).whenNotMatchedInsertAll().execute()
except:
    dimdate_df.write.format("delta").mode("overwrite").save("s3a://lakehouse/gold/dim_date")

# dim_date_query = dimdate_df.writeStream \
#     .format("delta") \
#     .outputMode("append") \
#     .option("checkpointLocation", "s3a://lakehouse/check/dim_date") \
#     .option("path", "s3a://lakehouse/gold/dim_date") \
#     .start()

# --------------------------------------------------
# DIMENSION TABLE: dim_genre
# --------------------------------------------------
dim_genre_df = df.select(
    explode(col("genres")).alias("genre")
).select(
    col("genre.id").cast("integer").alias("id"),
    col("genre.name").alias("name")
).dropDuplicates(["id"])

try:
    dim_genre = DeltaTable.forPath(spark, "s3a://lakehouse/gold/dim_genre")
    dim_genre.alias("target").merge(
        dim_genre_df.alias("source"),
        "target.id = source.id"
    ).whenNotMatchedInsertAll().execute()
except:
    dim_genre_df.write.format("delta").mode("overwrite").save("s3a://lakehouse/gold/dim_genre")

# dim_genre_query = dim_genre_df.writeStream \
#     .format("delta") \
#     .outputMode("append") \
#     .option("checkpointLocation", "s3a://lakehouse/check/dim_genre") \
#     .option("path", "s3a://lakehouse/gold/dim_genre") \
#     .start()

# --------------------------------------------------
# BRIDGE TABLE: movie_genres
# --------------------------------------------------
movie_genre_df = df.select(
    col("id").alias("movie_id"),
    explode(col("genres")).alias("genre")
).select(
    col("genre.id").cast("integer").alias("genres_id"),
    col("movie_id").alias("id")
)

try:
    movie_genre = DeltaTable.forPath(spark, "s3a://lakehouse/gold/movie_genre")
    movie_genre.alias("target").merge(
        movie_genre_df.alias("source"),
        "target.id = source.id AND target.genres_id = source.genres_id"
    ).whenNotMatchedInsertAll().execute()
except:
    movie_genre_df.write.format("delta").mode("overwrite").save("s3a://lakehouse/gold/movie_genres")

# movie_genres_query = movie_genre_df.writeStream \
#     .format("delta") \
#     .outputMode("append") \
#     .option("checkpointLocation", "s3a://lakehouse/check/movie_genres") \
#     .option("path", "s3a://lakehouse/gold/movie_genres") \
#     .start()


# date = spark.sql("SELECT current_timestamp() AS ts")

# # Lấy giá trị timestamp từ DataFrame
# new_last_read_time = date.collect()[0]["ts"]

# # Chuyển timestamp thành chuỗi ISO format (nếu cần)
# new_last_read_time_str = new_last_read_time.isoformat()

# # Lưu vào Airflow Variable
# Variable.set("last_read_time_SG", new_last_read_time_str)

# new_last_read_time = datetime.now().isoformat()
# Variable.set("last_read_time_SG", new_last_read_time)


max_read_time_row = df.agg(max("read_time")).collect()
max_read_time = max_read_time_row[0]["max(read_time)"] if max_read_time_row else None


readTime = spark.read.format("delta").load("s3a://lakehouse/ReadTime")
    
    # Cập nhật hoặc chèn bản ghi mới
updated_df = readTime.filter(f"task_id != 'BatchApi_Process'").union(
    spark.createDataFrame([("BatchApi_Process", max_read_time)], ["task_id", "last_read_time"])
)

# Ghi đè Delta Table
updated_df.write.format("delta").mode("overwrite").save("s3a://your-bucket/processing_state")

ValueError: read_time

In [3]:
df = spark.read.format("delta").load("s3a://lakehouse/bronze/Bronze_API_1")


In [5]:
from pyspark.sql.functions import current_timestamp, expr

filtered_df = df.filter(col("read_time") >= expr("current_timestamp() - interval 15 hour"))

# Hiển thị dữ liệu đã lọc
filtered_df.show()


+------+--------------------+--------------------+-----------------+--------------------+-------+-------+--------+--------------------+------+----------+-------+------------+----------+------------+----------------+-------------------+
|    id|      original_title|               title|original_language|            overview|runtime|tagline|  status|            homepage|budget|popularity|revenue|vote_average|vote_count|release_date|          genres|          read_time|
+------+--------------------+--------------------+-----------------+--------------------+-------+-------+--------+--------------------+------+----------+-------+------------+----------+------------+----------------+-------------------+
| 46704|Shakira: MTV Unpl...|Shakira: MTV Unpl...|               en|MTV Unplugged is ...|   51.0|       |Released|http://www.shakir...|     0|      0.36|      0|         7.6|         8|  2000-02-29|[{10402, Music}]|2025-02-23 08:16:52|
|853991|Battle of the Bot...|Battle of the Bot...|      

In [5]:
df = spark.read.format("delta").load("s3a://lakehouse/silver/Silver_API_1")
df.head(5)

[Row(id=605475, budget=0, popularity=0.006, revenue=0.0, vote_average=0.0, vote_count=0.0, date_id=20160106, title='At-issue', original_title='At-issue', language='en', overview='Puppet Bartholomew makes a change to his life.', runtime=10.0, tagline='', status='Released', homepage='', genres=[], release_date=datetime.date(2016, 1, 6), read_time=datetime.datetime(2025, 2, 22, 15, 52, 24, 948000)),
 Row(id=861458, budget=0, popularity=0.075, revenue=0.0, vote_average=1.0, vote_count=1.0, date_id=20020408, title='Nouvelles neuves du monde de Moustic', original_title='Nouvelles neuves du monde de Moustic', language='fr', overview='', runtime=0.0, tagline='', status='Released', homepage='', genres=[Row(id=35, name='Comedy')], release_date=datetime.date(2002, 4, 8), read_time=datetime.datetime(2025, 2, 22, 15, 52, 24, 948000)),
 Row(id=724202, budget=0, popularity=0.009, revenue=0.0, vote_average=0.0, vote_count=0.0, date_id=20170905, title='Erdogan - Vom Demokraten zum Despoten', original_t

In [13]:
df.head(5)

[Row(id=912401, original_title='레전드 오브 갱스터', title='The Gods', original_language='ko', overview='In a modern-day Romeo and Juliet, Mikey James, a street hustler raised on the streets with his two brothers, goes to work for the Fasano family as he desperately tries to find a way out of his hustling lifestyle.', runtime=0.0, tagline='', status='Released', homepage='', budget=0, popularity=0.006, revenue=0, vote_average=0.0, vote_count=0, release_date=datetime.date(2017, 1, 6), genres=[], read_time=datetime.datetime(2025, 2, 22, 16, 7, 44, 742000)),
 Row(id=674221, original_title='Una Noche En Mexico', title='Una Noche En Mexico', original_language='es', overview='One story, one script, one cast, one location two languages. A unique project that talks to two cultures. These films are dedicated to the \'Dreamers\' in the US that is stuck between two countries and two cultures. Pablo is a first generation Mexican/American born in the United States, his parents gave him a great education and

In [16]:
df.select("read_time").collect()

[Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 15, 59, 14, 44000)),
 Row(read_time=datetime.datetime(2025, 2, 22, 16, 8, 45, 12700

In [7]:
df = spark.read.format("delta").load("s3a://lakehouse/gold/movie_genre")
df.printSchema()

root
 |-- genres_id: integer (nullable = true)
 |-- id: integer (nullable = true)



In [31]:
import boto3
import pandas as pd
def connect_minio():
    try:
        s3_client = boto3.client(
            "s3",
            aws_access_key_id="conbo123",
            aws_secret_access_key="123conbo",
            endpoint_url='http://minio:9000' 
        )
        return s3_client
    except Exception as e:
        logging.error(f"Error connecting to MinIO: {str(e)}")
        raise e

def get_data_from_raw(name):
    try:
        client = connect_minio()
        response = client.get_object(Bucket="lakehouse", Key=f'raw/{name}.csv')
        df = pd.read_csv(response.get("Body"), low_memory=False)
        return df
    except Exception as e:
        logging.error(f"Error getting data from MinIO: {str(e)}")
        raise e

# Hàm ghi dữ liệu vào MinIO
def save_data_to_bronze(df, name):
    try:
        client = connect_minio()
        # Sử dụng BytesIO để lưu trữ dữ liệu dưới dạng Parquet
        parquet_buffer = BytesIO()
        df.to_parquet(parquet_buffer, index=False)
        parquet_buffer.seek(0)  # Reset buffer position
        client.put_object(Bucket="lakehouse", Key=f'bronze/{name}.parquet', Body=parquet_buffer.getvalue())
        logging.info(f"Data saved to bronze/{name}.parquet successfully.")
    except Exception as e:
        logging.error(f"Error saving data to MinIO: {str(e)}")
        raise e

In [None]:
df = get_data_from_raw('ratings')
save_data_to_bronze(df, 'ratings')

In [3]:
df = spark.read.format("csv").load("s3a://lakehouse/raw/ratings.csv")

In [4]:
df.write.format("parquet").save("s3a://lakehouse/bronze/ratings.parquet")

# DIM MOVIE
### ["movie_id", "title", "original_title", "language", "overview","runtime", "tagline", "status", "homepage"]

In [20]:
df_silver_movies = spark.read.format("parquet").load("s3a://lakehouse/bronze/movies.parquet")

In [21]:
df_silver_movies.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: double (nullable = true)
 |-- runtime: double (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: boolean (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: double (n

In [4]:
df_silver_movies = df_silver_movies.dropna(subset=["title", "release_date", "budget"])
df_silver_movies = df_silver_movies.dropDuplicates()


NameError: name 'df_silver_movies' is not defined

In [21]:
df_silver_movies = df_silver_movies.withColumn("id", col("id").cast(LongType()))

df_dim_movie = df_silver_movies.select(
    "id",  
    "title", "original_title", col("spoken_languages").alias("language"), "overview", 
    "runtime", "tagline", "status", "homepage"
)
df_dim_movie.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/dim_movie")

# DIM KEYWORD 

In [14]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/keywords.parquet")
df = df.filter((col("keywords").isNotNull()) & (col("keywords") != "[]"))
df.filter(col("id") == 13685).show(truncate=False)


+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id   |keywords                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
+-----+-----------------------------------------------

In [3]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/keywords.parquet")
keyword_schema = ArrayType(
    StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])
)
df_parsed = df.withColumn("keywords", from_json(col("keywords"), keyword_schema))

# Explode cột cast để có nhiều dòng
df_exploded = df_parsed.withColumn("keywords", explode(col("keywords")))

# Chọn các trường cần thiết
df_selected = df_exploded.select(
    col("keywords.name"),
    col("keywords.id")
)
df_selected.dropDuplicates(["id"])
# Hiển thị kết quả
df_selected.show(truncate=False)

+------------------------+------+
|name                    |id    |
+------------------------+------+
|jealousy                |931   |
|toy                     |4290  |
|boy                     |5202  |
|friendship              |6054  |
|friends                 |9713  |
|rivalry                 |9823  |
|boy next door           |165503|
|new toy                 |170722|
|toy comes to life       |187065|
|board game              |10090 |
|disappearance           |10941 |
|based on children's book|15101 |
|new home                |33467 |
|recluse                 |158086|
|giant insect            |158091|
|fishing                 |1495  |
|best friend             |12392 |
|duringcreditsstinger    |179431|
|old men                 |208510|
|based on novel          |818   |
+------------------------+------+
only showing top 20 rows



In [4]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/dim_keyword")

# DIM CAST
### cast_id, name, gender, profile_path


In [4]:
df_dim_cast = spark.read.format("parquet").load("s3a://lakehouse/bronze/credits.parquet")
df_dim_cast.printSchema()

root
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- id: long (nullable = true)



In [6]:
cast_schema = ArrayType(
    StructType([
        StructField("cast_id", IntegerType(), True),
        StructField("character", StringType(), True),
        StructField("credit_id", StringType(), True),
        StructField("gender", IntegerType(), True),
        StructField("id", LongType(), True),
        StructField("name", StringType(), True),
        StructField("order", IntegerType(), True),
        StructField("profile_path", StringType(), True)
    ])
)
df_parsed = df_dim_cast.withColumn("cast", from_json(col("cast"), cast_schema))

# Explode cột cast để có nhiều dòng
df_exploded = df_parsed.withColumn("cast", explode(col("cast")))

# Chọn các trường cần thiết
df_selected = df_exploded.select(
    col("cast.name"),
    col("cast.gender"),
    col("cast.profile_path"),
    col("cast.credit_id"),
    col("cast.id")
)

# Hiển thị kết quả
df_selected.show(truncate=False)

+------------------+------+--------------------------------+------------------------+-------+
|name              |gender|profile_path                    |credit_id               |id     |
+------------------+------+--------------------------------+------------------------+-------+
|Walter Matthau    |2     |/xJVkvprOnzP5Zdh5y63y8HHniDZ.jpg|52fe466a9251416c75077a8d|6837   |
|Jack Lemmon       |2     |/chZmNRYMtqkiDlatprGDH4BzGqG.jpg|52fe466a9251416c75077a91|3151   |
|Ann-Margret       |1     |/jx5lTaJ5VXZHYB52gaOTAZ9STZk.jpg|52fe466a9251416c75077a95|13567  |
|Sophia Loren      |1     |/emKLhbji1c7BjcA2DdbWf0EP9zH.jpg|52fe466a9251416c75077a99|16757  |
|Daryl Hannah      |1     |/4LLmp6AQdlj6ueGCRbVRSGvvFSt.jpg|52fe466a9251416c75077a9d|589    |
|Burgess Meredith  |2     |/lm98oKloU33Q7QDIIMSyc4Pr2jA.jpg|53e5fcc2c3a3684430000d65|16523  |
|Kevin Pollak      |2     |/kwu2T8CDnThZTzE88uiSgJ5eHXf.jpg|53e5fcd4c3a3684433000e1a|7166   |
|Whitney Houston   |1     |/69ouDnXnmklYPr4sMJXWKYz81AL.jpg|

In [7]:
df_dim_cast  = df_selected.dropDuplicates(["id"]).filter(col("id").isNotNull())


In [8]:
df_dim_cast.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/dim_cast")

# DIM CREW
### id, name,  department, job, gender, credit_ID

In [15]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/credits.parquet") 
df_filtered = df.filter((col("cast") != "[]") & (col("crew") != "[]"))
df_filtered.show()

+--------------------+--------------------+-----+
|                cast|                crew|   id|
+--------------------+--------------------+-----+
|[{'cast_id': 14, ...|[{'credit_id': '5...|  862|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 8844|
|[{'cast_id': 2, '...|[{'credit_id': '5...|15602|
|[{'cast_id': 1, '...|[{'credit_id': '5...|31357|
|[{'cast_id': 1, '...|[{'credit_id': '5...|11862|
|[{'cast_id': 25, ...|[{'credit_id': '5...|  949|
|[{'cast_id': 1, '...|[{'credit_id': '5...|11860|
|[{'cast_id': 2, '...|[{'credit_id': '5...|45325|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 9091|
|[{'cast_id': 1, '...|[{'credit_id': '5...|  710|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 9087|
|[{'cast_id': 9, '...|[{'credit_id': '5...|12110|
|[{'cast_id': 1, '...|[{'credit_id': '5...|21032|
|[{'cast_id': 1, '...|[{'credit_id': '5...|10858|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 1408|
|[{'cast_id': 4, '...|[{'credit_id': '5...|  524|
|[{'cast_id': 6, '...|[{'credit_id': '5...| 4584|


In [19]:
df.filter(col("id") == 42981).show(truncate=False)

+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|cast|crew                                                                                                                                                                 |id   |
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|[]  |[{'credit_id': '52fe4624c3a36847f80ef0a5', 'department': 'Directing', 'gender': 2, 'id': 129216, 'job': 'Director', 'name': 'Theodore Thomas', 'profile_path': None}]|42981|
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+



In [15]:
df_dim_crew = spark.read.format("parquet").load("s3a://lakehouse/bronze/credits.parquet")


In [16]:
crew_schema = ArrayType(
    StructType([
        StructField("credit_id", StringType(), True),
        StructField("department", StringType(), True),
        StructField("gender", IntegerType(), True),
        StructField("id", LongType(), True),
        StructField("job", StringType(), True),
        StructField("name", StringType(), True),
        StructField("profile_path", StringType(), True)
    ])
)
df_parsed = df_dim_crew.withColumn("crew", from_json(col("crew"), crew_schema))
# Explode cột cast để có nhiều dòng
df_exploded = df_parsed.withColumn("crew", explode(col("crew")))

# Chọn các trường cần thiết
df_selected = df_exploded.select(
    col("crew.name"),
    col("crew.gender"),
    col("crew.profile_path"),
    col("crew.credit_id"),
    col("crew.id"),
    col("crew.job"),
    col("crew.department")
)
df_selected = df_selected.dropDuplicates(["id"]).filter(col("id").isNotNull())
# Hiển thị kết quả
df_selected.show(truncate=False)

+--------------------+------+--------------------------------+------------------------+---+-----------------------+----------+
|name                |gender|profile_path                    |credit_id               |id |job                    |department|
+--------------------+------+--------------------------------+------------------------+---+-----------------------+----------+
|Mark Hamill         |2     |/ws544EgE5POxGJqq9LUfhnDrHtV.jpg|52fe44dcc3a368484e03b025|2  |Director               |Directing |
|Carrie Fisher       |1     |/pbleNurCYdrLFQMEnlQB2nkOR1O.jpg|52fe4440c3a368484e01852d|4  |Novel                  |Writing   |
|Albert Brooks       |2     |/kahlMTdygrPJ28VYRhKPavYD9hs.jpg|52fe44e1c3a368484e03c4dd|13 |Director               |Directing |
|Ellen DeGeneres     |1     |/4LG2bFkqOzxzR1kpnoDcwIVuQTG.jpg|52fe44fdc3a368484e0426ed|14 |Writer                 |Writing   |
|Barry Humphries     |2     |/ccJHmzU8wzOe4sAmeVeScu5mygl.jpg|52fe4480c3a368484e026db9|22 |Writer              

In [17]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/dim_crew")

# DIM DATE

In [6]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/movies.parquet")
df = df.dropna(subset=["title", "release_date", "budget"])
df = df.dropDuplicates()
df = df.withColumn("parsed_date", F.to_date(F.col("release_date"), "yyyy-MM-dd"))

result = df.select(
    F.col("release_date"),
    F.dayofweek("parsed_date").alias("DayOfWeek"),
    F.date_format("parsed_date", "EEEE").alias("DayName"),
    F.dayofmonth("parsed_date").alias("DayOfMonth"),
    F.dayofyear("parsed_date").alias("DayOfYear"),
    F.weekofyear("parsed_date").alias("WeekOfYear"),
    F.date_format("parsed_date", "MMMM").alias("MonthName"),
    F.month("parsed_date").alias("MonthOfYear"),
    F.quarter("parsed_date").alias("Quarter"),
    F.year("parsed_date").alias("Year"),
    F.when((F.dayofweek("parsed_date") >= 2) & (F.dayofweek("parsed_date") <= 6), True).otherwise(False).alias("IsWeekDay")
)
result = result.withColumn(
    "DATE_ID",
    (F.col("Year") * 10000 + F.col("MonthOfYear") * 100 + F.col("DayOfMonth")).cast("long")
)

result.show()



+------------+---------+---------+----------+---------+----------+---------+-----------+-------+----+---------+--------+
|release_date|DayOfWeek|  DayName|DayOfMonth|DayOfYear|WeekOfYear|MonthName|MonthOfYear|Quarter|Year|IsWeekDay| DATE_ID|
+------------+---------+---------+----------+---------+----------+---------+-----------+-------+----+---------+--------+
|  1996-10-25|        6|   Friday|        25|      299|        43|  October|         10|      4|1996|     true|19961025|
|  1988-08-28|        1|   Sunday|        28|      241|        34|   August|          8|      3|1988|    false|19880828|
|  1988-05-11|        4|Wednesday|        11|      132|        19|      May|          5|      2|1988|     true|19880511|
|  1984-02-16|        5| Thursday|        16|       47|         7| February|          2|      1|1984|     true|19840216|
|  1986-05-01|        5| Thursday|         1|      121|        18|      May|          5|      2|1986|     true|19860501|
|  1995-01-01|        1|   Sunda

In [8]:
result.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/dim_date")

# DIM GENRES
### genre_id, name

In [8]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/movies.parquet")

In [13]:
genres_schema = ArrayType(
    StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])
)
df_parsed = df.withColumn("genres", from_json(col("genres"), genres_schema))
df_exploded = df_parsed.withColumn("genres", explode(col("genres")))

df_selected = df_exploded.select(
    col("genres.id"),
    col("genres.name")
)
df_selected = df_selected.dropDuplicates(subset=["id"])

AnalysisException: Cannot resolve column name "id" among (CAST(genres.id AS INT), name)

In [13]:
df_selected.show()

+-----+--------------------+
|   id|                name|
+-----+--------------------+
|   12|           Adventure|
|   14|             Fantasy|
|   16|           Animation|
|   18|               Drama|
|   27|              Horror|
|   28|              Action|
|   35|              Comedy|
|   36|             History|
|   37|             Western|
|   53|            Thriller|
|   80|               Crime|
|   99|         Documentary|
|  878|     Science Fiction|
| 2883|             Aniplex|
| 7759|             GoHands|
| 7760|           BROSTA TV|
| 7761|Mardock Scramble ...|
| 9648|             Mystery|
|10402|               Music|
|10749|             Romance|
+-----+--------------------+
only showing top 20 rows



In [14]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/dim_genres")

# complete DIM

In [9]:
df_dimmovie = spark.read.format("delta").load("s3a://lakehouse/silver_T/dim_movie")
df_dimmovie.printSchema()

root
 |-- id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- language: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- runtime: double (nullable = true)
 |-- tagline: string (nullable = true)
 |-- status: string (nullable = true)
 |-- homepage: string (nullable = true)



In [10]:
df_cast = spark.read.format("delta").load("s3a://lakehouse/silver_T/dim_cast")
df_cast.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- profile_path: string (nullable = true)
 |-- credit_id: string (nullable = true)
 |-- id: long (nullable = true)



In [11]:
df_crew = spark.read.format("delta").load("s3a://lakehouse/silver_T/dim_crew")
df_crew.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- profile_path: string (nullable = true)
 |-- credit_id: string (nullable = true)
 |-- id: long (nullable = true)
 |-- job: string (nullable = true)
 |-- department: string (nullable = true)



In [12]:
df_genres = spark.read.format("delta").load("s3a://lakehouse/silver_T/dim_genres")
df_genres.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



In [13]:
df_date = spark.read.format("delta").load("s3a://lakehouse/silver_T/dim_date")
df_date.printSchema()

root
 |-- release_date: string (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DayName: string (nullable = true)
 |-- DayOfMonth: integer (nullable = true)
 |-- DayOfYear: integer (nullable = true)
 |-- WeekOfYear: integer (nullable = true)
 |-- MonthName: string (nullable = true)
 |-- MonthOfYear: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- IsWeekDay: boolean (nullable = true)
 |-- DATE_ID: long (nullable = true)



# Cast_Movie

In [18]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/credits.parquet")
cast_schema = ArrayType(
    StructType([
        StructField("cast_id", IntegerType(), True),
        StructField("character", StringType(), True),
        StructField("credit_id", StringType(), True),
        StructField("gender", IntegerType(), True),
        StructField("id", LongType(), True),
        StructField("name", StringType(), True),
        StructField("order", IntegerType(), True),
        StructField("profile_path", StringType(), True)
    ])
)
df_parsed = df.withColumn("cast", from_json(col("cast"), cast_schema))

# Explode cột cast để có nhiều dòng
df_exploded = df_parsed.withColumn("cast", explode(col("cast")))

# Chọn các trường cần thiết
df_selected = df_exploded.select(
    col("cast.id").alias("cast_id"),
    col("cast.character"),
    col("id").alias("movie_id")
)

# Hiển thị kết quả
df_selected.show(truncate=False)


+-------+----------------------------+--------+
|cast_id|character                   |movie_id|
+-------+----------------------------+--------+
|6837   |Max Goldman                 |15602   |
|3151   |John Gustafson              |15602   |
|13567  |Ariel Gustafson             |15602   |
|16757  |Maria Sophia Coletta Ragetti|15602   |
|589    |Melanie Gustafson           |15602   |
|16523  |Grandpa Gustafson           |15602   |
|7166   |Jacob Goldman               |15602   |
|8851   |Savannah 'Vannah' Jackson   |31357   |
|9780   |Bernadine 'Bernie' Harris   |31357   |
|18284  |Gloria 'Glo' Matthews       |31357   |
|51359  |Robin Stokes                |31357   |
|66804  |Marvin King                 |31357   |
|352    |Kenneth Dawkins             |31357   |
|87118  |John Harris, Sr.            |31357   |
|34     |Troy                        |31357   |
|1276777|Joseph                      |31357   |
|10814  |James Wheeler               |31357   |
|67773  |George Banks                |11

In [19]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/movie_cast")

# movie_crew

In [20]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/credits.parquet")
df.printSchema()
crew_schema = ArrayType(
    StructType([
        StructField("department", StringType(), True),
        StructField("gender", IntegerType(), True),
        StructField("id", LongType(), True),
        StructField("job", StringType(), True),
        StructField("name", StringType(), True),
        StructField("profile_path", StringType(), True)
    ])
)
df_parsed = df.withColumn("crew", from_json(col("crew"), crew_schema))
# Explode cột cast để có nhiều dòng
df_exploded = df_parsed.withColumn("crew", explode(col("crew")))

# Chọn các trường cần thiết
df_selected = df_exploded.select(
    col("crew.id").alias("crew_id"),
    col("crew.job"),
    col("crew.department"),
    col("id").alias("movie_id")
)

df_selected.show(truncate=False)


root
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- id: long (nullable = true)

+-------+------------+----------+--------+
|crew_id|job         |department|movie_id|
+-------+------------+----------+--------+
|6210   |Writer      |Writing   |16420   |
|56710  |Director    |Directing |16420   |
|56710  |Adaptation  |Writing   |16420   |
|33315  |Director    |Directing |31174   |
|6210   |Theatre Play|Writing   |31174   |
|1327   |Writer      |Writing   |31174   |
|33315  |Writer      |Writing   |31174   |
|16862  |Director    |Directing |48750   |
|37127  |Writer      |Writing   |48750   |
|16862  |Writer      |Writing   |48750   |
|119294 |Writer      |Writing   |46785   |
|120229 |Director    |Directing |46785   |
|117075 |Director    |Directing |188588  |
|14692  |Director    |Directing |47475   |
|114997 |Director    |Directing |55475   |
|114997 |Writer      |Writing   |55475   |
|5281   |Director    |Directing |20649   |
|5281   |Screenplay  |Writing 

In [21]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/movie_crew")

# movie genres

In [14]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/movies.parquet")
genres_schema = ArrayType(
    StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])
)
df_parsed = df.withColumn("genres", from_json(col("genres"), genres_schema))
df_exploded = df_parsed.withColumn("genres", explode(col("genres")))

df_selected = df_exploded.select(
    col("genres.id").alias("genres_id"),
    col("id").cast("Integer")
)

In [15]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/movie_genres")

# Movie Keyword

In [5]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/keywords.parquet")
keyword_schema = ArrayType(
    StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])
)
df_parsed = df.withColumn("keywords", from_json(col("keywords"), keyword_schema))

# Explode cột cast để có nhiều dòng
df_exploded = df_parsed.withColumn("keywords", explode(col("keywords")))

# Chọn các trường cần thiết
df_selected = df_exploded.select(
    col("keywords.name"),
    col("keywords.id").alias("keyword_id"),
    col("id")
                             
)

# Hiển thị kết quả
df_selected.show(truncate=False)

+------------------------+----------+-----+
|name                    |keyword_id|id   |
+------------------------+----------+-----+
|jealousy                |931       |862  |
|toy                     |4290      |862  |
|boy                     |5202      |862  |
|friendship              |6054      |862  |
|friends                 |9713      |862  |
|rivalry                 |9823      |862  |
|boy next door           |165503    |862  |
|new toy                 |170722    |862  |
|toy comes to life       |187065    |862  |
|board game              |10090     |8844 |
|disappearance           |10941     |8844 |
|based on children's book|15101     |8844 |
|new home                |33467     |8844 |
|recluse                 |158086    |8844 |
|giant insect            |158091    |8844 |
|fishing                 |1495      |15602|
|best friend             |12392     |15602|
|duringcreditsstinger    |179431    |15602|
|old men                 |208510    |15602|
|based on novel          |818   

In [6]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/movie_keyword")

# Fact Movie

In [28]:
df= spark.read.format("parquet").load("s3a://lakehouse/bronze/movies.parquet")
df = df.withColumn("popularity", col("popularity").cast("double"))
df.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: double (nullable = true)
 |-- runtime: double (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: boolean (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: double (n

In [7]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/movies.parquet")
df_fact = df.withColumn(
    "date_id",
    (F.year("release_date") * 10000 + F.month("release_date") * 100 + F.dayofmonth("release_date")).cast("long")
)

df_fact = df_fact.select(
    col("id").cast("Integer"),
    col("budget"),
    col("popularity"),
    col("revenue"),
    col("vote_average"),
    col("vote_count"),
    col("date_id")
)
df_fact.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/fact_movies")

In [36]:
df = spark.read.format("delta").load("s3a://lakehouse/silver_T/fact_movies")
df.show()

+-----+--------+----------+------------+------------+----------+--------+
|   id|  budget|popularity|     revenue|vote_average|vote_count| date_id|
+-----+--------+----------+------------+------------+----------+--------+
|  862|30000000| 21.946943|3.73554033E8|         7.7|    5415.0|19951030|
| 8844|65000000| 17.015539|2.62797249E8|         6.9|    2413.0|19951215|
|15602|       0|   11.7129|         0.0|         6.5|      92.0|19951222|
|31357|16000000|  3.859495| 8.1452156E7|         6.1|      34.0|19951222|
|11862|       0|  8.387519| 7.6578911E7|         5.7|     173.0|19950210|
|  949|60000000| 17.924927|1.87436818E8|         7.7|    1886.0|19951215|
|11860|58000000|  6.677277|         0.0|         6.2|     141.0|19951215|
|45325|       0|  2.561161|         0.0|         5.4|      45.0|19951222|
| 9091|35000000|   5.23158| 6.4350171E7|         5.5|     174.0|19951222|
|  710|58000000| 14.686036|3.52194034E8|         6.6|    1194.0|19951116|
| 9087|62000000|  6.318445|1.07879496E