In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, count, desc, from_json 
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,DateType,ArrayType

spark = SparkSession.builder \
    .appName("TMDB Data Aggregation") \
    .getOrCreate()

TMDB_Schema=StructType([StructField("budget",IntegerType(),False),
                        StructField('genres', StringType(), nullable=False),
                        StructField('homepage', StringType(), nullable=True),
                        StructField('id', IntegerType(), nullable=False),
                        StructField('keywords', StringType(), nullable=False),
                        StructField('original_language', StringType(), nullable=False),
                        StructField('original_title', StringType(), nullable=False),
                        StructField('overview', StringType(), nullable=True),
                        StructField('popularity', FloatType(), nullable=False),
                        StructField('production_companies', StringType(), nullable=False),
                        StructField('production_countries', StringType(),  nullable=True), 
                        StructField('release_date', DateType(), nullable=True),
                        StructField('revenue', IntegerType(), nullable=False),
                        StructField('runtime', StringType(),  nullable=True),
                        StructField('spoken_languages', StringType(), nullable=False),
                        StructField('status', StringType(), nullable=False),
                        StructField('tagline', StringType(), nullable=True),
                        StructField('title', StringType(), nullable=False),
                        StructField('vote_average', FloatType(), nullable=False),
                        StructField('vote_count', IntegerType(), nullable=False)])

data = spark.read.csv('tmdb_5000_movies.csv',  schema=TMDB_Schema ,header=True,quote='"',escape='"', multiLine=True)

data.show(5)

+---------+--------------------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+------------+----------+
|   budget|              genres|            homepage|    id|            keywords|original_language|      original_title|            overview|popularity|production_companies|production_countries|release_date|   revenue|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|
+---------+--------------------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+------------+----------+
|237000000|[{"id": 28, "name...

In [62]:
import subprocess
local_file_path = ".\tmdb_5000_movies.csv"
# make sure you running docker-compose
subprocess.run(["docker", "cp", local_file_path , "namenode:/tmp/"])

CompletedProcess(args=['docker', 'cp', '.\\tmdb_5000_movies.csv', 'namenode:/tmp/'], returncode=0)

In [60]:
popular_films = data.groupBy(data["original_language"]) \
    .agg({"popularity": "max"}).withColumnRenamed("max(popularity)", "max_popularity")

data_selected = data.select(data["original_language"].alias("language"), 
                            data["original_title"].alias("title"), 
                            data["popularity"])

top_popular_films = popular_films.join(data_selected, 
                                       (popular_films["original_language"] == data_selected["language"]) & 
                                       (popular_films["max_popularity"] == data_selected["popularity"]))

top_popular_films_selected = top_popular_films.select(top_popular_films["original_language"], 
                                                      top_popular_films["title"], 
                                                      top_popular_films["popularity"])\
                                                    .orderBy(desc("popularity"))

top_popular_films_selected.toPandas().to_csv("popular_film_per_lan.csv", index=False)

top_popular_films_selected.show(10)


+-----------------+--------------------+----------+
|original_language|               title|popularity|
+-----------------+--------------------+----------+
|               en|             Minions|  875.5813|
|               ja|    千と千尋の神隠し| 118.96856|
|               es|El laberinto del ...|  90.80941|
|               it|Il buono, il brut...| 88.377075|
|               fr|Le fabuleux desti...| 73.720245|
|               ko|         Snowpiercer| 64.238686|
|               zh|            卧虎藏龙| 45.742252|
|               pt|      Cidade de Deus| 44.356712|
|               id|        Serbuan maut| 40.285095|
|               da|What Happens in V...| 38.100487|
+-----------------+--------------------+----------+
only showing top 10 rows



In [16]:
data.schema

StructType([StructField('budget', StringType(), True), StructField('genres', StringType(), True), StructField('homepage', StringType(), True), StructField('id', StringType(), True), StructField('keywords', StringType(), True), StructField('original_language', StringType(), True), StructField('original_title', StringType(), True), StructField('overview', StringType(), True), StructField('popularity', StringType(), True), StructField('production_companies', StringType(), True), StructField('production_countries', StringType(), True), StructField('release_date', StringType(), True), StructField('revenue', StringType(), True), StructField('runtime', StringType(), True), StructField('spoken_languages', StringType(), True), StructField('status', StringType(), True), StructField('tagline', StringType(), True), StructField('title', StringType(), True), StructField('vote_average', StringType(), True), StructField('vote_count', StringType(), True)])

In [20]:
allEnglishMovies = data.filter(data.original_language == 'en')
sortedEnglishMovies = allEnglishMovies.orderBy(desc('popularity'))
sortedEnglishMovies.show(5)

+---------+--------------------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+------------+----------+
|   budget|              genres|            homepage|    id|            keywords|original_language|      original_title|            overview|popularity|production_companies|production_countries|release_date|   revenue|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|
+---------+--------------------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+------------+----------+
| 74000000|[{"id": 10751, "n...

In [3]:
genre_schema = ArrayType(StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True)
]))

df_parsed = data.withColumn("genres_array", from_json(col("genres"), genre_schema))

df_exploded = df_parsed.withColumn("genre", explode(df_parsed["genres_array"]))

genres_aggregated = df_exploded.groupBy("genre.id", "genre.name") \
    .agg(count("id").alias("number_of_movies")).orderBy(desc('number_of_movies'))

genres_aggregated.toPandas().to_csv("Genres_Agggregations.csv", index=False)

genres_aggregated.show(5)


+-----+--------+----------------+
|   id|    name|number_of_movies|
+-----+--------+----------------+
|   18|   Drama|            2297|
|   35|  Comedy|            1722|
|   53|Thriller|            1274|
|   28|  Action|            1154|
|10749| Romance|             894|
+-----+--------+----------------+
only showing top 5 rows

