# Spark Structured Streaming
<!-- We give an overview of our deep dive into Spark Structured Streaming. -->

Recall that we can think of Spark Structured Streaming as generating an infinite dataset against which we write queries.  We'll explore the API fully.

- We'll demonstrate how to pull data from different streaming sources like websockets or files.
- We'll delve into parsing and structuring data.
- We'll demonstrate how to run queries against this data using the structured streaming API.

In [None]:
val sparkDummy = spark
import sparkDummy.implicits._

# Netcat Socket Structured Streaming Example
<!-- We will demonstrate how to write a small Scala script to broadcast a file to that port and how to invoke bash commands from Scala repls and notebooks for testing the broadcast server -->

We're going to stream one of Shakespeare's most famous poems on a fixed port and listen for it using Spark.  We set up a streaming server to broadcast the file one line at a time using [Broadcast.scala](/edit/Broadcast.scala).

In [None]:
// create a stream and listen on a port

def createStream(port: Int, duration: Int) {
    val lines = (spark.readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", port)
        .load())

    val words = (lines
        .as[String]
        .flatMap(_.split("\\s+")))

    val wordCounts = (words
        .groupByKey(_.toLowerCase)
        .count()
        .orderBy($"count(1)" desc))

    val query = (wordCounts.writeStream
        .outputMode("complete")
        .format("console")
        .start
        .awaitTermination(duration))
}

In [None]:
// run `nc -lk 12341` in bash and start typing!

createStream(12341, 10000)

**Exercise:** type "foo", "foo bar",  and "bar", and watch the results 

# Socket Structured Streaming Example
<!-- We demonstrate how to use sockets to listen on a fixed port and how to set up a simple netcat server to broadcast to that port. -->

We can also broadcast on Unix's `netcat` to broadcast a stream on a fixed port.

In [None]:
import sys.process._

"more data/summer.txt" ! // run bash command using bang after a string

In [None]:
val port = 12342

// Broadcast file on port one line at a time
(new Thread {
    override def run {
        s"scala Broadcast.scala ${port} data/summer.txt" !
    }
}).start

In [None]:
createStream(port, 12000)

# Spark Structured Streaming Parsing Data
<!-- Most manipulations we do will involve structuring data.  We demonstrate how to use case classes and Scala Reflection to easily structure our data and account for missing or incomplete fields. -->

Much as with datasets, we can use a `case class` to represent rows of data.  The case class's attributes correspond to the json field names or (as in this case) the CSV column names.

However, unlike with datasets, we cannot ask the reader to infer the schema.  Instead, we will use `ScalaReflection` to generate a schema for our case class.

In [None]:
import sys.process._

"cat data/people/1.csv" ! // run bash command using bang after a string

In [None]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.ScalaReflection

case class Person(
    name: String,
    city: String,
    country: String,
    age: Option[Int]
)

// create schema for parsing data
val caseSchema = (ScalaReflection
    .schemaFor[Person]
    .dataType
    .asInstanceOf[StructType])

val peopleStream = (spark.readStream
    .schema(caseSchema)
    .option("header", true)  // Headers are matched to Person properties
    .option("maxFilesPerTrigger", 1)  // each file is read in a separate batch
    .csv("data/people/")  // load a CSV file
    .as[Person])
  
(peopleStream.writeStream
    .outputMode("append")  // write results to screen
    .format("console")
    .start)

**Exercises:**
- What would happen if age were not optional?
- What if the age were "five" instead of "5"?
- What if one of the records was missing an "age" record?
- There's also a `.json` method that we could use in lieu of the `.csv` method. Can you guess the json schema that this code would read?

# Constructing Columns in Structured Streaming
<!-- Structured Streaming makes heavy use of Column objects for manipulating data.  In this section, we explain various ways in which the Column objects can be constructed from columns in our structured stream or by combining other columns. -->

Datasets use a dataframe syntax to refer to columns (which are themselves `Column` objects).  There are a number of ways to do this:
- `peopleStream("country")`
- `peopleStream.col("country")`
- `$"country"`
- `'country`

The first two are more explicit as they tell Spark which data stream to use.  This is useful in joins when we want to specify the table more explicitly.  The second two are more implicit as they do not specify the data stream.  These are more useful for single datastream operations.  The symbols need to be imported from `spark.implicits`.

There are actually multiple ways to construct columns:
- The above allows us to reference `Column`s already in a dataframe.
- We can also construct a `Column` from other `Column`s using binary operators like `===` (equality), `>`, `<=`, `.plus`, `-`, `.startsWith`, or `&&`, depending on the underlying value of the column.
- Finally, we can rename the columns (keeping the values) with the operator `as`.

# Selecting and Filtering Columns Using Structured Streaming
<!-- We'll demonstrate how to select and filter columns using Structured Streaming. -->

We'll demonstrate these using the `select` method, which takes any non-zero number of `Column` arguments and returns a dataframe with those arguments.

In [None]:
(peopleStream.select(
    $"country" === "UK" as "in_UK",
    $"age" <= 30 as "under_30",
    'country startsWith "U" as "U_Country")
        .writeStream
        .outputMode("append")  // write results to screen
        .format("console")
        .start)

In [None]:
(peopleStream.filter($"age" === 22)
    .writeStream
    .outputMode("append")  // write results to screen
    .format("console")
    .start)

**Exercises:**

1. Select the column "age + 1".
1. Select a column that returns true if the user is a Londoner who is under 30 as "Young_Londoner".
1. Filter for when the age is no less than 22.
1. Filter for the city being "London".
1. Filter for Americans under the age of 30.

# GroupBy and Aggregation in Structured Streaming
<!-- We'll demonstrate how to perform groupBy and data aggregation in Structured Streaming.  We will also demonstrate how to use groupBy on multiple columns. -->

We can use groupBy and aggregation as we would in SQL.

- `groupBy` takes one or more `Column`s along which to groupBy.
- The resulting object supports various built-in aggregation functions (`avg`, `mean`, `min`, `max`, `sum`) which take one or more string column names along which to aggregate.

In [None]:
(peopleStream.groupBy('country)
    .mean("age")
    .writeStream
    .outputMode("complete")
    .format("console")
    .start)

For more complex aggregations, we can use `.agg`, which takes columns with aggregations.  Notice that we can reuse the keyword `as`, as well as other binary column operators from before.

In [None]:
(peopleStream.groupBy('city)
    .agg(first("country") as "country", count("age"))
    .writeStream
    .outputMode("complete")
    .format("console")
    .start)

**Exercise:** Add the average age of each city to the above query.

# Joining Structured Stream with Datasets
<!-- One of the best features of Structured Stream is the ability to natively join batch data with a Structured Stream. -->

We can join datastreams with datasets.  Remember: both of these are distributed datasets and one is being streamed -- that's a lot of semantics for a simple `.join` operator!

Below, we take a fixed user table and join it in with a stream of transactions in a fictitious poultry ecommerce website.

In [None]:
case class User(id: Int, name: String, email: String, country: String)
case class Transaction(userid: Int, product: String, cost: Double)

// A user dataset
// Notice that we do not have to provide a schema
// We can simply infer it
val users = (spark.read
    .option("inferSchema", "true")
    .option("header", true)
    .csv("data/users.csv")
    .as[User]
)

val transactionSchema = (ScalaReflection
    .schemaFor[Transaction]
    .dataType
    .asInstanceOf[StructType]
)
  
// A stream of transactions
val transactionStream = (spark.readStream
    .schema(transactionSchema)
    .option("header", true)
    .option("maxFilesPerTrigger", 1)
    .csv("data/transactions/*.csv")
    .as[Transaction]
)

// Join transaction stream with user dataset
val spendingByCountry = (transactionStream
    .join(users, users("id") === transactionStream("userid"))
    .groupBy($"country")
    .agg(sum($"cost")) as "spending")
    
// Print result
(spendingByCountry.writeStream
    .outputMode("complete")
    .format("console")
    .start)

**Exercises:**
- Show sales by product rather than country.
- Show sales by both product and country.

# SQL Queries in Spark Structured Streaming
<!-- Spark also has an escape hatch into SQL queries that allows users to write familiar SQL queries against Structured Streams. -->

Finally we can use the method `createOrReplaceTempView` to publish streams (and static datasets) as SQL tables.  We can then query the resulting table using SQL and stream the output as we would with any other datastream.

In [None]:
// Publish SQL table
peopleStream.createOrReplaceTempView("peopleTable")

// SQL query
val query = spark.sql("SELECT country, avg(age) FROM peopleTable GROUP BY country")

// Output
(query.writeStream
    .outputMode("complete")
    .format("console")
    .start)

**Exercise**:
- Use the SQL syntax to filter for Londoners under 40 years.
- Use the SQL syntax to join the user table and transaction stream to get transactions by country and product.

<img src="images/logo-text.jpg" width="20%"/>