In [1]:
import findspark
findspark.init("/opt/spark")
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *

In [2]:
! pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
spark = SparkSession.builder \
.appName("final project") \
.master("local[2]") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0,io.delta:delta-core_2.12:2.4.0,io.delta:delta-storage:2.4.0") \
.config("spark.hadoop.fs.s3a.access.key", "root") \
.config("spark.hadoop.fs.s3a.secret.key", "root12345") \
.config("spark.hadoop.fs.s3a.path.style.access", True) \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate() 

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
io.delta#delta-core_2.12 added as a dependency
io.delta#delta-storage added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-03f56ed0-34c7-466c-9293-f8734dafa7a7;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.0 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.375 in central
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 2363ms :: artifacts dl 16ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.375 from central in [default]
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.0 from central in [

# READ CREDITS

In [3]:
credits_df = spark.read.parquet('s3a://tmdb-bronze/credits')

24/03/23 02:05:56 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [4]:
credits_df.show()

                                                                                

+--------+--------------------+--------------------+--------------------+--------------------+
|movie_id|               title|                cast|                crew|          event_time|
+--------+--------------------+--------------------+--------------------+--------------------+
|   19995|              Avatar|[{"cast_id": 242,...|[{"credit_id": "5...|2024-03-22 23:14:...|
|     285|Pirates of the Ca...|[{"cast_id": 4, "...|[{"credit_id": "5...|2024-03-22 23:14:...|
|  206647|             Spectre|[{"cast_id": 1, "...|[{"credit_id": "5...|2024-03-22 23:14:...|
|   49026|The Dark Knight R...|[{"cast_id": 2, "...|[{"credit_id": "5...|2024-03-22 23:14:...|
|   49529|         John Carter|[{"cast_id": 5, "...|[{"credit_id": "5...|2024-03-22 23:14:...|
|     559|        Spider-Man 3|[{"cast_id": 30, ...|[{"credit_id": "5...|2024-03-22 23:14:...|
|   38757|             Tangled|[{"cast_id": 34, ...|[{"credit_id": "5...|2024-03-22 23:14:...|
|   99861|Avengers: Age of ...|[{"cast_id": 76, ..

In [5]:
credits_df.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- event_time: timestamp_ntz (nullable = true)



### CAST & CREDITS

In [6]:
cast_schema = StructType([
            StructField("cast_id", IntegerType()),
            StructField("character", StringType()),
            StructField("credit_id", StringType()),
            StructField("gender", IntegerType()),
            StructField("id", IntegerType()),
            StructField("name", StringType()),
            StructField("order", IntegerType())
                  ])

In [7]:
crew_schema = StructType([
            StructField("credit_id", StringType()),
            StructField("department", StringType()),
            StructField("gender", IntegerType()),
            StructField("id", IntegerType()),
            StructField("job", StringType()),
            StructField("name", StringType())
])

In [8]:
credits_sch = credits_df.withColumn("cast", F.from_json(F.col("cast"), ArrayType( cast_schema ))) \
                        .withColumn("crew", F.from_json(F.col("crew"), ArrayType( crew_schema )))

In [9]:
credits_sch.show(5)

                                                                                

+--------+--------------------+--------------------+--------------------+--------------------+
|movie_id|               title|                cast|                crew|          event_time|
+--------+--------------------+--------------------+--------------------+--------------------+
|   19995|              Avatar|[{242, Jake Sully...|[{52fe48009251416...|2024-03-22 23:14:...|
|     285|Pirates of the Ca...|[{4, Captain Jack...|[{52fe4232c3a3684...|2024-03-22 23:14:...|
|  206647|             Spectre|[{1, James Bond, ...|[{54805967c3a3682...|2024-03-22 23:14:...|
|   49026|The Dark Knight R...|[{2, Bruce Wayne ...|[{52fe4781c3a3684...|2024-03-22 23:14:...|
|   49529|         John Carter|[{5, John Carter,...|[{52fe479ac3a3684...|2024-03-22 23:14:...|
+--------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [10]:
credits_sch.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cast_id: integer (nullable = true)
 |    |    |-- character: string (nullable = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- gender: integer (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- order: integer (nullable = true)
 |-- crew: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- credit_id: string (nullable = true)
 |    |    |-- department: string (nullable = true)
 |    |    |-- gender: integer (nullable = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- job: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- event_time: timestamp_ntz (nullable = true)



In [11]:
cast_exp = credits_sch.select("movie_id", "title", F.explode_outer("cast").alias("cast"))

In [12]:
cast_exp.show(5)

[Stage 3:>                                                          (0 + 1) / 1]

+--------+------+--------------------+
|movie_id| title|                cast|
+--------+------+--------------------+
|   19995|Avatar|{242, Jake Sully,...|
|   19995|Avatar|{3, Neytiri, 52fe...|
|   19995|Avatar|{25, Dr. Grace Au...|
|   19995|Avatar|{4, Col. Quaritch...|
|   19995|Avatar|{5, Trudy Chacon,...|
+--------+------+--------------------+
only showing top 5 rows



                                                                                

In [13]:
cast_cols = ['cast_id', 'character', 'credit_id', 'gender', 'id', 'name']

In [15]:
for col in cast_cols:
    cast_exp = cast_exp.withColumn(col, F.col("cast").getItem(col))
cast_df = cast_exp.na.fill("0000000000",["credit_id"])    

In [16]:
credits_cast = cast_df.drop('cast')

In [17]:
credits_cast.show(4, truncate=False)

[Stage 9:>                                                          (0 + 1) / 1]

+--------+------+-------+-------------------+------------------------+------+-----+----------------+
|movie_id|title |cast_id|character          |credit_id               |gender|id   |name            |
+--------+------+-------+-------------------+------------------------+------+-----+----------------+
|19995   |Avatar|242    |Jake Sully         |5602a8a7c3a3685532001c9a|2     |65731|Sam Worthington |
|19995   |Avatar|3      |Neytiri            |52fe48009251416c750ac9cb|1     |8691 |Zoe Saldana     |
|19995   |Avatar|25     |Dr. Grace Augustine|52fe48009251416c750aca39|1     |10205|Sigourney Weaver|
|19995   |Avatar|4      |Col. Quaritch      |52fe48009251416c750ac9cf|2     |32747|Stephen Lang    |
+--------+------+-------+-------------------+------------------------+------+-----+----------------+
only showing top 4 rows



                                                                                

In [18]:
credits_cast.select("movie_id").distinct().count()

                                                                                

4803

In [19]:
credits_cast.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- cast_id: integer (nullable = true)
 |-- character: string (nullable = true)
 |-- credit_id: string (nullable = false)
 |-- gender: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



In [20]:
cast = credits_cast

In [21]:
crew_exp = credits_sch.select("movie_id", "title", F.explode_outer("crew").alias("crew"))

In [22]:
crew_cols = ['credit_id', 'department', 'gender', 'id', 'job', 'name']

In [23]:
for col in crew_cols:
    crew_exp = crew_exp.withColumn(col, F.col("crew").getItem(col))
crew_df = crew_exp.na.fill("0000000000",["credit_id"])    

In [24]:
credits_crew = crew_df.drop("crew")

In [25]:
credits_crew.show(4, truncate=False)

[Stage 16:>                                                         (0 + 1) / 1]

+--------+------+------------------------+----------+------+----+------------------------+-----------------+
|movie_id|title |credit_id               |department|gender|id  |job                     |name             |
+--------+------+------------------------+----------+------+----+------------------------+-----------------+
|19995   |Avatar|52fe48009251416c750aca23|Editing   |0     |1721|Editor                  |Stephen E. Rivkin|
|19995   |Avatar|539c47ecc3a36810e3001f87|Art       |2     |496 |Production Design       |Rick Carter      |
|19995   |Avatar|54491c89c3a3680fb4001cf7|Sound     |0     |900 |Sound Designer          |Christopher Boyes|
|19995   |Avatar|54491cb70e0a267480001bd0|Sound     |0     |900 |Supervising Sound Editor|Christopher Boyes|
+--------+------+------------------------+----------+------+----+------------------------+-----------------+
only showing top 4 rows



                                                                                

In [26]:
credits_crew.select('movie_id').distinct().count()

                                                                                

4803

In [27]:
credits_crew.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- credit_id: string (nullable = false)
 |-- department: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- name: string (nullable = true)



In [28]:
crew = credits_crew

# READ MOVİES

In [29]:
df_movies = spark.read.parquet('s3a://tmdb-bronze/movies')

                                                                                

In [30]:
df_movies.printSchema()

root
 |-- budget: long (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- keywords: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: double (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: long (nullable = true)
 |-- event_time: timestamp_ntz (nullable = true)



### MOVIES

In [31]:
select_movies = df_movies.withColumnRenamed("id","movie_id").select("movie_id","title","budget","homepage","original_language","original_title","overview","popularity","release_date","revenue","runtime","status","tagline","vote_average","vote_count")

In [32]:
select_movies.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- budget: long (nullable = true)
 |-- homepage: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: double (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: long (nullable = true)



In [33]:
movies_df = select_movies.withColumn("movie_id", F.col("movie_id").cast(StringType())) \
.withColumn("budget", F.col("budget").cast(DoubleType())) \
.withColumn("popularity", F.col("popularity").cast(FloatType())) \
.withColumn("release_date", F.to_date("release_date", "yyyy-MM-dd")) \
.withColumn("revenue", F.col("revenue").cast(DoubleType())) \
.withColumn("runtime", F.col("runtime").cast(IntegerType())) \
.withColumn("vote_average", F.col("vote_average").cast(FloatType())) \
.withColumn("vote_count", F.col("vote_count").cast(IntegerType()))

In [34]:
movies_df.show()

[Stage 24:>                                                         (0 + 1) / 1]

+--------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+------------+-------------+-------+--------+--------------------+------------+----------+
|movie_id|               title|budget|            homepage|original_language|      original_title|            overview|popularity|release_date|      revenue|runtime|  status|             tagline|vote_average|vote_count|
+--------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+------------+-------------+-------+--------+--------------------+------------+----------+
|   19995|              Avatar|2.37E8|http://www.avatar...|               en|              Avatar|In the 22nd centu...| 150.43758|  2009-12-10|2.787965087E9|    162|Released|Enter the World o...|         7.2|     11800|
|     285|Pirates of the Ca...| 3.0E8|http://disney.go....|               en|Pirates of the Ca...|Captain Barbossa,...| 

                                                                                

In [35]:
movies_df.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- budget: double (nullable = true)
 |-- homepage: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: double (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: integer (nullable = true)



In [36]:
movies = movies_df

In [37]:
movies_tables = df_movies.withColumnRenamed("id","movie_id").select("movie_id","genres","keywords","production_companies","production_countries","spoken_languages")
movies_tables.limit(5).toPandas()

                                                                                

Unnamed: 0,movie_id,genres,keywords,production_companies,production_countries,spoken_languages
0,19995,"[{""id"": 28, ""name"": ""Action""}, {""id"": 12, ""nam...","[{""id"": 1463, ""name"": ""culture clash""}, {""id"":...","[{""name"": ""Ingenious Film Partners"", ""id"": 289...","[{""iso_3166_1"": ""US"", ""name"": ""United States o...","[{""iso_639_1"": ""en"", ""name"": ""English""}, {""iso..."
1,285,"[{""id"": 12, ""name"": ""Adventure""}, {""id"": 14, ""...","[{""id"": 270, ""name"": ""ocean""}, {""id"": 726, ""na...","[{""name"": ""Walt Disney Pictures"", ""id"": 2}, {""...","[{""iso_3166_1"": ""US"", ""name"": ""United States o...","[{""iso_639_1"": ""en"", ""name"": ""English""}]"
2,206647,"[{""id"": 28, ""name"": ""Action""}, {""id"": 12, ""nam...","[{""id"": 470, ""name"": ""spy""}, {""id"": 818, ""name...","[{""name"": ""Columbia Pictures"", ""id"": 5}, {""nam...","[{""iso_3166_1"": ""GB"", ""name"": ""United Kingdom""...","[{""iso_639_1"": ""fr"", ""name"": ""Fran\u00e7ais""},..."
3,49026,"[{""id"": 28, ""name"": ""Action""}, {""id"": 80, ""nam...","[{""id"": 849, ""name"": ""dc comics""}, {""id"": 853,...","[{""name"": ""Legendary Pictures"", ""id"": 923}, {""...","[{""iso_3166_1"": ""US"", ""name"": ""United States o...","[{""iso_639_1"": ""en"", ""name"": ""English""}]"
4,49529,"[{""id"": 28, ""name"": ""Action""}, {""id"": 12, ""nam...","[{""id"": 818, ""name"": ""based on novel""}, {""id"":...","[{""name"": ""Walt Disney Pictures"", ""id"": 2}]","[{""iso_3166_1"": ""US"", ""name"": ""United States o...","[{""iso_639_1"": ""en"", ""name"": ""English""}]"


In [38]:
schema = (ArrayType( StructType([
            StructField("id", IntegerType()),
            StructField("name", StringType())])))

### GENRES

In [39]:
genres_exp = movies_tables.withColumn('movie_id',F.col("movie_id").cast(StringType())) \
                          .withColumn('genres',F.from_json(F.col('genres'),schema)) \
                          .select('movie_id',F.explode_outer('genres').alias('genres'))

In [40]:
genres_exp.limit(5).toPandas()


                                                                                

Unnamed: 0,movie_id,genres
0,19995,"(28, Action)"
1,19995,"(12, Adventure)"
2,19995,"(14, Fantasy)"
3,19995,"(878, Science Fiction)"
4,285,"(12, Adventure)"


In [41]:
cols = ["id","name"]

In [42]:
for col in cols:
    genres_exp = genres_exp.withColumn(col, F.col("genres").getItem(col))
genres = genres_exp.drop("genres").na.fill(value=-9999, subset=["id"])

In [43]:
genres.show(5)

[Stage 27:>                                                         (0 + 1) / 1]

+--------+---+---------------+
|movie_id| id|           name|
+--------+---+---------------+
|   19995| 28|         Action|
|   19995| 12|      Adventure|
|   19995| 14|        Fantasy|
|   19995|878|Science Fiction|
|     285| 12|      Adventure|
+--------+---+---------------+
only showing top 5 rows



                                                                                

### KEYWORDS

In [44]:
keyword_exp = movies_tables.withColumn('movie_id', F.col("movie_id").cast(StringType())) \
                             .withColumn('keywords', F.from_json(F.col('keywords'),schema)) \
                             .select('movie_id', F.explode_outer('keywords').alias('keywords'))
keyword_exp.limit(5).toPandas()

                                                                                

Unnamed: 0,movie_id,keywords
0,19995,"(1463, culture clash)"
1,19995,"(2964, future)"
2,19995,"(3386, space war)"
3,19995,"(3388, space colony)"
4,19995,"(3679, society)"


In [45]:
for col in cols:
    keyword_exp = keyword_exp.withColumn(col, F.col("keywords").getItem(col))
keywords = keyword_exp.drop("keywords").na.fill(value=-9999, subset=["id"])

In [46]:
keywords.show(5)

[Stage 29:>                                                         (0 + 1) / 1]

+--------+----+-------------+
|movie_id|  id|         name|
+--------+----+-------------+
|   19995|1463|culture clash|
|   19995|2964|       future|
|   19995|3386|    space war|
|   19995|3388| space colony|
|   19995|3679|      society|
+--------+----+-------------+
only showing top 5 rows



                                                                                

In [47]:
keywords.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



### PRODUCTION COMPANIES

In [48]:
companies = movies_tables.withColumn('movie_id', F.col("movie_id").cast(StringType())) \
                         .withColumn('production_companies', F.from_json(F.col('production_companies'),schema)) \
                         .select('movie_id', F.explode_outer('production_companies').alias('production_companies'))
companies.limit(5).toPandas()

                                                                                

Unnamed: 0,movie_id,production_companies
0,19995,"(289, Ingenious Film Partners)"
1,19995,"(306, Twentieth Century Fox Film Corporation)"
2,19995,"(444, Dune Entertainment)"
3,19995,"(574, Lightstorm Entertainment)"
4,285,"(2, Walt Disney Pictures)"


In [49]:
for col in cols:
    companies = companies.withColumn(col, F.col("production_companies").getItem(col))
companies = companies.drop("production_companies").na.fill(value=-9999, subset=["id"])

In [50]:
companies.show(5,truncate=False)

[Stage 31:>                                                         (0 + 1) / 1]

+--------+---+--------------------------------------+
|movie_id|id |name                                  |
+--------+---+--------------------------------------+
|19995   |289|Ingenious Film Partners               |
|19995   |306|Twentieth Century Fox Film Corporation|
|19995   |444|Dune Entertainment                    |
|19995   |574|Lightstorm Entertainment              |
|285     |2  |Walt Disney Pictures                  |
+--------+---+--------------------------------------+
only showing top 5 rows



                                                                                

In [51]:
companies.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



In [52]:
production_companies = companies

### PRODUCTION COUNTRIES

In [53]:
countries_schema = ArrayType (StructType([
        StructField('iso_3166_1', StringType()), 
        StructField('name', StringType())]))

In [55]:
countries_exp = movies_tables.withColumn('movie_id', F.col("movie_id").cast(StringType())) \
                             .withColumn('production_countries',F.from_json(F.col('production_countries'),countries_schema)) \
                             .select('movie_id', F.explode_outer('production_countries').alias('production_countries'))

In [56]:
countries_cols = ["iso_3166_1", "name"]
for col in countries_cols:
    countries_exp = countries_exp.withColumn(col, F.col("production_countries").getItem(col))
countries = countries_exp.drop("production_countries").na.fill(value="XX", subset=["iso_3166_1"])

In [57]:
countries.show(5, truncate=False)

[Stage 32:>                                                         (0 + 1) / 1]

+--------+----------+------------------------+
|movie_id|iso_3166_1|name                    |
+--------+----------+------------------------+
|19995   |US        |United States of America|
|19995   |GB        |United Kingdom          |
|285     |US        |United States of America|
|206647  |GB        |United Kingdom          |
|206647  |US        |United States of America|
+--------+----------+------------------------+
only showing top 5 rows



                                                                                

In [58]:
production_countries = countries

### SPOKEN LANGUAGES

In [59]:
languages_schema = ArrayType (StructType([
    StructField('iso_639_1', StringType()), 
    StructField('name', StringType())]))

In [60]:
languages_exp = movies_tables.withColumn('movie_id', F.col("movie_id").cast(StringType())) \
                             .withColumn('spoken_languages',F.from_json(F.col('spoken_languages'), languages_schema)) \
                             .select('movie_id', F.explode_outer('spoken_languages').alias('spoken_languages'))

In [61]:
languages_cols = ["iso_639_1", "name"]
for col in languages_cols:
    languages_exp = languages_exp.withColumn(col, F.col("spoken_languages").getItem(col))
languages = languages_exp.drop("spoken_languages").na.fill(value="XX", subset=["iso_639_1"])

In [62]:
languages.show(5, truncate=False)

[Stage 33:>                                                         (0 + 1) / 1]

+--------+---------+--------+
|movie_id|iso_639_1|name    |
+--------+---------+--------+
|19995   |en       |English |
|19995   |es       |Español |
|285     |en       |English |
|206647  |fr       |Français|
|206647  |en       |English |
+--------+---------+--------+
only showing top 5 rows



                                                                                

In [63]:
spoken_languages = languages

### WRITE TO DELTA

In [64]:
tables = {'cast': cast, 'crew': crew, 'movies': movies, 'genres': genres, 'keywords': keywords, 'production_companies': production_companies, 'production_countries': production_countries, 'spoken_languages': spoken_languages}

for table_name, table in tables.items():
    deltaPath = f"s3a://tmdb-silver/{table_name}"
    
    table.write.mode("overwrite").format("delta").save(deltaPath)

24/03/22 18:42:26 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                