# Module 5

## Structured Streaming using the Python DataFrames API

In [0]:
%fs ls /databricks-datasets/structured-streaming/events/

path,name,size
dbfs:/databricks-datasets/structured-streaming/events/file-0.json,file-0.json,72530
dbfs:/databricks-datasets/structured-streaming/events/file-1.json,file-1.json,72961
dbfs:/databricks-datasets/structured-streaming/events/file-10.json,file-10.json,73025
dbfs:/databricks-datasets/structured-streaming/events/file-11.json,file-11.json,72999
dbfs:/databricks-datasets/structured-streaming/events/file-12.json,file-12.json,72987
dbfs:/databricks-datasets/structured-streaming/events/file-13.json,file-13.json,73006
dbfs:/databricks-datasets/structured-streaming/events/file-14.json,file-14.json,73003
dbfs:/databricks-datasets/structured-streaming/events/file-15.json,file-15.json,73007
dbfs:/databricks-datasets/structured-streaming/events/file-16.json,file-16.json,72978
dbfs:/databricks-datasets/structured-streaming/events/file-17.json,file-17.json,73008


In [0]:
%fs head /databricks-datasets/structured-streaming/events/file-0.json

In [0]:
from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

# Since we know the data format already, let's define the schema to speed up processing (no need for Spark to infer schema)
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

# Static DataFrame representing data in the JSON files
staticInputDF = (
  spark
    .read
    .schema(jsonSchema)
    .json(inputPath)
)

display(staticInputDF)

time,action
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:28.000+0000,Close
2016-07-28T04:19:29.000+0000,Open
2016-07-28T04:19:31.000+0000,Close
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:31.000+0000,Open
2016-07-28T04:19:32.000+0000,Close
2016-07-28T04:19:33.000+0000,Close
2016-07-28T04:19:35.000+0000,Close
2016-07-28T04:19:36.000+0000,Open


In [0]:
from pyspark.sql.functions import *      # for window() function

staticCountsDF = (
  staticInputDF
    .groupBy(
       staticInputDF.action, 
       window(staticInputDF.time, "1 hour"))    
    .count()
)
staticCountsDF.cache()

# Register the DataFrame as table 'static_counts'
staticCountsDF.createOrReplaceTempView("static_counts")

In [0]:
%sql select action, sum(count) as total_count from static_counts group by action

action,total_count
Open,50000
Close,50000


### Stream Processing
Now that we have analyzed the data interactively, let's convert this to a streaming query that continuously updates as data comes. Since we just have a static set of files, we are going to emulate a stream from them by reading one file at a time, in the chronological order they were created. The query we have to write is pretty much the same as the interactive query above.

In [0]:
from pyspark.sql.functions import *

# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(jsonSchema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

# Same query as staticInputDF
streamingCountsDF = (                 
  streamingInputDF
    .groupBy(
      streamingInputDF.action, 
      window(streamingInputDF.time, "1 hour"))
    .count()
)

# Is this DF actually a streaming DF?
streamingCountsDF.isStreaming

As you can see, streamingCountsDF is a streaming Dataframe (streamingCountsDF.isStreaming was true). You can start streaming computation, by defining the sink and starting it. In our case, we want to interactively query the counts (same queries as above), so we will set the complete set of 1 hour counts to be in a in-memory table.

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "2")  # keep the size of shuffles small

query = (
  streamingCountsDF
    .writeStream
    .format("memory")        # memory = store in-memory table 
    .queryName("counts")     # counts = name of the in-memory table
    .outputMode("complete")  # complete = all the counts should be in the table
    .start()
)

query is a handle to the streaming query that is running in the background. This query is continuously picking up files and updating the windowed counts.

Note the status of query in the above cell. The progress bar shows that the query is active. Furthermore, if you expand the > counts above, you will find the number of files they have already processed.

Let's wait a bit for a few files to be processed and then interactively query the in-memory counts table.

In [0]:
from time import sleep
sleep(5)  # wait a bit for computation to start

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


In [0]:
%sql select action, sum(count) as total_count from counts group by action order by action

action,total_count
Close,50000
Open,50000


In [0]:
#stop query
query.stop()

## Structured Streaming using Delta Lake using Scala

In [0]:
%scala

// Define the input and output formats and paths and the table name.
val read_format = "delta"
val write_format = "delta"
val load_path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"
val save_path = "/tmp/delta/people-10m"
val table_name = "default.people10m"

// Load the data from its source.
val people = spark
  .read
  .format(read_format)
  .load(load_path)

// Write the data to its target.
people.write
  .format(write_format)
  .save(save_path)

// Create the table.
spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")

In [0]:
%scala

val tableName = "people10m"
val sourceType = "delta"
val loadPath = "/databricks-datasets/learning-spark-v2/people/people-10m.delta"

val people = spark
  .read
  .format(sourceType)
  .load(loadPath)

people.write
  .format(sourceType)
  .saveAsTable(tableName)

display(spark.sql("SELECT * FROM " + tableName))

In [0]:
%scala
display(spark.sql("DESCRIBE DETAIL people10m"))

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion
delta,3aadc7bf-6cc2-48c1-80b6-bf90e93ca238,default.people10m,,dbfs:/tmp/delta/people-10m,2022-03-06T15:13:26.708+0000,2022-03-06T15:13:47.000+0000,List(),8,221246204,Map(),1,2


In [0]:

%scala
//start ingesting into Delta tables
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

loan_id,funded_amnt,paid_amnt,addr_state
0,1000,182.22,CA
1,1000,361.19,WA
2,1000,176.26,TX
3,1000,1000.0,OK
4,1000,249.98,PA
5,1000,408.6,CA
6,1000,1000.0,MD
7,1000,168.81,OH
8,1000,193.64,TX
9,1000,218.83,CT


In [0]:
%scala

spark.sql("DROP TABLE " + table_name)

## Using  Auto Loader

Auto Loader incrementally and efficiently processes new data files as they arrive in cloud storage. 
Auto Loader can load data files from:
1. Azure Data Lake Storage Gen2 (ADLS Gen2, abfss://), 
2. Azure Blob Storage (wasbs://), 
3. ADLS Gen1 (adl://) and 
4. Databricks File System (DBFS, dbfs:/). 

Auto Loader can ingest _JSON_, _CSV_, _PARQUET_, _AVRO_, _ORC_, _TEXT_, and _BINARYFILE_ file formats.

Auto Loader provides a Structured Streaming source called **cloudFiles**. Given an input directory path on the cloud file storage, the cloudFiles source automatically processes new files as they arrive, with the option of also processing existing files in that directory.


Auto Loader can scale to loading data from storage accounts that contain billions of files that need to be backfilled to pipelines where millions of files are loaded in an hour.

In [0]:
%scala

val user_dir = "tomaz.kastrun@gmail.com"
val upload_path = "/FileStore/shared-uploads/" + user_dir + "/population_data_upload"

dbutils.fs.mkdirs(upload_path)

In [0]:
%scala
// run the auto Loader

val checkpoint_path = "/tmp/delta/population_data/_checkpoints"
val write_path = "/tmp/delta/population_data"

// Set up the stream to begin reading incoming files from the
// upload_path location.
val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .schema("city string, year int, population long")
  .load(upload_path)

// Start the stream.
// Use the checkpoint_path location to keep a record of all files that
// have already been uploaded to the upload_path location.
// For those that have been uploaded since the last check,
// write the newly-uploaded files' data to the write_path location.
df.writeStream.format("delta")
  .option("checkpointLocation", checkpoint_path)
  .start(write_path)

In [0]:
%scala

// while previous step is running, query the results:

val df_population = spark.read.format("delta").load(write_path)

display(df_population)

With the previous-previous step still running, create these files and upload (drag and drop) them to the 
upload directory by _DBFS file browser_:

**ID.csv**
```
city,year,population
Boise,2019,438000
Boise,2020,447000
```

**MT.csv**
```
city,year,population
Helena,2019,81653
Helena,2020,82590
```

**Misc.csv**
```
city,year,population
Seattle metro,2021,3461000
Portland metro,2021,2174000
Boise,2021,455000
Helena,2021,81653
```

In [0]:
%scala
// while autoloader is still running, check the deltas
val df_population = spark.read.format("delta").load(write_path)
display(df_population)

/* Result
+----------------+------+------------+
| city           | year | population |
+================+======+============+
| Seattle metro  | 2019 | 3406000    |
+----------------+------+------------+
| Seattle metro  | 2020 | 3433000    |
+----------------+------+------------+
| Helena         | 2019 | 81653      |
+----------------+------+------------+
| Helena         | 2020 | 82590      |
+----------------+------+------------+
| Boise          | 2019 | 438000     |
+----------------+------+------------+
| Boise          | 2020 | 447000     |
+----------------+------+------------+
| Portland metro | 2019 | 2127000    |
+----------------+------+------------+
| Portland metro | 2020 | 2151000    |
+----------------+------+------------+
| Seattle metro  | 2021 | 3461000    |
+----------------+------+------------+
| Portland metro | 2021 | 2174000    |
+----------------+------+------------+
| Boise          | 2021 | 455000     |
+----------------+------+------------+
| Helena         | 2021 | 81653      |
+----------------+------+------------+
*/

In [0]:
%scala

// clean up

dbutils.fs.rm(write_path, true)
dbutils.fs.rm(upload_path, true)

## Streaming using Spark

#### Quick setup using Scala

Assuming that you have all the installation completed, and we start with starting the master cluster.

Before starting, we will need to run Netcat (nc) server and we can start the localhost. Netcat is s a command-line utility that reads and writes data across network connections, using the TCP or UDP protocols. And this will generate and mimic the streaming data. 

To run the Netcat server, run the following CLI commnand (server: localhost; port: 9999):

`nc -lk 9999`

Using Scala   will connect to master and create a session.

Copy paste this script in Scala file (name it: ***Stream-word-count.scala***):

```Scala
package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

 
object NetworkWordCount {
  def main(args: Array[String]): Unit = {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (e.g. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
```

Run this command:

`$ ./bin/run-example Stream-word-count.scala localhost 9999`

#### Quick setup using R

Assuming that you have all the installation completed, and we start with starting the master cluster.

Before starting, we will need to run Netcat (nc) server and we can start the localhost. Netcat is s a command-line utility that reads and writes data across network connections, using the TCP or UDP protocols. And this will generate and mimic the streaming data. 

To run the Netcat server, run the following CLI commnand (server: localhost; port: 9999):

`nc -lk 9999`

Using R we will connect to master and create a session.

In [0]:

library(SparkR)
sparkR.session(appName = "StructuredStreamApp")
And we will define a dataframe, where we want to store the streaming data

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = "localhost", port = 9999)

# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")

# Generate running word count
wordCounts <- count(group_by(words, "word"))

Copy paste this script in R file (name it: ***Stream-word-count.R***):

```R
library(SparkR)
sparkR.session(appName = "StructuredStreamApp")

hostname <- args[[1]]
port <- as.integer(args[[2]])
lines <- read.stream("socket", host = hostname, port = port)

words <- selectExpr(lines, "explode(split(value, ' ')) as word")

wordCounts <- count(groupBy(words, "word"))

query <- write.stream(wordCounts, "console", outputMode = "complete")
awaitTermination(query)
sparkR.session.stop()
````

And run this script from CLI using spark-submit bash and push it to localhost on port 9999, that you have already started using nc:

`/bin/spark-submit /Rsample/Stream-word-count.R localhost 9999`

### Quick setup using Python
Similar to R, you can do this with Python (or Scala) as well. So assuming that you already have Spark SQL engine installed and nc is up and running on localhost with port 9999.

Create a Python file (***Stream-word-count.py***) and copy the content:

```Python
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: Stream-word-count.py <hostname> <port>", file=sys.stderr)
        sys.exit(-1)

    host = sys.argv[1]
    port = int(sys.argv[2])

    spark = SparkSession\
        .builder\
        .appName("StructuredStreamApp")\
        .getOrCreate()

    lines = spark\
        .readStream\
        .format('socket')\
        .option('host', host)\
        .option('port', port)\
        .load()

    # Split the lines into words
    words = lines.select(
        # explode turns each item in an array into a separate row
        explode(
            split(lines.value, ' ')
        ).alias('word')
    )

    wordCounts = words.groupBy('word').count()

    query = wordCounts\
        .writeStream\
        .outputMode('complete')\
        .format('console')\
        .start()

    query.awaitTermination()
 ```

And run the following file from CLI:

`/bin/spark-submit /Pysample/Stream-word-count.py localhost 9999`

In both cases, you should be getting the results back in dataset/dataframe that is ready to be analysed.