In [None]:
# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("word count") \
      .getOrCreate() 


In [None]:
spark

In [None]:
frank=spark.read.text("frankenstein.txt")

In [None]:
# und siehe da, es ist ein Dataframe, er hat 1 Spalte, und sie hat den Datentyp "String"
frank

In [None]:
# das Schema explizit anzeigen
frank.printSchema()

In [None]:
# und wenn wir auf das Schema direkt zugreifen wollen, dann gibt es alternativ die dtypes
print(frank.dtypes)
print(f"Ist das Element == 'string': { frank.dtypes[0][1] == 'string'}")
# Achtung, was passiert hier ... potentieller Denkfehler!
print(f'der Spaltenname ist {frank.dtypes[0][0]} und ist der Typ String?: { isinstance(frank.dtypes[0][1], str)}')
print(f'der Spaltenname ist {frank.dtypes[0][0]} und ist der Typ Int?: { isinstance(frank.dtypes[0][1], int)}')

In [None]:
# zeige die ersten 30 Zeilen komplett (nicht abgeschnitten (truncated))
frank.show(30,truncate=False)

In [None]:
# jetzt die Zeilen in einzelne Worte splitten
from pyspark.sql.functions import split
# select selektiert eine oder mehrere Spalten, hier eben nur die 1 Spalte "value"
# alias gibt dem Ergebnis (der selektierten Spalte) einen Namen (sonst waere es eher unhandlich)
# die Funktion split nimmt uebrigens eine REGEXP ... hier ist sie halt super-einfach> ein space
lines = frank.select(split(frank.value, " ").alias("Zeile"))
lines.show(10, truncate=100)


In [None]:
# das Schema: das Split hat ein Array generiert
lines.printSchema()

In [None]:
# eine Alternative zu ALIAS ist "withColumnRenamed" - die nimmt einen existierenden Dataframe und 
# erstellt einen neuen, der eben nun eine umbenannte Spalte hat
linesWRC = lines.withColumnRenamed("Zeile", "ZeileRenamed")
linesWRC.printSchema()

das gefaellt mir noch nicht so richtig, z.B. das Komma nach Frankenstein, oder die leere Zeile, 
oder die Gross- und Kleinschreibung

In [None]:
frank.show(100, truncate=False) # nur zum ueberpruefen ob die Regex unten tut, was sie soll

In [None]:
# Schritt 1: regexp verbessern, so dass das Zeichensetzung, Zahlen etc wegfällt
lines = frank.select(split(frank.value, "[^a-zA-Z]").alias("Zeile"))
lines.show(100, truncate=False)

In [None]:
# Spalte selektieren geht auf viele Arten:
lines.select(lines.Zeile).show()
lines.select("Zeile").show()
from pyspark.sql.functions import col
lines.select(col("Zeile")).show()
# wohingegen das hier nicht geht: das ist naemlich eine Column, kein Dataframe!
# lines["Zeile"].show()
# aber diese Spalte nutzen um dann eine Spalte im Dataframe auszuwaehlen, das geht:
lines.select(lines["Zeile"]).show()

# Exploding list of words into ROWS (nicht COLS)

In [None]:
from pyspark.sql.functions import explode, col
words = lines.select(explode(col("Zeile")).alias("word"))

In [None]:
words.show(15)

In [None]:
# huebsch ist das immer noch nicht. 
# Schritt 1: alles lower-case bitte
from pyspark.sql.functions import lower
words_lower = words.select(lower(col("word")).alias("word_lower"))
words_lower.show(truncate = False)

In [None]:
# weg mit den kurzen Zeilen bitte
from pyspark.sql.functions import regexp_extract
# minimum laenge 2 Zeichen ausser dem Wort "a" und "I"
words_clean = words_lower.select(regexp_extract(col("word_lower"), "[a-z]{2,}|a|i", 0).alias("echtesWort"))
words_clean.show()

In [None]:
# weg mit den leeren Zeilen bitte
proper_words = words_clean.filter(col("echtesWort") != "")
proper_words.show()

In [None]:
# nur zur Illustration, filter kann Negation: ~
proper_words_negation = words_clean.filter(col("echtesWort") != "")
proper_words_negation.show()

In [None]:
# vergleiche mit WHERE / gibt es da einen Unterschied?
proper_wordsWHERE = words_clean.where(col("echtesWort") != "")
proper_wordsWHERE.show()

In [None]:
# Follow-up Aufgabe zu 1b: aendere den Code von vorhin so dass
#-nur Woerter mit der Mindestlaenge von 3 Zeichen beibehalten werden diesmal mit der Funktion "length"

In [None]:
# Antwort
from pyspark.sql.functions import length
min3Zeichen = words_nonull.where(length(col("word")) > 3)

In [None]:
# Aufgabe: 
# was passiert hier - und was passiert, wenn man es negiert?
proper_words_any = words_clean.filter(col("echtesWort") != "any*")
proper_words_any.show()

# Die große Frage: in welcher Reihenfolge filtern? Wie crazy muss die Regex sein?
Antwort: Lesbarkeit geht vor. Spark evaluiert lazy, und kann "hintendran" sehr viel optimieren. Ist wahrscheinlich 
schlauer als wir;-)

# Aufgabe 1: erstelle einen Block "sauberen" Code, der Schritt fuer Schritt alle Einzelschritt ausfuehrt

In [101]:
# Antwort / die einen Fehler enthaelt, der aber gar nicht wie ein Fehler aussieht
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, lower, regexp_extract

spark = SparkSession.builder.getOrCreate()

frank = spark.read.text("frankenstein.txt")

lines = frank.select(split(frank.value, " ").alias("line"))

words = lines.select(explode(col("line")).alias("word"))

words_lower = words.select(lower(col("word")).alias("word_lower"))

words_clean = words_lower.select(regexp_extract(col("word_lower"), "[a-z]{2+}|a|i", 0).alias("word"))

words_nonull = words_clean.where(col("word") != "")

In [90]:
# Frage: was passiert, wenn man das hier ausfuehrt?
words_clean.show()

+-----+
| word|
+-----+
|hello|
|world|
| this|
| boat|
|  has|
|     |
|   no|
| oars|
|world|
|  has|
+-----+



In [None]:
# Antwort: es gibt einen Fehler. Ja he, das ging doch oben einwandfrei durch!
# des Raetsels Loesung: im Regex ist ein Fehler: das {2+} geht nicht, sonder man muss {2,} benutzen.
# da es aber lazy evaluation ist, finden wir das erst heraus, wenn das show aufgerufen wird.

In [None]:
# Aufgabe 1b: aendere den Code von eben so dass
- das Wort "is" aus dem gesamten Text entfernt wird
- nur Woerter mit der Mindestlaenge von 3 Zeichen beibehalten werden 

In [None]:
# Antwort:
# letzte Zeile: words_nonull = words_clean.where(col("word") != "is")
#words_clean = words_lower.select(
#    regexp_extract(col("word_lower"), "[a-z]{3+}", 0).alias("word")
#)

# Aufgabe 2: finde programmatisch heraus, wie viele Spalten KEINE Strings sind:
datenA2 = spark.createDataFrame([["test", "noch ein test", 10_000_000_000]], ["1", "2", "3"])

In [None]:
datenA2 = spark.createDataFrame([["test", "noch ein test", 10_000_000_000]], ["1", "2", "3"])
datenA2.printSchema()

In [None]:
print(len([x for x, y in datenA2.dtypes if y != "string"]))

In [None]:
# oder fuer die Nicht-Python Programmierer
cnt = 0
for x, y in datenA2.dtypes:
    if y != 'string':
        cnt += 1
print(f'cnt = {cnt}')

# Aufgabe 3: mache den Code lesbarer
datenA3 = spark.read.text("frankenstein.txt").select(length(col("value"))).withColumnRenamed("length(value)", "numChar")

***BEVOR*** Du den Code ausfuehrst: was ist denn nach dem Kommando im Dataframe datenA3?

In [None]:
from pyspark.sql.functions import length
datenA3 = spark.read.text("frankenstein.txt").select(length(col("value"))).withColumnRenamed("length(value)", "numChar")
datenA3.show(2)
datenA3b = spark.read.text("frankenstein.txt").select(length(col("value")).alias("numChar"))
datenA3b.show(2)

# Aufgabe 4: im folgenden Code gibt es ein Problem / was ist es, und wie kann man es reparieren?
finde erst einmal heraus, was der Code wohl als Ziel hat...

In [None]:
datenA4 = spark.createDataFrame([["key", 20_000_000, 10_000_000_000]], ["key", "value1", "value2"])
datenA4.printSchema()

In [None]:
from pyspark.sql.functions import greatest
from pyspark.sql.utils import AnalysisException
try:
    datenA4M = datenA4.select(greatest(col("value1"), col("value2")).alias("maxVal")).select("key","maxVal")
except AnalysisException as err:
        print(f"das war nicht gut: {err}")
datenA4M.show()

# Aufgabe 5 filtere einen ganzen Haufen Woerter raus
mit Hilfe der Funktion isin filtere die Woerter "is", "not", "if", "the" aus dem Text

In [36]:
exclDict = ["is", "not", "if", "the", "for", "of", "no", "at", "and"]
words_no_dict = words_nonull.where(~col("word").isin(exclDict))
words_no_dict.show()

+--------------+
|          word|
+--------------+
|       project|
|     gutenberg|
|  frankenstein|
|            by|
|          mary|
|wollstonecraft|
|        godwin|
|       shelley|
|          this|
|         ebook|
|           use|
|        anyone|
|      anywhere|
|          cost|
|          with|
|        almost|
|  restrictions|
|    whatsoever|
|           you|
|           may|
+--------------+
only showing top 20 rows



In [39]:
# Aufgabe 5b: versuche die exclusionWords in einem Dataframe darzustellen, und dann diesen im "isin" zu nutzen

In [40]:
# Antwort> geht leider nicht
exclusionWords = spark.createDataFrame(["is", "not", "if", "the", "for", "of", "no", "at", "and"], "string")
exclusionWords.show()
words_no_dict2 = words_nonull.where(~col("word").isin(exclusionWords.select("value")))

+-----+
|value|
+-----+
|   is|
|  not|
|   if|
|  the|
|  for|
|   of|
|   no|
|   at|
|  and|
+-----+



AttributeError: 'DataFrame' object has no attribute '_get_object_id'

# Aufgabe 6: Debugging
Finde den/die Fehler im folgenden Code und repariere in so, dass der Code so wie wohl erwartet funktioniert

In [None]:
from pyspark.sql.functions import col, split
try:
    book = spark.read.text("frankenstein.txt")
    book = book.printSchema()
    lines= book.select(spolit(book.value, " ").alias("lime"))
    words = lines.select(explode(col("line")).alias("word"))
except AnalysisException as err:
    print(err)
words.show()

In [75]:
# Antwort
from pyspark.sql.functions import col, split
from pyspark.sql.utils import AnalysisException # import wird unten benoetigt

try:
    book = spark.read.text("frankenstein.txt")
    book.printSchema()   # bitte nicht das Ergebnis von printSchema an den Dataframe zuweisen
    lines= book.select(split(book.value, " ").alias("line"))  # Tippfehler: spolit und lime anstatt line
    words = lines.select(explode(col("line")).alias("word"))
except AnalysisException as err:   # import
    print(err)
words.show()

root
 |-- value: string (nullable = true)

+-----+
| word|
+-----+
|hello|
|world|
| this|
| boat|
|  has|
|     |
|   no|
| oars|
+-----+



# Gruppieren
Ziel: jetzt wollen wir zaehlen, wie oft jedes Wort vorkommt
.count() alleine?

In [44]:
# ne, bringt nix, das sind ja nur "alle Worte" (Anzahl Reihen)
words_nonull.count()

77907

In [83]:
# also: erstmal Gruppieren
groups = words_nonull.groupby(col("word"))
print(groups

DataFrame[word: string, count: bigint]


In [95]:
wordCounts = groups.count()
wordCounts.printSchema()
wordCounts.show()
# Frage: kommen bei allen die Ergebnisse in der gleichen Reihenfolge heraus?

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = false)

+-----+-----+
| word|count|
+-----+-----+
|hello|    1|
| oars|    1|
| boat|    1|
|   no|    1|
|world|    2|
| this|    1|
|  has|    2|
+-----+-----+



In [None]:
# Antwort: sollte eigentlich nicht, da das Processing ueber mehrere Nodes verteilt wird...

# Aufgabe: finde die Anzahl der Worte per Anzahl Buchstaben (also: wie viele Worte mit 1, 2, 3, ... Buchstaben,

In [94]:
# Antwort
from pyspark.sql.functions import length
# words_nonull.groupBy(col("word")).count().select("count").alias("len").groupBy(col("len")).count().show()
words_nonull.select(length(col("word")).alias("length")).groupBy("length").count().orderBy(col("length").asc()).show()

+------+-----+
|length|count|
+------+-----+
|     2|    1|
|     3|    2|
|     4|    3|
|     5|    3|
+------+-----+



In [96]:
wordCount = words_nonull.select(length(col("word")).alias("length")).groupBy("length").count()
# hier 2 Alternativen des orderBy

wordCount.orderBy("count", ascending = False).show()
wordCount.orderBy(col("count").desc()).show()

+------+-----+
|length|count|
+------+-----+
|     5|    3|
|     4|    3|
|     3|    2|
|     2|    1|
+------+-----+

+------+-----+
|length|count|
+------+-----+
|     5|    3|
|     4|    3|
|     3|    2|
|     2|    1|
+------+-----+



# Ach uebrigens
was faellt Dir auf:

regexp_extract, groupBy, groupby, orderBy (probier mal orderby)

Antwort: die Cases sind all over the place. Ist leider so...

In [97]:
# was macht das hier?
wordCount.write.csv("dingdong.csv")

In [105]:
# Antwort: es legt ein Directory an! 
# was passiert wenn wir das fuer ein grosses File machen?
words_nonull.show()
words_nonull.write.csv("frankenout2.csv")
# Antwort: wenn ich mehrere Worker habe, dann gibt es (oft) mehrere Files. Diese
# kann man mit .coalesce zusammenfuegen
words_nonull.coalesce(1).write.csv("frankenCoalesce1.csv")

+--------------+
|          word|
+--------------+
|       project|
|     gutenberg|
|  frankenstein|
|            by|
|          mary|
|wollstonecraft|
|        godwin|
|       shelley|
|          this|
|         ebook|
|            is|
|           for|
|           the|
|           use|
|            of|
|        anyone|
|      anywhere|
|            at|
|            no|
|          cost|
+--------------+
only showing top 20 rows



# Aufraeumen
import as F
brackets

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


spark = SparkSession.builder.appName(
    "Counting word occurences from a book."
).getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# If you need to read multiple text files, replace `1342-0` by `*`.
results = (
    spark.read.text("frankenstein.txt")
    .select(F.split(F.col("value"), " ").alias("line"))
    .select(F.explode(F.col("line")).alias("word"))
    .select(F.lower(F.col("word")).alias("word"))
    .select(F.regexp_extract(F.col("word"), "[a-z']*", 0).alias("word"))
    .where(F.col("word") != "")
    .groupby(F.col("word"))
    .count()
)

results.orderBy("count", ascending=False).show(10)
results.coalesce(1).write.csv("./results_single_partition.csv")

# Abschluss-Aufgabe
lade einen Citybike Datensatz, erstelle ein Ranking nach Anzahl der von den jeweiligen Stationen ausgehenden Fahrten
und speichere das Ergebnis in einer Datei "abgehendeFahrten.csv"