<a href="https://colab.research.google.com/github/ChristianKitte/SparkProjekt/blob/main/notebook/Wordcount_mit_Spark_DataFrame.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Vorbereitung des Notebooks
Vor der eigentlichen Umsetzung der Aufgabe muss das Notebook vorbereitet werden. ***Python*** und die ***wichtigsten Bibliotheken sind bereits Bestandteil*** der
von Colab bereit gestellten Installation. 

Da Spark mit Java programmiert wurde, wird als erstes ***Java installiert***. In dieser Anwendung wird das Open JDK in der Version 8 verwendet. Die Installation
erfolgt direkt auf der Linux Ebene.

Nachdem Java installiert ist, kann Spark selbst ***heruntergeladen sowie alle Systemvariablen gesetzt werden***. Auch hier wird mit Linuxbefehlen operiert. In dieser Anwendung wird die aktuell neueste Version 3.2 verwendet.

In [None]:
# Installation  von Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

print("Java ist installiert...")

# Download und Entpacken von Spark (Versionsnummer anpassen!)
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz

print("Spark ist verfügbar...")

# Setzen der Systemvariablen für Java und Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

print("Umgebungsvariablen sind gesetzt...")

Java ist installiert...
Spark ist verfügbar...
Umgebungsvariablen sind gesetzt...


In [None]:
# Installation von findspark und pyspark

!pip install findspark
print("FindSpark wurde installiert...")

!pip install pyspark
print("PySpark wurde installiert...")

FindSpark wurde installiert...
PySpark wurde installiert...


In [None]:
# Initialisieren von findspark

try: 
  import findspark
    
  findspark.init()
  
  print("FindSpark wurde initialisiert")
except ImportError: 
  raise ImportError("Fehler bei der Initialiserung von FindSpark")

FindSpark wurde initialisiert


# Einlesen und Vorbereiten der Textdatei

Im ersten Abschnitt werden zunächst zwei Methoden definiert.

* Der ersten Methode ***get_file_from_url*** werden als Parameter eine ***URL*** sowie ein ***Speicherort*** angegeben. Bei Ihrem Aufruf lädt die Methode eine Datei von der angegebenen URL herunter und speichert sie in Google Drive ab.

* Die zweite Methode ***cut_file*** nimmt als Parameter einen ***numerischen Start- und Endwert*** sowie die Angabe einer ***Quell- und Zieldatei*** entgegen. Bei Ihrem Aufruf entfernt die Methode alle Zeilen vor bzw. nach den durch Start- und Endwert definierten Zeilenbereich aus der Quelldatei und speichert das Ergebnis in die Zieldatei.

In dem folgenden Block wird dann im Anschluss die Datei mit den gesammelten Werken von Shakespeare von der Seite des MIT ***heruntergeladen*** sowie ***von nicht benötigten Zeilen bereinigt*** und in einer ***neuen Datei*** gespeichert.

In [None]:
# Erstellen einer Methode, um Dateien aus dem Internet zu laden und zu speichern

import requests 

def get_file_from_url(file_url, place_to_save):
  try:
    req = requests.get(file_url, stream = True) 

    with open(place_to_save, "wb") as file: 
	    for block in req.iter_content(chunk_size = 1024): 
		    if block: 
			    file.write(block) 
     
    print("Die Datei wurde herunter geladen und angelegt: {}".format(file_url))
  
  except ValueError:
    print("Fehler {}".format(ValueError))   

print("Die Funktion get_file_from_url wurde angelegt...")


Die Funktion get_file_from_url wurde angelegt...


In [None]:
# Erstellen einer Methode, um eine Textdatei am Anfang und am Ende um die jeweils
# angegebene Zahl an Reihen zu beschneiden.

def cut_file(anfang, ende, quelldatei, zieldatei):
  try:
    with open(quelldatei, "r") as source:
      lines = source.readlines()
    
    source.close()

    print("")
    print("Start: {}".format(anfang))
    print("Ende: {}".format(ende))
    print("")

    current_count = 0
  
    with open(zieldatei, "w") as target:
      for line in lines:
        if current_count >= anfang and current_count <= ende:
          target.write(line)

        current_count = current_count + 1   
    
    target.close()

    print("Datei wurde beschnitten...")

  except ValueError:
    print("Fehler {}".format(ValueError))

print("Die Funktion cut_file wurde angelegt...")

Die Funktion cut_file wurde angelegt...


In [None]:
# Datei von der Quelle nach Colab laden

file_url = "https://ocw.mit.edu/ans7870/6/6.006/s08/lecturenotes/files/t8.shakespeare.txt"
place_to_save = "/content/shakespeare.txt"

get_file_from_url(file_url, place_to_save)

print("")
print("Datei wurde vorbereitet...")

Die Datei wurde herunter geladen und angelegt: https://ocw.mit.edu/ans7870/6/6.006/s08/lecturenotes/files/t8.shakespeare.txt

Datei wurde vorbereitet...


In [None]:
# Unnötige Zeilen am Ende und am Start entfernen

file_source = "/content/shakespeare.txt"
file_target = "/content/shakespeare_neu.txt"

cut_file(244,124438,file_source, file_target)

print("")
print("Die Arbeitsdatei ist vorbereitet...")


Start: 244
Ende: 124438

Datei wurde beschnitten...

Die Arbeitsdatei ist vorbereitet...


# Auszählen der Wörter

Anschließend wird die Textdatei eingelesen. Die Funktion ***Session.read.text*** liest eine Textdatei ein und gibt
ein ***direkt nutzbares typisiertes Spark DataFrame zurück***. Über ein Reflexion-Prozess kann das ***Schema der enthaltenen Daten*** - in diesem Fall ein nullable String – ***erkannt und ausgegeben*** werden. 

Im Weiteren finden zunächst eine Reihe von Ersetzungen (***replace***), dann eine Konvertierung in Kleinbuchstaben (***lower***) 
und am Schluss eine Filterung (***filter***) auf leere Zeilen statt. Hierbei wird jedes Mal ein ***neues DataFrame*** 
zurückgegeben. Da Spark DataFrames auf RDDs basieren, sind auch sie ***immutable***. Die Methode ***withColumn*** bewirkt, dass 
die ***übergebene Funktion*** ähnlich der Map Funktion ***auf alle Datensätze angewendet wird***.

Der letzte Schritt erinnert stark an ein SQL Konstrukt. Zunächst wird jede Zeile durch ihre ***Leerzeichen gesplittet***. 
Die Funktion ***explode*** sorgt dafür, dass das so entstandene ***Array mit n Spalten*** als ein ***Array mit einer Spalte (value2) 
und n Reihen*** zurückgegeben wird.

Die Funktion ***groupBy*** gruppiert die in ***value2*** enthaltenen Werte (***Wörter***) und ***Count*** aggregiert die Anzahl der einzelnen 
Vorkommen. Abgeschlossen wir die Anweisung mit einem ***sort*** und der Ausgabe der sortierten Liste. Die Nutzung der 
Fluent API macht den Code hierbei gut lesbar. Ebenso fällt die Ähnlichkeit zu Panda DataFrames auf.

In [None]:
# Erzeugen einer Spark Session

from pyspark.sql import SparkSession
session = SparkSession.builder.appName("Wordcount").getOrCreate()

print("Die Spark Session wurde angelegt...")

Die Spark Session wurde angelegt...


In [None]:
# Auszählen der Wörter

import pyspark.sql.functions as func

df = session.read.text(file_target)

print("")
print("Schema der eingelesenen Datei:")
df.printSchema()
print("")

top_out = 30
top_length = 30

print("")
print("Ausgabe der ersten {} Zeilen des Textes".format(top_out))
print("")

df.show(n=top_out,truncate=False)

df=df.withColumn('value', func.translate('value', ',', ' '))
df=df.withColumn('value', func.translate('value', '.', ' '))
df=df.withColumn('value', func.translate('value', '-', ' '))
df=df.withColumn('value', func.lower('value'))

print("")
print("Ausgabe der {} größten Vorkommen".format(top_length))
print("")

df=df.withColumn('value2',func.explode(func.split(func.col('value'), ' ')))\
  .groupBy('value2')\
  .count()\
  .sort('count', ascending=False)\
  .show(n=top_length,truncate=False)


Schema der eingelesenen Datei:
root
 |-- value: string (nullable = true)



Ausgabe der ersten 30 Zeilen des Textes

+-------------------------------------------------------+
|value                                                  |
+-------------------------------------------------------+
|1609                                                   |
|                                                       |
|THE SONNETS                                            |
|                                                       |
|by William Shakespeare                                 |
|                                                       |
|                                                       |
|                                                       |
|                     1                                 |
|  From fairest creatures we desire increase,           |
|  That thereby beauty's rose might never die,          |
|  But as the riper should by time decease,             |
|  His tende