# Spark Structured Streaming Parsing Data
<!-- Most manipulations we do will involve structuring data.  We demonstrate how to use case classes and Scala Reflection to easily structure our data and account for missing or incomplete fields. -->

Much as with datasets, we can use a `case class` to represent rows of data.  The case class's attributes correspond to the json field names or (as in this case) the CSV column names.

However, unlike with datasets, we cannot ask the reader to infer the schema.  Instead, we will use `ScalaReflection` to generate a schema for our case class.

In [1]:
import sys.process._

"cat data/people/1.csv" ! // run bash command using bang after a string

name,city,country,age
Amy,Paris,FR,30
Bob,New York,US,22
Charlie,London,UK,35
Denise,San Francisco,US,22




0

In [2]:
case class Person(
    name: String,
    city: String,
    country: String,
    age: Option[Int]
) extends Serializable {}

defined class Person


In [3]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.catalyst.ScalaReflection

// create schema for parsing data
val caseSchema = (ScalaReflection
    .schemaFor[Person]
    .dataType
    .asInstanceOf[StructType])

val peopleStream = (spark.readStream
    .schema(caseSchema)
    .option("header", true)  // Headers are matched to Person properties
    .option("maxFilesPerTrigger", 1)  // each file is read in a separate batch
    .csv("data/people/")  // load a CSV file
    .as[Person])
  
(peopleStream.writeStream
    .outputMode("append")  // write results to screen
    .format("console")
    .start)

caseSchema = StructType(StructField(name,StringType,true), StructField(city,StringType,true), StructField(country,StringType,true), StructField(age,IntegerType,true))
peopleStream = [name: string, city: string ... 2 more fields]


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@407054a6

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+------+-------+----+
|   name|  city|country| age|
+-------+------+-------+----+
| Edward|London|     UK|  53|
|Francis|  null|     FR|  22|
| George|London|     UK|null|
+-------+------+-------+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-------------+-------+---+
|   name|         city|country|age|
+-------+-------------+-------+---+
|    Amy|        Paris|     FR| 30|
|    Bob|     New York|     US| 22|
|Charlie|       London|     UK| 35|
| Denise|San Francisco|     US| 22|
+-------+-------------+-------+---+



# Constructing Columns in Structured Streaming
<!-- Structured Streaming makes heavy use of Column objects for manipulating data.  In this section, we explain various ways in which the Column objects can be constructed from columns in our structured stream or by combining other columns. -->

Datasets use a dataframe syntax to refer to columns (which are themselves `Column` objects).  There are a number of ways to do this:
- `peopleStream("country")`
- `peopleStream.col("country")`
- `$"country"`
- `'country`

The first two are more explicit as they tell Spark which data stream to use.  This is useful in joins when we want to specify the table more explicitly.  The second two are more implicit as they do not specify the data stream.  These are more useful for single datastream operations.  The symbols need to be imported from `spark.implicits`.

There are actually multiple ways to construct columns:
- The above allows us to reference `Column`s already in a dataframe.
- We can also construct a `Column` from other `Column`s using binary operators like `===` (equality), `>`, `<=`, `.plus`, `-`, `.startsWith`, or `&&`, depending on the underlying value of the column.
- Finally, we can rename the columns (keeping the values) with the operator `as`.

# Selecting and Filtering Columns Using Structured Streaming
<!-- We'll demonstrate how to select and filter columns using Structured Streaming. -->

We'll demonstrate these using the `select` method, which takes any non-zero number of `Column` arguments and returns a dataframe with those arguments.

In [4]:
(peopleStream.select(
    $"country" === "UK" as "in_UK",
    $"age" <= 30 as "under_30",
    'country startsWith "U" as "U_Country")
        .writeStream
        .outputMode("append")  // write results to screen
        .format("console")
        .start)

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@50a14e

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+--------+---------+
|in_UK|under_30|U_Country|
+-----+--------+---------+
| true|   false|     true|
|false|    true|    false|
| true|    null|     true|
+-----+--------+---------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----+--------+---------+
|in_UK|under_30|U_Country|
+-----+--------+---------+
|false|    true|    false|
|false|    true|     true|
| true|   false|     true|
|false|    true|     true|
+-----+--------+---------+



In [5]:
(peopleStream.filter($"age" === 22)
    .writeStream
    .outputMode("append")  // write results to screen
    .format("console")
    .start)

-------------------------------------------
Batch: 0
-------------------------------------------


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@176c1d45

+-------+----+-------+---+
|   name|city|country|age|
+-------+----+-------+---+
|Francis|null|     FR| 22|
+-------+----+-------+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-------------+-------+---+
|  name|         city|country|age|
+------+-------------+-------+---+
|   Bob|     New York|     US| 22|
|Denise|San Francisco|     US| 22|
+------+-------------+-------+---+



# Joining Structured Stream with Datasets
<!-- One of the best features of Structured Stream is the ability to natively join batch data with a Structured Stream. -->

We can join datastreams with datasets.  Remember: both of these are distributed datasets and one is being streamed -- that's a lot of semantics for a simple `.join` operator!

Below, we take a fixed user table and join it in with a stream of transactions in a fictitious poultry ecommerce website.

In [6]:
case class User(id: Int, name: String, email: String, country: String)
case class Transaction(userid: Int, product: String, cost: Double)


defined class User
defined class Transaction


In [None]:

// A user dataset
// Notice that we do not have to provide a schema
// We can simply infer it
val users = (spark.read
    .option("inferSchema", "true")
    .option("header", true)
    .csv("data/users.csv")
    .as[User]
)

val transactionSchema = (ScalaReflection
    .schemaFor[Transaction]
    .dataType
    .asInstanceOf[StructType]
)
  
// A stream of transactions
val transactionStream = (spark.readStream
    .schema(transactionSchema)
    .option("header", true)
    .option("maxFilesPerTrigger", 1)
    .csv("data/transactions/*.csv")
    .as[Transaction]
)

// Join transaction stream with user dataset
val spendingByCountry = (transactionStream
    .join(users, users("id") === transactionStream("userid"))
    .groupBy($"country")
    .agg(sum($"cost")) as "spending")
    
// Print result
(spendingByCountry.writeStream
    .outputMode("complete")
    .format("console")
    .start)

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+---------+
|country|sum(cost)|
+-------+---------+
|     EN|     90.0|
|     FR|     50.0|
+-------+---------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+---------+
|country|sum(cost)|
+-------+---------+
|     EN|    180.0|
|     FR|    100.0|
+-------+---------+



# SQL Queries in Spark Structured Streaming
<!-- Spark also has an escape hatch into SQL queries that allows users to write familiar SQL queries against Structured Streams. -->

Finally we can use the method `createOrReplaceTempView` to publish streams (and static datasets) as SQL tables.  We can then query the resulting table using SQL and stream the output as we would with any other datastream.

In [None]:
// Publish SQL table
peopleStream.createOrReplaceTempView("peopleTable")

// SQL query
val query = spark.sql("SELECT country, avg(age) FROM peopleTable GROUP BY country")

// Output
(query.writeStream
    .outputMode("complete")
    .format("console")
    .start)

# GroupBy and Aggregation in Structured Streaming
<!-- We'll demonstrate how to perform groupBy and data aggregation in Structured Streaming.  We will also demonstrate how to use groupBy on multiple columns. -->

We can use groupBy and aggregation as we would in SQL.

- `groupBy` takes one or more `Column`s along which to groupBy.
- The resulting object supports various built-in aggregation functions (`avg`, `mean`, `min`, `max`, `sum`) which take one or more string column names along which to aggregate.

In [None]:
(peopleStream.groupBy('country)
    .mean("age")
    .writeStream
    .outputMode("complete")
    .format("console")
    .start)

In [None]:
(peopleStream.groupBy('city)
    .agg(first("country") as "country", count("age"))
    .writeStream
    .outputMode("complete")
    .format("console")
    .start)

For more complex aggregations, we can use `.agg`, which takes columns with aggregations.  Notice that we can reuse the keyword `as`, as well as other binary column operators from before.