# A Tour of Spark tool set

### Datasets: Type-Safe Structured APIs
- The Dataset API is not available in Python and R, because those languages are dynamically typed.
- The Dataset API gives users the ability to assign a Java/Scala class to the records within a DataFrame and manipulate it as a collection of typed objects.
- The APIs available on Datasets are type-safe, meaning that you cannot accidentally view the objects in a Dataset as being of another class than the class you put in initially.
- One final advantage is that when you call collect or take on a Dataset, it will collect objects of the proper type in your Dataset, not DataFrame Rows.

### Structured Streaming
- High-level API for stream processing in Spark 2.2
- You can take the same operations that you perform in batch mode using Spark’s structured APIs and run them in a streaming fashion.
- Allows you to rapidly and quickly extract value out of streaming systems with virtually no code changes.


In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .appName("Structured Streaming")\
        .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", "5")

In [6]:

staticDataFrame = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("../data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

                                                                                

In [7]:
from pyspark.sql.functions import window, column, desc, col

In [8]:
staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
      .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
        .sum("total_cost")\
          .show(5)



+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   14075.0|{2011-12-05 07:00...|316.78000000000003|
|   18180.0|{2011-12-05 07:00...|            310.73|
|   15358.0|{2011-12-05 07:00...| 830.0600000000003|
|   15392.0|{2011-12-05 07:00...|304.40999999999997|
|   15290.0|{2011-12-05 07:00...|263.02000000000004|
+----------+--------------------+------------------+
only showing top 5 rows



                                                                                

In [11]:
streamingDataFrame = spark.readStream\
  .schema(staticSchema)\
    .option("maxFilePerTrigger", 1)\
      .format("csv")\
        .option("header", "true")\
          .load("../data/retail-data/by-day/*.csv")

streamingDataFrame.isStreaming

                                                                                

True

In [12]:
purchaseByCustomerPerHour = streamingDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
      .groupBy(
        col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
        .sum("total_cost")

In [14]:
purchaseByCustomerPerHour.writeStream\
  .format("memory")\
    .queryName("customer_purchases")\
      .outputMode("complete")\
        .start()

24/08/05 09:33:10 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/2y/q6ddzlk97yv4jfwcmb5byk8h0000gn/T/temporary-77dfe0eb-1407-42f1-b438-f4fd4a6ceaba. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/08/05 09:33:10 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.query.StreamingQuery at 0x107f1b260>

24/08/05 09:33:13 WARN FileStreamSource: Listed 305 file(s) in 2724 ms          
24/08/05 09:33:31 WARN FileStreamSource: Listed 305 file(s) in 4664 ms          
24/08/05 09:33:33 WARN FileStreamSource: Listed 305 file(s) in 2204 ms          
24/08/05 09:33:36 WARN FileStreamSource: Listed 305 file(s) in 2660 ms          
24/08/05 09:33:38 WARN FileStreamSource: Listed 305 file(s) in 2308 ms          
24/08/05 09:33:40 WARN FileStreamSource: Listed 305 file(s) in 2341 ms          
24/08/05 09:33:43 WARN FileStreamSource: Listed 305 file(s) in 2255 ms          
24/08/05 09:33:45 WARN FileStreamSource: Listed 305 file(s) in 2399 ms          
                                                                                

In [15]:
spark.sql("""
  SELECT * FROM customer_purchases
  ORDER BY `sum(total_cost)` DESC
  """)\
    .show(10)

24/08/05 09:41:33 WARN FileStreamSource: Listed 305 file(s) in 2367 ms          

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   17450.0|{2011-09-20 07:00...|          71601.44|
|      NULL|{2011-11-14 07:00...|          55316.08|
|      NULL|{2011-11-07 07:00...|          42939.17|
|      NULL|{2011-03-29 07:00...| 33521.39999999998|
|      NULL|{2011-12-08 07:00...|31975.590000000007|
|   18102.0|{2011-09-15 07:00...|31661.540000000005|
|      NULL|{2010-12-21 07:00...|31347.479999999938|
|   18102.0|{2011-10-21 07:00...|          29693.82|
|   18102.0|{2010-12-07 07:00...|          25920.37|
|   14646.0|{2011-10-20 07:00...|25833.559999999994|
+----------+--------------------+------------------+
only showing top 10 rows



                                                                                

24/08/05 09:41:42 WARN FileStreamSource: Listed 305 file(s) in 2133 ms

### Machine Learning and Advanced Analytics
- MLlib allows for preprocessing, munging, training of models, and making predictions at scale on data.
- You can even use models trained in MLlib to make predictions in Strucutred Streaming.

In [16]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)





                                                                                

In [18]:
from pyspark.sql.functions import date_format, col

preppedDataFrame = staticDataFrame\
  .na.fill(0)\
    .withColumn("day_of_week", date_format(col("InvoiceDate"), "EEE"))\
      .coalesce(5)

                                                                                

                                                                                

Split the data into training and test sets

In [19]:

trainDataFrame = preppedDataFrame\
    .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
    .where("InvoiceDate >= '2011-07-01'")

                                                                                

                                                                                

In [20]:
trainDataFrame.count()
testDataFrame.count()

24/08/05 10:05:51 WARN FileStreamSource: Listed 305 file(s) in 3667 ms          
                                                                                

296006

                                                                                

In [21]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
    .setInputCol("day_of_week")\
    .setOutputCol("day_of_week_index")

24/08/05 10:29:28 WARN FileStreamSource: Listed 305 file(s) in 2425 ms          
                                                                                

In [None]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")



In [23]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
    .setOutputCol("features")



                                                                                

In [24]:
from pyspark.ml import Pipeline

transformationPipeline = Pipeline()\
  .setStages([
    indexer, encoder, vectorAssembler
  ])



In [25]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)
transformedTraining = fittedPipeline.transform(trainDataFrame)
transformedTraining.cache()

24/08/05 10:45:56 WARN FileStreamSource: Listed 305 file(s) in 2398 ms          
24/08/05 10:46:01 WARN FileStreamSource: Listed 305 file(s) in 4045 ms          
                                                                                

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

                                                                                

In [27]:
from pyspark.ml.clustering import KMeans 

kmeans = KMeans()\
  .setK(20)\
    .setSeed(1)

                                                                                

                                                                                

In [28]:
kmModel = kmeans.fit(transformedTraining)

24/08/05 10:54:08 WARN FileStreamSource: Listed 305 file(s) in 2803 ms          
24/08/05 10:54:26 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

                                                                                

In [31]:
transformedTest = fittedPipeline.transform(testDataFrame)
training_cost = kmModel.summary.trainingCost
print(training_cost)

                                                                                

75301118.84698008


24/08/05 11:03:40 WARN FileStreamSource: Listed 305 file(s) in 2067 ms          
24/08/05 11:06:13 WARN FileStreamSource: Listed 305 file(s) in 2069 ms          

### Lower-Level APIs

In [32]:
from pyspark.sql import Row
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()

                                                                                

DataFrame[_1: bigint]

24/08/05 11:18:23 WARN FileStreamSource: Listed 305 file(s) in 2187 ms          
24/08/05 11:18:25 WARN FileStreamSource: Listed 305 file(s) in 2445 ms          
24/08/05 11:18:31 WARN FileStreamSource: Listed 305 file(s) in 2543 ms          
24/08/05 11:19:11 WARN FileStreamSource: Listed 305 file(s) in 2032 ms          
24/08/05 11:19:17 WARN FileStreamSource: Listed 305 file(s) in 2520 ms          
24/08/05 11:19:21 WARN FileStreamSource: Listed 305 file(s) in 2293 ms          
24/08/05 11:19:24 WARN FileStreamSource: Listed 305 file(s) in 2699 ms          
24/08/05 11:19:26 WARN FileStreamSource: Listed 305 file(s) in 2535 ms          
24/08/05 11:21:34 WARN FileStreamSource: Listed 305 file(s) in 2150 ms          
24/08/05 11:21:41 WARN FileStreamSource: Listed 305 file(s) in 2333 ms          
24/08/05 11:21:54 WARN FileStreamSource: Listed 305 file(s) in 2306 ms          
24/08/05 11:23:15 WARN FileStreamSource: Listed 305 file(s) in 2547 ms          
24/08/05 11:23:24 WARN FileS