# Big Data Analytics Praktikum 3

**Abfragen mit Apache Spark** 

Machen Sie sich mit Apache Spark vertraut. Bearbeiten Sie die Aufgaben indem Sie **Spark RDD** entsprechend transformieren. 

## Arbeitsanweisung
Nutzen Sie die markierten Zellen im vorliegenden Notebook `BDA1_A3_Spark.ipynb` für Ihre Lösungen und laden Sie es in Ilias hoch. In den Zellen muss ausführbarer python code vorliegen. Die Ausgabe soll unterhalb der jeweiligen Zellen produziert werden.
Liefern Sie auch aussagekräftiges Markdown zu Ihrem Code (Vorgehen, Quellen, etc) ab.

**Hinweis**: Verwenden Sie für diese Aufgaben *nicht* Spark SQL und *keine* Dataframes. 

----

## Vorbereitung
* Verwenden Sie immer den vorgegeben Spark Master um Inkonsistenzen der python3 Versionen zwischen Worker und Client zu verhindern.
* Ändern Sie nicht die SparkContext Konfiguration und beenden Sie bitte den SparkContext nachdem Sie die Bearbeitung beenden, um die Resourcen wieder frei zu geben! (`stop_sc1()`)
* Stellen Sie sicher, dass `pyspark` installiert ist (pip install)

In [1]:
%pip install pyspark



Für diese Aufgabe steht ein HDFS mit dem Namenode `namenode` unter Port `19000` bereit. 

Sie finden die folgenden Dateien darin:

* **`/data/bda1/co2data.tsv`**<br>Datensatz von Messungen verschiedener CO<sub>2</sub>-Sensoren
* **`/data/bda1/co2data_pm.tsv`**<br>Datensatz von Messungen verschiedener CO<sub>2</sub>-Sensoren mit Partikelmessung, pm ist die Partikeldichte, npm die Partikelanzahl bestimmter Größen

In [2]:
import pyspark
from pprint import pprint
import os
import sys

print(sys.version)  # python3 version

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'ipython3'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook'
def stop_sc1():
    """stop spark context if exists"""
    try:
        sc1.stop()
        print('Spark Context stopped')
    except Exception as ex1:
        print(f'No context stopped: {ex1}')

3.9.6 | packaged by conda-forge | (default, Jul 11 2021, 03:39:48) 
[GCC 9.3.0]


### Spark Context erzeugen
Die Session mit dem vorgegebenen Spark Master öffnen.

In [3]:
stop_sc1()

# never ever change these lines!
config = pyspark.SparkConf().setAll([('spark.executor.memory', '1g'), ('spark.executor.cores', '1'), ('spark.cores.max', '2'), ('spark.driver.memory','1g'), ("spark.app.name", os.environ['JUPYTERHUB_CLIENT_ID'])])
sc1 = pyspark.SparkContext(master='spark://jupiter.bigdata.fh-aachen.de:17077', conf=config)

# Festlegen der Dateipfade
file_path_co2data = "hdfs://namenode:19000/data/bda1/co2data.tsv"
file_path_co2data_pm = "hdfs://namenode:19000/data/bda1/co2data_pm.tsv"

# Laden der Daten in RDDs
co2data = sc1.textFile(file_path_co2data)
co2data_pm = sc1.textFile(file_path_co2data_pm)

No context stopped: name 'sc1' is not defined


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/20 02:59:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Aufgabe 3 a

Untersuchen Sie die Sensordaten unter ``hdfs://namenode/data/bda1/*``. Beantworten Sie die folgenden Fragen, indem Sie jeweils geeignete RDD Operationen in Spark ausführen:

1. Wieviele Messungen sind der Datenmenge `co2data.tsv` zu finden? Wieviele sind es in `co2data_pm.tsv` ?
2. Wie lauten die Attributnamen der Datenmenge `co2data_pm.tsv` ? Geben Sie sie zeilenweise aus.
3. Wieviele verschiedene Sensoren (angegeben im Feld _serial_number_) enhält die Datenmenge `co2data.tsv` ?  Beachten Sie den Hinweis unter 3a)
4. Wieviele Datenpunkte je Sensor liegen vor in `co2data.tsv` ? Geben Sie sowohl Sensor als auch Anzahl aus und beachten Sie den Hinweis unter 3a)
5. Was ist der höchste, und was der niedrigste Temperaturwert in der Datenmenge `co2data_pm.tsv` ? 
6. Was ist der durchschnittliche CO<sub>2</sub>-Wert je Sensor in der Datenmenge `co2data_pm.tsv` ? Runden Sie gerne auf drei Nachkommastellen und beachten Sie den Hinweis unter 3a)

**Hinweis:**<br>Die Serial Number besteht aus drei Komponenten: `s_` als Präfix, die eindeutige MAC-Adresse des Sensors und einer Zahl. Die Eindeutigkeit wird nur über die MAC-Adresse bestimmt. Nutzen Sie die MAC-Adresse.

## Nr.3a 1) Zählung von Messungen in den Datensätzen
- Quelle count: https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis 
- Die Count Funktion zählt die Anzahl der Elemente im RDD, somit kann die Anzahl der Messungen gezählt werden

In [None]:
# Aufgabe 3a 1

#Zeilen zählen
#num_rows = co2data.count()
#print(f'The total number of rows is {num_rows}')

#Überblick über Datensatz(nicht lösung für Aufgabe)
#first_five = co2data_pm.take(200)
#first_five = co2data.take(200)
#for row in first_five:
#print(row)
    
num_measurements_co2data = co2data.count()
num_measurements_co2data_pm = co2data_pm.count()

print(f"Anzahl der Messungen in co2data.tsv: {num_measurements_co2data}")
print(f"Anzahl der Messungen in co2data_pm.tsv: {num_measurements_co2data_pm}")



## Nr.3a 2) Ausgabe der Attributnamen
- Quelle split: https://spark.apache.org/examples.html
- Quelle first: https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis
- first() gibt erste Element des RDDs zurück(in unserem Fall die Zeile mit den Attributnamen)
- split() trennt die Attributnamen am Tabulator als Element

In [None]:
# Aufgabe 3a 2
attribute_names_co2data_pm = co2data_pm.first()
print("Attributnamen der Datenmenge co2data_pm.tsv:")
for attribute in attribute_names_co2data_pm.split("\t"):
    print(attribute)

## Nr.3a 3) Zählung der verschiedenen Sensoren

- Quelle map, distinct, filter: https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis
- Entfernen des headers des RDD co2data
- Extrahieren der MAC - Adresse aus dem serial_number feld
- Nutzung der distinct() Methode um einzigartige MAC-Adressen zu ermitteln
- Zählung der Adressen miot count

In [None]:
# Aufgabe 3a 3
# Header entfernen
header = co2data.first() 
co2data_noheader = co2data.filter(lambda line: line != header)

# Durchführung der Mac Adressen Filterung
mac_addresses = co2data_noheader.map(lambda x: x.split("\t")[1].split("s_")[1].split("_")[0])
sensors = mac_addresses.distinct().count()

print(f"Anzahl der verschiedenen Sensoren in co2data.tsv: {sensors}")

## Nr.3a 4) Zählung der Datenpunkte je Sensor
Quellen: collect, reduceBykey: https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis

- entfernen des haders des RDD co2data
- extrahieren der MAC-Adresse aus dem serial_number Feld
- zusammenfassung der Datenpunkte pro MAC-Adresse mit reduceByKey()

In [None]:
# Header entfernen
header = co2data.first() 
co2data_noheader = co2data.filter(lambda line: line != header)

sensor_counts = co2data_noheader.map(lambda x: (x.split("\t")[1].split("s_")[1].split("_")[0], 1))
sensor_counts = sensor_counts.reduceByKey(lambda a, b: a + b)

print("Anzahl der Datenpunkte je Sensor:")
for sensor, count in sensor_counts.collect():
    print(f"Sensor {sensor}: {count} Datenpunkte")

## Nr.3a 5) Suchen des höchsten und niedrigsten CO2- Wertes
- Quelle min, max: https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis 
- mit split werden die Zeilen in Elemente aufgeteilt wo ein Tabulatorzeichen ist und dann sechste Element bzw. an der fünften Stelle ausgewählt
- mit strip werden die Anführungszeichen entfernt
- mit filter werden die ungültigen null-Werte herausgefiltert 
- mit map(float) werden die Daten in Fließkommazahlen umgewandelt
- mit den min und max Funktionen wird schließlich der höchste und niedrigste Temperaturwert herausgefiltert

In [None]:
# Aufgabe 3a 5

# extrahiert den Header
header = co2data_pm.first()  

# filtert den Header aus den Daten heraus
co2data_pm_no_header = co2data_pm.filter(lambda row: row != header)  

# Filtern der ungültigen Einträge und konvertieren der gültigen Einträge in float
temperatures = co2data_pm_no_header.map(lambda x: x.split("\t")[5].strip('"')).filter(lambda x: x != 'null').map(float)

min_temperature = temperatures.min()
max_temperature = temperatures.max()

print(f"Der niedrigste Temperaturwert in der Datenmenge co2data_pm.tsv: {min_temperature}")
print(f"Der höchste Temperaturwert in der Datenmenge co2data_pm.tsv: {max_temperature}")

## Nr.3a 6) Berechnung des durchschnittlichen C02-Wertes je Sensor
- Quelle  collect, mapValues: https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis
- extract serial number teilt string an jedem unterstrich und gibt das zweite element zurück(mac adresse)
- valid_sensors_and co2 filtert null werte heraus, erzeugt die form der macadressen, entfernt anführungszeichen
- sum_counts berechnet Summe und Anzahl der co2 werte
- average_co2 berechnet den durschnittlichen co2 wert für jede serial number indem die summe der co2 Werte durch die Anzahl geteilt wird

In [None]:
# Entfernen vom Header
header = co2data_pm.first()
co2data_pm_no_header = co2data_pm.filter(lambda line: line != header)

# Funktion zur Extraktion der Seriennummer
def extract_serial_number(serial_number):
    return serial_number.split('_')[1]

# neues RDD mit gültigen Sensoren und CO2-Werten
valid_sensors_and_co2 = co2data_pm_no_header \
    .map(lambda x: (extract_serial_number(x.split("\t")[3].strip('"')), x.split("\t")[4].strip('"'))) \
    .filter(lambda x: x[1] != 'null')

# berechnung der co2- werte in float
valid_sensors_and_co2 = valid_sensors_and_co2.mapValues(float)

# Berechnung Summe und Anzahl der CO2-Werte für jede Seriennummer
sum_counts = valid_sensors_and_co2 \
    .mapValues(lambda x: (x, 1)) \
    .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))

# Berechnung Durchschnitt für jede Seriennummer
average_co2 = sum_counts \
    .mapValues(lambda x: x[0]/x[1]) \
    .collect()

# Ausgabe der Ergebnisse
for serial, avg in average_co2:
    print(f"Mac adress {serial} has an average CO2 value of {avg}")


## Aufgabe 3 b

Machen Sie Aussagen in Markdown zu den beiden Datenmengen, nachdem Sie sich die Information aus Spark gezogen haben.

* Kommen Sensoren (MAC-Adresse in Serial Number) in beiden Datenmengen vor? Wenn ja, welche?
* Anhand welchen Attributs können Sie auf vorhandene Werte in den Attributen `pm*` und `npm*` filtern? Geben Sie ein Beispiel.
* Enthält eine Datenmenge "fehlerhafte" CO<sub>2</sub>-Messungen? Wenn ja, wieviele Messungen sind betroffen? Fehlerhaft ist ein Wert `null`


## Nr.3b 1) Ausgabe der gemeinsamen Sensoren aus beiden Datensätzen
-Gemeinsame Sensoren:
Sensor: e8db84c5f33d
Sensor: 8caab57c89c3
Sensor: a848fac03782
Sensor: 8caab57c9751
Sensor: 8caab57d01da
Sensor: e8db84c62ab4
Sensor: 8caab57d0593
Sensor: 3c6105d45469
Sensor: a848fac0a9df
Sensor: 3c6105d381e8
Sensor: 308398824a5e
Sensor: a848fac03ff8
Sensor: 3c6105d36eb0
Sensor: 3c6105d3908f
Sensor: 40915150b550
Sensor: 8caab57ccaaf
Sensor: d8bfc014724e
Sensor: e8db84c5f771
Sensor: 308398a2f790
Sensor: 308398805c0b
Sensor: 3c6105d414f3
Sensor: ac0bfbd85271
Sensor: 4091514f0284
Sensor: 308398b552c4
Sensor: 308398a2fb52
Sensor: ac0bfbd71044
Sensor: 10521c01cf19
Sensor: 3c6105d4188d
Sensor: 30839882fcc8
Sensor: 8caab57cc813
Sensor: ac0bfbd6547d
Sensor: 308398b54e4c
Sensor: 308398b620a0
Sensor: 8caab57cc961
Sensor: e8db84c5f33d"
Sensor: 10521c0202ab
Sensor: 8caab57c3e19
Sensor: 8caab57cbb52
Sensor: 3c6105d467fd
Sensor: 308398a26607
Sensor: 8caab57cfb10
Sensor: 308398b5b69c
Sensor: 308398b60c74
Sensor: 8caab57cafa8
Sensor: 3c6105cffb3b
Sensor: 3c6105d3abae
Sensor: 308398b6157d
Sensor: 308398a2ddcb
Sensor: 308398a2a1f6
Sensor: 8caab57a6dd9
Sensor: 308398a2000e
Sensor: 8caab57b0e22
Sensor: ac0bfbd857fb
Sensor: 3c6105d34ddc
Sensor: 308398a2a8b6
Sensor: 308398b595c0
Sensor: 3c6105d389cc
Sensor: e8db84c60450
Sensor: 308398b61fd3
Sensor: e8db84c5fc6a
Sensor: 30839882dbce
Sensor: 8caab57a6dd9"
Sensor: ac0bfbd64321
Sensor: d8bfc0147061
Sensor: 308398b651ee
Sensor: 308398a2a0e4
Sensor: 3c6105d49d8d
Sensor: 3083988004ef

In [None]:
#header co2data entfernen
header1 = co2data.first() 
co2data_noheader = co2data.filter(lambda line: line != header1)

#header co2data_pm entfernen
header2 = co2data_pm.first() 
co2data_noheader_pm = co2data_pm.filter(lambda line: line != header2)

# Sensoren in co2data zählen
sensor_counts_co2data = co2data_noheader.map(lambda x: (x.split("\t")[1].split("s_")[1].split("_")[0], 1))
sensor_counts_co2data = sensor_counts_co2data.reduceByKey(lambda a, b: a + b)

# Sensoren in co2adata_pm zählen
sensor_counts_co2data_pm = co2data_noheader_pm.map(lambda x: (x.split("\t")[3].split("s_")[1].split("_")[0], 1))
sensor_counts_co2data_pm = sensor_counts_co2data_pm.reduceByKey(lambda a, b: a + b)

# Mergen der beiden RDDs
combined_sensor_counts = sensor_counts_co2data.union(sensor_counts_co2data_pm)

# Reduzieren der vereinigten RDDs und Filtern der Sensoren, die in beiden Datensätzen vorhanden sind
common_sensors = combined_sensor_counts.reduceByKey(lambda a, b: a + b).filter(lambda x: x[1] > 1)

# Ausgeben der gemeinsamen Sensoren
common_sensors_list = common_sensors.keys().collect()

print("Gemeinsame Sensoren:")
for sensor in common_sensors_list:
    print(f"Sensor: {sensor}")

## Nr.3b 2.) Filtern nach pm und npm Werten


In [None]:
#header co2data_pm entfernen
header = co2data_pm.first() 
co2data_noheader_pm = co2data_pm.filter(lambda line: line != header)

# Zeilen filtern, keine null-werte in der pm1-Spalte haben
non_null_pm1 = co2data_noheader_pm.filter(lambda x: x.split("\t")[7] != "null")

## Nr.3b 3.) Zählung fehlerhafter Co2-Messungen
- Die Ergebnisse zeigen, dass es in dem co2_data Datensatz keinen einzigen co2 'null' Wert gibt
- In dem co2_data_pm Datensatz wiederum 9 fehlerhafte Werte

In [None]:
# Filtern der Daten, in denen co2_ppm 'null' ist
wrong_co2_measurements = co2data.map(lambda x: x.split("\t")).filter(lambda x: x[3]=='null')
wrong_co2_measurements2 = co2data_pm.map(lambda x: x.split("\t")).filter(lambda x: x[4]=='null')

# Zählung der fehlerhaften Messungen
num_wrong_measurements = wrong_co2_measurements.count()
num_wrong_measurements2 = wrong_co2_measurements2.count()

# Ausgabe Anzahl fehlerhafter Messungen
print(num_wrong_measurements)
print(num_wrong_measurements2)

In [None]:
stop_sc1()  # always exit your spark context after work!

## Nützliche Links
* https://spark.apache.org/docs/latest/rdd-programming-guide.html
* https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis
* https://spark.apache.org/examples.html