# A tour of Sparks's Toolset
&rightarrow; Spark is composed of primitives - the lower-level APIs and the Structured APIs - and then a series of standard libraries for additional functionality
![image](Images/3_1.png)
&rightarrow; Spark's libraries support a variety of different tasks, from graph analysisi and machine learning to streaming and integrations with a host of computing and storage systems

## Overview
1. Running production applications with __spark-submit__
2. Datasets: type-safe APIs for structured data
3. Structured Straming
4. Machine learning and advanced analytics
5. Resilient Distributed Datasets(RDD): Spark's low level APIs
6. Spark R
7. The third-party package ecosystem

## Running Production Applications
+ Spark makes it easy to develop and create big data programs
+ It also makes it easy to turn interactive exploration into production applications with __spark-submit__, a built-in command-line tool.
+ __Spark-submit__ lets us send application code to a cluster and launch it to execute there.
+ Upon submission, the application will run until it exists with completion or encounters an error.
+ This can be used with all of Spark's support cluster managers:
    + Standalone
    + Mesos
    + YARN
+ __Spark-submit__ offers several controls with which we can specify the resources our application needs as well as how it should be run and its command-line arguments
+ __Spark-submit__ can be used with any applications written in any of Spark's supported language.

In [1]:
! spark-submit --master local ../../spark-2.1.1-bin-hadoop2.7/examples/src/main/python/pi.py 10

20/06/25 00:33:21 WARN Utils: Your hostname, raj-Predator-G3-572 resolves to a loopback address: 127.0.1.1; using 192.168.1.118 instead (on interface enp3s0f1)
20/06/25 00:33:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
20/06/25 00:33:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/06/25 00:33:22 INFO SparkContext: Running Spark version 2.4.5
20/06/25 00:33:22 INFO SparkContext: Submitted application: PythonPi
20/06/25 00:33:22 INFO SecurityManager: Changing view acls to: raj
20/06/25 00:33:22 INFO SecurityManager: Changing modify acls to: raj
20/06/25 00:33:22 INFO SecurityManager: Changing view acls groups to: 
20/06/25 00:33:22 INFO SecurityManager: Changing modify acls groups to: 
20/06/25 00:33:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with vie

## Datasets: Type-Safe Structured APIs
+ The Dataset API is used for writing statically typed code (that does type checking at compile time as opposed to runtime) in Java and Scala.
+ __The Dataset API is not available in Python and R, because those languages are dynamically typed.__
+ DataFrames are a distributed collection of objects of type Row that can hold various types of tabular data. The Dataset API gives users the ability to assign a Java/Scala class to the records within a DataFrame and manipulate it as a collection of typed objects, similar to a Java _ArrayList_ or Scala _Seq_.
+ The APIs available on Datasets are __type-safe__, meaning that we cannot accidentally view the objects in a Dataset as being of another class than the class you put in initially.
+ With it, we can define our own data type and manipulate it via arbitrary functions. After we've performed our manipulations, Spark can automatically turn it back into a DataFrame, and we can manipulate it further by using the hundreds of functions that Spark includes. This makes it easy to drop down to lower level, perform type-safe coding when necessary, and move higher up to SQL for more rapid analysis.

## Structured Streaming
+ Structured Steaming is a high-level API for steam processing.
+ With Structured Steaming, we can take the same operations that we perform in batch mode using Spark's structured APIs and run them in streaming fachion.
+ This can reduce latency and allow for incremental processing.
+ It allows us to rapidly and quickly extract value out of straming systems with virtually no code changes.
+ It also makes it easy to conceptualize because we can write our batch job as a way to prototype it and then convert it to a streaming job.
+ The way all of this works is by incrementally processing that data.



In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("chapter3").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions","5")

In [3]:
retail_dataset_path = "../data/retail-data/by-day/*.csv"

In [4]:
staticDataFrame = spark.read.format("csv").option("header", "true").option("inferSchema","true").load(retail_dataset_path)

In [5]:
# staticDataFrame = spark.read.option("header","true").option("inferSchema","true").csv(retail_dataset_path)

In [6]:
staticDataFrame.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA

In [7]:
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [8]:
from pyspark.sql.functions import window, column, desc, col, dayofmonth, month
staticDataFrame\
.selectExpr(
    "CustomerId",
    "(UnitPrice*Quantity) as total_cost",
    "InvoiceDate")\
.groupBy(
    col("CustomerId"),
    window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show()

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   14075.0|[2011-12-05 05:45...|316.78000000000003|
|   18180.0|[2011-12-05 05:45...|            310.73|
|   15358.0|[2011-12-05 05:45...| 830.0600000000003|
|   15392.0|[2011-12-05 05:45...|304.40999999999997|
|   15290.0|[2011-12-05 05:45...|263.02000000000004|
|   16811.0|[2011-12-05 05:45...|             232.3|
|   12748.0|[2011-12-05 05:45...| 363.7899999999999|
|   16500.0|[2011-12-05 05:45...| 52.74000000000001|
|   16873.0|[2011-12-05 05:45...|1854.8300000000002|
|   14060.0|[2011-12-05 05:45...|297.47999999999996|
|   14649.0|[2011-12-05 05:45...| 513.9899999999998|
|   16904.0|[2011-12-05 05:45...| 349.0200000000001|
|   17857.0|[2011-12-05 05:45...|            2979.6|
|   14083.0|[2011-12-05 05:45...| 446.5700000000001|
|   14777.0|[2011-12-05 05:45...|             -2.95|
|   16684.0|[2011-12-05 05:45...| 5401.9799999

In [9]:
# spark.sql("with tmp as (select CustomerId, (UnitPrice*Quantity) as total_cost, InvoiceDate from retail_data) select CustomerId, sum(total_cost) as total_cost from tmp group by CustomerId").show()

### Streaming code
+ Changes
    + Use __readStream__ instead of read
    + __maxFilesPerTrigger__ option which simply specifies the number of files we should read in at once.(This is to make demonstration more "streaming" and in a production scenarios this woul probably be omitted)

In [10]:
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load(retail_dataset_path)

We can see whether our DataFrame is streaming using __.isStreaming__ param

In [11]:
streamingDataFrame.isStreaming

True

In [12]:
purchaseByCustomerPerHour = streamingDataFrame.\
selectExpr(
    "CustomerId",
    "(UnitPrice*Quantity) as total_cost",
    "InvoiceDate")\
.groupBy(
    col("CustomerId"),
    window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")

+ This is still a lazy operation, so we will need to call a streaming action to start the execution of this data flow.
+ Sreaming actions are a bit different form our conventional static action because we're going to be populating data somewhere instead of just calling something like count(which doesn't make any sense on a stream anyways.
+ __The action we will use will output to an in-memory table that we will update after each trigger. In this case, each trigger is based on an individual file ( the read option that we set).__
+ __Spark will mutate the data in the in-memory table such that we will always have the highest value as specified in our previous aggregation.__

In [13]:
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f0894d2be10>

When we start the steam, we can run queries against it to debug what our result will look lik if we were to write this out to a production sink:

In [14]:
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY 'sum(total_cost)' DESC
""")\
.show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   15671.0|[2011-04-11 05:45...|375.96000000000004|
|   17576.0|[2010-12-13 05:45...|177.35000000000002|
|   13240.0|[2011-03-27 05:45...|218.33999999999997|
|   14911.0|[2011-03-11 05:45...|               0.0|
|   13668.0|[2011-04-11 05:45...|            132.44|
+----------+--------------------+------------------+
only showing top 5 rows



We'll notice that the composition of our table changes as we read in more data With each file, the results might or might not be changing based on the data.  
Another option is to write the results out to the console

In [15]:
purchaseByCustomerPerHour.writeStream\
.format("console")\
.queryName("customer_purchases_2")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f0894d3f0b8>

__Notice how this window is built on event time, as well, not th time at which Spark process the data. This was one of the shortcomings of Spark Sreaming that Structured Sreaming has resolved__

## Machine Learning and Advanced Analytics
&rightarrow; Another popular aspect of Spark is its ability to perform large-scale machine learning with a built-in library of machine algorithms called __MLlib__.
+ MLlib allows for preprocessing , munging, training of models, and make predictions at scale on data.
+ Models trained in MLlib can be used to make predictions in Structured Streaming
+ Spark provides a sophisticated machine learning API for performaing a variety of machine learning tasks, from classification to regression, and clustering to deep learning.

### K-Means
&rightarrow; Machine learning algorithms in MLlib require that data is represented as numberical values. We need to transform data of other types into some numerical representation for which we will use several DataFrame transformations.

In [16]:
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn("day_of_week", date_format(col("InvoiceDate"),"EEEE"))\
.coalesce(5)

In [17]:
# Splitting Data set based on InvoiceDate
trainDataFrame = preppedDataFrame\
.where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
.where("InvoiceDate >= '2011-07-01'")

In [18]:
trainDataFrame.count()

245903

In [19]:
testDataFrame.count()

296006

#### StringIndexer
&rightarrow; Spark's MLlib provides a number of transformations, with which we can automate some of our general transformations. One of such transformer is a __StringIndexer__.

In [20]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
.setInputCol("day_of_week")\
.setOutputCol("day_of_week_index")

This will trun our days of weeks into corresponding numerical values.

#### OneHotEncoder

In [21]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
.setInputCol("day_of_week_index")\
.setOutputCol("day_of_week_encoded")

Each of these will result in a set of columns that we will "assemble" into a vector.  
__All machine learning algorithms in Spark take as input a Vector type, which must be a set of numerical values:__

In [22]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler()\
.setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
.setOutputCol("features")

Here, we have three key features: the price, the quantity, and the day of week. Next we will set this up into a __pipeline so that any future data we need to transform can go through the same process__

In [23]:
from pyspark.ml import Pipeline
transformationPipeline = Pipeline()\
.setStages([indexer, encoder, vectorAssembler])

Preparing for training is a two-step process:  
+ First, we need to fit our transformers to this dataset; they need to know the uniques values present to be indexed.
+ After that, Spark can encode the values

In [24]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

After we fit the training data, we are ready to take that fitted pipeline and use it to transform all of our data in a consistent and repeatable way

In [25]:
transformedTraining = fittedPipeline.transform(trainDataFrame)

We could have included our model training in our pipeline.  
But for the hyperparameter tunning on the model we don't do because we don't want to repeat the exact same transformations over and over again.  
We'll use __caching__, an optimization which will put a copy of the intermediately transformed dataset into memory, allowing us to repeatedly access it at much lower cost than running the entire pipeline again. 

##### Without caching

In [26]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
.setK(20)\
.setSeed(1)

MLLib's DataFrame API follow the naming pattern of
+ __Algorithm__ for the unntrained version like __KMeans__
+ __AlgorithmModel__ for the trained version like __KMeansModel__

In [27]:
%timeit -n 1 -r 1 kmeans.fit(transformedTraining)

14.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [28]:
kmModel = kmeans.fit(transformedTraining)

##### With caching

In [29]:
transformedTraining.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

In [30]:
from pyspark.ml.clustering import KMeans
cachedkmeans = KMeans()\
.setK(20)\
.setSeed(1)

In [31]:
%timeit -n 1 -r 1 cachedkmeans.fit(transformedTraining)

9.87 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [32]:
cachedkmModel = cachedkmeans.fit(transformedTraining)

#### Evaluation

In [33]:
from pyspark.ml.evaluation import ClusteringEvaluator

In [34]:
evaluator = ClusteringEvaluator()
predictions = kmModel.transform(transformedTraining)
evaluator.evaluate(predictions)

0.6842576726028763

In [35]:
transformedTest = fittedPipeline.transform(testDataFrame)
predictions = kmModel.transform(transformedTest)
evaluator.evaluate(predictions)

0.5427938390491535

## Lower-Level APIs 
+ Spark includes a number of lower-level primitives to allow for arbitrary Java and Python object manipulation via Resilient Distributed Datasets(RDDs). 
+ Virtually everything in Spark is built on top of RDDs.
+ DataFrame operations are built on top of RDDs and compile down to these lower-level tools for convenient and extremely efficient distributed execution.
+ There are some things for which we might use RDDs, __especially when we're reading or manipulating raw data__, but for most part we should stick to the __Structured APIs__.
+ RDDs are lower level than DataFrames because they reveal physical execution characteristics (like partitions) to end users.
+ __One thing that we might use RDDs for is to parallelize raw data that we have stored in memory on the driver machine.__

In [36]:
from pyspark.sql import Row

In [37]:
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF().show()

+---+
| _1|
+---+
|  1|
|  2|
|  3|
+---+

