# Spark Streaming

- Use Cases
  - Notifications and alerting
  - Real-time reporting
  - Incremental ETL
  - Update data to serve in real time
  - Real-time decision making
  - Online machine learning
- Challenges of Stream Processing
  - Processing out-of-order data based on event time
  - Maintaining large amounts of state
  - Supporting high-data throughput
  - Processing each event exactly once despite machine failures
  - Handling load imbalance and stragglers
  - Responding to events at low latency
  - Joining with external data in other storage systems
  - Determining how to update output sinks as new events arrive
  - Writing data transactionally to output systems
  - Updating your application’s business logic at runtime
- Stream Processing Design Points
  - Record-at-a-Time Versus Declarative APIs
  - Event Time Versus Processing Time
  - Continuous Versus Micro-Batch Execution
- Spark’s Streaming APIs
  - DStream API (processing Time)
  - Structured Streaming API (event time)
    - Micro-Batch Execution (2.2~, default)
    - Continuous Execution (2.3~, optional)


## Import modules

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import apache_access_log

## Session

In [2]:
spark = SparkSession.builder \
                    .master("local[4]") \
                    .appName("spark streaming tutorial") \
                    .config("spark.executor.memory", "1g") \
                    .getOrCreate()

In [3]:
spark.conf.set("spark.sql.shuffle.partitions", "4")

## Batch Example

In [4]:
df = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("retail-data/by-day/*.csv")

In [5]:
df\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    F.col("CustomerId"), F.window(F.col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .sort(F.desc("sum(total_cost)"))\
  .limit(10).toPandas()

Unnamed: 0,CustomerId,window,sum(total_cost)
0,17450.0,"(2011-09-20 00:00:00, 2011-09-21 00:00:00)",71601.44
1,,"(2011-11-14 00:00:00, 2011-11-15 00:00:00)",55316.08
2,,"(2011-11-07 00:00:00, 2011-11-08 00:00:00)",42939.17
3,,"(2011-03-29 00:00:00, 2011-03-30 00:00:00)",33521.4
4,,"(2011-12-08 00:00:00, 2011-12-09 00:00:00)",31975.59
5,18102.0,"(2011-09-15 00:00:00, 2011-09-16 00:00:00)",31661.54
6,,"(2010-12-21 00:00:00, 2010-12-22 00:00:00)",31347.48
7,18102.0,"(2011-10-21 00:00:00, 2011-10-22 00:00:00)",29693.82
8,18102.0,"(2010-12-07 00:00:00, 2010-12-08 00:00:00)",25920.37
9,14646.0,"(2011-10-20 00:00:00, 2011-10-21 00:00:00)",25833.56


## Structured Streaming Example

In [6]:
sdf = spark.readStream.format("csv")\
    .option("header", "true")\
    .schema(df.schema)\
    .option("maxFilesPerTrigger", 1)\
    .load("retail-data/by-day/*.csv")

In [7]:
sdf.isStreaming

True

In [8]:
purchaseByCustomerPerDay = sdf\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    F.col("CustomerId"), F.window(F.col("InvoiceDate"), "1 day"))\
  .sum("total_cost")

In [9]:
streamingQuery = purchaseByCustomerPerDay.writeStream \
    .format("memory") \
    .queryName("customer_purchases") \
    .outputMode("complete")

# memory = store in-memory table (for testing only in Spark 2.0)
# counts = name of the in-memory table
# complete = all the counts should be in the table

In [10]:
query = streamingQuery.start()          # start the query

In [11]:
print query.id                          # get the unique identifier of the running query that persists across restarts from checkpoint data

e1cb64fd-3aae-441e-a54b-ba550c09d817


In [12]:
print query.runId                       # get the unique id of this run of the query, which will be generated at every start/restart

7edfaa6d-10b6-415e-b247-96a0a911dee9


In [13]:
print query.name                        # get the name of the auto-generated or user-specified name

customer_purchases


In [14]:
query.explain()                         # print detailed explanations of the query

No physical plan. Waiting for data.


In [15]:
import time
time.sleep(10)

In [16]:
import pprint
pprint.pprint(query.lastProgress)       # the most recent progress update of this streaming query
pprint.pprint(query.recentProgress) # an array of the most recent progress updates for this query

{u'batchId': 1,
 u'durationMs': {u'addBatch': 809,
                 u'getBatch': 62,
                 u'getOffset': 2788,
                 u'queryPlanning': 49,
                 u'triggerExecution': 3770,
                 u'walCommit': 34},
 u'id': u'e1cb64fd-3aae-441e-a54b-ba550c09d817',
 u'inputRowsPerSecond': 390.8782510365624,
 u'name': u'customer_purchases',
 u'numInputRows': 2074,
 u'processedRowsPerSecond': 550.132625994695,
 u'runId': u'7edfaa6d-10b6-415e-b247-96a0a911dee9',
 u'sink': {u'description': u'MemorySink'},
 u'sources': [{u'description': u'FileStreamSource[file:/opt/work/retail-data/by-day/*.csv]',
               u'endOffset': {u'logOffset': 1},
               u'inputRowsPerSecond': 390.8782510365624,
               u'numInputRows': 2074,
               u'processedRowsPerSecond': 550.132625994695,
               u'startOffset': {u'logOffset': 0}}],
 u'stateOperators': [{u'customMetrics': {u'loadedMapCacheHitCount': 8,
                                         u'loadedM

In [17]:
spark.sql("""
  SELECT *
  FROM customer_purchases
  ORDER BY `sum(total_cost)` DESC
  """)\
  .limit(10).toPandas()

Unnamed: 0,CustomerId,window,sum(total_cost)
0,12415.0,"(2011-03-03 00:00:00, 2011-03-04 00:00:00)",16558.14
1,15769.0,"(2011-03-17 00:00:00, 2011-03-18 00:00:00)",10065.0
2,,"(2011-03-17 00:00:00, 2011-03-18 00:00:00)",7876.0
3,12435.0,"(2011-03-17 00:00:00, 2011-03-18 00:00:00)",3978.99
4,,"(2011-03-03 00:00:00, 2011-03-04 00:00:00)",3538.75
5,17416.0,"(2011-03-03 00:00:00, 2011-03-04 00:00:00)",2114.71
6,18102.0,"(2011-03-03 00:00:00, 2011-03-04 00:00:00)",1396.0
7,16709.0,"(2011-03-03 00:00:00, 2011-03-04 00:00:00)",1120.53
8,12514.0,"(2011-03-17 00:00:00, 2011-03-18 00:00:00)",1017.68
9,14298.0,"(2011-03-17 00:00:00, 2011-03-18 00:00:00)",1004.6


In [18]:
time.sleep(10)

In [19]:
spark.sql("""
  SELECT *
  FROM customer_purchases
  ORDER BY `sum(total_cost)` DESC
  """)\
  .limit(10).toPandas()

Unnamed: 0,CustomerId,window,sum(total_cost)
0,12415.0,"(2011-03-03 00:00:00, 2011-03-04 00:00:00)",16558.14
1,15769.0,"(2011-03-17 00:00:00, 2011-03-18 00:00:00)",10065.0
2,17450.0,"(2011-11-03 00:00:00, 2011-11-04 00:00:00)",9069.82
3,,"(2011-03-17 00:00:00, 2011-03-18 00:00:00)",7876.0
4,17389.0,"(2011-11-03 00:00:00, 2011-11-04 00:00:00)",7427.32
5,16333.0,"(2011-11-03 00:00:00, 2011-11-04 00:00:00)",5727.0
6,16684.0,"(2011-11-03 00:00:00, 2011-11-04 00:00:00)",5500.38
7,,"(2011-11-17 00:00:00, 2011-11-18 00:00:00)",5217.83
8,12753.0,"(2011-11-17 00:00:00, 2011-11-18 00:00:00)",4808.48
9,12435.0,"(2011-03-17 00:00:00, 2011-03-18 00:00:00)",3978.99


In [20]:
query.stop()                            # stop the query

In [21]:
query.awaitTermination()                # block until query is terminated, with stop() or with error

In [22]:
query.exception()                       # the exception if the query has been terminated with error

## Reference

- https://spark.apache.org/
- https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data/by-day
- Spark: The Definitive Guide