# Übung Spark mit Databricks
### Ziel der Übung
Das Ziel der Übung besteht darin, die Wikimedia-Daten aus dem Kafka Cluster zu extrahieren und auszuwerten. Es sollen folgende Fragestellungen beantwortet werden:
- Was ist das Verhältnis zwischen Bot- und Nicht-Bot Einträgen?
- Was sind die TOP-5 Domains mit den meisten Änderungen?
- Was sind die TOP-5 Accounts mit den meisten Änderungen

Zur Auswertung der Daten soll die Dataframe API verwendet werden ([API-Dokumentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html)).

In [0]:
# Für die Übung benötigte Imports
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, LongType, IntegerType
from pyspark.sql.functions import from_json, col, desc

### Lade die Wikimedia-Daten aus dem Kafka Cluster
Eine hilfreiche Dokumentation findet ihr [hier](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html). Da die Verbindung zum Upstash-Cluster Probleme verursachen kann, geben wir euch einen Teil der Optionen vor:

```python
option("kafka.bootstrap.servers", "your_value")
option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"your_value\" password=\"your_value\";")
option("kafka.sasl.mechanism", "SCRAM-SHA-256")
option("kafka.security.protocol", "SASL_SSL")
```

In [0]:
# inputDF = spark \
#           ...

display(inputDF)

### Umwandlung der Daten in das Ursprungsformat
Bei diesem Abschnitt gibt es für euch nichts zu tun. Kafka übermittelt die Daten in binärer Form, weshalb wir sie hier in das ursprüngliche JSON-Format umwandeln.

In [0]:
schema = StructType([
  StructField("schema", StringType()),
  StructField("meta", StructType([
    StructField("uri", StringType()),
    StructField("request_id", StringType()),
    StructField("id", StringType()),
    StructField("dt", StringType()),
    StructField("domain", StringType()),
    StructField("stream", StringType()),
    StructField("topic", StringType()),
    StructField("partition", IntegerType()),
    StructField("offset", LongType())
  ])),
  StructField("id", LongType()),
  StructField("type", StringType()),
  StructField("namespace", IntegerType()),
  StructField("title", StringType()),
  StructField("comment", StringType()),
  StructField("timestamp", LongType()),
  StructField("user", StringType()),
  StructField("bot", BooleanType()),
  StructField("minor", BooleanType()),
  StructField("patrolled", BooleanType()),
  StructField("length", StructType([
    StructField("old", IntegerType()),
    StructField("new", IntegerType())
  ])),
  StructField("revision", StructType([
    StructField("old", LongType()),
    StructField("new", LongType())
  ])),
  StructField("server_url", StringType()),
  StructField("server_name", StringType()),
  StructField("server_script_path", StringType()),
  StructField("wiki", StringType()),
  StructField("parsedcomment", StringType())
])

parsedValueDF = inputDF \
  .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

display(parsedValueDF)

### Was ist das Verhältnis zwischen Bot und Nicht-Bot Einträgen?

Ermittelt das Verhältnis zwischen Bot und Nicht-Bot Einträgen. Stellt das Ergebnis anschließend in einem Pie-Chart dar. <br> Die relevanten Spark-Befehle findet ihr [hier](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.groupby.GroupBy.count.html).


In [0]:
# botCountDF = parsedValueDF \
#              ...

display(botCountDF)

### Was sind die TOP-5 Domains mit den meisten Änderungen?
Ermittelt die TOP-5 Domains mit den meisten Änderungen. Stellt das Ergebnis anschließend in einem Bar-Chart dar.

In [0]:
# changesPerDomainDF = parsedValueDF \
#                      ...

display(changesPerDomainDF)

### Was sind die TOP-5 Accounts mit den meisten Änderungen?
Ermittelt die TOP-5 Accounts mit den meisten Änderungen. Stellt das Ergebnis anschließend in einem Bar-Chart dar.

In [0]:
# changesPerAccountDF = parsedValueDF \
#                       ...

display(changesPerAccountDF)