# Dataset

### Wieso haben Sie sich für diesen Datensatz entschieden?

Wir haben im Internet (kaggle.com) nach Datensätzen recherchiert, welche einem «Big-Data» Ansatz entsprechen. Wir mussten aber feststellen, dass reine Text Datensätze, welche für unseren Cluster passen würden, schwer zu finden sind. Zusätzlich wollten wir ein Datensatz, der ein Thema beinhaltet, welches uns allen gängig ist. 

Daher haben wir uns für das «Airline on-time Performance Data» Dataset entschieden, da dieses mit einer Grösse von ca. 12 GB perfekt zu unser Cluster Grösse passt und wir die Begriffe im Datensat korrekt verstanden. 

### Wie haben Sie den Datensatz heruntergeladen? (api, save as, csv, xml, json, ...)

Wir haben die Daten von kaggle.com mitels eines API heruntergeladen. Die Daten werden als .csv Files auf der Master-VM zwischengespeichert. Anschliessend wurden die CSV-Files auf HDFS gespeichert und dort mittels SPARK in einzelne Parquet Files umformatiert. Schlussendlich wurden die alten CSV-Files gelöscht.

In [1]:
from kaggle.api.kaggle_api_extended import KaggleApi
from zipfile import ZipFile

In [2]:
api = KaggleApi()
api.authenticate()

Herunterladen und entpacken der Files:

In [3]:
api.dataset_download_file('bulter22/airline-data', 'carriers.csv')

True

In [4]:
api.dataset_download_file('bulter22/airline-data', 'airline.csv.shuffle')

True

In [5]:
zf = ZipFile('airline.csv.shuffle.zip')
zf.extractall()
zf.close()

### Wie haben Sie die Daten ins HDFS geladen? Musste die Blocksize von HDFS angepasst werden?

Aus den .csv Files erstellten wir Parquet-Files, welcher ebenfall in HDFS verteilt werden. Da die Default-HDFS-Blocksize bereits 128MB beträgt und auch Parquet diese Blockgrösse standardmässig aufweist, musste nichts weiteres veranlasst werden. Somit kann ein ganzer Block vom Parquet File von einem Block des HDFS gelesen werden.

Für die Partitionierung der Parquet-Files haben wir den Faktor 55 gewählt. Dieser Faktor wurde gewählt, da wir für SPARK gemäss Topologie insgesamt 55 CPU's für die Berechnungen zur Verfügung haben. Dadurch werden die SPARK-Jobs auf alle CPU's verteilt, was eine optimale Parallelisierung ermöglicht.

HDFS vorbereiten und Files kopieren:

In [6]:
!hdfs dfs -ls /

Found 1 items
drwxr-xr-x   - cluster supergroup          0 2022-05-05 12:04 /user


In [7]:
!hdfs dfs -mkdir /airline-data

In [8]:
!hdfs dfs -put ~/BDLC-Project-G01/airline.csv.shuffle /airline-data
!hdfs dfs -put ~/BDLC-Project-G01/carriers.csv /airline-data

In [9]:
!hdfs dfs -ls /airline-data

Found 2 items
-rw-r--r--   2 cluster supergroup 12029207752 2022-05-29 13:38 /airline-data/airline.csv.shuffle
-rw-r--r--   2 cluster supergroup       43758 2022-05-29 13:38 /airline-data/carriers.csv


Konvertieren der Files zu Parquet:

In [10]:
from pyspark.sql import SparkSession

In [11]:
spark = SparkSession.builder.appName('ConvertToParquet').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-05-29 13:38:56,652 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [12]:
spark.read.option('encoding', 'us-ascii').csv('/airline-data/carriers.csv', header=True).repartition(55).write.parquet('/airline-data/carriers.parquet')

                                                                                

In [13]:
spark.read.option('inferSchema', True).option('encoding', 'iso-8859-1').csv('/airline-data/airline.csv.shuffle', header=True).repartition(55).write.parquet('/airline-data/airline.parquet')

2022-05-29 13:39:59,790 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
2022-05-29 13:40:49,178 WARN scheduler.TaskSetManager: Lost task 68.0 in stage 6.0 (TID 216) (10.177.124.99 executor 9): java.io.IOException: No space left on device
	at java.io.FileOutputStream.writeBytes(Native Method)
	at java.io.FileOutputStream.write(FileOutputStream.java:326)
	at org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:59)
	at org.apache.spark.io.MutableCheckedOutputStream.write(MutableCheckedOutputStream.scala:43)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
	at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:223)
	at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:176)
	at java.io.BufferedOutpu

HDFS airline-data überprüfen:

In [14]:
!hdfs dfs -ls /airline-data

Found 4 items
-rw-r--r--   2 cluster supergroup 12029207752 2022-05-29 13:38 /airline-data/airline.csv.shuffle
drwxr-xr-x   - cluster supergroup           0 2022-05-29 13:42 /airline-data/airline.parquet
-rw-r--r--   2 cluster supergroup       43758 2022-05-29 13:38 /airline-data/carriers.csv
drwxr-xr-x   - cluster supergroup           0 2022-05-29 13:39 /airline-data/carriers.parquet


Originale löschen

In [15]:
!hdfs dfs -rm /airline-data/carriers.csv
!hdfs dfs -rm /airline-data/airline.csv.shuffle

Deleted /airline-data/carriers.csv
Deleted /airline-data/airline.csv.shuffle


In [16]:
!hdfs dfs -ls /airline-data

Found 2 items
drwxr-xr-x   - cluster supergroup          0 2022-05-29 13:42 /airline-data/airline.parquet
drwxr-xr-x   - cluster supergroup          0 2022-05-29 13:39 /airline-data/carriers.parquet


In [17]:
spark.stop()

### Ist Ihr Projekt ein Big-Data-Problem?

Da erst ab einer Grösse im Terabyte Bereich von Big-Data gesprochen wird und die airline-Daten mit wenigen Ausnahmen einheitlich sind, kann man hier nicht von einem Big-Data-Problem sprechen. Um ein Big-Data-Problem auszuwerten ist unser Cluster im EnterpriseLab leider zu klein, es ist aber trotzdem eine spannende Übung, da das File doch rund 123 Millionen Datensätze enthält. Mit dieser Anzahl Datensätzen und der vorhandenen Maschinen-Kapazität kann doch eine aussagekräftige und performante Analyse gemacht werden.

### Erklären Sie Ihre Daten. Was bedeuten die Felder, welches Schema sollten die Daten haben, gibt es primary keys und foreign keys?

Die Daten des «Airline on-time Performance Data» Dataset beinhalten die Aufzeichnungen der Inlandflüge in den USA von 1987 - 2008. Untenstehende Tabellen erklärt die Felder und das Schema. Die zwei Tabellen "Airlinedata" und "Carrier" werden mit dem Primarschlüssel "Code" (bei Carrier)  und dem Fremdschlüssel "UniqueCarrier" (bei Airlinedata) verbunden.

#### Airlinedata
| Name | Beschreibung| Datentyp |
| ------- | -------- | -------- |
| Year | Jahr | int |
| Month | Monat | int |
| Day of Month | Tag des Monat (1-31) | int |
| DayOfWeek | Wochentag (1 = Monday - 7 = Sunday) | int |
| DepTime | Reale Abflugszeit (lokal, hhmm) | int |
| CRSDepTime | Geplante Abflugszeit (lokal, hhmm) | int |
| ArrTime | Reale Ankunftszeit (lokal, hhmm) | int |
| CRSArrTime | Geplante Ankunftszeit (lokal, hhmm) | int |
| UniqueCarrier | Fluggesellschafts Code | string |
| FlightNum | Flugnummer | string |
| TailNum | Flugzeugnummer | string |
| ActualElapsedTime | Tatsächliche Zeitdauer (in Minuten) |  int |
| CRSElapsedTime | Geplante Zeitdauer (in Minuten) | int |
| AirTime | Flugdauer (in Minuten) | int |
| ArrDelay | Ankunft Verspätung (in Minuten) | int |
| DepDelay | Abflug Verspätung (in Minuten) | int |
| Origin | Startort IATA Flughafen Code | string |
| Dest | Zielort IATA Flughafen Code | string |
| Distance | Distanz (in Meilen) |  int |
| TaxiIn | Zeit in Bewegung am Boden bei Abflug (in Minuten) | int |
| TaxiOut | Zeit in Bewegung am Boden bei Ankunft (in Minuten) | int |
| Cancelled | Wurde der Flug gestrichen? | string |
| CancellationCode | Ursache der streichnung des Flugs (A = carrier, B = weather, C = NAS, D = security) | string |
| Diverted | Flug wurde Umgeleitet (1 = ja, 0 = nein) | int |
| CarrierDelay | Verspätung wegen des Gepäcks (in Minuten) | int | 
| WeatherDelay | Verspätung wegen des Wetters (in Minuten) | int |
| NASDelay | "National Aviation System" Verspätung (in Minuten) | int |
| SecurityDelay | Verspätung wegen der Sicherheit (in Minuten) | int |
| LateAircraftDelay | Verspätung wegen dem Flugzeug (in Minuten) | int |

#### Carrier
| Name | Beschreibung| Datentyp |
| ------- | -------- | -------- |
| Code | Fluggesellschafts Code | string |
| Description | Name der Airline | string |