In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Movies data joining").getOrCreate()

In [2]:
from pyspark.sql.types import  *
from pyspark.sql.functions import from_json, regexp_replace

In [3]:
hadoop_base_directory = 'hdfs://192.168.56.101:9000/obligatorio'

# Movies

In [4]:
hadoop_movies_data = f'{hadoop_base_directory}/datasets/movies_metadata.csv'

In [5]:
movies = spark.read.format("csv").option("header", "true").option("mode", "DROPMALFORMED").option("escape","\"").option("quote", "\"").load(hadoop_movies_data)

In [6]:
movies.count()

45572

In [7]:
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
movies = movies.withColumn("row_num", row_number().over(w))

In [8]:
a_adult = "adult"
a_belongs_to = "belongs_to_collection"
a_budget = "budget"
a_genres = "genres"
a_id = "movie_id"
a_original_language = "original_language"
a_original_title = "original_title"
a_overview = "overview"
a_popularity = "popularity"
a_prod_companies = "production_companies"
a_production_countries = "production_countries"
a_release_date = "release_date"
a_revenue = "revenue"
a_spoken_languages = "spoken_languages"
a_title = "title"
a_vote_average = "vote_average"
a_vote_count = "vote_count"

relevant_fields = [a_adult, a_budget, a_genres, a_id, a_original_language, a_original_title,
                   a_overview, a_popularity, a_prod_companies, a_production_countries, a_release_date,
                  a_revenue, a_spoken_languages, a_title]

In [9]:
genres_schema = ArrayType(
    StructType([StructField("id", IntegerType()), 
                StructField("name", StringType())]))

prod_companies_schema = ArrayType(
    StructType([StructField("name", StringType()),
                StructField("id", IntegerType())]))

prod_countries_schema = ArrayType(
    StructType([StructField("iso_3166_1", StringType()),
                StructField("name", StringType())]))

spoken_languages_schema = ArrayType(
    StructType([StructField("iso_639_1", StringType()),
                StructField("name", StringType())]))

In [10]:
movies = movies.withColumn(a_adult, (movies.adult).cast("Boolean"))\
         .withColumn(a_id, (movies.id).cast("Integer"))\
         .withColumn(a_budget, (movies.budget).cast("Integer"))\
         .withColumn(a_genres, from_json(movies.genres, genres_schema))\
         .withColumn(a_prod_companies, from_json(movies.production_companies, prod_companies_schema))\
         .withColumn(a_production_countries, from_json(movies.production_countries, prod_countries_schema))\
         .withColumn(a_spoken_languages, from_json(movies.spoken_languages, spoken_languages_schema))\
         .withColumn(a_popularity, (movies.popularity).cast("Float"))\
         .withColumn(a_release_date, (movies.release_date).cast("Date"))\
         .withColumn(a_revenue, (movies.revenue).cast("Integer"))\
         .withColumn(a_vote_average, (movies.vote_average).cast("Float"))\
         .withColumn(a_vote_count, (movies.vote_count).cast("Integer"))

In [11]:
movies = movies.na.drop(subset=[a_adult, a_id, a_budget, a_genres, a_prod_companies, a_production_countries, 
                   a_spoken_languages, a_popularity, a_revenue, a_vote_average, a_vote_count])

In [12]:
movies = movies.dropDuplicates(subset=[a_id])

In [None]:
movies.count()

In [None]:
#from pyspark.sql.functions import row_number,lit
#from pyspark.sql.window import Window
#w = Window().orderBy(lit('A'))
#movies = movies.withColumn("row_num", row_number().over(w))

In [13]:
movies.select(["id","title", "production_companies"]).show(truncate = False)

+----+-----------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id  |title                                                                              |production_companies                                                                                                                                                                                                                |
+----+-----------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|148 |The Secret Life of Words          

In [14]:
movies.show(10, truncate = False)

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-------------------------------------------------------+-------------------------------------------------------------+---+---------+-----------------+--------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+--------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [15]:
movies.filter(movies['vote_count'].isNull()).select(["movie_id","production_companies","release_date"]).show()

+--------+--------------------+------------+
|movie_id|production_companies|release_date|
+--------+--------------------+------------+
+--------+--------------------+------------+



In [None]:
movies.count()

In [16]:
def get_element_in_pos(struct_list, pos):
    return list([ x[pos] for x in struct_list])

In [17]:
from pyspark.sql.functions import udf

extract_name_udf = udf(lambda z: get_element_in_pos(z, 1), ArrayType(StringType()))
extract_prod_company_name = udf(lambda z: get_element_in_pos(z, 0), ArrayType(StringType()))

In [18]:
movies = movies.withColumn("genres", extract_name_udf(movies.genres))\
        .withColumn("production_companies", extract_prod_company_name(movies.production_companies))\
        .withColumn("production_countries", extract_name_udf(movies.production_countries))\
        .withColumn("spoken_languages", extract_name_udf(movies.spoken_languages))\

In [None]:
movies.dtypes

In [19]:
movies.select(["id","title", "spoken_languages"]).show(truncate = False)

+----+-----------------------------------------------------------------------------------+-----------------------------+
|id  |title                                                                              |spoken_languages             |
+----+-----------------------------------------------------------------------------------+-----------------------------+
|148 |The Secret Life of Words                                                           |[English, Français, Español] |
|471 |Bandyta                                                                            |[Polski, English]            |
|496 |Borat: Cultural Learnings of America for Make Benefit Glorious Nation of Kazakhstan|[English, עִבְרִית, қазақ]   |
|833 |Umberto D.                                                                         |[Italiano]                   |
|1088|Whale Rider                                                                        |[English, ]                  |
|1580|Rope                      

In [None]:
movies.count()

In [None]:
movies.dtypes

In [20]:
movies = movies[relevant_fields]

# Ratings

In [27]:
hadoop_ratings_addr = f'{hadoop_base_directory}/ratings'
hadoop_links_addr = f'{hadoop_base_directory}/datasets/links.csv'

In [33]:
ratings_schema = StructType([
    StructField(a_id, IntegerType(), True),
    StructField("rating", FloatType(), True),
    StructField("vote_count", IntegerType(), True)
])

In [34]:
ratings = spark.read.format("csv").option("header", "true").schema(ratings_schema).load(hadoop_ratings_addr, header=False)
links = spark.read.format("csv").option("header", "true").load(hadoop_links_addr, header=True)

In [None]:
ratings = ratings.na.drop(subset=["movie_id"])
ratings = ratings.dropDuplicates(subset=['movie_id'])

In [None]:
links = links.withColumn(a_id, (links.movieId).cast("Integer"))\
             .withColumn("tmdbId", (links.tmdbId).cast("Integer"))[a_id, "tmdbId"]

In [None]:
links.show(10)

In [None]:
links.count()

In [None]:
links = links.na.drop(subset=[a_id])
links = links.na.drop(subset=["tmdbId"])
links = links.dropDuplicates(subset=[a_id])
links = links.dropDuplicates(subset=["tmdbId"])

In [None]:
links.count()

In [None]:
ratings_links = ratings.join(links, on=[a_id])

In [None]:
ratings_links.show()

In [None]:
ratings = ratings_links.withColumn(a_id, (ratings_links.tmdbId).cast("Integer"))\
                 .withColumn("rating", (ratings_links.rating).cast("Float"))[a_id, "rating"]

In [None]:
ratings.count()

In [None]:
movies.count()

In [None]:
movies = movies.join(ratings, on=[a_id], how='left')

In [None]:
movies.count()

# Keywords - Para busqueda por indexacion

In [None]:
hadoop_keywords_addr = f'{hadoop_base_directory}/datasets/keywords.csv'

In [None]:
keywords = spark.read.format("csv").option("header", "true").option("escape","\"").load(hadoop_keywords_addr)

In [None]:
a_keywords = "keywords"

keywords_schema = ArrayType(
    StructType([StructField("id", IntegerType()), 
                StructField("name", StringType())]))

In [None]:
extract_name_udf = udf(lambda z: get_element_in_pos(z, 1), ArrayType(StringType()))

In [None]:
keywords = keywords.withColumn(a_id, (keywords.id).cast("Integer"))\
                   .withColumn(a_keywords, from_json(keywords.keywords, keywords_schema))[a_id, a_keywords]

In [None]:
keywords = keywords.dropna()
keywords = keywords.dropDuplicates(subset=[a_id])

In [None]:
keywords.count()

In [None]:
keywords = keywords.withColumn(a_keywords, extract_name_udf(keywords.keywords))

In [None]:
keywords.count()

In [None]:
keywords.show(10, truncate = False)

In [None]:
movies = movies.join(keywords, on=[a_id], how='left')

In [None]:
movies.count()

In [None]:
movies.select([a_id, a_title, a_keywords]).show(truncate=False)

# Obtener Cast (Actores) de Credits

In [None]:
hadoop_credits_addr = f'{hadoop_base_directory}/datasets/credits.csv'

In [None]:
credits = spark.read.format("csv").option("header", "true").option("escape","\"").load(hadoop_credits_addr)
credits = credits.withColumn('cast', regexp_replace('cast', ': None', ": ''"))

In [None]:
cast_schema = ArrayType(
    StructType([StructField("cast_id", IntegerType()), 
                StructField("character", StringType()),
                StructField("credit_id", StringType(), True),
                StructField("gender", IntegerType(), True),
                StructField("id", IntegerType()),
                StructField("name", StringType()),
                StructField("order", IntegerType(), True),
                StructField("profile_path", StringType(), True),
               ]))

In [None]:
credits = credits.withColumn(a_id, (credits.id).cast("Integer"))\
                  .withColumn("cast_actors", from_json(credits.cast, cast_schema))[a_id, "cast_actors"]

In [None]:
credits.filter(credits[a_id].isNull()).show()

In [None]:
credits.select(["cast_actors"]).show()

In [None]:
credits = credits.na.drop(subset=['cast_actors'])
credits = credits.dropDuplicates(subset=[a_id])

In [None]:
extract_cast_name_udf = udf(lambda z: get_element_in_pos(z, 5), ArrayType(StringType()))

In [None]:
credits = credits.withColumn("cast_actors", extract_cast_name_udf(credits.cast_actors))

In [None]:
movies = movies.join(credits, on=[a_id], how='left')

In [None]:
movies.dtypes

In [None]:
movies.select(['cast_actors']).count()

In [None]:
from pyspark.sql.functions import to_json, spark_partition_id, collect_list, col, struct

! rm -r movies_denormalized

movies.select(to_json(struct(*movies.columns)).alias("json"))\
    .groupBy(spark_partition_id())\
    .agg(collect_list("json").alias("json_list"))\
    .select(col("json_list").cast("string"))\
    .write.mode('overwrite').text('movies_denormalized')

In [None]:
! ./to_json.sh

In [None]:
hadoop_dest_folder_csv = 'hdfs://192.168.56.101:9000/obligatorio/'
movies_aggregated_dir = 'movies_aggregated'

In [None]:
movies.write.mode('overwrite').json(f'{hadoop_dest_folder_csv}/{movies_aggregated_dir}')

In [None]:
spark.stop()