In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window,   col

# http://localhost:4040
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions', '5')
spark


In [2]:
df = spark.read\
    .format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('../data/retail-data/*.csv')

df.createOrReplaceTempView('retail_data')
df_schema = df.schema

df.show(truncate=False)
df.printSchema()


+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+-------------------+---------+----------+--------------+
|537226   |22811    |SET OF 6 T-LIGHTS CACTI            |6       |2010-12-06 08:34:00|2.95     |15987.0   |United Kingdom|
|537226   |21713    |CITRONELLA CANDLE FLOWERPOT        |8       |2010-12-06 08:34:00|2.1      |15987.0   |United Kingdom|
|537226   |22927    |GREEN GIANT GARDEN THERMOMETER     |2       |2010-12-06 08:34:00|5.95     |15987.0   |United Kingdom|
|537226   |20802    |SMALL GLASS SUNDAE DISH CLEAR      |6       |2010-12-06 08:34:00|1.65     |15987.0   |United Kingdom|
|537226   |22052    |VINTAGE CARAVAN GIFT WRAP          |25      |2010-12-06 08:34:00|0.42     |15987.0   |United Kingdom|
|537226   |22705

In [3]:
df2 = df.selectExpr('CustomerID', '(UnitPrice * Quantity) AS total_cost', 'InvoiceDate') \
    .groupBy(col('CustomerID'), window(col('InvoiceDate'), '1 day'))\
    .sum('total_cost')

df2.show(truncate=False)


+----------+------------------------------------------+------------------+
|CustomerID|window                                    |sum(total_cost)   |
+----------+------------------------------------------+------------------+
|null      |[2010-12-06 01:00:00, 2010-12-07 01:00:00]|23395.099999999904|
|13652.0   |[2010-12-06 01:00:00, 2010-12-07 01:00:00]|308.79999999999995|
|12586.0   |[2010-12-06 01:00:00, 2010-12-07 01:00:00]|-17.0             |
|17428.0   |[2010-12-06 01:00:00, 2010-12-07 01:00:00]|353.00000000000006|
|15899.0   |[2010-12-06 01:00:00, 2010-12-07 01:00:00]|56.25             |
|16210.0   |[2010-12-06 01:00:00, 2010-12-07 01:00:00]|2263.7999999999993|
|17227.0   |[2010-12-06 01:00:00, 2010-12-07 01:00:00]|142.25            |
|12474.0   |[2010-12-06 01:00:00, 2010-12-07 01:00:00]|-34.0             |
|14825.0   |[2010-12-06 01:00:00, 2010-12-07 01:00:00]|184.10000000000002|
|15078.0   |[2010-12-06 01:00:00, 2010-12-07 01:00:00]|475.1499999999999 |
|16710.0   |[2010-12-06 0

## Streaming


In [4]:
streaming_df = spark.readStream\
    .schema(df_schema)\
    .format('csv')\
    .option('header', 'true')\
    .option('maxFilesPerTrigger', 1)\
    .load('../data/retail-data/*.csv')

streaming_df.isStreaming


True

In [5]:
purchase_by_customer_per_hour = streaming_df \
    .selectExpr('CustomerID', '(UnitPrice * Quantity) AS total_cost', 'InvoiceDate') \
    .groupBy(col('CustomerID'), window(col('InvoiceDate'), '1 hour')) \
    .sum('total_cost')


The action will output to an in-memory table that will update after each trigger.


In [6]:
sq = purchase_by_customer_per_hour.writeStream \
    .format('memory') \
    .queryName('customer_purchases') \
    .outputMode('complete') \
    .start()


In [13]:
sq.lastProgress


{'id': '08ac292a-4221-4e20-a125-c70ae8064887',
 'runId': '94d6a831-35ea-49ca-a17d-c46091ab99cb',
 'name': 'customer_purchases',
 'timestamp': '2019-07-21T13:28:09.232Z',
 'batchId': 12,
 'numInputRows': 0,
 'inputRowsPerSecond': 0.0,
 'processedRowsPerSecond': 0.0,
 'durationMs': {'getOffset': 4, 'triggerExecution': 4},
 'stateOperators': [{'numRowsTotal': 1127,
   'numRowsUpdated': 0,
   'memoryUsedBytes': 288842,
   'customMetrics': {'loadedMapCacheHitCount': 110,
    'loadedMapCacheMissCount': 0,
    'stateOnCurrentVersionSizeBytes': 242810}}],
 'sources': [{'description': 'FileStreamSource[file:/Users/sg0218817/Private/IT/others/spark/src/main/data/retail-data/*.csv]',
   'startOffset': {'logOffset': 11},
   'endOffset': {'logOffset': 11},
   'numInputRows': 0,
   'inputRowsPerSecond': 0.0,
   'processedRowsPerSecond': 0.0}],
 'sink': {'description': 'MemorySink'}}

In [None]:
spark.sql('SELECT * FROM customer_purchases ORDER BY `sum(total_cost)` DESC') \
    .show(truncate=False)


In [11]:
spark.sql('SELECT * FROM customer_purchases ORDER BY `sum(total_cost)` DESC') \
    .show(truncate=False)


+----------+------------------------------------------+------------------+
|CustomerID|window                                    |sum(total_cost)   |
+----------+------------------------------------------+------------------+
|18102.0   |[2010-12-07 16:00:00, 2010-12-07 17:00:00]|25920.37          |
|null      |[2010-12-03 11:00:00, 2010-12-03 12:00:00]|12187.780000000002|
|null      |[2010-12-03 14:00:00, 2010-12-03 15:00:00]|10661.690000000004|
|15061.0   |[2010-12-02 15:00:00, 2010-12-02 16:00:00]|9407.339999999998 |
|null      |[2010-12-06 16:00:00, 2010-12-06 17:00:00]|8223.399999999996 |
|null      |[2010-12-06 10:00:00, 2010-12-06 11:00:00]|7818.5800000000045|
|null      |[2010-12-06 09:00:00, 2010-12-06 10:00:00]|7338.400000000006 |
|null      |[2010-12-01 17:00:00, 2010-12-01 18:00:00]|6953.740000000007 |
|13777.0   |[2010-12-01 16:00:00, 2010-12-01 17:00:00]|6585.16           |
|null      |[2010-12-01 14:00:00, 2010-12-01 15:00:00]|5623.860000000006 |
|null      |[2010-12-07 1

The action will output to the console.


In [8]:
purchase_by_customer_per_hour.writeStream \
    .format('console') \
    .queryName('customer_purchases_console') \
    .outputMode('complete') \
    .start()


<pyspark.sql.streaming.StreamingQuery at 0x11b877438>