In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("Datenbanken mit Spark") \
      .getOrCreate() # getOrCreate liefert existierende Session wenn es schon eine gibt

In [2]:
textFile = spark.read.text("frankenstein.txt")

In [5]:
print(textFile)

DataFrame[value: string]


In [7]:
textFile.printSchema()

root
 |-- value: string (nullable = true)



In [8]:
print(textFile.dtypes)

[('value', 'string')]


In [12]:
print(textFile.dtypes[0][1])

string


In [15]:
textFile.show(30, truncate=False)

+-------------------------------------------------------------------------+
|value                                                                    |
+-------------------------------------------------------------------------+
|                                                                         |
|Project Gutenberg's Frankenstein, by Mary Wollstonecraft (Godwin) Shelley|
|                                                                         |
|This eBook is for the use of anyone anywhere at no cost and with         |
|almost no restrictions whatsoever.  You may copy it, give it away or     |
|re-use it under the terms of the Project Gutenberg License included      |
|with this eBook or online at www.gutenberg.net                           |
|                                                                         |
|                                                                         |
|Title: Frankenstein                                                      |
|       or T

In [18]:
from pyspark.sql.functions import split

In [30]:
lines = textFile.select(split(textFile.value, "[^a-zA-Z]").alias("Zeile"))
lines.show(10,truncate=100)

+--------------------------------------------------------------------------------------+
|                                                                                 Zeile|
+--------------------------------------------------------------------------------------+
|                                                                                    []|
|[Project, Gutenberg, s, Frankenstein, , by, Mary, Wollstonecraft, , Godwin, , Shelley]|
|                                                                                    []|
|       [This, eBook, is, for, the, use, of, anyone, anywhere, at, no, cost, and, with]|
|  [almost, no, restrictions, whatsoever, , , You, may, copy, it, , give, it, away, or]|
|      [re, use, it, under, the, terms, of, the, Project, Gutenberg, License, included]|
|                              [with, this, eBook, or, online, at, www, gutenberg, net]|
|                                                                                    []|
|                    

In [23]:
lines.printSchema()

root
 |-- Zeile: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [28]:
lines.select(lines.Zeile).show()
lines.select("Zeile").show()

+--------------------+
|               Zeile|
+--------------------+
|                  []|
|[Project, Gutenbe...|
|                  []|
|[This, eBook, is,...|
|[almost, no, rest...|
|[re, use, it, und...|
|[with, this, eBoo...|
|                  []|
|                  []|
|[Title, , Franken...|
|[, , , , , , , or...|
|                  []|
|[Author, , Mary, ...|
|                  []|
|[Release, Date, ,...|
|[Last, updated, ,...|
|                  []|
|[Language, , Engl...|
|                  []|
|[Character, set, ...|
+--------------------+
only showing top 20 rows

+--------------------+
|               Zeile|
+--------------------+
|                  []|
|[Project, Gutenbe...|
|                  []|
|[This, eBook, is,...|
|[almost, no, rest...|
|[re, use, it, und...|
|[with, this, eBoo...|
|                  []|
|                  []|
|[Title, , Franken...|
|[, , , , , , , or...|
|                  []|
|[Author, , Mary, ...|
|                  []|
|[Release, Date, ,...|
|[Last, 

In [35]:
from pyspark.sql.functions import explode, col

words = lines.select(explode(col("Zeile")).alias("word"))
words.show(15)

+--------------+
|          word|
+--------------+
|              |
|       Project|
|     Gutenberg|
|             s|
|  Frankenstein|
|              |
|            by|
|          Mary|
|Wollstonecraft|
|              |
|        Godwin|
|              |
|       Shelley|
|              |
|          This|
+--------------+
only showing top 15 rows



In [36]:
from pyspark.sql.functions import lower
words_lower = words.select(lower(col("word")).alias("word_lower"))

In [37]:
words_lower.show(20, truncate=False)

+--------------+
|word_lower    |
+--------------+
|              |
|project       |
|gutenberg     |
|s             |
|frankenstein  |
|              |
|by            |
|mary          |
|wollstonecraft|
|              |
|godwin        |
|              |
|shelley       |
|              |
|this          |
|ebook         |
|is            |
|for           |
|the           |
|use           |
+--------------+
only showing top 20 rows



In [38]:
from pyspark.sql.functions import regexp_extract
words_clean = words_lower.select(regexp_extract(col("word_lower"), "[a-z]{2,}|a|i", 0).alias("echtesWort"))
words_clean.show(20, truncate=False)

+--------------+
|echtesWort    |
+--------------+
|              |
|project       |
|gutenberg     |
|              |
|frankenstein  |
|              |
|by            |
|mary          |
|wollstonecraft|
|              |
|godwin        |
|              |
|shelley       |
|              |
|this          |
|ebook         |
|is            |
|for           |
|the           |
|use           |
+--------------+
only showing top 20 rows



In [39]:
proper_words = words_clean.filter(col("echtesWort") != "")
proper_words.show()

+--------------+
|    echtesWort|
+--------------+
|       project|
|     gutenberg|
|  frankenstein|
|            by|
|          mary|
|wollstonecraft|
|        godwin|
|       shelley|
|          this|
|         ebook|
|            is|
|           for|
|           the|
|           use|
|            of|
|        anyone|
|      anywhere|
|            at|
|            no|
|          cost|
+--------------+
only showing top 20 rows



In [40]:
words_clean.where(col("echtesWort") != "").show()

+--------------+
|    echtesWort|
+--------------+
|       project|
|     gutenberg|
|  frankenstein|
|            by|
|          mary|
|wollstonecraft|
|        godwin|
|       shelley|
|          this|
|         ebook|
|            is|
|           for|
|           the|
|           use|
|            of|
|        anyone|
|      anywhere|
|            at|
|            no|
|          cost|
+--------------+
only showing top 20 rows



In [50]:
proper_words.write.coalesce(1).option("header", True).option("delimiter", ";").csv("new")

In [54]:
noIs = proper_words.filter(col("echtesWort") != "is").alias("noIs")
noIs.show()
minThree = proper_words.select(regexp_extract(col("echtesWort"), "[a-z]{3,}", 0).alias("minThree"))
minThree.show()

+--------------+
|    echtesWort|
+--------------+
|       project|
|     gutenberg|
|  frankenstein|
|            by|
|          mary|
|wollstonecraft|
|        godwin|
|       shelley|
|          this|
|         ebook|
|           for|
|           the|
|           use|
|            of|
|        anyone|
|      anywhere|
|            at|
|            no|
|          cost|
|           and|
+--------------+
only showing top 20 rows

+--------------+
|      minThree|
+--------------+
|       project|
|     gutenberg|
|  frankenstein|
|              |
|          mary|
|wollstonecraft|
|        godwin|
|       shelley|
|          this|
|         ebook|
|              |
|           for|
|           the|
|           use|
|              |
|        anyone|
|      anywhere|
|              |
|              |
|          cost|
+--------------+
only showing top 20 rows



In [56]:
datenA2 = spark.createDataFrame([["test", "noch ein Test", 10_000_000_000]], ["1", "2", "3"])
datenA2.printSchema()

root
 |-- 1: string (nullable = true)
 |-- 2: string (nullable = true)
 |-- 3: long (nullable = true)



In [57]:
cnt = 0
for x,y in datenA2.dtypes:
    if y != "string":
        cnt += 1
print(f'cnt = {cnt}')

cnt = 1


In [61]:
from pyspark.sql.functions import length
datenA3 = spark.read.text("frankenstein.txt").select(length(col("value")).alias("numChar"))

In [62]:
datenA3.show()

+-------+
|numChar|
+-------+
|      0|
|     73|
|      0|
|     64|
|     68|
|     67|
|     46|
|      0|
|      0|
|     19|
|     31|
|      0|
|     44|
|      0|
|     39|
|     30|
|      0|
|     17|
|      0|
|     29|
+-------+
only showing top 20 rows



In [66]:
from pyspark.sql.functions import greatest
from pyspark.sql.utils import AnalysisException
datenA4 = spark.createDataFrame([["key", 20_000_000, 10_000_000_000]],["key", "value1", "value2"])
try:
    datenA4M = datenA4.select(col("key"), greatest(col("value1"), col("value2")).alias("maxVal")).select("key","maxVal")
except AnalysisException as err:
    print(f'das war nicht gut: {err}')
datenA4M.show()

+---+-----------+
|key|     maxVal|
+---+-----------+
|key|10000000000|
+---+-----------+



In [84]:
wordFilter = ["is", "not", "if", "the"]
wordIsIn = proper_words.filter(~col("echtesWort").isin(wordFilter))
wordIsIn.show()

+--------------+
|    echtesWort|
+--------------+
|       project|
|     gutenberg|
|  frankenstein|
|            by|
|          mary|
|wollstonecraft|
|        godwin|
|       shelley|
|          this|
|         ebook|
|           for|
|           use|
|            of|
|        anyone|
|      anywhere|
|            at|
|            no|
|          cost|
|           and|
|          with|
+--------------+
only showing top 20 rows



In [97]:
from pyspark.sql.functions import col, split
from pyspark.sql.utils import AnalysisException

try:

    book = spark.read.text("frankenstein.txt")

    book.printSchema()

    lines = book.select(split(book.value, " ").alias("line"))

    words = lines.select(explode(col("line")).alias("word"))

except AnalysisException as err:

    print(err)

words.show()

root
 |-- value: string (nullable = true)

+--------------+
|          word|
+--------------+
|              |
|       Project|
|   Gutenberg's|
| Frankenstein,|
|            by|
|          Mary|
|Wollstonecraft|
|      (Godwin)|
|       Shelley|
|              |
|          This|
|         eBook|
|            is|
|           for|
|           the|
|           use|
|            of|
|        anyone|
|      anywhere|
|            at|
+--------------+
only showing top 20 rows



In [98]:
proper_words.count()

78236

In [99]:
groups = proper_words.groupby(col("echtesWort"))
print(groups)

<pyspark.sql.group.GroupedData object at 0x7f13636b9690>


In [107]:
from pyspark.sql.functions import desc,asc
wordCount = groups.count().sort(desc("count"))
wordCount.show()

+----------+-----+
|echtesWort|count|
+----------+-----+
|       the| 4371|
|       and| 3046|
|         i| 2850|
|        of| 2760|
|        to| 2174|
|        my| 1776|
|         a| 1449|
|        in| 1186|
|      that| 1033|
|       was| 1022|
|        me|  868|
|      with|  714|
|       but|  692|
|       had|  686|
|       you|  644|
|        he|  611|
|     which|  565|
|        it|  562|
|        as|  539|
|       his|  535|
+----------+-----+
only showing top 20 rows



In [123]:
from pyspark.sql.functions import length, asc
wordLen = proper_words.select(length(col("echtesWort")).alias("length")).groupBy("length").count().sort(asc("length"))

In [125]:
wordLen.coalesce(1).write.csv("test")

In [156]:
import pyspark.sql.functions as F
result = (
    spark.read.option("header", True).csv("202209-citibike-tripdata.csv")
    .groupby(F.col("start_station_name"))
    .count()
    .withColumnRenamed("count", "Rating")
    .sort(F.desc("Rating"))
    .show(truncate=False)
)

+-----------------------+------+
|start_station_name     |Rating|
+-----------------------+------+
|West St & Chambers St  |15284 |
|W 21 St & 6 Ave        |14939 |
|Broadway & W 58 St     |14262 |
|Central Park S & 6 Ave |12674 |
|6 Ave & W 33 St        |12176 |
|Broadway & W 25 St     |11865 |
|West St & Liberty St   |11355 |
|1 Ave & E 68 St        |11344 |
|University Pl & E 14 St|11245 |
|Lafayette St & E 8 St  |11196 |
|12 Ave & W 40 St       |10914 |
|Broadway & W 29 St     |10448 |
|8 Ave & W 16 St        |10378 |
|W 31 St & 7 Ave        |10274 |
|E 33 St & 1 Ave        |10179 |
|6 Ave & W 34 St        |10165 |
|E 17 St & Broadway     |9979  |
|Broadway & E 21 St     |9965  |
|W 22 St & 10 Ave       |9928  |
|Broadway & E 14 St     |9873  |
+-----------------------+------+
only showing top 20 rows

