In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('structuredstreaming').getOrCreate()

In [3]:
# in python
staticDataFrame = spark.read.format('csv')\
.option("header","true")\
.option("inferSchema","true")\
.load("retail-data/*.csv")

In [4]:
staticDataFrame.createOrReplaceTempView("retail_data")

In [5]:
staticSchema = staticDataFrame.schema

In [9]:
#in python
from pyspark.sql.functions import window, column, desc, col
staticDataFrame.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)

+----------+--------------------+-----------------+
|CustomerId|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|[2011-12-05 08:00...|            -37.6|
|   14126.0|[2011-11-29 08:00...|643.6300000000001|
|   13500.0|[2011-11-16 08:00...|497.9700000000001|
|   17160.0|[2011-11-08 08:00...|516.8499999999999|
|   15608.0|[2011-11-11 08:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



In [10]:
spark.conf.set("spark.sql.shuffle.partitions","5")

In [11]:
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", "1")\
.format('csv')\
.option("header","true")\
.load("retail-data/*.csv")

In [12]:
streamingDataFrame.isStreaming

True

In [16]:
# no working in streaming
streamingDataFrame.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)

AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nFileSource[retail-data/*.csv]'

In [17]:
purchaseByCustomerPerHour = streamingDataFrame.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost") 

In [18]:
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f4e4bfac3c8>

In [19]:
# in python
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""").show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17450.0|[2011-09-20 08:00...|          71601.44|
|      null|[2011-12-08 08:00...|31975.590000000007|
|   18102.0|[2011-09-15 08:00...|31661.540000000005|
|      null|[2010-12-06 08:00...|23395.099999999904|
|   14646.0|[2011-08-11 08:00...| 19150.65999999999|
+----------+--------------------+------------------+
only showing top 5 rows



In [6]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [7]:
from pyspark.sql.functions import date_format,col

In [8]:
# in Python
preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
.coalesce(5)

In [9]:
trainDataFrame = preppedDataFrame\
.where("InvoiceDate < '2011-07-01'")

In [10]:
testDataFrame = preppedDataFrame\
.where("InvoiceDate >= '2011-07-01'")

In [11]:
trainDataFrame.count()

245903

In [12]:
testDataFrame.count()

296006

In [13]:
# in python
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
.setInputCol("day_of_week")\
.setOutputCol("day_of_week_index")

In [14]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
.setInputCol("day_of_week_index")\
.setOutputCol("day_of_week_encoded")

In [15]:
# vector assembler
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler()\
.setInputCols(["UnitPrice","Quantity","day_of_week_encoded"])\
.setOutputCol("features")

In [16]:
# pipeline 
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
.setStages([indexer, encoder, vectorAssembler])

In [17]:
# fit
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [18]:
# transformed training
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [19]:
transformedTraining.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

In [20]:
from pyspark.ml.clustering import KMeans

In [22]:
kmeans = KMeans()\
.setK(20)\
.setSeed(1)

In [23]:
kmodel = kmeans.fit(transformedTraining)

In [24]:
kmodel.computeCost(transformedTraining)

84553739.96537484

In [25]:
transformedTest = fittedPipeline.transform(testDataFrame)

In [26]:
kmodel.computeCost(transformedTest)

517507094.7222117

In [27]:
from pyspark.sql import Row

In [28]:
spark.sparkContext.parallelize([Row(1),Row(2),Row(3)]).toDF()

DataFrame[_1: bigint]