In [None]:
#hideInput
import zipfile
with zipfile.ZipFile("utils.zip", "r") as zip_ref:
    zip_ref.extractall()

In [None]:
#hideInput
import pandas as pd
import visualization as vis
from taskreview.learning_module import LearningModule
lm = LearningModule('taskReviewSpark.db')

<img src="Images/Apache_Spark_logo.svg" width="300">

<br>
<br>

<div style="background-color: #3c3a3e ; padding: 5px; "></div>

# Lernmodul zur Bearbeitung großer Datenmengen mit Apache Spark

Große Datenmengen fallen heutzutage in jedem Bereich unseres Lebens an. Sei es ein Einkauf in einem Onlineshop oder ein Besuch auf einer Social Media Plattform. All diese Aktionen führen dazu, dass rund um die Uhr Daten entstehen und es häufen sich gigantische Datenmengen an. Durch Datenanalysen können daraus Erkenntnisse gewonnen werden, die beispielsweise das Verhalten von Nutzern repräsentieren.

Daten können sowohl in strukturierter, semi-strukturierter als auch unstrukturierter Form vorliegen. Die Analyse von großen Datenmengen umfasst zwei grundlegende Bereiche. Der Begriff Analytics vereint die Bereiche Analyse, Reporting und Visualisierung und beschreibt die Aufbereitung der Daten, auf Basis derer Entscheidungen getroffen werden können. Ein wachsender Bereich beschäftigt sich außerdem mit der Analyse von großen Datenmengen durch Machine Learning Technologien. Dabei werden Systeme mit Hilfe der gesammelten Daten trainiert und können im Anschluss daran dabei helfen Entscheidungsprozesse zu unterstützen, wie es beispielsweise bei relevanten Produktempfehlungen in Online-Shops der Fall ist.

Für die Analyse solcher Datenmengen gibt es verschiedene Technologien und Frameworks. Ein viel verwendetes Framework in diesem Bereich ist Apache Spark, welches als Unified Analytics Engine mehrere Bereiche der Datenanalyse vereint. In diesem Lernmodul soll der Umgang mit Apache Spark, insbesondere PySpark gelernt werden. Bei PySpark handelt es sich um eine Spark-Bibliothek, die die Möglichkeit bietet Spark-Anwendungen mit Python-Programmcode zu erstellen und auszuführen. Im Laufe des Lernmoduls sollen vor allem die Besonderheiten der Spark-Architektur theoretisch aufgezeigt, die Kerntechnologie von Spark erklärt und kennengelernt, und der Umgang mit den Spark-Datenstrukturen gelernt werden. Es wird eine Einführung in das Thema gegeben und anhand eines Datensatzes, der Taxidaten aus New York enthält, der Umgang mit strukturierten Daten in PySpark gelernt.

### Lernziele:

In diesem Lernmodul werden Grundkenntnisse im Umgang mit dem Framework Apache Spark, im Besonderen mit der Python-Bibliothek PySpark erworben. Das Grundkonzept und die charakteristischen Merkmale der Datenstrukturen in PySpark sind bekannt. Es können Daten importiert und exportiert werden. Außerdem können Daten untersucht und Datensätze bearbeitet werden. Es könnte bestehende Daten verändert und neue Daten ermittelt und hinzugefügt werden. 

Apache Spark wird häufig für Cluster-Computing verwendet, um große Datenmengen effizient zu verarbeiten. Im Idealfall wird dazu ein Cluster aus mehreren Maschinen aufgebaut. Die Erstellung und Verwendung eines solchen Clusters wird in diesem Lernmodul nicht behandelt. Die im Lernmodul vermittelten Grundlagen und Anwendungen lassen sich aber auf eine Anwendung von Spark im Cluster übertragen.

### Verwendeter Datensatz:

In diesem Lernmodul wird ein Datensatz verwendet, der Daten von Taxifahrten aus New York beinhaltet. Die Daten stammen dabei aus dem Jahr 2016. Im Laufe des Lernmoduls wird der Datensatz vor allem auf zeitliche und räumliche Faktoren untersucht.

Quelle: [https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

### Hinweis:

Die Aufgaben und Lernabschnitte dieses Lernmoduls bauen aufeinander auf. Deshalb ist es wichtig, dass diese nacheinander in der richtigen Reihenfolge bearbeitet werden. Falls dies nicht der Fall ist, werden die Aufgaben eventuell fälschlicherweise als falsch markiert, da die Lösung auf den vorherigen Abschnitten aufbaut.

# Inhalt
1. [Apache Spark](#1-Apache-Spark)  
    1.1 [Unified Analytics Engine](#1.1-Unified-Analytics-Engine)  
    1.2 [Spark Session](#1.2-Spark-Session)  
    1.3 [Zusammenfassung: Apache Spark](#1.3-Zusammenfassung:-Apache-Spark)  
    1.4 [Aufgaben: Apache Spark](#1.4-Aufgaben:-Apache-Spark)  
2. [Das Grundkonzept von Spark - Resilient Distributed Datasets](#2-Das-Grundkonzept-von-Spark---Resilient-Distributed-Datasets)  
    2.1 [Erstellung von RDDs](#2.1-Erstellung-von-RDDs)  
    2.2 [Anzeigen von RDDs](#2.2-Anzeigen-von-RDDs)  
    2.3 [PairRDDs](#2.3-PairRDDs)  
    2.4 [Zusammenfassung: Das Grundkonzept von Spark - Resilient Distributed Datasets](#2.4-Zusammenfassung:-Das-Grundkonzept-von-Spark---Resilient-Distributed-Datasets)  
    2.5 [Aufgaben: Das Grundkonzept von Spark - Resilient Distributed Datasets](#2.5-Aufgaben:-Das-Grundkonzept-von-Spark---Resilient-Distributed-Datasets)  
3. [Strukturierte Daten - SparkSQL und DataFrames](#3-Strukturierte-Daten---SparkSQL-und-DataFrames)  
    3.1 [Erstellung von DataFrames](#3.1-Erstellung-von-DataFrames)  
    3.2 [Anzeigen von DataFrames](#3.2-Anzeigen-von-DataFrames)  
    3.3 [Eigenschaften und Strukturen von DataFrames](#3.3-Eigenschaften-und-Strukturen-von-DataFrames)  
    3.4 [Sortieren und Gruppieren von DataFrames](#3.4-Sortieren-und-Gruppieren-von-DataFrames)  
    3.5 [SQL-Queries in Spark](#3.5-SQL-Queries-in-Spark)  
    3.6 [Zusammenführen von DataFrames](#3.6-Zusammenführen-von-DataFrames)  
    3.7 [Exportieren von DataFrames](#3.7-Exportieren-von-DataFrames)  
    3.8 [Zusammenfassung: Strukturierte Daten - SparkSQL und DataFrames](#3.8-Zusammenfassung:-Strukturierte-Daten---SparkSQL-und-DataFrames)  
    3.9 [Aufgaben: Strukturierte Daten - SparkSQL und DataFrames](#3.9-Aufgaben:-Strukturierte-Daten---SparkSQL-und-DataFrames)  
4. [Explorative Analyse und Bearbeitung von strukturierten Daten](#4-Analyse-und-Bearbeitung-von-strukturierten-Daten)  
    4.1 [Filterung von DataFrames](#4.1-Filterung-von-DataFrames)  
    4.2 [Bereinigung von strukturierten Daten](#4.2-Bereinigung-von-strukturierten-Daten)  
    4.3 [Zusammenfassung: Analyse und Bereinigung von strukturierten Daten](#4.3-Zusammenfassung:-Analyse-und-Bereinigung-von-strukturierten-Daten)  
    4.4 [Aufgaben: Analyse und Bereinigung von strukturierten Daten](#4.4-Aufgaben:-Analyse-und-Bereinigung-von-strukturierten-Daten)  
    4.5 [Manipulation von Spalten](#4.5-Manipulation-von-Spalten)  
    4.6 [User Defined Functions](#4.6-User-Defined-Functions)  
    4.7 [Neue Daten ermitteln und hinzufügen](#4.7-Neue-Daten-ermitteln-und-hinzufügen)  
    4.8 [Zusammenfassung: Bearbeitung und Manipulation von strukturierten Daten](#4.8-Zusammenfassung:-Bearbeitung-und-Manipulation-von-strukturierten-Daten)  
    4.9 [Aufgaben: Bearbeitung und Manipulation von strukturierten Daten](#4.9-Aufgaben:-Bearbeitung-und-Manipulation-von-strukturierten-Daten)  
5. [Fazit](#5-Fazit)  
6. [Weiterführende Informationen](#6-Weiterführende-Informationen)

## 1 Apache Spark

Apache Spark begann als Forschungsprojekt am UC Berkeley AMPLab im Jahr 2009. Ursprünglich sollte Spark die Verarbeitung in Hadoop-Systemen beschleunigen und eine Alternative zu MapReduce bieten. MapReduce ist ein Programmiermodell, das nebenläufige Berechnungen über große Datenmengen auf Computerclustern durchführt. Viele der Projektmitarbeiter waren vorher auch an der Entwicklung des Hadoop-MapReduce-Algorithmus beteiligt gewesen und mit dessen Effizienz unzufrieden. Wie auch MapReduce kann Spark mit verteilten Knoten in einem Cluster arbeiten. Innerhalb des Clustern übernimmt ein Driver-Node die Koordination und einer oder mehrere Worker-Nodes die Ausführung von Aufgaben, wie beispielsweise Berechnungen auf Daten. Durch In-Memory-Datenverarbeitung, bei der die zu analysierenden Daten direkt im Arbeitsspeicher der Cluster-Knoten gespeichert und verarbeitet werden, bietet Spark Geschwindigkeitsvorteile gegenüber konventionellen Systemen, bei denen die Daten zuerst vom Festspeicher geladen werden müssen. Ab einem bestimmten Punkt, wenn die Datenmengen zu groß werden, lagert aber auch Spark diese auf die Festplatten aus.

[Hier](https://www.youtube.com/watch?v=p8FGC49N-zM&feature=emb_logo) ist ein interessantes Video zu finden, in dem Matei Zaharia, der Co-Founder and Chief Techologist von Databricks, eine kurze Vorstellung zu Apache Spark gibt.

### 1.1 Unified Analytics Engine

Apache Spark wird als Unified Analytics Engine bezeichnet, die verschiedene Felder für die Analyse von Big Data vereint. Dadurch können einzelne Schritte der Datenanalyse, wie die Aufbereitung, Analyse, Transformation und Visualisierung der Daten, in einer Umgebung ausgeführt werden. Innerhalb von Spark übernehmen die vier Bibliotheken SparkSQL, SparkStreaming, MLlib und GraphX verschiedene Teilaufgaben. Die Koordination dieser Teilaspekte erfolgt über die sogenannte **Spark Core API**, die die Basis von Spark bildet. Bei ihr handelt es sich um die zentrale Ausführungsengine, die unter anderem die Planung von Aufträgen und die Verteilung und Überwachung der Aufgaben auf den Worker-Nodes übernimmt. Die Core API ist außerdem für die Speicherverwaltung zuständigt und übernimmt unter anderem die Interaktion mit verschiedenen Speichersystemen, die als Quelle bzw. Ziel der Daten innerhalb der Anwendung dienen.

<img src="Images/Spark_Oekosystem.png" width="450">
<br>
<center>
    orientiert an: 
    <a href="https://databricks.com/de/spark/about">"Ökosystem" von Apache Spark</a>
</center>

Wie zuvor bereits kurz angesprochen, übernehmen in Spark vier Bibliotheken verschiedene Teilaufgaben. 

**SparkSQL** ist ein Modul, das dazu dient strukturierte Daten in Form von sogenannten DataFrames (s. Abschnitt [3 Strukturierte Daten - SparkSQL und DataFrames](#strukturiertedaten)) zu analysieren und durch Anwendung von verschiedenen Operationen zu manipulieren. Es ermöglicht die Ausführung von SQL-ähnlichen Abfragen auf Daten zusammen mit komplexen Programmen. 

**SparkStreaming** dient dazu Live-Datenströme zu verarbeiten. Die Daten können aus verschiedenen Quellen eingelesen und mit komplexen Algorithmen verarbeitet werden. Anschließend können die verarbeiteten Daten an Dateisysteme, Datenbanken oder auch Live-Dashboard ausgegeben werden. 

**MLlib** dient dazu Algorithmen für maschinelles Lernen in Spark verfügbar zu machen. Mit Hilfe dieser Algorithmen können Analysen wie Korrelationen durchgeführt werden, aber auch komplexere Anwendungen, wie Klassifikation, Regression, Clustering werden über MLlib zugänglich gemacht. Dabei kann das Konzept der sogenannten Pipeline verwendet werden, um einen bestimmten Ablauf der Verarbeitungsschritte der Daten zu rationalisieren. Die ML-Pipeline ermöglicht es wiederverwendbare Sequenzen von Transformationen zu definieren und macht zwischengeschaltete Lese-/Schreibzugriffe überflüssig, da die Ausgaben einer Stufe direkt als Eingabe an die nächste gegeben werden. 

Zuletzt gibt es noch die **GraphX**-Bibliothek, die zur Graphenverarbeitung, zur Manipulation von Graphen und zur Durchführung paralleler Graphenberechnungen dient. Graphen stellen die Zusammenhänge von Daten innerhalb eines Datensatzes visuell dar.

### 1.2 Spark Session

Seit Spark Version 2.0 gibt es die sogenannte **Spark Session**, die als Einstiegspunkt in die zugrunde liegende PySpark-Funktionalität dient, um mit RDDs und DataFrames (s. Abschnitt [2 Das Grundkonzept von Spark - Resilient Distributed Datasets](#grundkonzept) und [3 Strukturierte Daten - SparkSQL und DataFrames](#strukturiertedaten)) zu arbeiten. Die Spark Session erzeugt intern eine Variable namens sparkContext. Der **Spark Context** wird vom Driver-Prozess dazu verwendet, um eine Kommunikation mit dem Cluster und den Ressourcenmanagern herzustellen und diente vor Spark 2.0 als Einstiegspunkt. Mittlerweile wird der Spark Context mit anderen Kontexten (SQL Context, Streaming Context, Hive Context) in der Spark Session vereint.

Eine SparkSession wird mit einer Builder-Methode erstellt, der folgende Attribute übergeben werden können:

| Attribute |  |
| :--- | :--- |
| appName(*name*) | legt einen Namen für die Anwendung fest; wenn kein Name festgelegt wird, wird ein zufälliger generiert |
| config(*key*, *value*, *conf*) | setzt eine Konfigurationsoption mit einem Schlüsselnamen (*key*), einem Wert (*value*) und einer Instanz von SparkConf (*conf*) |
| enableHiveSupport() | ermöglicht die Unterstützung mit [Hive](https://hive.apache.org/) |
| getOrCreate() | erstellt eine SparkSession, basierend auf den im Builder festgelegten Optionen; wenn bereits eine SparkSession vorhanden ist, wird diese abgerufen |
| master(*master*) | legt die Spark-Master-URL fest, mit der eine Verbindung hergestellt werden soll; wenn eine lokale SparkSession ohne Cluster gestartet wird, wird local[n] verwendet: n steht für die Anzahl der Partitionen, die erstellt werden |

In [None]:
# Notwendiges Modul für SparkSession importieren
from pyspark.sql import SparkSession

# SparkSession erstellen und konfigurieren
spark = SparkSession.builder.master("local[2]").appName("Lernmodul-Spark").config('spark.driver.extraClassPath', './jars/sqlite-jdbc-3.34.0.jar').getOrCreate()

### 1.3 Zusammenfassung: Apache Spark

| Beschreibung | Code |
| :--- | :--- |
| Starten einer Spark Session namens *name* in einem Cluster mit *clustertype* | `spark = SparkSession.builder.master(*clustertype*).appName(*name*).getOrCreate()` |
| Stoppen einer Spark Session | `spark.stop()` |

<br>
<br>

<div style="background-color: #e25a1c ; padding: 5px; "></div>

### 1.4 Aufgaben: Apache Spark

**Was bezeichnet der Begriff In-Memory-Datenverarbeitung?**

In [None]:
#hideInput
lm.show_task(11)

**Kann SparkSQL direkt mit relationalen Datenbanken kommunizieren, um die dort gespeicherten Daten zu manipulieren?**

In [None]:
#hideInput
lm.show_task(12)

**Was bedeutet die 1 in dem Ausdruck "local[1]"?**

In [None]:
#hideInput
lm.show_task(13)

**Wie kann eine lokale SparkSession mit einer Partition, dem Variablenname "spark_lm" und der Bezeichnung "Lernmodul-Spark" erstellt werden?**

In [None]:
#hideInput
lm.show_task(14)

<br>
<br>

<div style="background-color: #3c3a3e ; padding: 5px; "></div>

## 2 Das Grundkonzept von Spark - Resilient Distributed Datasets

Der **Resilient Distributed Dataset** (kurz: RDD) ist die grundlegende Datenstruktur von Apache Spark. RDDs sind fehlertolerante, unveränderliche verteilte Sammlungen von Elementen. Jeder Datensatz in Form eines RDDs wird in logische Partitionen unterteilt, die auf verschiedenen Knoten des Clusters berechnet werden können. In RDDs werden vor allem unstrukturierte Daten gespeichert. 

Auf RDDs können verschiedene Operationen vorgenommen werden. Dabei unterscheidet man zwischen Aktionen und Transformationen. Transformationen dienen dazu, die Daten aus einem RDD zu verändern. Da es sich bei RDDs um unveränderliche Sammlungen handelt, entstehen aus Transformationen stets neue RDDs. Die Transformationen in Spark werden "lazy" genannt, da sie erst ausgeführt werden, wenn eine Aktion auf den transformierten RDD ausgeführt wird. Zu den Aktionen gehören Operationen, wie das Anzeigen von RDDs.

<img src="Images/RDD_Prinzip.png" width="600">
<br>
<center>
    orientiert an: 
    <a href="https://medium.com/@lavishj77/spark-fundamentals-part-2-a2d1a78eff73">Spark fundamentals - Lavesh Jain</a>
</center>

### 2.1 Erstellung von RDDs

Ein RDD kann unter anderem durch Parallelisierung einer bestehenden Sammlung (Collection) oder durch das Laden eines Datensatzes aus einer externen Datei erstellt werden. Um einen RDD aus einer vorhandenen Collection zu erstellen, wird die Funktion `parallelize(collection)` verwendet und die Collection übergeben. Im Python-Kontext kann es sich beispielsweise um eine Liste handeln. Die Funktion wird vom Spark Context aus aufgerufen, der für die Kommunikation mit dem Cluster und den Ressourcenmanagern zuständig ist. RDDs, die durch Parallelisierung aus einer Collection erstellt werden, werden auch ParallelizeCollectionRDD genannt.

In [None]:
# Erstellung einer Liste
data_list = ["a", "b", "c", "d", "e", "f", "g", "h"]

# Erzeugung eines RDD aus der Liste
list_rdd = spark.sparkContext.parallelize(data_list)

Wie zuvor beschrieben, handelt es sich bei RDDs um verteilte Sammlungen, die aus mehreren Partitionen bestehen. Wenn eine Spark Session lokal gestartet wird, kann ein RDD auch aus einer Partition bestehen. Dies ist immer abhängig davon, welche Anzahl bei der Erstellung der Spark Session an `master("local[n]")` übergeben wurde. Mit der Funktion `getNumPartitions()` kann die Anzahl der Partitionen eines RDDs ermittelt werden.

In [None]:
# Ausgabe der Anzahl der Partitionen des RDDs list_rdd
list_rdd.getNumPartitions()

Für die Neupartitionierung von RDDs können zwei Funktionen verwendet werden. Mit `repartition(n)` werden aus dem RDD n Partitionen erstellt. Die Anzahl der Partitionen kann mit dieser Funktion also sowohl verkleinert, als auch vergrößert werden. Mit `coalesce()` wird die Anzahl der Partitionen durch PySpark auf effiziente Weise verringert.

### 2.2 Anzeigen von RDDs

Für die Anzeige eines RDDs können mehrere Funktionen verwendet werden. Bei allen diesen Operationen handelt es sich um Aktionen. Wenn also zuvor eine Transformation auf einen RDD vorgenommen wurde, kann diese durch Ausführen einer solchen Aktion wirksam gemacht werden.

Mit der Funktion `collect()` wird ein gesamter RDD ausgegeben. Dafür wird der gesamte RDD als Array auf dem Treiberknoten gesammelt. Der Treiberknoten ist immer die Instanz, auf der die Spark Session läuft. Bei einem großen Datensatz, kann dieses Sammeln dazu führen, dass dem Treiberknoten der Speicher ausgeht. Diese Aktion sollte also eher nach Filterungen oder ähnlichen Transformationen gewählt werden, die eine ausreichend kleine Teilmenge der Daten zurückgeben.

In [None]:
# Anzeige des gesamten RDDs mit collect()
list_rdd.collect()

Sollen nur ein Paar Elemente des RDDs angezeigt werden, kann die Funktion `take(n)` verwendet werden. Dabei muss über n immer angegeben werden, wie viele Elemente angezeigt werden sollen. Es müssen dann nur die ersten n Elemente des Datensatzes von den Driver Nodes gesammelt werden. Wenn nur das erste Element angezeigt werden soll, kann `take(1)` oder `first()` verwendet werden.

In [None]:
# Anzeige der ersten drei Einträge des RDDs mit take(3)
list_rdd.take(3)

In [None]:
# Anzeige des ersten Eintrags des RDDs mit first()
list_rdd.first()

### 2.3 PairRDDs

Es gibt verschiedene Transformationen, die auf RDDs ausgeführt werden können. Eine Übersicht ist [hier](https://sparkbyexamples.com/pyspark-rdd#rdd-transformations) zu finden. Zu diesen Transformationen zählt unter anderem die Funktion `map(function)`, die dazu verwendet wird Daten hinzuzufügen. Häufig werden mit Map-Transformationen PairRDDs erstellt. PairRDDs enthalten Schlüssel-Wert-Paare, die dem RDD etwas mehr Struktur geben können. Der Schlüsselwert muss nicht einzigartig sein und kann beispielsweise ein numerischer Wert oder auch ein Stringwert sein. Anhand der Schlüsselwerte kann ein RDD sortiert oder gruppiert werden. Dafür werden `sortByKey()`und `groupByKey()`verwendet. 

In [None]:
# Erstellung eines PairRDD durch Hinzufügen eines Schlüssels
list_rdd_indexed = list_rdd.zipWithIndex()
list_rdd_indexed = list_rdd_indexed.map(lambda x: (x[1],x[0]))
list_rdd_indexed.collect()

# Sortieren des RDDs anhand des Schlüssels in absteigender Reihenfolge
list_rdd_indexed.sortByKey(ascending=False).collect()

### 2.4 Zusammenfassung: Das Grundkonzept von Spark - Resilient Distributed Datasets

| Beschreibung | Code |
| :--- | :--- |
| RDD aus Liste erstellen (ParallelizeCollectionRDD) | `spark.sparkContext.parallelize(list)` |
| Anzahl Partitionen eines RDD ausgeben | `rdd.getNumPartitions()` |
| Gesamtes RDD anzeigen | `rdd.collect()` |
| Die ersten n Zeilen eines RDDs anzeigen | `rdd.take(n)` |
| Sortieren eines RDDs (PairRDD) | `rdd.sortByKey()` |
| Sortieren eines RDDs (PairRDD) | `rdd.groupByKey()` |

<br>
<br>

<div style="background-color: #e25a1c ; padding: 5px; "></div>

### 2.5 Aufgaben: Das Grundkonzept von Spark - Resilient Distributed Datasets

**Welche Daten werden meist in RDDs gespeichert?**

In [None]:
#hideInput
lm.show_task(21)

**Wieso kann es sinnvoll sein, ein RDD in ein PairRDD umzuwandeln?**

In [None]:
#hideInput
lm.show_task(22)

<br>
<br>

<div style="background-color: #3c3a3e ; padding: 5px; "></div>

## 3 Strukturierte Daten - SparkSQL und DataFrames

Ab Version 1.3 wurde Apache Spark durch eine weitere Datenstruktur ergänzt. DataFrames basierend teilweise auf RDDs und werden als Partitionen im Cluster verteilt. Sie besitzen jedoch im Gegensatz zu den RDDs eine klare Struktur, die dem Konzept von relationalen Datenbanken entspricht, in denen Daten in benannten Spalten organisiert werden. In diesem Punkt beruhen sie zum Teil auf den gleichnamigen Datenstrukturen in R und Python (pandas).

DataFrames sind die Kernstruktur der Spark-Bibliothek SparkSQL, die unter anderem dazu dient SQL-Abfragen auf DataFrames auszuführen, die als Ergebnis wieder einen DataFrame zurückgeben. Dadurch wird eine einfache Durchführung von Selektionen, Projektionen, Joins, Gruppierungen und weiteren Anwendungen ermöglicht. Außerdem ist die Anwendung von SQL-Syntax für viele Entwickler im Bereich der Datenanalyse bekannt und somit kann der Einstieg oder Übergang zu Spark erleichtert werden.

### 3.1 Erstellung von DataFrames

DataFrames können aus verschiedenen Datenquellen erstellt werden. 

Wie RDDs können auch DataFrames aus einer Collection erstellt werden. Dabei unterscheiden sie sich von RDDs, da sie eine Struktur besitzen. DataFrames bestehen aus einer bestimmten Anzahl von Spalten, die vordefinierte Variablen beinhalten. Um also einen DataFrame aus einer Collection erstellen zu können, muss ein Schema erstellt werden. Das Schema kann entweder nur aus den Spaltennamen bestehen oder zusätzlich noch Informationen über den Datentyp der Spalte geben. `createDataFrame(data=data, schema=schema)` erstellt einen DataFrame aus einer Collection und wird durch die aktuelle Spark Session aufgerufen.

In [None]:
# Erstellung der Daten und des Schemas für einen DataFrame
data_list = [("1", "a"), ("2", "b"), ("3", "c"), ("4", "d"), ("5", "e"), ("6", "f"), ("7", "g"), ("8", "h")]
schema = ["id", "value"]

# Erzeugung eines DataFrames aus den Daten und dem Schema
test_df = spark.createDataFrame(data=data_list, schema=schema)

Um einen DataFrame aus Daten einer Textdateien, wie CSV- oder JSON-Dateien zu erstellen, wird der sogenannte DataFrameReader verwendet, der über `spark.read` verwendet werden kann. Mit `csv(filepath, *option*)` wird ein DataFrame aus der CSV-Datei mit dem Dateipfad "filepath" erstellt. Beim Importieren der Datei können verschiedene Optionen gesetzt werden. Häufig wird `inferSchema=True` und `header=True` gesetzt. Header=true signalisiert, dass die erste Zeile der Datei für die Spaltenüberschriften des DataFrames verwendet werden soll. Durch inferSchema=True wird das Schema der Datei geladen. Die Daten innerhalb der Spalten besitzen also bereits vordefinierte Datentypen (bspw. Int, Long, Double etc.). Wird diese Option nicht gesetzt, werden alle Werte als String importiert. inferSchema wirkt nur dann, wenn auch header=True ist.

An dieser Stelle kann auch ein eigenes Schema erstellt und verwendet werden. Dazu muss ein StructType-Objekt erstellt werden, das Informationen zu jeder Spalte enthält. Innerhalb dieses Structs wird eine Spalte in Form eines StructFields dargestellt. Das StructField enthält den Spaltennamen, den Datentyp der Spalte und die Information, ob die Spalte Nullwerte enthalten kann. Wenn diese beiden Instanzen zusammengefügt werden, entsteht `StructType([StructField(columnname, type, nullable), ..])` als Anweisung für die Erstellung eines Schemas.

In [None]:
# Erzeugung eines DataFrames aus der Datei nyc_taxi_juni_part1.csv
csv_df = spark.read.csv("data/nyc_taxi_juni.csv", inferSchema=True, header=True)

Der DataFrameReader kann auch für andere Arten des Datenimports verwendet werden. Für das Lesen von Daten aus Datenbanktabellen wird `spark.read.jdbc(url, tablename)` verwendet. Die Verbindung zur Datenbank wird mit JDBC aufgebaut (s. [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)). Um JDBC in einer Spark-Anwendung nutzen zu können, muss ein Treiber installiert werden. Es gibt JDBC-Treiber für einige gängige Datenbanksysteme. Dazu gehören unter anderem SQLite, MySQL und PostgreSQL.

### 3.2 Anzeigen von DataFrames

DataFrames sind im Gegensatz zu RDDs wie Tabellen in relationalen Datenbanken aufgebaut. Sie bestehen also aus Spalten, die Variablen beschreiben und Zeilen, die einzelne Einträge beschreiben. Durch die Ausgabe eines DataFrames mit `show()` kann diese Struktur sichtbar gemacht werden. Dabei kann, wie zuvor bei take() angegeben werden, wie viele Einträge ausgegeben werden sollen. In DataFrames wird dadurch die Ausgabe der Zeilen bestimmt. Im Unterschied zu take() ist diese Angabe nicht zwingend notwendig. Wenn kein Wert angegeben wird, werden maximal die ersten 20 Einträge angezeigt.

In [None]:
# Anzeigen der Daten eines DataFrames mit show()
test_df.show()

Es können auch nur gewisse Teile eines DataFrames angezeigt werden. Zuvor wurde schon gezeigt wie die Anzahl der angezeigten Zeilen beeinflusst werden kann. Auch für die Spalten gibt es eine solche Option. Mit `select(columnname(s))` kann eine oder mehrere Spalten ausgewählt werden. Mit * können alle Spalten gewählt werden. Es gibt zwei Alternativen, um hier die Spalten eines DataFrames auszuwählen. Es kann entweder `df.select("columnname")` oder `df.select(df.columnname)` geschrieben werden. Diese beiden Formulierungen führen zu dem gleichen Ergebnis und können auch in anderen Funktionen alternativ zueinander verwendet werden.

select() dient allerdings nur dazu die Spalten auszuwählen. Zum Anzeigen muss anschließend noch show() verwendet werden.

In [None]:
# Anzeigen bestimmter Spalten eines DataFrames
csv_df.select("Vendor", "passenger_count", "total_amount").show()

### 3.3 Eigenschaften und Strukturen von DataFrames

Die Spalten von DataFrames beinhalten verschiedene Informationen. Mit `printSchema()` kann das zuvor bereits erläuterte Schema eines DataFrames dargestellt werden. Die Funktion gibt eine Auflistung der Spalten, mit den dazugehörigen Datentypen zurück. Dazu wird außerdem angegeben, ob die Spalte Nullwerte enthalten kann. Es werden also genau die Informationen ausgegeben, die zuvor bei der Erstellung eines Schemas festgelegt werden konnten.

In [None]:
# Ausgabe des Schemas eines DataFrames mit printSchema()
csv_df.printSchema()

Zu jeder Spalte eines DataFrames können statistische Informationen ausgegeben werden. Dazu wird die Funktion `describe(columnname(s))` durch einen DataFrame aufgerufen und eine Liste der zu analysierenden Spaltennamen übergeben. Falls Informationen zu allen Spalten ermittelt werden sollen, kann auch hier wieder * benutzt werden. Die Funktion gibt die folgenden Werte zurück:
- Anzahl der Einträge (Zeilen) - count
- Mittelwert - mean
- Standardabweichung - stddev
- Minimum - min
- Maximum - max

Werte wie der Mittelwert können nicht für Spalten ermittelt werden, die Stringwerte enthalten. An diesen Stellen wird null zurückgegeben.

Für die Ermittlung einiger dieser Werte gibt es in PySpark auch separate Funktionen. Die Anzahl der Einträge eines DataFrames kann beispielsweise auch mit `count()` ermittelt werden.

In [None]:
# Ausgabe von statistischen Informationen eines DataFrames
csv_df.describe(["passenger_count"]).show()

### 3.4 Sortieren und Gruppieren von DataFrames

DataFrames können wie PairRDDs auch sortiert werden. Da DataFrames bereits eine Struktur besitzen, muss hier kein Schlüsselwert hinzugefügt werden. Zum Sortieren oder Gruppieren von DataFrames können die Spalten verwendet werden. In PySpark gibt es dafür die Funktionen `sort(columnname)`, `orderBy(columnname)` und `groupBy(columnname)`. Standardmäßig werden DataFrames in PySpark absteigend sortiert. Um eine aufsteigende Sortierung zu erhalten, muss `ascending=False` an die Funktion übergeben werden.

Nach dem Gruppieren der Daten werden sogenannte Aggregate-Funktionen angewendet. Aggregate Funktionen operieren auf einer Gruppe von Zeilen und berechnen einen einzelnen Rückgabewert für jede Gruppe. Sie akzeptieren als Eingabe den Spaltennamen als Zeichenkette und verschiedene andere Argumente, die auf der Funktion und dem Rückgabespaltentyp basieren. In PySpark sind die Aggregate-Funktionen als "agg_funcs" gruppiert. Eine Übersicht über alle Aggregate-Funktionen ist [hier](https://sparkbyexamples.com/pyspark/pyspark-aggregate-functions) zu finden. 

Gruppierte Daten werden auch durch Window-Funktionen erzeugt und verwendet. Diese Window-Funktionen operieren auf einer Gruppe von Zeilen und geben einen einzelnen Wert für jede Eingabezeile zurück. Um eine Operation auf einer Gruppe durchzuführen, müssen die Daten zuerst mit `Window.partitionBy()` partitioniert werden.

In [None]:
# Sortieren eines DataFrames mit sort()
csv_df.select("passenger_count", "total_amount").sort("passenger_count", ascending=False).show()

# Gruppieren eines DataFrames mit groupBy() 
# Ausgabe eines Mittelwertes mit der Aggregate Function mean()
csv_df.groupBy("passenger_count").mean("total_amount").show()

### 3.5 SQL-Queries in Spark

SparkSQL bieten die Möglichkeit jegliche Abfragen auf DataFrames in SQL-Syntax auszuführen. Dazu wird `sql(query)` durch die Spark Session ausgeführt. Diese Abfrage kann allerdings nicht direkt auf einem DataFrame ausgeführt werden. Aus dem DataFrame muss zuerst mit `createOrReplaceTempView(viewname)` eine temporäre View erstellt werden.

In [None]:
# Erzeugung einer temprären View aus einem DataFrame
csv_df.createOrReplaceTempView("df_view")

# Anwendung einer SQL-Anweisung auf die gerade erstellte View
spark.sql("SELECT Ratecode, trip_distance, total_amount FROM df_view").show()

### 3.6 Zusammenführen von DataFrames

Zwei oder mehr DataFrames können unter bestimmten Bedingungen zusammengefügt werden. Um die Daten von DataFrames sinnvoll zusammenfügen zu können, müssen sie entweder in einer oder in allen Spalten übereinstimmen. Wenn dies der Fall ist, können sie mit `join()` oder `union()` zusammengefasst werden.

### 3.6.1 Join

Join dient dazu zwei DataFrames zusammenzufügen, die in einer Spalte übereinstimmen. Joins in PySpark entsprechen dem Konzept von Joins in relationalen Datenbanken. Es werden alle grundlegenden Join-Optionen unterstützt. Dazu gehören (Inner) Joins, Left (Outer) Joins, Right (Outer) Joins und Full (Outer) Joins (für ergänzende Erläuterungen zu SQL-Joins s. [SQL Joins](https://www.w3schools.com/sql/sql_join.asp)). 

Der Join von zwei DataFrames erfolgt über `df1.join(df2, on, *how*)`. Es muss angegeben werden, über welcher Spalte die DataFrames zusammengefügt werden sollen. Optional kann außerdem die Art des Joins angegeben werden. Standardmäßig werden Inner Joins durchgeführt.

### 3.6.2 Union

Union wird dazu verwendet, DataFrames zusammenzufügen, die gleich aufgebaut sind und in allen Spalten übereinstimmen. Durch `df1.union(df2)` werden zwei DataFrames zusammengefügt. Der resultierende DataFrame beinhaltet die Daten der beiden DataFrames. Dabei werden die Daten des zweiten DataFrames unten an die Daten des ersten angehängt. Die ursprünglichen Reihenfolgen werden beibehalten.

### 3.7 Exportieren von DataFrames

DataFrames können unter anderem in Textdateien und Datenbanktabellen exportiert werden. Äquivalent zum DataFrameReader für den Import existiert ein sogenannter DataFrameWriter für den Export. Der Export in eine CSV-Datei beispielsweise erfolgt durch `spark.write.csv(filepath)`. Um einen DataFrame mit den Spaltenüberschriften zu exportieren, kann auch hier, nach dem Dateipfad, `header=True` angegeben werden. Beim Export in Textdateien, wird für jede Partition eine eigene Textdatei erstellt, die sich dann alle in einem Ordner befinden. Um eine Textdatei für den gesamten Datensatz zu erzeugen, kann nach spark.write `repatition(1)` (s. Abschnitt [2.1 Erstellung von RDDs](#erstellungrdd)) angegeben werden. Der Export in Datenbanktabellen erfolgt über `spark.write.jdbc()` (ergänzende Informationen sind [hier](https://spark.apache.org/docs/2.3.2/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter.jdbc) zu finden).

### 3.8 Zusammenfassung: Strukturierte Daten - SparkSQL und DataFrames

| Beschreibung | Code |
| :--- | :--- |
| DataFrame aus Collection |`spark.createDataFrame(data=data, schema=schema)` |
| DataFrame aus CSV-Datei |`spark.read.csv(filepath, inferSchema=True, header=True)` |
| DataFrame aus Datenbanktabelle |`spark.read.jdbc(url, tablename)` |
| Schema erzeugen |`StructType([StructField(columnname, type, nullable], ...)` |
| DataFrame anzeigen |`df.show()` |
| Spalte(n) eines DataFrames auswählen |`df.select(columnname(s))` |
| Schema eines DataFrames anzeigen |`df.printSchema()` |
| Statistische Informationen zu einer Spalte/mehreren Spalten anzeigen |`df.describe(columnname(s))` |
| Anzahl der Zeilen eines DataFrame ausgeben |`df.count()` |
| Sortieren eines DataFrames |`df.sort(columnname)` / `df.orderBy(columnname)` |
| Gruppieren eines DataFrames |`df.groupBy(columnname)` |
| SQL-Abfrage auf temporäre View eines DataFrames |`df.createOrReplaceTempView(viewname)` <br> `spark.sql(query)` |
| Zusammenfügen von DataFrames über einer Spalte |`df1.join(df2, df1.columnname == df2.columnname)` |
| Zusammenfügen von DataFrames mit gleichem Schema |`df1.union(df2)` |

<br>
<br>

<div style="background-color: #e25a1c ; padding: 5px; "></div>

### 3.9 Aufgaben: Strukturierte Daten - SparkSQL und DataFrames

**Im Ordner "data" befinden sich zwei Dateien namens "nyc_taxi_januar_part1.csv" und "nyc_taxi_januar_part2.csv". Importiere diese Datensätze und erstelle daraus zwei DataFrames. Achte beim Import darauf, das Schema und die Spaltennamen der Datei zu übernehmen. Füge die beiden DataFrames im Anschluss per Join zusammen. Überprüfe dazu anhand der Spalten der beiden DataFrames über welcher Spalte sie zusammengefügt werden sollten. Entferne diese Spalte anschließend mit [drop()](https://sparkbyexamples.com/pyspark/pyspark-drop-column-from-dataframe/) aus dem Ergebnis-DataFrame. Sie enthält nur aufsteigende Identifizierer und wurde lediglich für den Join der beiden DataFrames hinzugefügt.**

In [None]:
# Hier kann die Aufgabe bearbeitet werden
januar = 

# Hier wird die Aufgabe kontrolliert
lm.show_task(31, januar)

**Wie viele Einträge beinhaltet der gerade erstellte DataFrame? Verwende zur Beantwortung der Aufgabe eine der zuvor vorgestellten Funktionen, die die Anzahl der Zeilen zurückgeben.**

In [None]:
# Hier kann die Aufgabe bearbeitet werden


In [None]:
#hideInput
lm.show_task(32)

**Wie viele Spalten beinhaltete der gerade erstellte DataFrame?**

In [None]:
# Hier kann die Aufgabe bearbeitet werden


In [None]:
#hideInput
lm.show_task(33)

**Im Ordner "data" befindet sich noch eine SQLite-Datenbank namens "NYCTaxi.db". Erzeuge aus den Daten der Datenbanktabelle "nyc_taxi_april" einen DataFrame. Verwende für die Verbindung zur Datenbank die Variablen url = jdbc:sqlite:filepath. Ein Treiber muss nicht installiert werden, da dies bereits beim Erstellen der Spark Session gemacht wurde.**

In [None]:
# Hier kann die Aufgabe bearbeitet werden
april = 

# Hier wird die Aufgabe kontrolliert
lm.show_task(34, april)

**Welche der folgenden Spalten befinden sich in dem gerade erstellten DataFrame?**

In [None]:
# Hier kann die Aufgabe bearbeitet werden


In [None]:
#hideInput
lm.show_task(35)

**Nun haben wir die Daten für Taxifahrten aus Januar und April vorliegen. Füge die DataFrames januar und april jetzt zu einem DataFrame namens taxi zusammen. Durch die vorherigen Ausgaben der DataFrames sollte ersichtlich sein, wie sie zusammengefügt werden müssen.**

In [None]:
# Hier kann die Aufgabe bearbeitet werden
taxi = 

# Hier wird die Aufgabe kontrolliert
lm.show_task(36, taxi)

**Sortiere den Datensatz anhand der Entfernung der Fahrt (trip_distance). Wie viele Meilen ging die weiteste Fahrt?**

In [None]:
# Hier kann die Aufgabe bearbeitet werden


In [None]:
#hideInput
lm.show_task(37)

<br>
<br>

<div style="background-color: #3c3a3e ; padding: 5px; "></div>

## 4 Analyse und Bearbeitung von strukturierten Daten

Spark wird unter anderem dazu verwendet Daten zu analysieren, zu untersuchen und zu bearbeiten/manipulieren. Dazu werden in diesem Abschnitt einige hilfreiche Funktionen und deren Anwendung vorgestellt. Es werden strukturierte Daten betrachtet, wir verwenden also DataFrames. Um den Umgang mit DataFrames zu trainieren, werden Taxidaten aus New York bereinigt und untersucht und vor allem zeitliche und räumliche Faktoren betrachtet. Der Datensatz beinhaltet die folgenden Variablen:

| Variable | Beschreibung |
| :--- | :--- |
| VendorID | Code, der den TPEP-Anbieter angibt, der den Datensatz bereitgestellt hat |
| tpep_pickup_datetime | Datum und Uhrzeit, zu der der Zähler gestartet wurde |
| tpep_dropoff_datetime | Datum und Uhrzeit, zu der der Zähler gestoppt wurde |
| passenger_count | Anzahl der Fahrgäste im Fahrzeug (durch Fahrer eingegeben) |
| trip_distance | Fahrstecke in Meilen, die vom Taxmeter gemeldet wird |
| pickup_longitude | Längengrad, in dem der Zähler gestartet wurde |
| pickup_latitude | Breitengrad, in dem der Zähler gestartet wurde |
| Ratecode | Tarifoce, der am Ende der Reise gilt |
| store_and_fwd_flag | Zeigt an, ob der Fahrtdatensatz vor dem Senden an den Anbieter im Fahrzeug gespeichert wurde, weil das Fahrzeug keine Verbindung zum Server hatte |
| dropoff_longitude | Längengrad, in dem der Zähler gestoppt wurde |
| dropoff_latitude | Breitengrad, in dem der Zähler gestoppt wurde |
| payment_type | Gibt an, wie der Fahrgast die Fahrt bezahlt hat |
| fare_amount | Vom Zähler berechneter Zeit- und Entfernungstarif |
| extra | Verschiedene Extras und Zuschläge |
| mta_tax | MTA-Steuer (0,50 Dollar), die automatisch auf Grundlage des verwendeten Zählertarifs ausgelöst wird |
| tip_amount | Trinkgeldbeitrag - wird automatisch für Kreditkarten-Tripps ausgefüllt, Bargeldtripps werden nicht berücksichtigt |
| tolls_amount | Gesamtbeitrag aller in der Fahrt gezahlten Mautgebühren |
| improvement_surcharge | Verbesserungszuschlag (0,30 Dollar)  |
| total_amount | Gesamtbetrag, der Fahrgästen berechnet wird (enthält kein Trinkgeld in bar) |

### 4.1 Filterung von DataFrames

Das Filtern von Daten dient dazu, nur gewisse Teile eines Datensatzes auszuwählen. Eine Filterfunktion bestimmt dabei, welche Daten erhalten bleiben. In PySpark werden die Funktionen `filter(function)` und `where(function)` für das Filtern von DataFrames benutzt. Die beiden Funktion sind synonym zu betrachten und führen zu dem gleichen Ergebnis. Wenn mehrere Filterbedingungen verwendet werden, müssen sie jeweils geklammert werden und mit & (logisches und) oder | (logisches oder) verknüpft werden.

Da es sich bei SparkSQL um eine Bibliothek handelt, die unter anderem die Brücke zwischen Spark und dem Konzept von relationalen Datenbanken schlägt, wurde die Funktion where() als Alternative zu filter() hinzugefügt. where() entspricht dem Befehl beim Filtern von Datenbanktabellen mit SQL-Abfragen.

Beim Ausführen der Codezelle ist zu erkennen, dass beide Filtervorgänge die Fahrten herausfiltern, die zum "John F. Kennedy" Flughafen gingen und deshalb den Tarifcode "JFK" haben.

In [None]:
# Filterung eines DataFrames mit filter()
taxi.select("tpep_pickup_datetime", "tpep_dropoff_datetime", "Ratecode").filter(taxi.Ratecode == "JFK").show()

# Filterung eines DataFrames mit where()
taxi.select("tpep_pickup_datetime", "tpep_dropoff_datetime", "Ratecode").where(taxi.Ratecode == "JFK").show()

### 4.2 Bereinigung von strukturierten Daten

Häufig kommt es vor, dass Daten verunreinigt sind. Die Datenbereinigung dient dazu fehlerhafte, doppelte, inkonsistente, falsch formatierte oder irrelevante Daten zu finden und sie zu korrigieren. Dabei werden Daten beispielsweise ergänzt, gelöscht, umformatiert oder angepasst. Es werden Duplikate entfernt, leere Datenfelder gefüllt oder entfernt und fehlerhafte Daten gelöscht.

### 4.2.1 Umgang mit Duplikaten

Beispielsweise durch Fehler in der Aufzeichnung können Duplikate entstehen. Dabei handelt es sich um exakt identische Zeilen in einem DataFrame. Es ist eher unrealistisch mehrfach genau die gleichen Daten zu haben. In unserem Taxi-Datensatz würde das bedeuten, dass es mindestens zwei Fahrten gab, die genau zur gleichen Zeit, an der gleichen Position begonnen und geendet haben. Dabei sind genau gleich viele Personen mitgefahren und es entstand genau der gleiche Preis. Da solche Doppelungen unrealistisch sind und weitere Analysen dadurch beeinflusst werden können, sollten Duplikate entfernt werden.

Mit `dropDuplicates()` werden Duplikate entfernt, die in allen Variablen übereinstimmen. Dabei wird der erste Eintrag behalten und alle weiteren, die diesem Eintrag entsprechen, entfernt. Es ist auch möglich nur Duplikate in gewissen Spalten zu suchen. Wenn ein Datensatz eine Spalte enthält, in der einzigartige Werte stehen müssen, dürfen dort natürlich keine Doppelungen vorliegen.

### 4.2.2 Umgang mit fehlenden Werten

Manchmal kommt es vor, dass einzelne Daten innerhalb eines Datensatzes fehlen, also null sind. Je nachdem wie man mit diesen Daten umgehen möchte, können sie entfernt oder beibehalten werden. Wenn sie erhalten bleiben sollen, ist es sinnvoll sie zu ersetzen. Mit den Funktionen `fillna(replacement)` oder `na.fill(replacement)` können Nullwerte in einem DataFrame durch 0, eine leere Zeichenkette, ein Leerzeichen oder ein konstantes Literal ersetzt werden. Wenn dagegen alle Zeilen mit Nullwerten entfernt werden sollen, werden die Funktionen `dropna()` oder `na.drop()` verwendet. 

Die Funktion `isNull()` wird dazu verwendet einen DataFrame auf Nullwerte hin zu untersuchen. Sie kann auf bestimmte Spalten angewendet werden. Der DataFrame wird dann anhand der Bedingung gefiltert, dass die Spalte Nullwerte enthält. Eine Filterung über alle Spalten kann mit Hilfe einer For-Schleife durchgeführt werden. 

In [None]:
# Filterung der Spalte Vendor auf fehlende Werte
taxi.filter(taxi.Vendor.isNull()).show()

In [None]:
# Anzeigen der Anzahl fehlender Werte für jede Spalte eines DataFrames
# mit taxi.columns wird eine Liste der Spaltennamen erstellt
for col in taxi.columns:
    print(col, "beinhaltet " + str(taxi.filter(taxi[col].isNull()).count()) + " Nullwerte")

### 4.2.3 Umgang mit Ausreißern

Bei der Datenbereinigung werden häufig auch Daten entfernt, die nicht innerhalb eines bestimmten Definitionsraumes liegen. Bei diesen Werten handelt es sich um Ausreißer, die zwar dem Datentyp der jeweiligen Spalte entsprechen, aber in Bezug auf den Anwendungsfall unrealistisch sind. Da solche Werte eine Analyse der Daten beeinflussen können und beispielsweise bei der Untersuchung der Daten durch Machine Learning Algorithmen zu Fehlern führen können, sollten sie entfernt werden. Die genaue Filterung ist dabei natürlich immer abhängig vom Anwendungsfall. 

In einem New Yorker Taxi sind beispielsweise maximal 5 Fahrgäste erlaubt. Alle Einträge mit mehr als 5 Fahrgästen sollten also herausgefiltert werden. Da jedoch sehr viele Einträge die Zahl 6 beinhalten, werden wir erst einmal nur die Einträge mit mehr als 6 Fahrgästen entfernen. Da es sich bei der Zahl der Fahrgäste um einen Eintrag handelt, den der Fahrer des Taxis selber eingibt, kann es sein, dass einige der Fahrer die Gesamtanzahl an Personen innerhalb des Taxis angegeben haben, sich selbst also mitgezählt haben.

In [None]:
# Entfernen aller Einträge, die außerhalb eines bestimmten vordefinierten Bereichs liegen
taxi = taxi.filter(taxi.passenger_count <= 6)

Mit Hilfe der Funktion `distinct(columnname)` können die eindeutigen Werte einer Spalte gefunden werden. Das ist vor allem sinnvoll, wenn kategorische Werte vorliegen. Es wird dann jeder Wert, der innerhalb der Spalte vertreten ist, einmal ausgegeben. Dadurch können Ausreißer gut erkannt werden. Die Anwendung dieser Funktion ist allerdings nur sinnvoll, wenn es sich dabei um einige wenige eindeutige Werte handelt, da sonst schnell der Überblick verloren werden kann. 

In [None]:
# Ausgabe der Anbieter der Aufzeichnungssysteme, die Daten bereitgestellt haben
taxi.select("Vendor").distinct().show()

### 4.3 Zusammenfassung: Analyse und Bereinigung von strukturierten Daten

| Beschreibung | Code |
| :--- | :--- |
| Filtern von DataFrames | `df.filter(function)` / `df.where(functions)` |
| Duplikate entfernen | `df.dropDuplicates()` |
| Nullwerte ersetzen | `df.fillna(replacement)` / `df.na.fill(replacement)` |
| Nullwerte entfernen | `df.dropna()` / `df.na.drop()` |
| Einzigartige Werte einer Spalte ausgeben | `df.select(columnname).distinct()` | 

<br>
<br>

<div style="background-color: #e25a1c ; padding: 5px; "></div>

### 4.4 Aufgaben: Analyse und Bereinigung von strukturierten Daten

**Welche Tarifcodes für Taxifahrten in New York gibt es basierend auf dem Datensatz taxi? Kreuze die richtigen Antworten an.**

In [None]:
# Hier kann die Aufgabe bearbeitet werden


In [None]:
#hideInput
lm.show_task(41)

**In der letzten Aufgabe ist ein Ausreißer aufgefallen, bei dem es sich nicht um einen Ratecode handelt. Entferne diesen aus dem Datensatz.**

In [None]:
# Hier kann die Aufgabe bearbeitet werden
taxi_ratecode_cleaned = 

# Hier wird die Aufgabe kontrolliert
lm.show_task(42, taxi_ratecode_cleaned)

**Welche Spalten des Taxi-Datensatzes taxi_ratecode_cleaned beinhalten den Wert 0? Dabei sollen keine Nullwerte gesucht werden. Es geht hier darum, welche Spalten den numerischen Wert 0 beinhalten.**

In [None]:
# Hier kann die Aufgabe bearbeitet werden


In [None]:
#hideInput
lm.show_task(43)

**Bei welchen dieser Spalten ist es inhaltlich sinnvoll, die Einträge mit dem Wert 0 zu entfernen? Überlege dazu, bei welchen Variablen es unrealsitisch ist, eine 0 zu haben.**

In [None]:
#hideInput
lm.show_task(44)

**Entferne die Einträge mit Nullen in den Spalten, die in der letzten Aufgabe als überflüssig festgestellt wurden aus dem DataFrame namens taxi_ratecode_cleaned.**

In [None]:
# Hier kann die Aufgabe bearbeitet werden
taxi_without_null = 

# Hier wird die Aufgabe kontrolliert
lm.show_task(45, taxi_without_null)

**Im Datensatz sind die Längen- und Breitengrade des Abfahrtsorts und des Ankunftsortes gesammelt. New York befindet sich in etwa zwischen den Längengraden -74,26 und -73,7 und zwischen den Breitengraden 40,916 und 40,477.**

**Passe taxi_without_null so an, dass nur noch Längen- und Breitengrade innerhalb des "erlaubten" Rahmens vorhanden sind. Nehme diese Anassung sowohl für die Abfahrtsorte als auch für die Ankunftsorte vor und speichere den bereinigten Datensatz in einem DataFrame namens taxi_borders_cleaned.**

<img src="Images/New_York_BoundaryBox.png" width="400">

In [None]:
# Hier kann die Aufgabe bearbeitet werden
taxi_borders_cleaned = 

# Hier wird die Aufgabe kontrolliert
lm.show_task(46, taxi_borders_cleaned)

Im folgenden befindet sich eine Übersicht über die, nach der Anpassung, noch vorhandenen Abfahrts- und Ankunftsorte.

Längen- und Breitengrade der Abfahrtsorte:

In [None]:
vis.visualize_map(taxi_borders_cleaned, "pickup_longitude", "pickup_latitude")

Längen- und Breitengrade der Ankunftsorte:

In [None]:
vis.visualize_map(taxi_borders_cleaned, "dropoff_longitude", "dropoff_latitude")

<br>
<br>

<div style="background-color: #3c3a3e ; padding: 5px; "></div>

### 4.5 Manipulation von Spalten

Wenn in Spark mit DataFrames gearbeitet wird und sie verändert werden, werden häufig Änderungen an den Spalten vorgenommen. Sie können entfernt, umbenannt, ihr Inhalt und Datentyp verändert und neue Spalten hinzugefügt werden. Wenn Änderungen an einer Spalte vorgenommen werden, wird diese häufig mit `withColumn(columnname, function)` angesprochen und durch die übergebene Funktion angepasst.

### 4.5.1 Inhalte und Datentypen von Spalten ändern

Manchmal kann es sinnvoll sein, die Datentypen der Inhalte von DataFrames zu ändern. Wenn ein Datensatz beispielsweise für das Training eines Machine-Learning Algorithmus verwendet werden soll, sollten alle Werte als numerische Werte vorliegen. Häufig kommt es dabei vor, dass Spalten kategorische Werte beinhalten, die als Strings vorliegen. Um diese in numerische Werte umzuwandeln, erfolgt zuerst eine Ersetzung der Stringwerte durch Zahlen als Stringwerte mit `regexp_replace(columnname, oldvalue, newvalue)`. Anschließend werden sie mit der Funktion `cast(type)` in numerische Werte umgewandelt. Um direkt numerische Werte zu ersetzen, wird `replace()` verwendet.

In [None]:
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import IntegerType

# Ersetzen von Stringwerten in einer Spalte durch Integerwerte
taxi_regex_replace = taxi_borders_cleaned.withColumn('store_and_fwd_flag', regexp_replace('store_and_fwd_flag', 'Y', '1'))
taxi_regex_replace = taxi_regex_replace.withColumn('store_and_fwd_flag', regexp_replace('store_and_fwd_flag', 'N', '0'))

# Ändern des Datentyps einer Spalte
taxi_regex_replace = taxi_regex_replace.withColumn("store_and_fwd_flag", taxi_regex_replace.store_and_fwd_flag.cast(IntegerType()))

# Ausgabe des Schemas eines DataFrames
taxi_regex_replace.printSchema()

### 4.5.2 Spalten umbenennen

Manchmal kommt es vor, dass DataFrames keine aussagekräftigen Spaltennamen besitzen. Wenn beim Import aus einer CSV-Datei beispielsweise nicht header=True verwendet wurde, setzt PySpark alternative Spaltennamen. Die Spalten werden dann mit "_ c" und einer darauffolgenden aufsteigenden Nummerierung bezeichnet. Dies ist natürlich nicht hilfreich, wenn ein Datensatz untersucht werden soll. Spalten werden durch `withColumnRenamed(oldname, newname)` umbenannt.

Im Taxidatensatz gibt es zwei Spalten, die "tpep_pickup_datetime" und "tpep_dropoff_datetime" heißen. TPEP steht dabei für "Taxicap Passenger Enhancement Program" und signalisiert, dass es sich bei der Abfahrts- und Ankunftszeit, um die durch das elektronische System aufgezeichnete Zeit handelt. Da diese Information für die Analyse nicht unbedingt von Bedeutung ist und zu Verwirrungen führen kann, können die Spalten umbenannt werden.

In [None]:
# Umbenennen der Spalten tpep_pickup_datetime und tpep_dropoff_datetime
taxi_columns_renamed = taxi_regex_replace.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")
taxi_columns_renamed = taxi_columns_renamed.withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")

# Ausgabe des Schemas eines DataFrames
taxi_columns_renamed.printSchema()

### 4.5.3 Spalten hinzufügen

In Aufgabenteil [3.7](#aufgabenstrukturiertedaten) haben wir bereits die Funktion `drop(columnname)` zum Entfernen von Spalten kennengelernt. Basierend auf einer übergebenen Funktion können mit `withColumn()` allerdings auch neue Spalten hinzugefügt werden. Dazu wird innerhalb der Klammer eine Vorschrift für die Erzeugung der Spalte erstellt. Beispielsweise kann eine neue Spalte aus der Addition der Werte zweier anderer Spalten erstellt werden. Wird eine neue Spalte mit der `select()`Funktion erzeugt, dann muss die anschließend mit `alias(columnname)` benannt werden. Das Erzeugen einer Spalte mit select erfolgt in der Form `df.select(columnname(s), functions.alias(columnname))`. Mit Hilfe der Funktionen `lit()` und `typedLit()` in select() können Spalten hinzugefügt werden, die ein Literal oder einen konstanten Wert enthalten. 

Im Taxidatensatz gibt es einige Spalten, die Informationen in Bezug auf den Preis der Fahrt beinhalten. Die Spalte "total_amount" sollte die Summe der Spalten "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount" und "improvement_sucharge" beinhalten. Dies kann Stichproben-artig kontrolliert werden, indem eine Spalte erstellt wird, die diese Summe beinhaltet.

In [None]:
# Hinzufügen einer Spalte zu einem DataFrame basierend auf einer Berechnung
taxi_sum = taxi_columns_renamed.withColumn("price_sum", taxi_columns_renamed.fare_amount+taxi_columns_renamed.extra
                                           +taxi_columns_renamed.mta_tax+taxi_columns_renamed.tip_amount
                                           +taxi_columns_renamed.tolls_amount+taxi_columns_renamed.improvement_surcharge)

# Anzeigen der ersten drei Zeilen des gerade erstellten DataFrames
taxi_sum.select("total_amount", "price_sum").show(5)

### 4.6 User Defined Functions

Die sogenannten UDFs werden dazu verwendet, um den Funktionsumfang von PySpark zu erweitern. In PySpark können Funktionen in Python-Syntax erstellt werden und in der Methode `udf()` verpackt und somit als UDF registiert werden. Danach kann diese neue Funktion auf DataFrames angewendet werden. Es ist allerdings immer empfehlenswert, zuerst zu schauen, ob es eine entsprechende Funktion bereits gibt. Der Funktionsumfang von PySpark erweitert sich mit jeder Version. Deswegen kann es gut sein, dass eine gewünschte Funktionalität bereits implementiert ist. Außerdem muss bei dem Entwurf von UDFs darauf geachtet werden sorgfältig vorzugehen, um eventuelle Optimierungs- und Leistungsprobleme zu verhindern.

User Defined Functions können beispielsweise dazu genutzt werden, die Einträge eines Datensatzes anzupassen. Im Beispiel werden Wörter, die mit einem Großbuchstaben anfangen so umgewandelt, dass sie mit einem Kleinbuchstaben beginnen.

In [None]:
# Notwendige Packages importieren
from pyspark.sql.functions import udf

# Funktion, die einen String erhält und den ersten Buchstaben jedes Wortes in einen Kleinbuchstaben umwandelt
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
        resStr = resStr + x[0:1].lower() + x[1:len(x)] + " "
    return resStr

# Konvertierung der Python-Funktion in eine User Defined Function
convertUDF = udf(lambda z: convertCase(z))

# Anwendung der UDF auf einen DataFrame und Anzeige der ersten drei Zeilen
taxi_columns_renamed.select(convertUDF("Vendor").alias("Vendor")).show(5)

### 4.7 Neue Daten ermitteln und hinzufügen

Häufig kommt es bei der Analyse von Datenmengen vor, dass Zusammenhänge erkannt werden oder aus bereits vorhandenen Daten neue Informationen gewonnen werden können, die für eine weitere Analyse interessant sind. In einem solchen Fall ist es sinnvoll, diese Informationen zu extrahieren und an anderer Stelle zu speichern, z.B. als ergänzende Spalte im Datensatz. Diese Werte können bspw. bei einer anschließenden Untersuchung des Datensatzes mit Machine Learning Algorithmen helfen, wichtige Erkenntnisse zu gewinnen.

Im Taxi-Datensatz gibt es zu jedem Eintrag, also zu jeder Fahrt, Ortsangaben. Diese Angaben bestehen aus den Längen- und Breitengrade des Abfahrts- und Ankunftsortes. Mithilfe eines **k-Means-Clustering Algorithmus** (weitere Informationen [hier](https://www.bigdata-insider.de/was-ist-der-k-means-algorithmus-a-734637/)) aus der Spark-Bibliothek MLlib können diese so geclustert werden, dass New York in Zonen aufgeteilt wird, denen Werte von 0 bis 14 zugeordnet werden.  Dadurch ist es später einfacher zu erkennen, in welchem Bereich der Stadt eine Fahrt begann und endete, ohne die Längen- und Breitengrade betrachten zu müssen. 

Wie genau das Modell arbeitet ist an dieser Stelle nicht von Bedeutung. Es soll verdeutlicht werden, wie reibungslos die verschiedenen Spark-Bibliotheken miteinander arbeiten können. Es ist zu erkennen, dass einige der bereits kennengelernten Funktionen mit neuen Funktionen der MLlib-Bibliothek verknüpft werden. 

In [None]:
# Packages für Machine Learning installieren
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Zusammenführen der Spalten "dropoff_latitude" und "dropoff_longitude" zu einer Vektorspalte namens "features"
vecAssembler = VectorAssembler(inputCols=["dropoff_latitude", "dropoff_longitude"], outputCol="features")
new_df = vecAssembler.transform(taxi_columns_renamed)

# Trainieren eines k-Means-Clustering Algorithmus mit 15 Clustern und der zuvor erstellen "features"-Spalte
# Ziel ist es, New York anhand der Längen- und Breitengrade in Cluster einzuteilen
kmeans = KMeans().setK(15).setSeed(1)
model = kmeans.fit(new_df.select('features'))

# Anwenden des traierten Modells auf die Längen- und Breitengrade der Abfahrtsorte um ein Cluster für diese zu erstellen
vecAssembler = VectorAssembler(inputCols=["pickup_latitude", "pickup_longitude"], outputCol="features")
new_df = vecAssembler.transform(taxi_columns_renamed)
df = model.transform(new_df)

# Aus der vorherigen Anwendung des Modells auf die Abfahrtsorte ist eine Spalte prediction entstanden, die die jeweilige
# Clusternummer zum Ort beinhaltet. Diese Spalte wird in diesem Schritt umbenannt und die "features"-Spalte entfernt,
# damit auch für die Ankunftsorte ein Cluster erstellt werden kann
df = df.withColumnRenamed('prediction', 'pickup_cluster')
df = df.drop('features')

# Anwenden des traierten Modells auf die Längen- und Breitengrade der Ankunftsorte, um ein Cluster für diese zu erstellen
vecAssembler = VectorAssembler(inputCols=["dropoff_latitude", "dropoff_longitude"], outputCol="features")
new_df = vecAssembler.transform(df)
df = model.transform(new_df)

# Umbennen der Spalte "prediction" in "dropoff_cluster" und entfernen der vektorisierten "features"-Spalte
df = df.withColumnRenamed('prediction', 'dropoff_cluster')
df = df.drop('features')

# Umbenennen des Ergebnis-DataFrames in taxi_cluster
taxi_cluster = df

Cluster der Abfahrtsorte:

In [None]:
# Visualisierung des Clusters der Abfahrtsorte
vis.visualize_cluster(taxi_cluster, "pickup_latitude", "pickup_longitude", "pickup_cluster")

Cluster der Ankunftsorte:

In [None]:
# Visualisierung des Clusters der Ankunftsorte
vis.visualize_cluster(taxi_cluster, "dropoff_latitude", "dropoff_longitude", "dropoff_cluster")

**In welchem Bereich beginnen die meisten Fahrten?**

In [None]:
# Hier kann die Aufgabe bearbeitet werden


In [None]:
#hideInput
lm.show_task(47)

### 4.8 Zusammenfassung: Bearbeitung und Manipulation von strukturierten Daten

| Beschreibung | Code |
| :--- | :--- |
| Spalten eines DataFrames verändern | `df.withColumn(columnname, function)` |
| Datentyp einer Spalte ändern | `df.columnname.cast(type)` |
| Stringwerte ersetzen | `df.withColumn(columnname, regexp_replace(columnname, oldvalue, newvalue))` |
| Numerische Werte ersetzen | `df.withColumn(columnname, replace(columnname, oldvalue, newvalue))` |
| Spalte eines DataFrames umbenennen | `df.withColumnRenamed(oldname, newname)` |
| Spalte eines DataFrames entfernen | `df.drop(columnname)` |
| Einem DataFrame eine Spalte hinzufügen | `df.select(columnname(s), functions.alias(columnname))` |
| User Defined Function erstellen | `udf(lambda function)` |

<br>
<br>

<div style="background-color: #e25a1c ; padding: 5px; "></div>

### 4.9 Aufgaben: Bearbeitung und Manipulation von strukturierten Daten

**Numerische Werte für Kategorien sind zwar für ML-Algorithmen notwendig, aber nicht besonders anschaulich. Ändere deshalb die Einträge der Spalte payment_type so, dass folgende Ersetzungen erfolgen. Verwende dafür den DataFrame taxi_columns_renamed.**
- 1 = Credit card
- 2 = Cash
- 3 = No charge
- 4 = Dispute
- 5 = Unknown
- 6 = Voided trip

In [None]:
from pyspark.sql.types import StringType

# Hier kann die Aufgabe bearbeitet werden
taxi_paymenttype_string = 

# Hier wird die Aufgabe kontrolliert
lm.show_task(51, taxi_paymenttype_string)

**Auf welche Art wird am häufigsten gezahlt?**

In [None]:
# Hier kann die Aufgabe bearbeitet werden


In [None]:
#hideInput
lm.show_task(52)

**Erstelle einen DataFrame aus taxi_columns_renamed. Dem neuen DataFrame soll eine Spalte namens journey_time_seconds hinzugefügt werden, die die Fahrtdauer in Sekunden enthält. Verwende zur Ermittlung der Fahrtdauer die Spalten pickup_datetime und dropoff_datetime. Verwende für die Lösung der Aufgabe [to_timestamp()](https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html) aus den pyspark.sql.functions.**

In [None]:
from pyspark.sql.types import LongType

# Hier kann die Aufgabe bearbeitet werden
taxi_journeytime= 

# Hier wird die Aufgabe kontrolliert
lm.show_task(53, taxi_journeytime)

**In New York ist es den Taxifahrern erlaubt maximal 10 Stunden am Stück zu fahren. Fahrten die länger gedauert haben, sind also vermutlich fehlerhaft aufzgezeichnet. Wie viele "fehlerhafte" Fahrten gibt es, nach dieser Definition, in dem Datensatz?**

In [None]:
# Hier kann die Aufgabe bearbeitet werden


In [None]:
#hideInput
lm.show_task(54)

**Um weitere Erkenntnisse aus dem Datensatz gewinnen zu können, ist es außerdem interessant, sich die zeitlichen Faktoren anzuschauen. Erzeuge aus taxi_columns_renamed einen DataFrame mit zwei neuen Spalten namens weekday und hour (in dieser Reihenfolge), die den Wochentag des Fahrtbeginns als Stringwert in der Form Mon/Tue/Wed/Thu/Fri/Sat/Sun und die Stunde, in der die Fahrt begonnen hat, beinhalten. Verwende dazu die Funktionen [date_format()](https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html) und [hour()](https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html) aus den pyspark.sql.functions.**

In [None]:
# Hier kann die Aufgabe bearbeitet werden
taxi_hour_weekday = 

# Hier wird die Aufgabe kontrolliert
lm.show_task(55, taxi_hour_weekday)

**Wie viele Fahrten wurden zwischen 11 und 12 Uhr Montags gefahren? Filter den Datensatz, sodass nur noch die Fahrten übrig sind, die zu dieser Zeit durchgeführt wurden und ermittle deren Anzahl.**

In [None]:
# Hier kann die Aufgabe bearbeitet werden


In [None]:
#hideInput
lm.show_task(56)

**Erzeuge aus taxi_hour_weekday einen DataFrame mit einer Spalte, die in jeder Zeile die Anzahl der Fahrten in der jeweiligen Stunde beinhaltet. Nenne die Spalte rides_per_hour. Verwende dazu [Window](https://sparkbyexamples.com/pyspark/pyspark-window-functions/) aus pyspark.sql und [count()](https://spark.apache.org/docs/3.1.1/sql-getting-started.html) aus pyspark.sql.functions.**

In [None]:
# Hier kann die Aufgabe bearbeitet werden
taxi_ridesperhour = 

# Hier wird die Aufgabe kontrolliert
lm.show_task(57, taxi_ridesperhour)

**Zu welcher Uhrzeit wurden die meisten Fahrten gefahren.**

In [None]:
# Hier kann die Aufgabe bearbeitet werden


In [None]:
#hideInput
lm.show_task(58)

<br>
<br>

<div style="background-color: #3c3a3e ; padding: 5px; "></div>

## 5 Fazit

In diesem Lernmodul wurde das Datenanalyse-Framework Apache Spark vorgestellt und der Umgang mit der Python-Bibliothek PySpark gelernt. Die Idee hinter Spark wurde vorgestellt, die Besonderheiten der parallelen Verarbeitung von Daten auf dem Arbeitsspeicher von Knoten innerhalb eines Cluster wurde erklärt und die Vorteile einer solchen Verarbeitung in Bezug auf die Performance beschrieben. Das Grundkonzept hinter Apache Spark, die Resilient Distributed Datasets als verteilte Sammlungen von Daten wurden vorgestellt und die Brücke zu strukturierten Daten in DataFrames geschlagen. Der Unterschied von RDDs und DataFrames sollte deutlich geworden sein und wird hier noch einmal zusammengefasst:

| Eigenschaften | RDD | DataFrame |
| :--- | :--- | :--- |
| Definition | verteilte Collection von Daten | verteilte Tabelle mit Spaltennamen |
| Schema | Nein | Ja |
| Unveränderlich | Ja | Ja |
| Typsicherung | Ja | Nein |
| Query-Optimierung | Nein | Ja |
| Level | Low | High (basiert auf RDD) |
| Lazy Transformation | Ja | Ja |

<br>

Anschließend wurde der Umgang mit DataFrames praktisch ahand von Taxidaten aus New York erlernt und dabei deren Eigenschaften kennengelernt. Es wurden neue Daten hinzugefügt und dabei der grundlegende Umgang mit Algorithmen für maschinelles Lernen in Spark aufgezeigt. Wir haben dies in eher kleinem und einfachem Maße mit der Erstellung von Clustern und dem Extrahieren von Information gemacht. Dabei ging es vor allem um zeitliche und räumliche Eigenschaften und Zusammenhänge. 

Interessante Projekte für weitere Analysen mit diesem Datensatz wären beispielsweise die Anwendung eines ML-Algorithmus, um die Dauer oder den Preis einer Taxifahrt vorherzusagen. Dabei sollte aber darauf geachtet werden, dass während des Trainings und Testings des Modells für die Features nur Daten vorliegen sollten, die zu Beginn einer Fahrt vorhanden sein können. Dafür müsste der Datensatz also angepasst werden.

Die Zusammenfassungen der einzelnen Abschnitte sind hier noch einmal in einer Datei gesammelt: <a href="Pyspark_Handout.pdf">Download</a>

## 6 Weiterführende Informationen

[Ausführen einer Spark-Anwendung in einem Cluster](https://spark.apache.org/docs/latest/cluster-overview.html)  
[PySpark API Documentation](https://spark.apache.org/docs/latest/api/python/)