In [1]:
import os
import sys
import traceback
import logging
from pyspark.sql import SparkSession
from pyspark.sql.functions import  explode,col, expr,when,to_date, sum, from_json
from pyspark.sql.types import  ArrayType,StructType, StructField, BooleanType, StringType, IntegerType, DateType, FloatType,DoubleType, LongType

from pyspark.sql import functions as F

In [2]:

spark = SparkSession.builder \
    .appName("MinIO with Delta Lake") \
    .config("spark.jars", "jars/hadoop-aws-3.3.4.jar,jars/aws-java-sdk-bundle-1.12.262.jar,jars/delta-core_2.12-2.2.0.jar,jars/delta-storage-2.2.0.jar")\
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "conbo123") \
    .config("spark.hadoop.fs.s3a.secret.key", "123conbo") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
    .config("delta.enable-non-concurrent-writes", "true") \
    .config('spark.sql.warehouse.dir', "s3a://lakehouse/") \
    .getOrCreate()

# DIM MOVIE
### ["movie_id", "title", "original_title", "language", "overview","runtime", "tagline", "status", "homepage"]

In [18]:
df_silver_movies = spark.read.format("parquet").load("s3a://lakehouse/bronze/movies.parquet")

In [19]:
df_silver_movies.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: double (nullable = true)
 |-- runtime: double (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: boolean (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: double (n

In [4]:
df_silver_movies = df_silver_movies.dropna(subset=["title", "release_date", "budget"])
df_silver_movies = df_silver_movies.dropDuplicates()


NameError: name 'df_silver_movies' is not defined

In [21]:
df_silver_movies = df_silver_movies.withColumn("id", col("id").cast(LongType()))

df_dim_movie = df_silver_movies.select(
    "id",  
    "title", "original_title", col("spoken_languages").alias("language"), "overview", 
    "runtime", "tagline", "status", "homepage"
)
df_dim_movie.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/dim_movie")

# DIM KEYWORD 

In [3]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/keywords.parquet")
keyword_schema = ArrayType(
    StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])
)
df_parsed = df.withColumn("keywords", from_json(col("keywords"), keyword_schema))

# Explode cột cast để có nhiều dòng
df_exploded = df_parsed.withColumn("keywords", explode(col("keywords")))

# Chọn các trường cần thiết
df_selected = df_exploded.select(
    col("keywords.name"),
    col("keywords.id")
)
df_selected.dropDuplicates(["id"])
# Hiển thị kết quả
df_selected.show(truncate=False)

+------------------------+------+
|name                    |id    |
+------------------------+------+
|jealousy                |931   |
|toy                     |4290  |
|boy                     |5202  |
|friendship              |6054  |
|friends                 |9713  |
|rivalry                 |9823  |
|boy next door           |165503|
|new toy                 |170722|
|toy comes to life       |187065|
|board game              |10090 |
|disappearance           |10941 |
|based on children's book|15101 |
|new home                |33467 |
|recluse                 |158086|
|giant insect            |158091|
|fishing                 |1495  |
|best friend             |12392 |
|duringcreditsstinger    |179431|
|old men                 |208510|
|based on novel          |818   |
+------------------------+------+
only showing top 20 rows



In [4]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/dim_keyword")

# DIM CAST
### cast_id, name, gender, profile_path


In [4]:
df_dim_cast = spark.read.format("parquet").load("s3a://lakehouse/bronze/credits.parquet")
df_dim_cast.printSchema()

root
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- id: long (nullable = true)



In [6]:
cast_schema = ArrayType(
    StructType([
        StructField("cast_id", IntegerType(), True),
        StructField("character", StringType(), True),
        StructField("credit_id", StringType(), True),
        StructField("gender", IntegerType(), True),
        StructField("id", LongType(), True),
        StructField("name", StringType(), True),
        StructField("order", IntegerType(), True),
        StructField("profile_path", StringType(), True)
    ])
)
df_parsed = df_dim_cast.withColumn("cast", from_json(col("cast"), cast_schema))

# Explode cột cast để có nhiều dòng
df_exploded = df_parsed.withColumn("cast", explode(col("cast")))

# Chọn các trường cần thiết
df_selected = df_exploded.select(
    col("cast.name"),
    col("cast.gender"),
    col("cast.profile_path"),
    col("cast.credit_id"),
    col("cast.id")
)

# Hiển thị kết quả
df_selected.show(truncate=False)

+------------------+------+--------------------------------+------------------------+-------+
|name              |gender|profile_path                    |credit_id               |id     |
+------------------+------+--------------------------------+------------------------+-------+
|Walter Matthau    |2     |/xJVkvprOnzP5Zdh5y63y8HHniDZ.jpg|52fe466a9251416c75077a8d|6837   |
|Jack Lemmon       |2     |/chZmNRYMtqkiDlatprGDH4BzGqG.jpg|52fe466a9251416c75077a91|3151   |
|Ann-Margret       |1     |/jx5lTaJ5VXZHYB52gaOTAZ9STZk.jpg|52fe466a9251416c75077a95|13567  |
|Sophia Loren      |1     |/emKLhbji1c7BjcA2DdbWf0EP9zH.jpg|52fe466a9251416c75077a99|16757  |
|Daryl Hannah      |1     |/4LLmp6AQdlj6ueGCRbVRSGvvFSt.jpg|52fe466a9251416c75077a9d|589    |
|Burgess Meredith  |2     |/lm98oKloU33Q7QDIIMSyc4Pr2jA.jpg|53e5fcc2c3a3684430000d65|16523  |
|Kevin Pollak      |2     |/kwu2T8CDnThZTzE88uiSgJ5eHXf.jpg|53e5fcd4c3a3684433000e1a|7166   |
|Whitney Houston   |1     |/69ouDnXnmklYPr4sMJXWKYz81AL.jpg|

In [7]:
df_dim_cast  = df_selected.dropDuplicates(["id"]).filter(col("id").isNotNull())


In [8]:
df_dim_cast.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/dim_cast")

# DIM CREW
### id, name,  department, job, gender, credit_ID

In [15]:
df_dim_crew = spark.read.format("parquet").load("s3a://lakehouse/bronze/credits.parquet")


In [16]:
crew_schema = ArrayType(
    StructType([
        StructField("credit_id", StringType(), True),
        StructField("department", StringType(), True),
        StructField("gender", IntegerType(), True),
        StructField("id", LongType(), True),
        StructField("job", StringType(), True),
        StructField("name", StringType(), True),
        StructField("profile_path", StringType(), True)
    ])
)
df_parsed = df_dim_crew.withColumn("crew", from_json(col("crew"), crew_schema))
# Explode cột cast để có nhiều dòng
df_exploded = df_parsed.withColumn("crew", explode(col("crew")))

# Chọn các trường cần thiết
df_selected = df_exploded.select(
    col("crew.name"),
    col("crew.gender"),
    col("crew.profile_path"),
    col("crew.credit_id"),
    col("crew.id"),
    col("crew.job"),
    col("crew.department")
)
df_selected = df_selected.dropDuplicates(["id"]).filter(col("id").isNotNull())
# Hiển thị kết quả
df_selected.show(truncate=False)

+--------------------+------+--------------------------------+------------------------+---+-----------------------+----------+
|name                |gender|profile_path                    |credit_id               |id |job                    |department|
+--------------------+------+--------------------------------+------------------------+---+-----------------------+----------+
|Mark Hamill         |2     |/ws544EgE5POxGJqq9LUfhnDrHtV.jpg|52fe44dcc3a368484e03b025|2  |Director               |Directing |
|Carrie Fisher       |1     |/pbleNurCYdrLFQMEnlQB2nkOR1O.jpg|52fe4440c3a368484e01852d|4  |Novel                  |Writing   |
|Albert Brooks       |2     |/kahlMTdygrPJ28VYRhKPavYD9hs.jpg|52fe44e1c3a368484e03c4dd|13 |Director               |Directing |
|Ellen DeGeneres     |1     |/4LG2bFkqOzxzR1kpnoDcwIVuQTG.jpg|52fe44fdc3a368484e0426ed|14 |Writer                 |Writing   |
|Barry Humphries     |2     |/ccJHmzU8wzOe4sAmeVeScu5mygl.jpg|52fe4480c3a368484e026db9|22 |Writer              

In [17]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/dim_crew")

# DIM DATE

In [6]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/movies.parquet")
df = df.dropna(subset=["title", "release_date", "budget"])
df = df.dropDuplicates()
df = df.withColumn("parsed_date", F.to_date(F.col("release_date"), "yyyy-MM-dd"))

result = df.select(
    F.col("release_date"),
    F.dayofweek("parsed_date").alias("DayOfWeek"),
    F.date_format("parsed_date", "EEEE").alias("DayName"),
    F.dayofmonth("parsed_date").alias("DayOfMonth"),
    F.dayofyear("parsed_date").alias("DayOfYear"),
    F.weekofyear("parsed_date").alias("WeekOfYear"),
    F.date_format("parsed_date", "MMMM").alias("MonthName"),
    F.month("parsed_date").alias("MonthOfYear"),
    F.quarter("parsed_date").alias("Quarter"),
    F.year("parsed_date").alias("Year"),
    F.when((F.dayofweek("parsed_date") >= 2) & (F.dayofweek("parsed_date") <= 6), True).otherwise(False).alias("IsWeekDay")
)
result = result.withColumn(
    "DATE_ID",
    (F.col("Year") * 10000 + F.col("MonthOfYear") * 100 + F.col("DayOfMonth")).cast("long")
)

result.show()



+------------+---------+---------+----------+---------+----------+---------+-----------+-------+----+---------+--------+
|release_date|DayOfWeek|  DayName|DayOfMonth|DayOfYear|WeekOfYear|MonthName|MonthOfYear|Quarter|Year|IsWeekDay| DATE_ID|
+------------+---------+---------+----------+---------+----------+---------+-----------+-------+----+---------+--------+
|  1996-10-25|        6|   Friday|        25|      299|        43|  October|         10|      4|1996|     true|19961025|
|  1988-08-28|        1|   Sunday|        28|      241|        34|   August|          8|      3|1988|    false|19880828|
|  1988-05-11|        4|Wednesday|        11|      132|        19|      May|          5|      2|1988|     true|19880511|
|  1984-02-16|        5| Thursday|        16|       47|         7| February|          2|      1|1984|     true|19840216|
|  1986-05-01|        5| Thursday|         1|      121|        18|      May|          5|      2|1986|     true|19860501|
|  1995-01-01|        1|   Sunda

In [8]:
result.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/dim_date")

# DIM GENRES
### genre_id, name

In [8]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/movies.parquet")

In [13]:
genres_schema = ArrayType(
    StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])
)
df_parsed = df.withColumn("genres", from_json(col("genres"), genres_schema))
df_exploded = df_parsed.withColumn("genres", explode(col("genres")))

df_selected = df_exploded.select(
    col("genres.id"),
    col("genres.name")
)
df_selected = df_selected.dropDuplicates(subset=["id"])

AnalysisException: Cannot resolve column name "id" among (CAST(genres.id AS INT), name)

In [13]:
df_selected.show()

+-----+--------------------+
|   id|                name|
+-----+--------------------+
|   12|           Adventure|
|   14|             Fantasy|
|   16|           Animation|
|   18|               Drama|
|   27|              Horror|
|   28|              Action|
|   35|              Comedy|
|   36|             History|
|   37|             Western|
|   53|            Thriller|
|   80|               Crime|
|   99|         Documentary|
|  878|     Science Fiction|
| 2883|             Aniplex|
| 7759|             GoHands|
| 7760|           BROSTA TV|
| 7761|Mardock Scramble ...|
| 9648|             Mystery|
|10402|               Music|
|10749|             Romance|
+-----+--------------------+
only showing top 20 rows



In [14]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/dim_genres")

# complete DIM

In [9]:
df_dimmovie = spark.read.format("delta").load("s3a://lakehouse/silver_T/dim_movie")
df_dimmovie.printSchema()

root
 |-- id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- language: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- runtime: double (nullable = true)
 |-- tagline: string (nullable = true)
 |-- status: string (nullable = true)
 |-- homepage: string (nullable = true)



In [10]:
df_cast = spark.read.format("delta").load("s3a://lakehouse/silver_T/dim_cast")
df_cast.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- profile_path: string (nullable = true)
 |-- credit_id: string (nullable = true)
 |-- id: long (nullable = true)



In [11]:
df_crew = spark.read.format("delta").load("s3a://lakehouse/silver_T/dim_crew")
df_crew.printSchema()

root
 |-- name: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- profile_path: string (nullable = true)
 |-- credit_id: string (nullable = true)
 |-- id: long (nullable = true)
 |-- job: string (nullable = true)
 |-- department: string (nullable = true)



In [12]:
df_genres = spark.read.format("delta").load("s3a://lakehouse/silver_T/dim_genres")
df_genres.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



In [13]:
df_date = spark.read.format("delta").load("s3a://lakehouse/silver_T/dim_date")
df_date.printSchema()

root
 |-- release_date: string (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DayName: string (nullable = true)
 |-- DayOfMonth: integer (nullable = true)
 |-- DayOfYear: integer (nullable = true)
 |-- WeekOfYear: integer (nullable = true)
 |-- MonthName: string (nullable = true)
 |-- MonthOfYear: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- IsWeekDay: boolean (nullable = true)
 |-- DATE_ID: long (nullable = true)



# Cast_Movie

In [18]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/credits.parquet")
cast_schema = ArrayType(
    StructType([
        StructField("cast_id", IntegerType(), True),
        StructField("character", StringType(), True),
        StructField("credit_id", StringType(), True),
        StructField("gender", IntegerType(), True),
        StructField("id", LongType(), True),
        StructField("name", StringType(), True),
        StructField("order", IntegerType(), True),
        StructField("profile_path", StringType(), True)
    ])
)
df_parsed = df.withColumn("cast", from_json(col("cast"), cast_schema))

# Explode cột cast để có nhiều dòng
df_exploded = df_parsed.withColumn("cast", explode(col("cast")))

# Chọn các trường cần thiết
df_selected = df_exploded.select(
    col("cast.id").alias("cast_id"),
    col("cast.character"),
    col("id").alias("movie_id")
)

# Hiển thị kết quả
df_selected.show(truncate=False)


+-------+----------------------------+--------+
|cast_id|character                   |movie_id|
+-------+----------------------------+--------+
|6837   |Max Goldman                 |15602   |
|3151   |John Gustafson              |15602   |
|13567  |Ariel Gustafson             |15602   |
|16757  |Maria Sophia Coletta Ragetti|15602   |
|589    |Melanie Gustafson           |15602   |
|16523  |Grandpa Gustafson           |15602   |
|7166   |Jacob Goldman               |15602   |
|8851   |Savannah 'Vannah' Jackson   |31357   |
|9780   |Bernadine 'Bernie' Harris   |31357   |
|18284  |Gloria 'Glo' Matthews       |31357   |
|51359  |Robin Stokes                |31357   |
|66804  |Marvin King                 |31357   |
|352    |Kenneth Dawkins             |31357   |
|87118  |John Harris, Sr.            |31357   |
|34     |Troy                        |31357   |
|1276777|Joseph                      |31357   |
|10814  |James Wheeler               |31357   |
|67773  |George Banks                |11

In [19]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/movie_cast")

# movie_crew

In [20]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/credits.parquet")
df.printSchema()
crew_schema = ArrayType(
    StructType([
        StructField("department", StringType(), True),
        StructField("gender", IntegerType(), True),
        StructField("id", LongType(), True),
        StructField("job", StringType(), True),
        StructField("name", StringType(), True),
        StructField("profile_path", StringType(), True)
    ])
)
df_parsed = df.withColumn("crew", from_json(col("crew"), crew_schema))
# Explode cột cast để có nhiều dòng
df_exploded = df_parsed.withColumn("crew", explode(col("crew")))

# Chọn các trường cần thiết
df_selected = df_exploded.select(
    col("crew.id").alias("crew_id"),
    col("crew.job"),
    col("crew.department"),
    col("id").alias("movie_id")
)

df_selected.show(truncate=False)


root
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- id: long (nullable = true)

+-------+------------+----------+--------+
|crew_id|job         |department|movie_id|
+-------+------------+----------+--------+
|6210   |Writer      |Writing   |16420   |
|56710  |Director    |Directing |16420   |
|56710  |Adaptation  |Writing   |16420   |
|33315  |Director    |Directing |31174   |
|6210   |Theatre Play|Writing   |31174   |
|1327   |Writer      |Writing   |31174   |
|33315  |Writer      |Writing   |31174   |
|16862  |Director    |Directing |48750   |
|37127  |Writer      |Writing   |48750   |
|16862  |Writer      |Writing   |48750   |
|119294 |Writer      |Writing   |46785   |
|120229 |Director    |Directing |46785   |
|117075 |Director    |Directing |188588  |
|14692  |Director    |Directing |47475   |
|114997 |Director    |Directing |55475   |
|114997 |Writer      |Writing   |55475   |
|5281   |Director    |Directing |20649   |
|5281   |Screenplay  |Writing 

In [21]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/movie_crew")

# movie genres

In [14]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/movies.parquet")
genres_schema = ArrayType(
    StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])
)
df_parsed = df.withColumn("genres", from_json(col("genres"), genres_schema))
df_exploded = df_parsed.withColumn("genres", explode(col("genres")))

df_selected = df_exploded.select(
    col("genres.id").alias("genres_id"),
    col("id").cast("Integer")
)

In [15]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/movie_genres")

# Movie Keyword

In [5]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/keywords.parquet")
keyword_schema = ArrayType(
    StructType([
        StructField("id", IntegerType(), True),
        StructField("name", StringType(), True)
    ])
)
df_parsed = df.withColumn("keywords", from_json(col("keywords"), keyword_schema))

# Explode cột cast để có nhiều dòng
df_exploded = df_parsed.withColumn("keywords", explode(col("keywords")))

# Chọn các trường cần thiết
df_selected = df_exploded.select(
    col("keywords.name"),
    col("keywords.id").alias("keyword_id"),
    col("id")
                             
)

# Hiển thị kết quả
df_selected.show(truncate=False)

+------------------------+----------+-----+
|name                    |keyword_id|id   |
+------------------------+----------+-----+
|jealousy                |931       |862  |
|toy                     |4290      |862  |
|boy                     |5202      |862  |
|friendship              |6054      |862  |
|friends                 |9713      |862  |
|rivalry                 |9823      |862  |
|boy next door           |165503    |862  |
|new toy                 |170722    |862  |
|toy comes to life       |187065    |862  |
|board game              |10090     |8844 |
|disappearance           |10941     |8844 |
|based on children's book|15101     |8844 |
|new home                |33467     |8844 |
|recluse                 |158086    |8844 |
|giant insect            |158091    |8844 |
|fishing                 |1495      |15602|
|best friend             |12392     |15602|
|duringcreditsstinger    |179431    |15602|
|old men                 |208510    |15602|
|based on novel          |818   

In [6]:
df_selected.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/movie_keyword")

# Fact Movie

In [28]:
df= spark.read.format("parquet").load("s3a://lakehouse/bronze/movies.parquet")
df = df.withColumn("popularity", col("popularity").cast("double"))
df.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: double (nullable = true)
 |-- runtime: double (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: boolean (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: double (n

In [7]:
df = spark.read.format("parquet").load("s3a://lakehouse/bronze/movies.parquet")
df_fact = df.withColumn(
    "date_id",
    (F.year("release_date") * 10000 + F.month("release_date") * 100 + F.dayofmonth("release_date")).cast("long")
)

df_fact = df_fact.select(
    col("id").cast("Integer"),
    col("budget"),
    col("popularity"),
    col("revenue"),
    col("vote_average"),
    col("vote_count"),
    col("date_id")
)
df_fact.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("s3a://lakehouse/silver_T/fact_movies")

In [36]:
df = spark.read.format("delta").load("s3a://lakehouse/silver_T/fact_movies")
df.show()

+-----+--------+----------+------------+------------+----------+--------+
|   id|  budget|popularity|     revenue|vote_average|vote_count| date_id|
+-----+--------+----------+------------+------------+----------+--------+
|  862|30000000| 21.946943|3.73554033E8|         7.7|    5415.0|19951030|
| 8844|65000000| 17.015539|2.62797249E8|         6.9|    2413.0|19951215|
|15602|       0|   11.7129|         0.0|         6.5|      92.0|19951222|
|31357|16000000|  3.859495| 8.1452156E7|         6.1|      34.0|19951222|
|11862|       0|  8.387519| 7.6578911E7|         5.7|     173.0|19950210|
|  949|60000000| 17.924927|1.87436818E8|         7.7|    1886.0|19951215|
|11860|58000000|  6.677277|         0.0|         6.2|     141.0|19951215|
|45325|       0|  2.561161|         0.0|         5.4|      45.0|19951222|
| 9091|35000000|   5.23158| 6.4350171E7|         5.5|     174.0|19951222|
|  710|58000000| 14.686036|3.52194034E8|         6.6|    1194.0|19951116|
| 9087|62000000|  6.318445|1.07879496E