# Big Data Analytics
Praktikum Sommersemester 2023. <small>Version 1.0</small>

**Aufgabe 3: Abfragen mit Apache Spark** 

Machen Sie sich mit Apache Spark vertraut. Bearbeiten Sie die Aufgaben indem Sie **Spark RDD** entsprechend transformieren. 

## Arbeitsanweisung
Nutzen Sie die markierten Zellen im vorliegenden Notebook `BDA1_A3_Spark.ipynb` für Ihre Lösungen und laden Sie es in Ilias hoch. In den Zellen muss ausführbarer python code vorliegen. Die Ausgabe soll unterhalb der jeweiligen Zellen produziert werden.
Liefern Sie auch aussagekräftiges Markdown zu Ihrem Code (Vorgehen, Quellen, etc) ab.

**Hinweis**: Verwenden Sie für diese Aufgaben *nicht* Spark SQL und *keine* Dataframes. 

----

## Vorbereitung
* Verwenden Sie immer den vorgegeben Spark Master um Inkonsistenzen der python3 Versionen zwischen Worker und Client zu verhindern.
* Ändern Sie nicht die SparkContext Konfiguration und beenden Sie bitte den SparkContext nachdem Sie die Bearbeitung beenden, um die Resourcen wieder frei zu geben! (`stop_sc1()`)
* Stellen Sie sicher, dass `pyspark` installiert ist (pip install)

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[K     |████████████████████████████▌   | 277.1 MB 91.5 MB/s eta 0:00:011

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



[K     |████████████████████████████████| 310.8 MB 22 kB/s 
[?25hCollecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[K     |████████████████████████████████| 200 kB 89.5 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l|

Für diese Aufgabe steht ein HDFS mit dem Namenode `namenode` unter Port `19000` bereit. 

Sie finden die folgenden Dateien darin:

* **`/data/bda1/co2data.tsv`**<br>Datensatz von Messungen verschiedener CO<sub>2</sub>-Sensoren
* **`/data/bda1/co2data_pm.tsv`**<br>Datensatz von Messungen verschiedener CO<sub>2</sub>-Sensoren mit Partikelmessung, pm ist die Partikeldichte, npm die Partikelanzahl bestimmter Größen

In [2]:
import pyspark
from pprint import pprint
import os
import sys

print(sys.version)  # python3 version

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'ipython3'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook'
def stop_sc1():
    """stop spark context if exists"""
    try:
        sc1.stop()
        print('Spark Context stopped')
    except Exception as ex1:
        print(f'No context stopped: {ex1}')

3.9.6 | packaged by conda-forge | (default, Jul 11 2021, 03:39:48) 
[GCC 9.3.0]


### Spark Context erzeugen
Die Session mit dem vorgegebenen Spark Master öffnen.

In [3]:
stop_sc1()

# never ever change these lines!
config = pyspark.SparkConf().setAll([('spark.executor.memory', '1g'), ('spark.executor.cores', '1'), ('spark.cores.max', '2'), ('spark.driver.memory','1g'), ("spark.app.name", os.environ['JUPYTERHUB_CLIENT_ID'])])
sc1 = pyspark.SparkContext(master='spark://jupiter.bigdata.fh-aachen.de:17077', conf=config)

No context stopped: name 'sc1' is not defined


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/12 20:16:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Einlesen der Daten aus HDFS und Erzeugen von RDDs

In [4]:
rdd_co2data = sc1.textFile("hdfs://namenode:19000/data/bda1/co2data.tsv")
rdd_co2data_pm = sc1.textFile("hdfs://namenode:19000/data/bda1/co2data_pm.tsv")

## Aufgabe 3 a

Untersuchen Sie die Sensordaten unter ``hdfs://namenode/data/bda1/*``. Beantworten Sie die folgenden Fragen, indem Sie jeweils geeignete RDD Operationen in Spark ausführen:

1. Wieviele Messungen sind der Datenmenge `co2data.tsv` zu finden? Wieviele sind es in `co2data_pm.tsv` ?
2. Wie lauten die Attributnamen der Datenmenge `co2data_pm.tsv` ? Geben Sie sie zeilenweise aus.
3. Wieviele verschiedene Sensoren (angegeben im Feld _serial_number_) enhält die Datenmenge `co2data.tsv` ?  Beachten Sie den Hinweis unter 3a)
4. Wieviele Datenpunkte je Sensor liegen vor in `co2data.tsv` ? Geben Sie sowohl Sensor als auch Anzahl aus und beachten Sie den Hinweis unter 3a)
5. Was ist der höchste, und was der niedrigste Temperaturwert in der Datenmenge `co2data_pm.tsv` ? 
6. Was ist der durchschnittliche CO<sub>2</sub>-Wert je Sensor in der Datenmenge `co2data_pm.tsv` ? Runden Sie gerne auf drei Nachkommastellen und beachten Sie den Hinweis unter 3a)

**Hinweis:**<br>Die Serial Number besteht aus drei Komponenten: `s_` als Präfix, die eindeutige MAC-Adresse des Sensors und einer Zahl. Die Eindeutigkeit wird nur über die MAC-Adresse bestimmt. Nutzen Sie die MAC-Adresse.

### Aufgabe 3a: 1. Anzahl der Messungen je Datenmenge
----------------
1. Einlesen der Daten aus HDFS und Erzeugen von RDDs
2. die Anzahl der Messungen je Datenmenge zählen mit ``count()`` und ausgeben

In [1]:
# Aufgabe 3a 1

print("Anzahl der Messungen aus co2data.tsv: ", rdd_co2data.count())
print("Anzahl der Messungen aus co2data_pm.tsv: ", rdd_co2data_pm.count())

NameError: name 'rdd_co2data' is not defined

### Aufgabe 3a: 2. Attributnamen der Datenmenge
----------------
1. ``.first()`` um die erste Zeile in diesem RDD zu holen
2. Trennt die erste Zeile durch das Tabulatortrennzeichen ``.split("\t")``, um jede einzelne Spalten zu erhalten.

In [6]:
# Aufgabe 3a 2

attribute_names = rdd_co2data_pm.first().split("\t")

# Ausgabe der Attributnamen zeilenweise
for attribute_name in attribute_names:
    print(attribute_name)

"timestamp"
"measurement_count"
"version"
"serial_number"
"co2_ppm"
"temperature_celsius"
"relative_humidity_percent"
"pm1"
"pm2"
"pm4"
"pm10"
"npm0"
"npm1"
"npm2"
"npm4"
"npm10"
"ps"


### Aufgabe 3a: 3. Anzahl der verschiedenen Sensoren
----------------
1. Das Feld für die Serial Nummer aus den RDD-Einträgen extrahieren.
2. Die erste Zeile wird herausgefiltert und die Serial Nummer wird durch ``.split("_")[1]`` gesplittert, um die Mac-Addresse des Sensors zu bekommen.
3. Die eindeutigen Mac-Adresse der Sensoren zählen ``.countApproxDistinct()``

In [7]:
# Aufgabe 3a 3

data = rdd_co2data.map(lambda line: line.replace('"', '').split("\t"))
header = data.first()
data = data.filter(lambda line: line != header).map(lambda line: line[1].split("_")[1])

sensors_count = data.countApproxDistinct()

print("Anzahl der verschiedenen Sensoren:", sensors_count)



Anzahl der verschiedenen Sensoren: 10


                                                                                

### Aufgabe 3a: 4. Anzahl der Datenpunkten je Sensor
----------------
1. Die eindeutige Mac-Adresse der Sensoren wie ber Aufgabe 3a_3 ermitteln.
2. Die Werte werden nach MAC-Adresse gruppiert ``.groupBy()`` und für jede Gruppe wird die Anzahl der Datenpunkte ermittelt.

In [8]:
# Aufgabe 3a 4

header = rdd_co2data.first()
data = rdd_co2data.map(lambda line: line.split("\t")).filter(lambda line: line[1] != '"serial_number"').groupBy(lambda line: line[1].split("_")[1].strip('"')).mapValues(lambda group: len(list(group)))

print("{:<10} {:<10}".format('Sensoren', 'Anzahl der Datenpunkten'))
for sensor, count in data.collect():
    print("{:<10} {:<10}".format(sensor, count))

Sensoren   Anzahl der Datenpunkten




10521c0202ab 2064      
d8bfc0147061 578457    
d8bfc014724e 2103522   
e8db84c5f33d 2270696   
8caab57a6dd9 2781677   
8caab57c3e19 1561046   
10521c01cf19 385105    
8caab57cc961 1131868   
e8db84c5f771 1665530   
3c6105d3abae 1533919   


                                                                                

### Aufgabe 3a: 5. Höchste und niedrigste Temperaturwerte
----------------
1. Das Feld für die Temperatur aus den RDD-Einträgen extrahieren.
2. Die erste Zeile und die Werte "null" werden herausgefiltert, um nur die gültige Temperatur-Werte zu erhalten.
3. Die Temperaturwerte in ``float`` konvertieren für die Berechnung.
4. Finden den maximalen und minimalen Temperaturwert durch Vergleich

In [9]:
# Aufgabe 3a 5

data = rdd_co2data_pm.map(lambda line: line.replace('"', '').split("\t"))

header = data.first()
data = data.filter(lambda line: line != header)

rdd_temperature = data.map(lambda line: line[5]).filter(lambda temp: temp != "null")

valid_temperature = rdd_temperature.filter(lambda temp: temp.replace(".", "", 1).isdigit())

rdd_temperature_float = valid_temperature.map(lambda temp: float(temp))

max_temperature = rdd_temperature_float.reduce(lambda a, b: a if a > b else b)
min_temperature = rdd_temperature_float.reduce(lambda a, b: a if a < b else b)

print("Maximum Temperature:", max_temperature)
print("Minimum Temperature:", min_temperature)



Maximum Temperature: 41.0
Minimum Temperature: 6.0


                                                                                

### Aufgabe 3a: 6. Durchschnittlische CO2-Wert je Sensor
----------------
1. Die erste Zeile und die Werte "null" werden herausgefiltert, um nur die gültige CO2-Werte zu erhalten.
2. Die Werte nach MAC-Adresse gruppieren ``.groupByKey()`` und den durchschnittlichen CO2-Wert berechnen und auf 3 Dezimalstellen runden.

In [10]:
# Aufgabe 3a 6

header = rdd_co2data_pm.first()
rdd_co2data_pm = rdd_co2data_pm.filter(lambda line: line != header)
data = rdd_co2data_pm.map(lambda line: line.split("\t")).filter(lambda line: (line[4] != '"null"')).map(lambda line: (line[3].split("_")[1], float(line[4].replace('"', ''))))

# Step 2: Group the data by MAC address and calculate the average CO2 value
average_co2 = data.groupByKey().mapValues(lambda values: round(sum(values) / len(values),3))

print("{:<10} {:<10}".format('Sensoren', 'Durchschnittlische CO2-Wert'))
for sensor, average in average_co2.collect():
    print("{:<10} {:<10}".format(sensor, average))

Sensoren   Durchschnittlische CO2-Wert




e8db84c5fc6a 554.979   
308398a2a1f6 657.518   
8caab57c9751 519.263   
8caab57cc813 547.52    
8caab57d01da 574.967   
308398a2ddcb 481.186   
3c6105d3908f 730.933   
8caab57cbb52 496.846   
a848fac03782 549.538   
ac0bfbd6547d 486.608   
308398b5b69c 1058.714  
308398a2fb52 541.715   
308398a2f790 688.314   
ac0bfbd64321 580.367   
308398b595c0 531.905   
e8db84c62ab4 560.562   
3c6105d381e8 546.007   
e8db84c5f33d 613.406   
e8db84c5f771 960.3     
ac0bfbd85271 573.868   
3c6105d467fd 640.568   
8caab57d0593 671.539   
308398b61fd3 860.563   
308398b651ee 676.252   
e8db84c60450 603.941   
4091514f0284 896.765   
30839882dbce 596.058   
308398a2000e 653.749   
308398b54e4c 704.048   
308398824a5e 532.384   
3c6105d4188d 594.736   
308398a26607 573.242   
308398b552c4 556.71    
308398b60c74 501.695   
8caab57b0e22 767.415   
ac0bfbd71044 524.589   
308398a2a0e4 586.763   
3c6105d389cc 736.777   
10521c0202ab 521.268   
3c6105d49d8d 513.605   
30839882fcc8 843.63    
ac0bfbd857fb 446

                                                                                

## Aufgabe 3 b

Machen Sie Aussagen in Markdown zu den beiden Datenmengen, nachdem Sie sich die Information aus Spark gezogen haben.

* Kommen Sensoren (MAC-Adresse in Serial Number) in beiden Datenmengen vor? Wenn ja, welche?
* Anhand welchen Attributs können Sie auf vorhandene Werte in den Attributen `pm*` und `npm*` filtern? Geben Sie ein Beispiel.
* Enthält eine Datenmenge "fehlerhafte" CO<sub>2</sub>-Messungen? Wenn ja, wieviele Messungen sind betroffen? Fehlerhaft ist ein Wert `null`


### Aufgabe 3b: 1. Kommen Sensoren in beiden Datenmengen
----------------
1. Zunächst werden die MAC-Adressen aus den beiden Datenmengen extrahiert.
2. Die erste Zeile mit dem "serial_number" wird herausgefiltert, um nur die eindeutige Mac-Addresse zu erhalten.
3. Dann werden die gemeinsamen MAC-Adressen in beiden Datenmengen durch die Funktion ``intersection()`` auf die beiden RDDs gefunden.

In [11]:
mac_addresses_co2data = rdd_co2data.map(lambda line: line.split("\t")[1]).filter(lambda line: line != '"serial_number"').map(lambda line: line.split("_")[1].strip('"')).distinct()
mac_addresses_co2data_pm = rdd_co2data_pm.map(lambda line: line.split("\t")[3]).filter(lambda line: line != '"serial_number"').map(lambda line: line.split("_")[1].strip('"')).distinct()

# Step 2: Find the common MAC addresses in both data sets
common_mac_addresses = mac_addresses_co2data.intersection(mac_addresses_co2data_pm)

# Step 3: Print the common MAC addresses
for mac_address in common_mac_addresses.collect():
    print("Common MAC-Adresse:", mac_address)



Common MAC-Adresse: e8db84c5f33d
Common MAC-Adresse: e8db84c5f771
Common MAC-Adresse: 10521c0202ab
Common MAC-Adresse: 8caab57c3e19
Common MAC-Adresse: 3c6105d3abae


                                                                                

### Aufgabe 3b: 2. Attribut um auf vorhandene Werte in den Attributen pm* und npm* zu filtern
----------------
**Antwort**: Attribut ``ps`` kann auf die Werte in pm* und npm* filtern.
- **Beispiel 1** : ``ps`` gleich 0.47 und keine null Werte in ``pm*`` und ``npm*``
- **Beispiel 2** : ``ps`` ungleich 0.47 und keine null Werte in ``pm*`` und ``npm*``
- **Beispiel 3** : ``ps`` gleich 0.47 und null Werte in ``pm*`` und ``npm*`` enthalten
> Es werden keine Zeilen gefunden, die diesen Kriterien entsprechen, daher ist die Ausgabe **leer**.
- **Beispiel 4** : ``ps`` ungleich 0.47 und null Werte in ``pm*`` und ``npm*`` enthalten

**Beispiel 1** : ``ps`` gleich 0.47 und keine null Werte in ``pm*`` und ``npm*``

In [12]:
# Define the filter condition
data_pm_rdd = rdd_co2data_pm.filter(lambda line: line.split("\t")[16] == '"0.47"' and '"null"' not in line)

for line in data_pm_rdd.take(10):
    data = line.split("\t")
    print(data)

['"1682899199"', '"26715"', '"1.1.0"', '"s_4091514f0284_309574"', '"1738"', '"21.0"', '"55.0"', '"1.58"', '"1.73"', '"1.77"', '"1.79"', '"10.74"', '"12.52"', '"12.61"', '"12.63"', '"12.63"', '"0.47"']
['"1682899194"', '"26714"', '"1.1.0"', '"s_4091514f0284_309574"', '"1739"', '"21.0"', '"55.0"', '"1.59"', '"1.74"', '"1.78"', '"1.8"', '"10.8"', '"12.59"', '"12.68"', '"12.7"', '"12.7"', '"0.47"']
['"1682899189"', '"26713"', '"1.1.0"', '"s_4091514f0284_309574"', '"1736"', '"21.0"', '"55.0"', '"1.69"', '"1.84"', '"1.88"', '"1.91"', '"11.45"', '"13.34"', '"13.44"', '"13.45"', '"13.46"', '"0.47"']
['"1682899184"', '"26712"', '"1.1.0"', '"s_4091514f0284_309574"', '"1735"', '"21.0"', '"55.0"', '"1.77"', '"1.93"', '"1.98"', '"2.0"', '"12.04"', '"14.03"', '"14.13"', '"14.15"', '"14.15"', '"0.47"']
['"1682899178"', '"26711"', '"1.1.0"', '"s_4091514f0284_309574"', '"1735"', '"21.0"', '"55.0"', '"1.78"', '"1.94"', '"1.98"', '"2.01"', '"12.07"', '"14.07"', '"14.17"', '"14.19"', '"14.19"', '"0.47"']


**Beispiel 2** : ``ps`` ungleich 0.47 und keine null Werte in ``pm*`` und ``npm*``

In [13]:
# Define the filter condition
data_pm_rdd = rdd_co2data_pm.filter(lambda line: line.split("\t")[16] != '"0.47"' and '"null"' not in line)

for line in data_pm_rdd.take(10):
    data = line.split("\t")
    print(data)

['"1682899199"', '"35305"', '"1.1.0"', '"s_ac0bfbd6547d_597369"', '"474"', '"23.0"', '"34.0"', '"2.12"', '"2.24"', '"2.24"', '"2.24"', '"14.56"', '"16.86"', '"16.92"', '"16.93"', '"16.93"', '"0.41"']
['"1682899193"', '"35304"', '"1.1.0"', '"s_ac0bfbd6547d_597369"', '"474"', '"23.0"', '"34.0"', '"2.15"', '"2.28"', '"2.28"', '"2.28"', '"14.78"', '"17.11"', '"17.18"', '"17.18"', '"17.19"', '"0.41"']
['"1682899188"', '"35303"', '"1.1.0"', '"s_ac0bfbd6547d_597369"', '"475"', '"23.0"', '"34.0"', '"2.1"', '"2.22"', '"2.22"', '"2.22"', '"14.45"', '"16.73"', '"16.79"', '"16.79"', '"16.8"', '"0.41"']
['"1682899182"', '"35302"', '"1.1.0"', '"s_ac0bfbd6547d_597369"', '"474"', '"23.0"', '"34.0"', '"2.01"', '"2.12"', '"2.12"', '"2.12"', '"13.78"', '"15.95"', '"16.01"', '"16.01"', '"16.02"', '"0.41"']
['"1682899177"', '"35301"', '"1.1.0"', '"s_ac0bfbd6547d_597369"', '"474"', '"23.0"', '"34.0"', '"1.93"', '"2.04"', '"2.04"', '"2.04"', '"13.26"', '"15.35"', '"15.41"', '"15.41"', '"15.42"', '"0.41"']
['

**Beispiel 3** : ``ps`` gleich 0.47 und null Werte in ``pm*`` und ``npm*`` enthalten

In [14]:
# Define the filter condition
data_pm_rdd = rdd_co2data_pm.filter(lambda line: line.split("\t")[16] == '"0.47"' and '"null"' in line)

for line in data_pm_rdd.take(10):
    data = line.split("\t")
    print(data)

                                                                                

**Beispiel 4** : ``ps`` ungleich 0.47 und null Werte in ``pm*`` und ``npm*`` enthalten

In [15]:
# Define the filter condition
data_pm_rdd = rdd_co2data_pm.filter(lambda line: line.split("\t")[16] != '"0.47"' and '"null"' in line)

for line in data_pm_rdd.take(10):
    data = line.split("\t")
    print(data)

['"null"', '"411"', '"0.9.8"', '"s_e8db84c5f33d_281913"', '"731"', '"18.0"', '"51.0"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"']
['"null"', '"1869"', '"0.9.8"', '"s_e8db84c5f33d_281913"', '"819"', '"17.0"', '"46.0"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"']
['"null"', '"315"', '"0.9.8"', '"s_e8db84c5f33d_281913"', '"1912"', '"18.0"', '"48.0"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"']
['"null"', '"6"', '"0.9.8"', '"s_e8db84c5f771_300390"', '"2860"', '"19.0"', '"52.0"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"']
['"null"', '"913"', '"0.9.8"', '"s_8caab57c3e19_282028"', '"788"', '"20.0"', '"40.0"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"', '"null"']
['"null"', '"1397"', '"0.9.8"', '"s_e8db84c5f771_300390"', '"431

### Aufgabe 3b: 3. "fehlerhafte" CO2-Messungen
------
1. Filtern der Daten, um nur Zeilen mit "null" in der Spalte "co2_ppm" zu erhalten.
2. Zählen der Anzahl von fehlerhaften Messungen mit ``.count()``
3. Die Anzahl von fehlerhaften Messungen der beiden Datenmengen wird ausgegeben.

In [16]:
faulty_data1 = rdd_co2data.filter(lambda line: line.split("\t")[3] == '"null"')
count_faulty_data1 = faulty_data1.count()
print("Anzahl der fehlerhaften CO2-Messungen aus co2data.tsv:", count_faulty_data1)

faulty_data2 = rdd_co2data_pm.filter(lambda line: line.split("\t")[4] == '"null"')
count_faulty_data2 = faulty_data2.count()
print("Anzahl der fehlerhaften CO2-Messungen aus co2data_pm.tsv:", count_faulty_data2)

                                                                                

Anzahl der fehlerhaften CO2-Messungen aus co2data.tsv: 19




Anzahl der fehlerhaften CO2-Messungen aus co2data_pm.tsv: 9


                                                                                

In [17]:
stop_sc1()  # always exit your spark context after work!

Spark Context stopped


## Nützliche Links
* https://spark.apache.org/docs/latest/rdd-programming-guide.html
* https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis
* https://spark.apache.org/examples.html