# Spark Structured Streaming

Recall that we can think of Spark Structured Streaming as generating an infinite dataset against which we write queries.  We'll explore the API fully

- We'll demonstrate how to pull data from different streaming sources like websockets or files.
- We'll delve into parsing and structuring data.
- We'll demonstrate how to run queries against this data using the structured streaming API.

In [2]:
val sparkDummy = spark
import sparkDummy.implicits._

# Socket Structured Streaming Example

We're going to stream one of Shakespeare's most famous poems on a fixed port and listen for it using Spark.  We setup a streaming server to broadcast the file one line at a time using [Broadcast.scala](/edit/Broadcast.scala).

In [3]:
import sys.process._

"more data/summer.txt" ! // run bash command using bang after a string

Shall I compare thee to a summer’s day?
Thou art more lovely and more temperate.
Rough winds do shake the darling buds of May,
And summer’s lease hath all too short a date.
Sometime too hot the eye of heaven shines,
And often is his gold complexion dimmed;
And every fair from fair sometime declines,
By chance, or nature’s changing course, untrimmed;
But thy eternal summer shall not fade,
Nor lose possession of that fair thou ow’st,
Nor shall death brag thou wand’rest in his shade,
When in eternal lines to Time thou grow’st.
So long as men can breathe, or eyes can see,
So long lives this, and this gives life to thee.

In [4]:
// create a stream and listen on a port

def createStream(port: Int, duration: Int) {
    val lines = (spark.readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", port)
        .load())

    val words = (lines
        .as[String]
        .flatMap(_.split("\\s+")))

    val wordCounts = (words
        .groupByKey(_.toLowerCase)
        .count()
        .orderBy($"count(1)" desc))

    val query = (wordCounts.writeStream
        .outputMode("complete")
        .format("console")
        .start
        .awaitTermination(duration))
}

In [5]:
val port = 9001

// Broadcast file on port one line at time
(new Thread {
    override def run {
        s"scala Broadcast.scala ${port} data/summer.txt" !
    }
}).start

In [6]:
createStream(port, 12000)

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+--------+
|   value|count(1)|
+--------+--------+
|    thee|       1|
|summer’s|       1|
|       i|       1|
|    day?|       1|
|   shall|       1|
|       a|       1|
|      to|       1|
| compare|       1|
+--------+--------+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------+--------+
|   value|count(1)|
+--------+--------+
|     and|       4|
|      of|       3|
|    fair|       3|
|    thou|       2|
|    more|       2|
|     too|       2|
|summer’s|       2|
|     the|       2|
|   shall|       2|
|       a|       2|
|sometime|       2|
|     art|       1|
|   often|       1|
|   fade,|       1|
|     not|       1|
|    lose|       1|
|      by|       1|
|  lovely|       1|
|     hot|       1|
|     thy|       1|
+--------+--------+
only showing top 20 rows

-------------------------------------------
Batch: 2
------

# Netcast Socket Structured Streaming Example

We can also broadcast on Unix's `netcast` to broadcast a stream on a fixed port.

In [7]:
// run `nc -lk 9002` in bash and start typing!

createStream(9002, 10000)

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+--------+
|value|count(1)|
+-----+--------+
|   hi|       1|
+-----+--------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+--------+
|value|count(1)|
+-----+--------+
|   hi|       3|
|  bye|       1|
+-----+--------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----+--------+
|value|count(1)|
+-----+--------+
|   hi|       4|
|  bye|       2|
+-----+--------+



# Spark Structured Streaming Parsing Data

Much as with datasets, we can use a `case class` to represent rows of data.  The case classe's attributes correspond to the json field names or (as in this case) the CSV column names.

However, unlike with datasets, we cannot ask the reader to infer the schema.  Instead, we will use `ScalaReflection` to generate a schema for our case class.

In [17]:
import sys.process._

"cat data/people/1.csv" ! // run bash command using bang after a string

name,city,country,age
Amy,Paris,FR,30
Bob,New York,US,22
Charlie,London,UK,35
Denise,San Francisco,US,22


In [21]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.ScalaReflection

case class Person(
    name: String,
    city: String,
    country: String,
    age: Option[Int]
)

// create schema for parsing data
val caseSchema = (ScalaReflection
    .schemaFor[Person]
    .dataType
    .asInstanceOf[StructType])

val peopleStream = (spark.readStream
    .schema(caseSchema)
    .option("header", true)  // Headers are matched to Person properties
    .option("maxFilesPerTrigger", 1)  // each file is read in a separate batch
    .csv("data/people/")
    .as[Person])
  
(peopleStream.writeStream
    .outputMode("append")  // write results to screen
    .format("console")
    .start)

ACTIVE]

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-------------+-------+---+
|   name|         city|country|age|
+-------+-------------+-------+---+
|    Amy|        Paris|     FR| 30|
|    Bob|     New York|     US| 22|
|Charlie|       London|     UK| 35|
| Denise|San Francisco|     US| 22|
+-------+-------------+-------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+------+-------+----+
|   name|  city|country| age|
+-------+------+-------+----+
| Edward|London|     UK|  53|
|Francis|      |     FR|  22|
| George|London|     UK|null|
+-------+------+-------+----+



**Exercise:** What would happen if age were not optional?

# Constructing Columns in Structured Streaming

Datasets use a dataframe syntax to refer to columns (which are themselves `Column` objects).  There are a number of ways to do this:
- `peopleStream("country")`
- `peopleStream.col("country")`
- `$"country"`
- `'country`

The first two are more explicit as they tell Spark which data stream to use.  This is useful in joins when we want to specify the table more explicitly.  The second two are more implicit as they do not specify the data stream.  These are more useful for single datastream operations.  The symbols need to be imported from `spark.implicits`.

There are actually multiple ways to construct columns:
- The above allows us to refernce `Column`s already in a dataframe.
- We can also construct a `Column` from other `Column`s using binary operators like `===` (equality), `>`, `<=`, `.plus`, `-`, `.startsWith`, or `&&`, depending on the underlying value of the column.
- Finally, we can rename the columns (keeping the values) with the operator `as`.

# Selecting Columns Using Structured Streaming

We'll demonstrate these using the `select` method, which takes any non-zero number of `Column` arguements and returns a dataframe with those arguements.

In [73]:
(peopleStream.select(
    $"country" === "UK" as "in_UK",
    $"age" <= 30 as "under_30",
    $"age" + 1,
    ($"city" === "London") && ($"age" <= 40) as "young_Londoner",
    'country startsWith "U" as "U_Country")
        .writeStream
        .outputMode("append")  // write results to screen
        .format("console")
        .start)

ACTIVE]

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+--------+---------+--------------+---------+
|in_UK|under_30|(age + 1)|young_Londoner|U_Country|
+-----+--------+---------+--------------+---------+
|false|    true|       31|         false|    false|
|false|    true|       23|         false|     true|
| true|   false|       36|          true|     true|
|false|    true|       23|         false|     true|
+-----+--------+---------+--------------+---------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+--------+---------+--------------+---------+
|in_UK|under_30|(age + 1)|young_Londoner|U_Country|
+-----+--------+---------+--------------+---------+
| true|   false|       54|         false|     true|
|false|    true|       23|         false|    false|
| true|    null|     null|          null|     true|
+-----+--------+---------+--------------+---------+



# Filtering Rows in Structured Streaming

Filter takes a boolean-valued `Column` and returns a dataframe whose rows are the values where only the rows for which the column is true are kept.

In [49]:
(peopleStream.filter($"age" === 22)
    .writeStream
    .outputMode("append")  // write results to screen
    .format("console")
    .start)

ACTIVE]

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-------------+-------+---+
|  name|         city|country|age|
+------+-------------+-------+---+
|   Bob|     New York|     US| 22|
|Denise|San Francisco|     US| 22|
+------+-------------+-------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+----+-------+---+
|   name|city|country|age|
+-------+----+-------+---+
|Francis|    |     FR| 22|
+-------+----+-------+---+



**Exercises:**

1. Filter for when the age is no less than 22.
1. Filter for the city being "London"
1. Filter for Americans under the age of 30

# Groupby in Structured Streaming

We can use groupby and aggregation as we would in SQL.

- `groupBy` takes one or more `Column`s along which to groupBy.
- The resulting object supports various builtin aggregation funcitons (`avg`, `mean`, `min`, `max`, `sum`) which take one or more string column names along which to aggregate.

In [62]:
(peopleStream.groupBy('country)
    .mean("age")
    .writeStream
    .outputMode("complete")
    .format("console")
    .start)

ACTIVE]

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+--------+
|country|avg(age)|
+-------+--------+
|     US|    22.0|
|     FR|    30.0|
|     UK|    35.0|
+-------+--------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+--------+
|country|avg(age)|
+-------+--------+
|     US|    22.0|
|     FR|    26.0|
|     UK|    44.0|
+-------+--------+



# Groupby Aggregations in Structured Streaming

For more complex aggregations, we can use `.agg`, which takes columns with aggregations.  Notice that we can reuse the keyword `as`, as well as other binary column operators from before.

In [75]:
(peopleStream.groupBy('city)
    .agg(first("country") as "country", count("age"))
    .writeStream
    .outputMode("complete")
    .format("console")
    .start)

ACTIVE]

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------+-------+----------+
|         city|country|count(age)|
+-------------+-------+----------+
|San Francisco|     US|         1|
|       London|     UK|         1|
|        Paris|     FR|         1|
|     New York|     US|         1|
+-------------+-------+----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------+-------+----------+
|         city|country|count(age)|
+-------------+-------+----------+
|San Francisco|     US|         1|
|       London|     UK|         2|
|        Paris|     FR|         1|
|             |     FR|         1|
|     New York|     US|         1|
+-------------+-------+----------+



**Exercise:** Add the average age of each city to the above query

# Joining Structured Stream with Datasets

We can join datastreams with datasets.  Remember: both of these are distributed datasets and one is being streamed -- that's a lot of semantics for a simple `.join` operator!

Below, we take a fixed user table and join it in with a stream of transactions in a fictitious poultry ecommerce website.

In [86]:
case class User(id: Int, name: String, email: String, country: String)
case class Transaction(userid: Int, product: String, cost: Double)

// A user dataset
// Notice that we do not have to provide a schema
// We can simply infer it
val users = (spark.read
    .option("inferSchema", "true")
    .option("header", true)
    .csv("data/users.csv")
    .as[User]
)

val transactionSchema = (ScalaReflection
    .schemaFor[Transaction]
    .dataType
    .asInstanceOf[StructType]
)
  
// A stream of transactions
val transactionStream = (spark.readStream
    .schema(transactionSchema)
    .option("header", true)
    .option("maxFilesPerTrigger", 1)
    .csv("data/transactions/*.csv")
    .as[Transaction]
)

// Join transaction stream with user dataset
val spendingByCountry = (transactionStream
    .join(users, users("id") === transactionStream("userid"))
    .groupBy($"country")
    .agg(sum($"cost")) as "spending")
    
// Print result
(spendingByCountry.writeStream
    .outputMode("complete")
    .format("console")
    .start)

ACTIVE]

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---------+
|country|sum(cost)|
+-------+---------+
|     EN|     90.0|
|     FR|     50.0|
+-------+---------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+---------+
|country|sum(cost)|
+-------+---------+
|     EN|    180.0|
|     FR|    100.0|
+-------+---------+



**Exercises:**
- Show sales by product rather than country.
- Show sales by both product and country.

# SQL Queries in Spark Structured Streaming

Finally we can use the method `createOrReplaceTempView` to publish streams (and static datasets) as SQL tables.  We can then query the resulting table using SQL and stream the output as we would with any other datastream.

In [88]:
// Publish SQL table
peopleStream.createOrReplaceTempView("peopleTable")

// SQL query
val query = spark.sql("SELECT country, avg(age) FROM peopleTable GROUP BY country")

// Output
(query.writeStream
    .outputMode("complete")
    .format("console")
    .start)

ACTIVE]

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+--------+
|country|avg(age)|
+-------+--------+
|     US|    22.0|
|     FR|    30.0|
|     UK|    35.0|
+-------+--------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+--------+
|country|avg(age)|
+-------+--------+
|     US|    22.0|
|     FR|    26.0|
|     UK|    44.0|
+-------+--------+



**Exercise**:
- Use the SQL syntax to filter for londoners under 40 years.
- Use the SQL syntax to join the user table and transaction stream to get transactions by country and product.