# Example to show word count using spark SQL

Import necessary things!

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import explode, split

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/08 16:31:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Read in the data from a local source into a Spark SQL style data frame. We'll use the Oliver Twist chapter we created a while back.

In [2]:
chap1 = spark.read.text("data/chap1.txt")
chap1.show()
chap1.schema

+--------------------+
|               value|
+--------------------+
|chapter i  treats...|
+--------------------+



StructType([StructField('value', StringType(), True)])

Now we'll use `split(str, regex, limit)`: Splits str around occurrences that match regex and returns an array with a length of at most limit.
Remember we have lazy eval though!

In [3]:
split(chap1.value, " ")

Column<'split(value,  , -1)'>

With the result of that, we'll use `explode()`: Separates the elements of array expr into multiple rows, or the elements of map expr into multiple rows and columns

In [4]:
explode(split(chap1.value, " "))

Column<'explode(split(value,  , -1))'>

As the column name isn't great, let's create an alias for the column name so it is easier to use.

In [5]:
explode(split(chap1.value, " ")).alias("word")

Column<'explode(split(value,  , -1)) AS word'>

Ok, so now we have a column object that we can select from our original `chap1` data frame.  We need to use the select method.
([syntax and more info](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.select.html))

In [6]:
col = explode(split(chap1.value, " ")).alias("word")
chap1.select(col)

DataFrame[word: string]

Use the `.show()` method to actually get the data back!

In [7]:
words = chap1.select(col)
words.show()

+-------------+
|         word|
+-------------+
|      chapter|
|            i|
|             |
|       treats|
|           of|
|          the|
|        place|
|        where|
|       oliver|
|        twist|
|          was|
|         born|
|          and|
|           of|
|          the|
|circumstances|
|    attending|
|          his|
|        birth|
|             |
+-------------+
only showing top 20 rows



Ok, that was a good check to make sure we had what we wanted.  Now we want to count the number of times each word occurs.  We'll use `groupBy()` and `count()` to do so.

In [8]:
words.groupBy("word").count()

DataFrame[word: string, count: bigint]

Let's use `.show()` to execute all the steps above and get something back!

In [9]:
counts = words.groupBy("word").count()
counts.show(30)

+-------------+-----+
|         word|count|
+-------------+-----+
|         some|    2|
|          few|    1|
|         hope|    1|
|    overseers|    2|
|   surrounded|    1|
|    biography|    1|
|  perspective|    1|
|circumstances|    1|
|  articulated|    1|
|        among|    1|
|          day|    1|
|         lips|    1|
|    appendage|    1|
|       raised|    2|
|      whether|    1|
|          did|    2|
|        space|    1|
|    existence|    1|
|          two|    1|
|     instance|    1|
|    buildings|    1|
|    strangers|    1|
|     occurred|    1|
|      inmates|    1|
|      backand|    1|
|       within|    1|
|       favour|    1|
|        could|    3|
|          him|    2|
|       badged|    1|
+-------------+-----+
only showing top 30 rows



Lastly, let's sort it and show some of the results.

In [10]:
counts.sort('count', ascending = False).show()

+-----+-----+
| word|count|
+-----+-----+
|  the|   75|
|     |   40|
|  and|   35|
|   of|   35|
|    a|   33|
|   to|   27|
|   in|   22|
|  was|   17|
|   it|   13|
|  her|   13|
|  had|   12|
| have|   12|
| that|   12|
|   by|   11|
|  she|   11|
| been|   11|
|  his|   11|
|   he|   11|
|   on|   10|
|which|   10|
+-----+-----+
only showing top 20 rows



# Example Using Spark Structured Streaming

First read in functions we need.

In [11]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.types import StructType
from pyspark.sql.functions import explode, split

## Step 1: Read in the stream

In this case, set up that we are *listening* to a folder call `csv_files` for the addition of new `.csv` files. We'll set up the schema for the resulting SQL style data frame.

In [12]:
myschema = StructType().add("value", "string")
chaps = spark.readStream.schema(myschema).csv("csv_files")

## Step 2: Set up transformations/aggregations to do

We'll basically pull the exact code from above to act on our `chaps` object.

In [13]:
words = chaps.select(explode(split(chaps.value, " ")).alias("word"))
#now count the words and sort it.
wordCounts = words \
                .groupBy("word") \
                .count() \
                .sort('count', ascending = False)

In [14]:
wordCounts

DataFrame[word: string, count: bigint]

## Step 3 & 4: Set up Writing of the Query to an Output Source and `.start()` the Query

Now we'll use the `.writeStream` method to output the result of this to the console (we'll talk more about what this all means in a bit). The `.start()` method instructs the query to being looking for data.

In [15]:
query = wordCounts.writeStream.outputMode("complete").format("console").start()

24/04/08 16:31:51 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9841acd7-73e9-4ff9-b550-127028185a7e. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/08 16:31:51 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-----+
| word|count|
+-----+-----+
|  the|   75|
|     |   40|
|  and|   35|
|   of|   35|
|    a|   33|
|   to|   27|
|   in|   22|
|  was|   17|
|   it|   13|
|  her|   13|
|  had|   12|
| have|   12|
| that|   12|
|   by|   11|
|  she|   11|
| been|   11|
|  his|   11|
|   he|   11|
|   on|   10|
|which|   10|
+-----+-----+
only showing top 20 rows



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
|   the|  332|
|      |  214|
|     a|  165|
|   and|  153|
|    of|  148|
|    to|  120|
|   was|   95|
|    in|   93|
|    he|   62|
|   had|   60|
|    it|   55|
|   his|   51|
|  that|   50|
|oliver|   47|
|   for|   46|
|  with|   44|
|   him|   42|
|     i|   39|
|  said|   36|
|    at|   35|
+------+-----+
only showing top 20 rows



Great! Now that it is *listening* we can start adding files to the `csv_files` folder and see Spark update with new data!

In [16]:
#stop query
query.stop()

# Example Creating Data Using the `rate` Format

Let's just see how to generate data using the `rate` format. This is just data that can be used to play around with methods and get things working.

In [17]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
rateDF = spark.readStream.format("rate").load()

In [18]:
rateDF

DataFrame[timestamp: timestamp, value: bigint]

In [19]:
writeDF = rateDF.writeStream.outputMode("append").format("console").start()

24/04/08 16:33:11 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-51ab7639-d0ec-4dbd-b782-aa3dd79a8992. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/08 16:33:11 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-----+
|           timestamp|value|
+--------------------+-----+
|2024-04-08 16:33:...|    0|
+--------------------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-----+
|           timestamp|value|
+--------------------+-----+
|2024-04-08 16:33:...|    1|
+--------------------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+-----+
|           timestamp|value|
+--------------------+-----+
|2024-04-08 16:33:...|    2|
+--------------------+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+---

Now *usually* we can stop the query.

In [20]:
writeDF.stop()

Great, we can easily add some transformations in that would allow us to check functionality.

In [21]:
from pyspark.sql.functions import col, pow
manipDF = rateDF.withColumn("squared_value", pow(col("value"), 2))
writeDF = manipDF.writeStream.outputMode("append").format("console").start()

24/04/08 16:33:23 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a48c538d-fb71-46c0-a22c-ba52cbfc11d7. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/08 16:33:23 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+-------------+
|timestamp|value|squared_value|
+---------+-----+-------------+
+---------+-----+-------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+-----+-------------+
|           timestamp|value|squared_value|
+--------------------+-----+-------------+
|2024-04-08 16:33:...|    0|          0.0|
+--------------------+-----+-------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+-----+-------------+
|           timestamp|value|squared_value|
+--------------------+-----+-------------+
|2024-04-08 16:33:...|    1|          1.0|
+--------------------+-----+-------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+-----+-------------+
|       

In [22]:
writeDF.stop()

# Writing Stream Example `memory` Format

We can write a table to memory and then access it via SQL style commands.

In [27]:
writeTable = manipDF.writeStream \
                    .format("memory") \
                    .queryName("my_table") \
                    .start()

24/04/08 16:33:57 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2c5ba013-e1a4-4d20-8014-f5372ee2628d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/08 16:33:57 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Now there is a table that exists for us called `my_table`. We can use `spark.sql()` to access it.

In [34]:
spark.sql("select * from my_table").show()  

+--------------------+-----+-------------+
|           timestamp|value|squared_value|
+--------------------+-----+-------------+
|2024-04-08 16:33:...|    0|          0.0|
|2024-04-08 16:33:...|    1|          1.0|
|2024-04-08 16:33:...|    2|          4.0|
|2024-04-08 16:34:...|    3|          9.0|
|2024-04-08 16:34:...|    4|         16.0|
|2024-04-08 16:34:...|    5|         25.0|
+--------------------+-----+-------------+



In [35]:
writeTable.stop()

The table still exists after we stop the query.

In [36]:
spark.sql("select * from my_table").show()  

+--------------------+-----+-------------+
|           timestamp|value|squared_value|
+--------------------+-----+-------------+
|2024-04-08 16:33:...|    0|          0.0|
|2024-04-08 16:33:...|    1|          1.0|
|2024-04-08 16:33:...|    2|          4.0|
|2024-04-08 16:34:...|    3|          9.0|
|2024-04-08 16:34:...|    4|         16.0|
|2024-04-08 16:34:...|    5|         25.0|
+--------------------+-----+-------------+



# Aggregating Over Windows

Let's do a more complicated example where we were are reading in csv files from a folder and doing aggregations over time windows. This implies that our data will have some timestamps associated with it!

First we'll need some data to stream. We'll read in the `neuralgia.csv` file we've seen previously in the course. Then we'll add a time stamp and output parts of the data to `.csv` files that will be read in via a stream.

**We don't want to submit this code right now! We'll submit this in a python console in another window to simulate the idea of getting new data in.**

In [None]:
#Read in some data to sample from
import pandas as pd
neuralgia = pd.read_csv("data/neuralgia.csv")

#Now a for loop to sample a few rows and output them to a data set
#Put a pause in as well
import numpy as np
import time

for i in range(0,10):
    #randomly sample a few rows
    temp = neuralgia.loc[np.random.randint(neuralgia.shape[0], size = 5)]
    temp["timestamp"] = [time.strftime("%H:%M:%S", time.localtime())]*5
    temp.to_csv("csv_neuralgia/neuralgia" + str(i) + ".csv", index = False, header = False)
    time.sleep(20)

Ok, when we get files in the `csv_neuralgia` directory we want to read those in. Let's set up the schema for that and read in the files as as stream.

In [37]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.types import StructType

#set up schema
myschema = StructType().add("Treatment", "string").add("Sex", "string").add("Age", "integer").add("Duration", "integer").add("Pain", "string").add("timestamp", "timestamp")
#set up the stream
df = spark.readStream.schema(myschema).csv("csv_neuralgia")

We now want to do some aggregations on the data. Specifically, we'll group the data by the "Sex" column and find the average "Duration" of pain. To start with, let's just output this to the console to see if it is working **(no windowing yet)**!

In [38]:
#do our aggregations
agg = df.groupBy("Sex").avg("Duration")
#write out the results to the console to start with
myquery = agg.writeStream.outputMode("complete").format("console").start()

24/04/08 16:34:58 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-af09457d-adcb-4797-9051-7d0c609dfe33. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/08 16:34:58 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---+-------------+
|Sex|avg(Duration)|
+---+-------------+
|  F|         27.0|
|  M|         10.0|
+---+-------------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+---+-------------+
|Sex|avg(Duration)|
+---+-------------+
|  F|         19.5|
|  M|       10.375|
+---+-------------+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+---+-------------+
|Sex|avg(Duration)|
+---+-------------+
|  F|         14.0|
|  M|       10.375|
+---+-------------+



Now that this is running we can submit the first chunk of code to a console and see it update in the console!

In [39]:
myquery.stop()

We want to do the same thing as above but find aggregations over windows of time rather than overall. We'll use the `window()` function with the `timestamp` column to accomplish this.

In [40]:
#Now including windowing
from pyspark.sql.functions import window
#'listen' on the same folder
df = spark.readStream.schema(myschema).csv("csv_neuralgia")
#do our aggregations with a window of 30 seconds, no overlap.
agg = df.groupBy(
            window("timestamp", "30 seconds", "30 seconds"),
            "Sex") \
        .avg("Duration")
#write it!
windowQuery = agg.writeStream.outputMode("complete").format("console").start()

24/04/08 16:37:01 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-baf75c09-c3c2-417a-b982-1e171b694723. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/08 16:37:01 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---+------------------+
|              window|Sex|     avg(Duration)|
+--------------------+---+------------------+
|{2024-04-08 16:37...|  M|              20.0|
|{2024-04-08 16:37...|  F|27.666666666666668|
+--------------------+---+------------------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+---+------------------+
|              window|Sex|     avg(Duration)|
+--------------------+---+------------------+
|{2024-04-08 16:37...|  F|               9.0|
|{2024-04-08 16:37...|  M|              20.0|
|{2024-04-08 16:37...|  F|27.666666666666668|
+--------------------+---+------------------+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+---+------------------+
|              window|Sex|     avg(Duration)|
+--------------------+---+------------------+
|{2024-04-08 16:37...|  F|               9.0|
|{2024-04-08 16:38...|  M|              12.0|
|{2024-04-08 16:37...|  M|              20.0|
|{2024-04-08 16:38...|  F| 4.666666666666667|
|{2024-04-08 16:37...|  F|27.666666666666668|
+--------------------+---+------------------+



Now that this is running we can submit the first chunk of code to a console and see it update in the console!

In [41]:
windowQuery.stop()

We are ready to include a watermark as well. We do so with the `.withWatermark()` method! This will allow us to output to a file (like .csv) when doing aggregations.

In [42]:
#'listen' on the same folder
df = spark.readStream.schema(myschema).csv("csv_neuralgia")
#do our aggregations with a window of 30 seconds, no overlap. Provide a 15 second watermark
agg = df \
        .withWatermark("timestamp", "2 seconds") \
        .groupBy(
            window("timestamp", "30 seconds", "30 seconds"),
            "Sex") \
        .avg("Duration")
#now the window column isn't a type that can be sent to a csv...
#instead cast it to a string and we'll be all set
agg2 = agg \
        .select(col("Sex"), 
                col("avg(Duration)").alias("Avg_Duration"), 
                col("Window").cast("string")
               )
#write it!
windowWaterQuery = agg2 \
                    .writeStream \
                    .outputMode("append") \
                    .option("checkpointlocation", "check") \
                    .format("csv") \
                    .option("path", "output_csv/") \
                    .start()

24/04/08 16:38:48 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

We're ready to submit the code that creates the csv files now. After that runs for a minute, let's stop the query!

In [43]:
windowWaterQuery.stop()

The output csv is actually stored in pieces over my local cluster. We can `coalesce` that into a single file with the code below.

In [44]:
allsafiles = spark.read.option('header', 'false').csv("output_csv/part-*.csv")
allsafiles.coalesce(1).write.format("csv").option("header","false").save("output_csv/final")

# Streaming Joins Example

First let's do our set up and read in a static spark SQL data frame representing ad *impressions* data.

In [51]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.types import StructType

adImpressionSchema = StructType().add('adId', "integer").add('impressionTime', 'timestamp')
#First read in a static dataframe
staticDF = spark.read.csv('data/impressions.csv', schema = adImpressionSchema, header = True)
staticDF.show(10)

+----+--------------------+
|adId|      impressionTime|
+----+--------------------+
|   0|2022-01-01 00:02:...|
|   1|2022-01-01 00:05:...|
|   2|2022-01-01 00:12:...|
|   3|2022-01-01 00:13:...|
|   4|2022-01-01 00:14:...|
|   5|2022-01-01 00:19:...|
|   6|2022-01-01 00:21:...|
|   7|2022-01-01 00:21:...|
|   8|2022-01-01 00:27:...|
|   9|2022-01-01 00:28:...|
+----+--------------------+
only showing top 10 rows



Great, we will now place some *click* data into a folder that is being monitored. This will be joined with the ad data on the `adId` column. (We'll just drop a couple files in the folder being monitored and see that it is working.)

In [52]:
#Now set up the stream to take in data from the .csv
adClickSchema = StructType().add('adId', "integer").add('clickTime', 'timestamp')
df = spark.readStream.schema(adClickSchema).csv("click_csv_files")

#Set up the join and start the query!
joinquery = staticDF \
                .join(df, "adId", "inner") \
                .writeStream.outputMode("append") \
                .format("console") \
                .start()

24/04/08 16:51:14 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a547b519-af9a-44e3-a451-83db8dfdb0a9. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/04/08 16:51:14 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+----+--------------------+--------------------+
|adId|      impressionTime|           clickTime|
+----+--------------------+--------------------+
|  35|2022-01-01 01:24:...|2022-01-01 01:30:...|
|  38|2022-01-01 01:33:...|2022-01-01 01:37:...|
|  58|2022-01-01 02:30:...|2022-01-01 02:42:...|
|  90|2022-01-01 04:02:...|2022-01-01 04:10:...|
|  94|2022-01-01 04:11:...|2022-01-01 04:13:...|
| 117|2022-01-01 05:26:...|2022-01-01 05:29:...|
| 123|2022-01-01 05:51:...|2022-01-01 06:03:...|
| 127|2022-01-01 06:04:...|2022-01-01 06:16:...|
| 140|2022-01-01 06:41:...|2022-01-01 06:45:...|
| 170|2022-01-01 07:56:...|2022-01-01 08:07:...|
| 190|2022-01-01 08:57:...|2022-01-01 09:05:...|
| 197|2022-01-01 09:25:...|2022-01-01 09:34:...|
| 246|2022-01-01 12:13:...|2022-01-01 12:20:...|
| 251|2022-01-01 12:17:...|2022-01-01 12:26:...|
| 252|2022-01-01 12:18:...|2022-01-01 12:18:...|
| 260|2022-01-01 12:3

In [53]:
joinquery.stop()