# Big Data Storage Exam - Individual Task Timo Heiß
**Note**: Data import/preparation and storage in parquet files are simply copied from the group task in order to be able to perform the query on the same tables as in the group task. My individual task begins with the heading "Individual Task - Timo Heiß"

## Data Import and Preparation
(Copied from Group Task, hide cells for better overview)

In [0]:
# imports:
from pyspark.sql.types import *
from pyspark.sql.functions import explode, expr, col, avg, broadcast, year
import json
import ast

In [0]:
# create schema for movies dataframe
movies_schema = StructType([ 
    StructField("budget", IntegerType(),True), 
    StructField("genres", StringType(),True),  # nested columns are stored as strings and normalized later (same for other nested columns)
    StructField("homepage", StringType(),True), 
    StructField("id", StringType(), True), # ids are stored as string (same for all ids)
    StructField("keywords", StringType(), True), 
    StructField("original_language", StringType(), True),
    StructField("original_title", StringType(), True), 
    StructField("overview", StringType(), True), 
    StructField("popularity", FloatType(), True), 
    StructField("production_companies", StringType(), True), 
    StructField("production_countries", StringType(), True), 
    StructField("release_date", DateType(), True), 
    StructField("revenue", LongType(), True),  # values too big for int, therefore used dtype Long
    StructField("runtime", IntegerType(), True), 
    StructField("spoken_languages", StringType(), True), 
    StructField("status", StringType(), True), 
    StructField("tagline", StringType(), True), 
    StructField("title", StringType(), True)
  ])

# create schema for credits dataframe
credits_schema = StructType([ 
    StructField("movie_id", StringType(),True), 
    StructField("title", StringType(),True), 
    StructField("cast", StringType(),True), 
    StructField("crew", StringType(), True), 
  ])

# create schema for recommendations dataframe
recommendations_schema = StructType([ 
    StructField("movie_id", StringType(),True), 
    StructField("user_id", StringType(),True), 
    StructField("vote", IntegerType(),True)
  ])

In [0]:
# load data from CSVs into Data-Frames 
df_movies_in = (spark.read.format("csv")
                          .schema(movies_schema) # apply schema
                          .option("header", "true")
                          .option("escape",'"') # additional quotes are ignored
                          .option("mode", "DROPMALFORMED") # drops malformed rows in csv --> PLEASE NOTE THAT THIS WILL LEAD TO SOME MOVIES NOT BEING CONSIDERED
                          .load("dbfs:/FileStore/shared_uploads/tiheiss@gmail.com/groupC/groupC/movies_groupC.csv")) # please insert your own link to the movies csv here

df_credits_in = spark.read.format("csv") \
                          .schema(credits_schema) \
                          .option("header", "true") \
                          .option("escape",'"') \
                          .load("dbfs:/FileStore/shared_uploads/tiheiss@gmail.com/groupC/groupC/credits_groupC.csv") # please insert your own link to the credits csv here

df_recommendations_in = spark.read.format("csv") \
                                  .schema(recommendations_schema) \
                                  .option("header", "true") \
                                  .load("dbfs:/FileStore/shared_uploads/tiheiss@gmail.com/groupC/groupC/recommendations_groupC.csv") # please insert your own link to the recommendations csv here

In [0]:
# Pre-Processing:
# credits dataframe

"""
normalize credit dataframe --> leads to one dataframe for cast and one for crew
json.loads() to convert the string into a list of dictionaries
explode() to normalize the column --> 1.NF
Note: for some reason, every value in the dictionary resulting from json.loads() is stored with the same dtype; the dtype is derived from the first element in the dict, therefore cast_id is 'converted' to string by adding quotes in the json-string (otherwise all values would be stored as integer, which leads to NULL in case of the name)
"""

df_cast = df_credits_in.rdd.map(lambda x: (x[0],                    # adding quotes in json string to "convert" cast_id to string  
                                           json.loads(x[2].replace('\"cast_id\":','\"cast_id\":\"').replace(', \"character\"','\", \"character\"')) 
                                           )
                               ).toDF(["movie_id", "cast"])

rdd = df_cast.select(df_cast.movie_id, explode(df_cast.cast)).rdd.map(lambda x: (x[0], 
                                                                                 x[1]["name"], 
                                                                                 x[1]["id"], 
                                                                                 x[1]["order"], 
                                                                                 x[1]["character"], 
                                                                                 x[1]["credit_id"], 
                                                                                 x[1]["cast_id"], 
                                                                                 x[1]["gender"]
                                                                                )
                                                                     )
cast_schema = StructType([ # schema for new cast dataframe
    StructField("movie_id", StringType(),True), 
    StructField("name", StringType(),True), 
    StructField("id", StringType(),True), 
    StructField("order", StringType(), True), 
    StructField("character", StringType(),True), 
    StructField("credit_id", StringType(),True), 
    StructField("cast_id", StringType(),True), 
    StructField("gender", StringType(),True)  # cast to IntegerType not possible here, has to be done separately (see below)
  ])

df_cast = spark.createDataFrame(rdd, cast_schema)
df_cast = df_cast.withColumn("gender", df_cast.gender.cast('int')).withColumn("order", df_cast.order.cast('int')) # convert gender and order to integer manually
df_cast.limit(3).display() # show top 3 rows
df_cast.printSchema() # print schema


df_crew = df_credits_in.rdd.map(lambda x: (x[0], json.loads(x[3]))).toDF(["movie_id", "crew"])
rdd = df_crew.select(df_crew.movie_id, explode(df_crew.crew)).rdd.map(lambda x: (x[0], 
                                                                                 x[1]["name"], 
                                                                                 x[1]["job"], 
                                                                                 x[1]["department"], 
                                                                                 x[1]["id"], 
                                                                                 x[1]["credit_id"], 
                                                                                 x[1]["gender"]
                                                                                )
                                                                     )
crew_schema = StructType([ # schema for new crew dataframe
    StructField("movie_id", StringType(),True), 
    StructField("name", StringType(),True), 
    StructField("job", StringType(),True), 
    StructField("department", StringType(), True), 
    StructField("id", StringType(),True), 
    StructField("credit_id", StringType(),True), 
    StructField("gender", StringType(),True)
  ])

df_crew = spark.createDataFrame(rdd, crew_schema)
df_crew = df_crew.withColumn("gender",df_crew.gender.cast('int')) # cast gender to integer
df_crew.limit(3).display()
df_crew.printSchema()

movie_id,name,id,order,character,credit_id,cast_id,gender
19995,Sam Worthington,65731,0,Jake Sully,5602a8a7c3a3685532001c9a,242,2
19995,Zoe Saldana,8691,1,Neytiri,52fe48009251416c750ac9cb,3,1
19995,Sigourney Weaver,10205,2,Dr. Grace Augustine,52fe48009251416c750aca39,25,1


root
 |-- movie_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- order: integer (nullable = true)
 |-- character: string (nullable = true)
 |-- credit_id: string (nullable = true)
 |-- cast_id: string (nullable = true)
 |-- gender: integer (nullable = true)



movie_id,name,job,department,id,credit_id,gender
19995,Stephen E. Rivkin,Editor,Editing,1721,52fe48009251416c750aca23,0
19995,Rick Carter,Production Design,Art,496,539c47ecc3a36810e3001f87,2
19995,Christopher Boyes,Sound Designer,Sound,900,54491c89c3a3680fb4001cf7,0


root
 |-- movie_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- job: string (nullable = true)
 |-- department: string (nullable = true)
 |-- id: string (nullable = true)
 |-- credit_id: string (nullable = true)
 |-- gender: integer (nullable = true)



In [0]:
# Pre-Processing:
# recommendations dataframe
df_recommendations = df_recommendations_in # already clean and normalized
df_recommendations.limit(3).display()
df_recommendations.printSchema()

movie_id,user_id,vote
76493,70090,4
20764,47695,1
71547,57703,4


root
 |-- movie_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- vote: integer (nullable = true)



In [0]:
# Pre-Processing:
# movies dataframe
"""
# extract genres from movie dataframe and normalize the column in a separate dataframe
# json.loads() does not work here, therefore ast.literal_eval() is used to convert the json-string to a dictionary list (does basically the same as json.loads())
"""

df_genres = df_movies_in.withColumnRenamed("id", "movie_id").rdd.map(lambda x: (x[3], 
                                                                                ast.literal_eval(x[1].replace('\"id\": ','\"id\": \"') # add quotes -> "convert" id to string
                                                                                                     .replace(', \"name\"','\", \"name\"'))
                                                                               ) 
                                                                    ).toDF(["movie_id", "genres"])

rdd = df_genres.select(df_genres.movie_id, explode(df_genres.genres)).rdd.map(lambda x: (x[0], 
                                                                                         x[1]["name"], 
                                                                                         x[1]["id"]
                                                                                        )
                                                                             )
genres_schema = StructType([ # schema for new genre dataframe
    StructField("movie_id", StringType(),True), 
    StructField("name", StringType(),True), 
    StructField("id", StringType(),True)
  ])

df_genres = spark.createDataFrame(rdd, genres_schema)
df_genres.limit(3).display()
df_genres.printSchema()

# note: other columns (such as "keywords") could also be normalized, but since they are not used in the tasks below, we refrain from doing so (would be the same procedere as for genres)

df_movies = df_movies_in.drop("genres") # genres column can now be dropped as it is already in the newly created dataframe
df_movies.limit(3).display()
df_movies.printSchema()

movie_id,name,id
19995,Action,28
19995,Adventure,12
19995,Fantasy,14


root
 |-- movie_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- id: string (nullable = true)



budget,homepage,id,keywords,original_language,original_title,overview,popularity,production_companies,production_countries,release_date,revenue,runtime,spoken_languages,status,tagline,title
237029119,http://www.avatarmovie.com/,19995,"[{""id"": 1463, ""name"": ""culture clash""}, {""id"": 2964, ""name"": ""future""}, {""id"": 3386, ""name"": ""space war""}, {""id"": 3388, ""name"": ""space colony""}, {""id"": 3679, ""name"": ""society""}, {""id"": 3801, ""name"": ""space travel""}, {""id"": 9685, ""name"": ""futuristic""}, {""id"": 9840, ""name"": ""romance""}, {""id"": 9882, ""name"": ""space""}, {""id"": 9951, ""name"": ""alien""}, {""id"": 10148, ""name"": ""tribe""}, {""id"": 10158, ""name"": ""alien planet""}, {""id"": 10987, ""name"": ""cgi""}, {""id"": 11399, ""name"": ""marine""}, {""id"": 13065, ""name"": ""soldier""}, {""id"": 14643, ""name"": ""battle""}, {""id"": 14720, ""name"": ""love affair""}, {""id"": 165431, ""name"": ""anti war""}, {""id"": 193554, ""name"": ""power relations""}, {""id"": 206690, ""name"": ""mind and soul""}, {""id"": 209714, ""name"": ""3d""}]",en,Avatar,"In the 22nd century, a paraplegic Marine is dispatched to the moon Pandora on a unique mission, but becomes torn between following orders and protecting an alien civilization.",150.43758,"[{""name"": ""Ingenious Film Partners"", ""id"": 289}, {""name"": ""Twentieth Century Fox Film Corporation"", ""id"": 306}, {""name"": ""Dune Entertainment"", ""id"": 444}, {""name"": ""Lightstorm Entertainment"", ""id"": 574}]","[{""iso_3166_1"": ""US"", ""name"": ""United States of America""}, {""iso_3166_1"": ""GB"", ""name"": ""United Kingdom""}]",2009-12-10,2787954796,162,"[{""iso_639_1"": ""en"", ""name"": ""English""}, {""iso_639_1"": ""es"", ""name"": ""Espa\u00f1ol""}]",Released,Enter the World of Pandora.,Avatar
300022446,http://disney.go.com/disneypictures/pirates/,285,"[{""id"": 270, ""name"": ""ocean""}, {""id"": 726, ""name"": ""drug abuse""}, {""id"": 911, ""name"": ""exotic island""}, {""id"": 1319, ""name"": ""east india trading company""}, {""id"": 2038, ""name"": ""love of one's life""}, {""id"": 2052, ""name"": ""traitor""}, {""id"": 2580, ""name"": ""shipwreck""}, {""id"": 2660, ""name"": ""strong woman""}, {""id"": 3799, ""name"": ""ship""}, {""id"": 5740, ""name"": ""alliance""}, {""id"": 5941, ""name"": ""calypso""}, {""id"": 6155, ""name"": ""afterlife""}, {""id"": 6211, ""name"": ""fighter""}, {""id"": 12988, ""name"": ""pirate""}, {""id"": 157186, ""name"": ""swashbuckler""}, {""id"": 179430, ""name"": ""aftercreditsstinger""}]",en,Pirates of the Caribbean: At World's End,"Captain Barbossa, long believed to be dead, has come back to life and is headed to the edge of the Earth with Will Turner and Elizabeth Swann. But nothing is quite as it seems.",139.08261,"[{""name"": ""Walt Disney Pictures"", ""id"": 2}, {""name"": ""Jerry Bruckheimer Films"", ""id"": 130}, {""name"": ""Second Mate Productions"", ""id"": 19936}]","[{""iso_3166_1"": ""US"", ""name"": ""United States of America""}]",2007-05-19,961027991,169,"[{""iso_639_1"": ""en"", ""name"": ""English""}]",Released,"At the end of the world, the adventure begins.",Pirates of the Caribbean: At World's End
244999130,http://www.sonypictures.com/movies/spectre/,206647,"[{""id"": 470, ""name"": ""spy""}, {""id"": 818, ""name"": ""based on novel""}, {""id"": 4289, ""name"": ""secret agent""}, {""id"": 9663, ""name"": ""sequel""}, {""id"": 14555, ""name"": ""mi6""}, {""id"": 156095, ""name"": ""british secret service""}, {""id"": 158431, ""name"": ""united kingdom""}]",en,Spectre,"A cryptic message from Bond’s past sends him on a trail to uncover a sinister organization. While M battles political forces to keep the secret service alive, Bond peels back the layers of deceit to reveal the terrible truth behind SPECTRE.",107.376785,"[{""name"": ""Columbia Pictures"", ""id"": 5}, {""name"": ""Danjaq"", ""id"": 10761}, {""name"": ""B24"", ""id"": 69434}]","[{""iso_3166_1"": ""GB"", ""name"": ""United Kingdom""}, {""iso_3166_1"": ""US"", ""name"": ""United States of America""}]",2015-10-26,880650185,148,"[{""iso_639_1"": ""fr"", ""name"": ""Fran\u00e7ais""}, {""iso_639_1"": ""en"", ""name"": ""English""}, {""iso_639_1"": ""es"", ""name"": ""Espa\u00f1ol""}, {""iso_639_1"": ""it"", ""name"": ""Italiano""}, {""iso_639_1"": ""de"", ""name"": ""Deutsch""}]",Released,A Plan No One Escapes,Spectre


root
 |-- budget: integer (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)



In [0]:
# split genres table into fact and dimension table (there are only few distinct genres in the entire dataset, so this process reduces size significantly as data is not stored repeatedly; the dimension table can later be broadcasted)

# fact table: connects movies and genres table
df_movie_genres = df_genres.select(["movie_id", col("id").alias("genre_id")])
df_movie_genres.limit(3).display()
df_movie_genres.printSchema()

# genre dimension table
df_genres = df_genres.select(["id", "name"]).distinct()
df_genres.limit(3).display()
df_genres.printSchema()

movie_id,genre_id
19995,28
19995,12
19995,14


root
 |-- movie_id: string (nullable = true)
 |-- genre_id: string (nullable = true)



id,name
28,Action
12,Adventure
14,Fantasy


root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



## Data Storage
(Copied from Group Task, hide cells for better overview)

In [0]:
# store data in parquet files (reasons for parquet: see respective individual tasks)
df_movies.write.mode('overwrite').parquet("/tmp/out/movies.parquet")
df_cast.write.mode('overwrite').parquet("/tmp/out/cast.parquet")
df_crew.write.mode('overwrite').parquet("/tmp/out/crew.parquet")
df_recommendations.write.mode('overwrite').parquet("/tmp/out/recommendations.parquet")
df_genres.write.mode("overwrite").parquet("/tmp/out/genres.parquet")
df_movie_genres.write.mode('overwrite').parquet("/tmp/out/movie_genres.parquet")

## Individual Task - Timo Heiß

In [0]:
df_crew = spark.read.parquet("/tmp/out/crew.parquet")

### Teil 1
How many movies where written by a female writer?

In [0]:
solution = (df_crew.where((df_crew.job == "Writer") & (df_crew.gender == 1)) # Assumption gender=1 for female
                   .select("movie_id")
                   .distinct() # because some movies are written by two or more women
                   .count())

print(f"Insgesamt wurden {solution} Filme von einer Frau geschrieben.")

Insgesamt wurden 139 Filme von einer Frau geschrieben.


**Hinweis:** Diese Aufgabe könnte alternativ unter Nutzung eines Accumulators durchgeführt werden. Darauf wird hier jedoch mit derselben Begründung wie in der Gruppenaufgabe verzichtet (--> mangelnde Zuverlässigkeit von Accumulator).

### Teil 2
Explain what data storage structure you used to store the information and why. When storing the information how can you speed up the information retrieval if you know you are interested in looking at the gender of the writer? Why does it speed up the information retrieval when you store the data differently?

Als Speicherstruktur wurden Apache Parquet Files verwendet. Apache Parquet ist ein spaltenorientiertes Datendateiformat und wurde für die effiziente Speicherung und Abfrage von Daten entwickelt. Es ist für die Verarbeitung komplexer Daten in großen Mengen optimiert und bietet effiziente Datenkompressions- und Kodierungsverfahren. Parquet Files werden in einem Vezeichnis gespeichert, dass eine oder mehrere Dateien enthalten und beliebig tief verschachtelt sein kann. Da es eine Anforderung dieser Aufgabe ist, dass die verwendete Speicherstruktur auch mit sehr großen Datenmengen umgehen können soll, bieten sich Parquet Files hier an. Im Detail lassen sich die Vorteile von Parquet wie folgt beschreiben: <br>

- In den meisten Speicherformaten werden Daten zeilenorientiert gespeichert (bspw. csv-Dateien) und sind damit für die Arbeit mit jeweils einem ganzen Datensatz geeignet. Das spaltenorientierte Format von Parquet führt nun dazu, dass nur Daten in den tatsächlich benötigten Spalten gelesen werden müssen. Dies ist vorteilhaft, da in Anwendungsfällen (bspw. diese Aufgabe) häufig nur ausgewählte Spalten abgefragt werden. Außerdem können die Daten im spaltenorientierten Format besser durch geeignete Komprimierungsalgorithmen komprimiert werden, da die Daten einer Spalte meist homogener sind.
- Parquet legt die Werte in jeder Spalte physisch an zusammenhängenden Arbeitsspeicherplätzen ab. Dabei setzt es auf ein hybrides physikalischen Speicherlayout (Kombination aus zeilen- und spaltenweise). So ermöglicht Parquet eine effiziente Suche auch in riesigen Datensätzen.
- Parquet Files können beliebig komprimiert werden (bspw. mit GZIP, LZO, Snappy).
- Es werden verschiedene Kodierungsverfahren eingesetzt (z.B. *RLE_DICTIONARY* zum Kodieren von sich wiederholenden Werten), wodurch ebenfalls die File-Größe reduziert wird.
- Die Daten werden u.a. mithilfe von Row Groups organisiert. Für jede Row Group stehen Metadaten wie Minimum, Maximum und Anzahl (count) zur Verfügung. Der Predicate Pushdown erlaubt in diesem Zusammenhang, dass bei Abfragen nur Row Groups gelesen werden, in denen ein abgefragter Wert auch vorkommen kann (weil er zwischen min und max der Row Group liegt). 
- Parquet unterstützt die Speicherung geschachtelter Daten (dieses Feature wird hier zwar nicht genutzt, da die Daten zunächst zu weiten Teilen normalisiert wurden, wäre hier aber durchaus denkbar, da die ursprünglichen, denormalisierten Daten in geschachtelter Form vorliegen).

Parquet Files sind mit ihren verschiedenen Optimierungen für die Speicherung großer Datenmengen und schnellen Abfragen darauf ideal für die vorliegende Aufgabe geeignet. Über Nachteile von Parquet wie die vergleichsweise geringe Schreibgeschwindigkeit (Write Speed) kann in diesem Fall hinweggesehen werden. Es ist davon auszugehen, dass Datensätze wie neue Filme oder Bewertungen lediglich einmal in das entsprechende File geschrieben werden müssen, dann jedoch häufig im Zuge verschiedener Abfragen gelesen werden ("Write once, read forever"). Somit steht die hohe Lesegeschwindigkeit, die Parquet mit sich bringt, hier im Vordergrund.
<br>

Eine Möglichkeit, die Parquet bietet, um die Abfrage bestimmter Informationen zu beschleunigen, ist das sogenannte Partitioning. Die Daten können dabei nach beliebigen Spalten (auch nach mehreren) partitioniert werden. Partitionen sind Verzeichnisse im Dateisystem, unter denen die Daten gespeichert werden, wobei die Verzeichnisstruktur auf dem Partitionierungsschema basiert. Partitionierung führt zu einer Beschleunigung der Abfrage, da Daten nur aus den für die Abfrage relevanten Partitionen gelesen werden müssen und nicht die gesamte Menge durchsucht werden muss, wenn eben nur ein Teil der Daten von Interesse ist. Dies ergibt dann Sinn, wenn die Abfragen immer einem ähnlichen Muster folgen: Ist beispielsweise bekannt, dass man sich meist für ein bestimmtes Geschlecht interessiert, kann nach dieser Spalte partitioniert werden. Da auch nach mehreren Spalten partitioniert werden kann, könnte im vorliegenden Fall auch nach Job und Gender partitioniert werden, wenn man am Geschlecht der Autoren (job=Writer) interessiert ist.

#### Beispiel

In [0]:
### partitioning by job and gender
df_crew.write.partitionBy("job","gender").mode("overwrite").parquet("/tmp/out/crew_part.parquet") # this will take some time (roughly 10 minutes)

### alternative: partitioning only by gender because job may produce a large number of files
# df_crew.write.partitionBy("gender").mode("overwrite").parquet("/tmp/out/crew_part.parquet")

In [0]:
df_crew_partitioned = spark.read.parquet("/tmp/out/crew_part.parquet")
df_crew = spark.read.parquet("/tmp/out/crew.parquet")

In [0]:
# perform query again, but on partitioned data
import time
start_time = time.time()
solution = df_crew_partitioned.where((df_crew_partitioned.job == "Writer") & (df_crew_partitioned.gender == 1)) \
                              .select("movie_id") \
                              .distinct() \
                              .count()
print("Execution time: %s seconds" % round(time.time() - start_time, 4))
print(f"Result: Insgesamt wurden {solution} Filme von einer Frau geschrieben.")

Execution time: 1.0421 seconds
Result: Insgesamt wurden 139 Filme von einer Frau geschrieben.


In [0]:
# comparison to query without partitioning
start_time = time.time()
solution = df_crew.where((df_crew.job == "Writer") & (df_crew.gender == 1)) \
                  .select("movie_id") \
                  .distinct() \
                  .count()
print("Execution time: %s seconds" % round(time.time() - start_time, 4))
print(f"Result: Insgesamt wurden {solution} Filme von einer Frau geschrieben.")

Execution time: 1.9222 seconds
Result: Insgesamt wurden 139 Filme von einer Frau geschrieben.


Der obige Beispielvergleich zeigt, dass Partitioning die vorliegende Abfrage schneller machen kann (natürlich kann man das nach einer Ausführung noch nicht sicher sagen, soll aber als Demonstration genügen).