# Chapter 3
## A Tour of Sparks Toolset

### Imports

In [1]:
import time

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import window, column, desc, col, date_format

from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import (StringIndexer,
                                OneHotEncoder,
                                VectorAssembler)

### Instantiate a Spark Session

In [2]:
spark = SparkSession.builder.appName("Chapter3").getOrCreate()

### Files

In [3]:
retailData = "Spark-The-Definitive-Guide/data/retail-data/by-day/*.csv"

### Load Files Into DataFrames

In [4]:
staticDataFrame = spark.read.format("csv")\
                      .option("header", "true")\
                      .option("inferSchema", "true")\
                      .load(retailData)

In [5]:
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

staticSchema

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,TimestampType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

In [6]:
staticDataFrame\
      .selectExpr(
        "CustomerId",
        "(UnitPrice * Quantity) as total_cost",
        "InvoiceDate")\
      .groupBy(
        col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
      .sum("total_cost")\
      .sort(desc("sum(total_cost)"))\
      .show(5)


+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17450.0|[2011-09-19 21:00...|          71601.44|
|      null|[2011-11-13 20:00...|          55316.08|
|      null|[2011-11-06 20:00...|          42939.17|
|      null|[2011-03-28 21:00...| 33521.39999999998|
|      null|[2011-12-07 20:00...|31975.590000000007|
+----------+--------------------+------------------+
only showing top 5 rows



In [7]:
streamingDataFrame = spark.readStream\
                            .schema(staticSchema)\
                            .option("maxFilesPerTrigger", 1)\
                            .format("csv")\
                            .option("header", "true")\
                            .load(retailData)

In [8]:
purchaseByCustomerPerHour = streamingDataFrame\
                                  .selectExpr(
                                    "CustomerId",
                                    "(UnitPrice * Quantity) as total_cost",
                                    "InvoiceDate")\
                                  .groupBy(
                                    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
                                  .sum("total_cost")

In [9]:
purchaseByCustomerPerHour.writeStream\
                        .format("memory")\
                        .queryName("customer_purchases")\
                        .outputMode("complete")\
                        .start()

<pyspark.sql.streaming.StreamingQuery at 0x11c24c790>

In [10]:
time.sleep(15) # to allow time for the data to start streaming in

spark.sql("""
          SELECT *
          FROM customer_purchases
          ORDER BY `sum(total_cost)` DESC
          """)\
          .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   12415.0|[2011-03-02 20:00...|          16558.14|
|      null|[2011-03-02 20:00...| 3538.750000000001|
|   17416.0|[2011-03-02 20:00...|           2114.71|
|   18102.0|[2011-03-02 20:00...|            1396.0|
|   16709.0|[2011-03-02 20:00...|1120.5300000000002|
+----------+--------------------+------------------+
only showing top 5 rows



In [11]:
streamingDataFrame.isStreaming

True

In [12]:
spark.sql("SELECT * FROM customer_purchases").show()

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17567.0|[2011-03-02 20:00...|            535.38|
|   18218.0|[2011-03-02 20:00...|            309.38|
|   12565.0|[2011-03-02 20:00...|            -14.75|
|   15932.0|[2011-03-02 20:00...|             -7.65|
|   18102.0|[2011-03-02 20:00...|            1396.0|
|   12500.0|[2011-03-02 20:00...|            249.84|
|   15257.0|[2011-03-02 20:00...|             -85.0|
|   15719.0|[2011-03-02 20:00...|            183.65|
|   17856.0|[2011-03-02 20:00...|482.81000000000006|
|   17017.0|[2011-03-02 20:00...|            -10.15|
|   13630.0|[2011-03-02 20:00...|             -14.4|
|   13476.0|[2011-03-02 20:00...| 727.5999999999999|
|   14239.0|[2011-03-02 20:00...|             -56.1|
|   13715.0|[2011-03-02 20:00...|            137.49|
|   17873.0|[2011-03-02 20:00...|485.78000000000003|
|   15005.0|[2011-03-02 20:00...|            2

In [13]:
preppedDataFrame = staticDataFrame\
                      .na.fill(0)\
                      .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
                      .coalesce(5)

In [14]:
trainDataFrame = preppedDataFrame\
                      .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
                      .where("InvoiceDate >= '2011-07-01'")

In [15]:
indexer = StringIndexer()\
              .setInputCol("day_of_week")\
              .setOutputCol("day_of_week_index")

encoder = OneHotEncoder()\
              .setInputCol("day_of_week_index")\
              .setOutputCol("day_of_week_encoded")

vectorAssembler = VectorAssembler()\
              .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
              .setOutputCol("features")

transformationPipeline = Pipeline()\
              .setStages([indexer, 
                          encoder, 
                          vectorAssembler])

In [16]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [17]:
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [18]:
transformedTraining.show(1, vertical=True, truncate=False)

-RECORD 0-----------------------------------------
 InvoiceNo           | 537226                     
 StockCode           | 22811                      
 Description         | SET OF 6 T-LIGHTS CACTI    
 Quantity            | 6                          
 InvoiceDate         | 2010-12-06 08:34:00        
 UnitPrice           | 2.95                       
 CustomerID          | 15987.0                    
 Country             | United Kingdom             
 day_of_week         | Monday                     
 day_of_week_index   | 2.0                        
 day_of_week_encoded | (5,[2],[1.0])              
 features            | (7,[0,1,4],[2.95,6.0,1.0]) 
only showing top 1 row



In [19]:
kmeans = KMeans()\
  .setK(20)\
  .setSeed(1)

In [20]:
kmModel = kmeans.fit(transformedTraining)

In [21]:
transformedTest = fittedPipeline.transform(testDataFrame)

In [22]:
transformedTest.show(1, vertical=True, truncate=False)

-RECORD 0------------------------------------------
 InvoiceNo           | 580538                      
 StockCode           | 23084                       
 Description         | RABBIT NIGHT LIGHT          
 Quantity            | 48                          
 InvoiceDate         | 2011-12-05 08:38:00         
 UnitPrice           | 1.79                        
 CustomerID          | 14075.0                     
 Country             | United Kingdom              
 day_of_week         | Monday                      
 day_of_week_index   | 2.0                         
 day_of_week_encoded | (5,[2],[1.0])               
 features            | (7,[0,1,4],[1.79,48.0,1.0]) 
only showing top 1 row



In [23]:
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()

DataFrame[_1: bigint]