# SQL Abfragen der *movie* Daten

Using: https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53

## Aufgabe 1
Bearbeite das zur Verfügung gestellte Tutorial und wende das Gelernte an, um aus den Movie-Daten der letzten Vorlesung einen *kombinierten* Dataframe aufzubauen den man dann gut mit *traditionellen* SQL-Abfragen bearbeiten und speicher diesen direkt als *JSON-File*, dass man dann z.B. in MongoDB importieren kann.

Demonstriere, dass alles funktioniert an Hand von beispielhaften SQL-Abfragen und importiere es wirklich in MongoDB/Compass.

In [2]:
# Wir wollen DataFrames nutzen und dafür brauchen wir eine SparkSession
# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder\
    .master("local[1]")\
    .appName("Datenbanken mit Spark")\
    .getOrCreate() # getOrCreate liefert existierende Session, wenn es schon eine gibt

In [4]:
spark

Erstellen von Daten-Frames durch Einlesen als *.tsv* und Ausgabe als *.csv*.
</br>
</br>
**Note**: 
   - Mit *"inferSchema"* werden beispielsweise *Integer* als solches erkannt, auch wenn diese als *String* hinterlegt sind
   - Durch *"sep"* wird die logische Trennung zwischen den Spalteninhalten festgelegt, wenn eine *.csv*-Datei gelesen werden soll, es sich allerdings um ein *.tsv*-Format handelt. Hier wird der *Tabulator* als Trennung verwendet

In [13]:
# Creates a spark data frame for .tsv
name_basic = spark.read.format("csv")\
.option("inferSchema", "true")\
.option("sep","\t")\
.option("header", "true")\
.load("../movie/name_basic.tsv")

name_basic.printSchema()

root
 |-- nconst: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: string (nullable = true)
 |-- deathYear: string (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)



Duplizierte Werte innerhalb einer Tabelle können mittels der *dropDuplicates()* Funktion entfernt werden.

In [14]:
name_basic.show(10)

+---------+---------------+---------+---------+--------------------+--------------------+
|   nconst|    primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+---------------+---------+---------+--------------------+--------------------+
|nm0000001|   Fred Astaire|     1899|     1987|soundtrack,actor,...|tt0031983,tt00723...|
|nm0000002|  Lauren Bacall|     1924|     2014|  actress,soundtrack|tt0038355,tt01170...|
|nm0000003|Brigitte Bardot|     1934|       \N|actress,soundtrac...|tt0049189,tt00573...|
|nm0000004|   John Belushi|     1949|     1982|actor,soundtrack,...|tt0077975,tt00787...|
|nm0000005| Ingmar Bergman|     1918|     2007|writer,director,a...|tt0050986,tt00608...|
|nm0000006| Ingrid Bergman|     1915|     1982|actress,soundtrac...|tt0038109,tt00387...|
|nm0000007|Humphrey Bogart|     1899|     1957|actor,soundtrack,...|tt0037382,tt00432...|
|nm0000008|  Marlon Brando|     1924|     2004|actor,soundtrack,...|tt0068646,tt00787...|
|nm0000009

In [15]:
name_basic_drop = name_basic.dropDuplicates()
name_basic_drop.show(10)

+---------+--------------------+---------+---------+--------------------+--------------------+
|   nconst|         primaryName|birthYear|deathYear|   primaryProfession|      knownForTitles|
+---------+--------------------+---------+---------+--------------------+--------------------+
|nm0000141|      David Duchovny|     1960|       \N|actor,producer,di...|tt0443701,tt09042...|
|nm0001115|         Ossie Davis|     1917|     2005|actor,writer,dire...|tt0281686,tt01094...|
|nm0001349|Jennifer Love Hewitt|     1979|       \N|actress,producer,...|tt0130018,tt01193...|
|nm0001445|       Martin Landau|     1928|     2017|actor,producer,mi...|tt0053125,tt00963...|
|nm0001718|       Kyra Sedgwick|     1965|       \N|actress,producer,...|tt0361127,tt01144...|
|nm0001931|       Binnie Barnes|     1903|     1998|actress,soundtrac...|tt0038108,tt00390...|
|nm0002054|       Sheena Easton|     1959|       \N|  actress,soundtrack|tt0082398,tt01155...|
|nm0002146|     James Wong Howe|     1899|     197

In [69]:
name_basic = name_basic_drop

Mit dem Spaltenattribut (*"primaryName"*) und auch mit dem Index (*"name_basic["primaryName"]"*) ist es möglich selektierte Datensätze mittels der *select* Funktion zu erhalten.

In [70]:
name_basic.select("primaryName").show(20)
name_basic.select("primaryName", "birthYear", "deathYear").show(20)

+--------------------+
|         primaryName|
+--------------------+
|      David Duchovny|
|         Ossie Davis|
|Jennifer Love Hewitt|
|       Martin Landau|
|       Kyra Sedgwick|
|       Binnie Barnes|
|       Sheena Easton|
|     James Wong Howe|
|     Edvin Adolphson|
|Nusrat Fateh Ali ...|
|       David Jackson|
|        Larry Lerner|
|    Pedro de Cordoba|
|    Anne-Marie Blanc|
|          J.A. Steel|
|          Kirk Bloom|
|         Eric Hester|
|     Marco Sacerdoti|
|       Pamela Bowman|
|        Deana Carter|
+--------------------+
only showing top 20 rows

+--------------------+---------+---------+
|         primaryName|birthYear|deathYear|
+--------------------+---------+---------+
|      David Duchovny|     1960|       \N|
|         Ossie Davis|     1917|     2005|
|Jennifer Love Hewitt|     1979|       \N|
|       Martin Landau|     1928|     2017|
|       Kyra Sedgwick|     1965|       \N|
|       Binnie Barnes|     1903|     1998|
|       Sheena Easton|     1959|   

Im nachfolgenden Beispiel wird eine *when* Bedingung aufgestellt, die alle Schauspieler ausgibt deren Geburtsjahr über 1920 liegt.

In [16]:
from pyspark.sql.functions import when
name_basic.select("primaryName",when(name_basic.birthYear > "1920",1).otherwise(0)).show(5)

+---------------+----------------------------------------------+
|    primaryName|CASE WHEN (birthYear > 1920) THEN 1 ELSE 0 END|
+---------------+----------------------------------------------+
|   Fred Astaire|                                             0|
|  Lauren Bacall|                                             1|
|Brigitte Bardot|                                             1|
|   John Belushi|                                             1|
| Ingmar Bergman|                                             0|
+---------------+----------------------------------------------+
only showing top 5 rows



Innerhalb der *like* Funktion stellt der *%* Charakter eine Art Filterung dar.</br>
Im folgenden Beispiel wird nach alle Personen gesucht, die *"James"* im Namen haben.

In [72]:
name_basic.select("primaryName", "primaryProfession", name_basic.primaryName.like("%James%")).show(15)

+--------------------+--------------------+------------------------+
|         primaryName|   primaryProfession|primaryName LIKE %James%|
+--------------------+--------------------+------------------------+
|      David Duchovny|actor,producer,di...|                   false|
|         Ossie Davis|actor,writer,dire...|                   false|
|Jennifer Love Hewitt|actress,producer,...|                   false|
|       Martin Landau|actor,producer,mi...|                   false|
|       Kyra Sedgwick|actress,producer,...|                   false|
|       Binnie Barnes|actress,soundtrac...|                   false|
|       Sheena Easton|  actress,soundtrack|                   false|
|     James Wong Howe|cinematographer,c...|                    true|
|     Edvin Adolphson|actor,director,so...|                   false|
|Nusrat Fateh Ali ...|soundtrack,music_...|                   false|
|       David Jackson|director,writer,p...|                   false|
|        Larry Lerner|assistant_di

*startswith* scant den beginnenden Kontext auf ein definiertes Kriterium.</br>
*endswith* verarbeitet parallel den selben Prozess nur mit den letzten Kontext.</br>
**Note**: Beide Funktionen sind Case-Sensitive!

In [73]:
name_basic.select("primaryName", "primaryProfession", name_basic.primaryProfession.startswith("actor")).show(5)
name_basic.select("primaryName", "primaryProfession", name_basic.primaryProfession.endswith("miscellaneous")).show(5)

+--------------------+--------------------+------------------------------------+
|         primaryName|   primaryProfession|startswith(primaryProfession, actor)|
+--------------------+--------------------+------------------------------------+
|      David Duchovny|actor,producer,di...|                                true|
|         Ossie Davis|actor,writer,dire...|                                true|
|Jennifer Love Hewitt|actress,producer,...|                               false|
|       Martin Landau|actor,producer,mi...|                                true|
|       Kyra Sedgwick|actress,producer,...|                               false|
+--------------------+--------------------+------------------------------------+
only showing top 5 rows

+--------------------+--------------------+------------------------------------------+
|         primaryName|   primaryProfession|endswith(primaryProfession, miscellaneous)|
+--------------------+--------------------+-----------------------------

Die *substr* Operation extrahiert den Text innerhalb eines vordefinierten Index.

In [74]:
name_basic.select(name_basic.primaryName.substr(1, 3).alias("primaryName")).show(5)
name_basic.select(name_basic.primaryName.substr(3, 6).alias("primaryName")).show(5)
name_basic.select(name_basic.primaryName.substr(1, 6).alias("primaryName")).show(5)

+-----------+
|primaryName|
+-----------+
|        Dav|
|        Oss|
|        Jen|
|        Mar|
|        Kyr|
+-----------+
only showing top 5 rows

+-----------+
|primaryName|
+-----------+
|     vid Du|
|     sie Da|
|     nnifer|
|     rtin L|
|     ra Sed|
+-----------+
only showing top 5 rows

+-----------+
|primaryName|
+-----------+
|     David |
|     Ossie |
|     Jennif|
|     Martin|
|     Kyra S|
+-----------+
only showing top 5 rows



Anhand folgender Beispiele werden die *Add*, *Update* und *Remove* Funktionen genauer aufgezeigt.

In [17]:
from pyspark.sql.functions import lit
name_basic = name_basic.withColumn("new_column", lit("This is a new column"))

display(name_basic)

DataFrame[nconst: string, primaryName: string, birthYear: string, deathYear: string, primaryProfession: string, knownForTitles: string, new_column: string]

In [77]:
name_basic = name_basic.withColumnRenamed('primaryName', 'Name')

display(name_basic)

DataFrame[nconst: string, Name: string, birthYear: string, deathYear: string, primaryProfession: string, knownForTitles: string, new_column: string]

In [82]:
name_basic = name_basic.drop("new_column")
#name_basic.drop(name_basic.new_column).show()

display(name_basic)

DataFrame[nconst: string, Name: string, birthYear: string, deathYear: string, primaryProfession: string, knownForTitles: string]

Im folgenden werden diverse Funktionstypen aufgeführt, um Daten genauer zu inspizieren.

In [83]:
# Returns dataframe column names and data types
name_basic.dtypes

[('nconst', 'string'),
 ('Name', 'string'),
 ('birthYear', 'string'),
 ('deathYear', 'string'),
 ('primaryProfession', 'string'),
 ('knownForTitles', 'string')]

In [85]:
# Displays the content of dataframe
    #name_basic.show()
    
# Return first n rows
    #name_basic.head()
    
# Returns first row
    #name_basic.first()
    
# Return first n rows
    #name_basic.take(5)
    
# Returns columns of dataframe
name_basic.columns


['nconst',
 'Name',
 'birthYear',
 'deathYear',
 'primaryProfession',
 'knownForTitles']

In [88]:
# Counts the number of rows in dataframe
name_basic.count()

12126677

In [89]:
# Counts the number of distinct rows in dataframe
name_basic.distinct().count()

# Gleiches Ergebnis wie der obrige Befehl, da alle Duplikate bereits entfernt wurden. Daher sind alle Zeilen nur noch einmalig vorhanden

12126677

In [91]:
# Prints plans including physical and logical
name_basic.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[primaryName#620, knownForTitles#624, birthYear#621, nconst#619, deathYear#622, primaryProfession#623], functions=[])
   +- Exchange hashpartitioning(primaryName#620, knownForTitles#624, birthYear#621, nconst#619, deathYear#622, primaryProfession#623, 200), ENSURE_REQUIREMENTS, [plan_id=1322]
      +- HashAggregate(keys=[primaryName#620, knownForTitles#624, birthYear#621, nconst#619, deathYear#622, primaryProfession#623], functions=[])
         +- FileScan csv [nconst#619,primaryName#620,birthYear#621,deathYear#622,primaryProfession#623,knownForTitles#624] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/name_basic.tsv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<nconst:string,primaryName:string,birthYear:string,deathYear:string,primaryProfession:strin...




In [92]:
# Computes summary statistics
name_basic.describe("birthYear").show()

+-------+------------------+
|summary|         birthYear|
+-------+------------------+
|  count|          12126677|
|   mean|1952.2512455666272|
| stddev| 34.84414710193421|
|    min|              0004|
|    max|                \N|
+-------+------------------+



Mithilfe der *groupBy()* Funktion können Datensätze anhand eines Spaltennamens zusammengefasst bzw. gruppiert werden.

In [93]:
name_basic.groupBy("Name").count().show(10)

+--------------------+-----+
|                Name|count|
+--------------------+-----+
|   Kimberly Cossette|    1|
|Billie Greenbaum ...|    1|
|          Al Harsten|    1|
|     Patricia Hülser|    1|
|      Adriana Ionica|    1|
|       Cory Jamieson|    1|
|          Kelly Jaye|    1|
|      Jeffrey Kalmus|    1|
|        Richard King|   79|
|    Jill Kooyoomjian|    1|
+--------------------+-----+
only showing top 10 rows



Die *filter()* Funktion ist eine Erweiterung der *like* Operation mit dem *%* Charakter.</br>
**Note**: Die Funktion ist Case-Sensitive!

In [94]:
name_basic.filter(name_basic["primaryProfession"] == 'actor').show(5)

+---------+----------------+---------+---------+-----------------+--------------------+
|   nconst|            Name|birthYear|deathYear|primaryProfession|      knownForTitles|
+---------+----------------+---------+---------+-----------------+--------------------+
|nm0003124|Pedro de Cordoba|     1881|     1950|            actor|tt0032245,tt00352...|
|nm0008480|   Mohammed Abel|       \N|       \N|            actor|           tt0199850|
|nm0009732| Yamandú Acevedo|     1963|     2020|            actor|tt0224941,tt02614...|
|nm0009929|    Eugene Acker|     1889|     1971|            actor|tt0166832,tt00145...|
|nm0010869|    Dallas Adams|     1947|     1991|            actor|tt0056751,tt00700...|
+---------+----------------+---------+---------+-----------------+--------------------+
only showing top 5 rows



Mithilfe des *join()* Befehls können mehrere Collections zu einer logischen zusammengefügt werden. Im nachfolgenden Beispiel, wird die *title_basic* mit der *title_rating* auf die *tconst*-ID verbunden.

In [5]:
# Creates a spark data frame for .tsv 
title_basic = spark.read.format("csv")\
.option("inferSchema", "true")\
.option("sep","\t")\
.option("header", "true")\
.load("../movie/title_basic.tsv")

title_basic.printSchema()


title_ratings = spark.read.format("csv")\
.option("inferSchema", "true")\
.option("sep","\t")\
.option("header", "true")\
.load("../movie/title_ratings.tsv")

title_ratings.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)

root
 |-- tconst: string (nullable = true)
 |-- averageRating: double (nullable = true)
 |-- numVotes: integer (nullable = true)



In [32]:
title_combined = title_basic.join(title_ratings,title_basic.tconst ==  title_ratings.tconst,"inner").drop(title_basic.tconst)

In [33]:
title_combined.write.save("../movie/title_combined.json",format="json")