# Spark ML - AAS 1904

A great book: Introduction to Statistical Learning by Gareth James - free

Machine learning is a method of data analysis that automates analytical model building. Using algorithms that iteratively learn from data, machine learning allows computers to find hidden insights without being explicitly programmed where to look.

What can ML be used for:
1. Recommendation Engines
2. Customer Segmentation
3. Text Sentiment Analysis
4. Predicting Customer Churn
5. Pattern and image recognition.
6. Email spam filtering.
7. Financial Modeling
8. Fraud detection.
9. Web search results.
10. Real-time ads on web pages 
11. Credit scoring and next-best offers.
12. Prediction of equipment failures.
13. New pricing models.
14. Network intrusion detection.

## Spark’s MLlib is mainly designed for Supervised and Unsupervised Learning tasks, with most of its algorithms falling under those two categories.
spark.mllib: API original, based on RDDs
spark.ml: API (new), basad on DataFrames



### Supervised learning algorithms are trained using labeled examples, such as an input where the desired output is known. The learning algorithm receives a set of inputs along with the corresponding correct outputs, and the algorithm learns by comparing its actual output with correct outputs to find errors. It then modifies the model accordingly. 
#### Through methods like classification, regression, prediction and gradient boosting, supervised learning uses patterns to predict the values of the label on additional unlabeled data. Supervised learning is commonly used in applications where historical data predicts likely future events.


### Unsupervised learning is used against data that has no historical labels. The system is not told the "right answer." The algorithm must figure out what is being shown. The goal is to explore the data and find some structure within.
#### Popular techniques include self-organizing maps, nearest-neighbor mapping, k-means clustering and singular value decomposition. One issue is that it can be difficult to evaluate results of an unsupervised model!




### One of the biggest constraints of MLlib is that we need to format our data in 1 or 2 columns
### 1. Features, Labels (Supervised)
### 2. Features (Unsupervised)

This requires a little more data processing work than some other machine learning libraries, but the big upside is that this exact same syntax works with distributed data, which is no small feat for what is going on “under the hood”!

# A huge part of learning MLlib is getting comfortable with the documentation! 

# Being able to master the skill of finding information (not memorization) is the key to becoming a great Spark and Python developer!









## Data Transformations

You won't always get data in a convienent format, often you will have to deal with data that is non-numerical, such as customer names, or zipcodes, country names, etc...

A big part of working with data is using your own domain knowledge to build an intuition of how to deal with the data, sometimes the best course of action is to drop the data, other times feature-engineering is a good way to go, or you could try to transform the data into something the Machine Learning Algorithms will understand.

Spark has several built in methods of dealing with thse transformations, check them all out here: http://spark.apache.org/docs/latest/ml-features.html



In [1]:
from pyspark import SparkConf, HiveContext, SparkContext
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark.sql import functions as F

# Set Spark Context

conf = SparkConf()
conf.set('spark.executor.memory','10g')\
    .set('spark.yarn.queue', 'root.odfgsbx2_q1')\
    .set('spark.executor.cores','5')\
    .set('spark.executor.instances','3')\
    
spark = SparkSession.builder.appName('DSU') \
    .config(conf=conf) \
    .getOrCreate()
sqlContext = SQLContext(spark.sparkContext)
# Default Spark context
#sc, sqlContext = setupSparkContext()



In [2]:

df = spark.read.csv("maprfs:///datalake/optum/optuminsight/sandbox3/dsu/nilay_bhatt/fake_customers.csv",inferSchema=True,header=True)

In [3]:
df.show()

+-------+----------+-----+
|   Name|     Phone|Group|
+-------+----------+-----+
|   John|4085552424|    A|
|   Mike|3105552738|    B|
| Cassie|4085552424|    B|
|  Laura|3105552438|    B|
|  Sarah|4085551234|    A|
|  David|3105557463|    C|
|   Zach|4085553987|    C|
|  Kiera|3105552938|    A|
|  Alexa|4085559467|    C|
|Karissa|3105553475|    A|
+-------+----------+-----+



## StringIndexer

We often have to convert string information into numerical information as a categorical feature. This is easily done with the StringIndexer Method:

In [4]:
from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["user_id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()

+-------+--------+-------------+
|user_id|category|categoryIndex|
+-------+--------+-------------+
|      0|       a|          0.0|
|      1|       b|          2.0|
|      2|       c|          1.0|
|      3|       a|          0.0|
|      4|       a|          0.0|
|      5|       c|          1.0|
+-------+--------+-------------+



### VectorIndexer

VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees. VectorAssembler accepts the following input column types: all numeric types, boolean type, and vector type. In each row, the values of the input columns will be concatenated into a vector in the specified order. 

Assume that we have a DataFrame with the columns id, hour, mobile, userFeatures, and clicked:

     id | hour | mobile | userFeatures     | clicked
    ----|------|--------|------------------|---------
     0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0
     
userFeatures is a vector column that contains three user features. We want to combine hour, mobile, and userFeatures into a single feature vector called features and use it to predict clicked or not. If we set VectorAssembler’s input columns to hour, mobile, and userFeatures and output column to features, after transformation we should get the following DataFrame:

     id | hour | mobile | userFeatures     | clicked | features
    ----|------|--------|------------------|---------|-----------------------------
     0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

In [5]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])
dataset.show()

+---+----+------+--------------+-------+
| id|hour|mobile|  userFeatures|clicked|
+---+----+------+--------------+-------+
|  0|  18|   1.0|[0.0,10.0,0.5]|    1.0|
+---+----+------+--------------+-------+



In [6]:
assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show()

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+--------------------+-------+
|            features|clicked|
+--------------------+-------+
|[18.0,1.0,0.0,10....|    1.0|
+--------------------+-------+



## Linear Regression

In [7]:
from pyspark.ml.regression import LinearRegression

In [8]:
# Use Spark to read in the Ecommerce Customers csv file.
data = spark.read.csv("maprfs:///datalake/optum/optuminsight/sandbox3/dsu/nilay_bhatt/Ecommerce_Customers.csv",inferSchema=True,header=True)

In [9]:
# Print the Schema of the DataFrame
data.printSchema()

root
 |-- Email: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Avatar: string (nullable = true)
 |-- Avg Session Length: double (nullable = true)
 |-- Time on App: double (nullable = true)
 |-- Time on Website: double (nullable = true)
 |-- Length of Membership: double (nullable = true)
 |-- Yearly Amount Spent: double (nullable = true)



In [10]:
data.show(2)

+--------------------+--------------------+---------+------------------+------------------+------------------+--------------------+-------------------+
|               Email|             Address|   Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|
+--------------------+--------------------+---------+------------------+------------------+------------------+--------------------+-------------------+
|mstephenson@ferna...|835 Frank TunnelW...|   Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|
|   hduke@hotmail.com|4547 Archer Commo...|DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|
+--------------------+--------------------+---------+------------------+------------------+------------------+--------------------+-------------------+
only showing top 2 rows



In [11]:
for item in data.head():
    print(item)

mstephenson@fernandez.com
835 Frank TunnelWrightmouth, MI 82180-9605
Violet
34.49726772511229
12.65565114916675
39.57766801952616
4.0826206329529615
587.9510539684005


## Set Up DataFrame for Machine Learning

In [12]:
# A few things we need to do before Spark can accept the data!
# It needs to be in the form of two columns
# ("label","features")

# Import VectorAssembler and Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [13]:
data.columns

['Email',
 'Address',
 'Avatar',
 'Avg Session Length',
 'Time on App',
 'Time on Website',
 'Length of Membership',
 'Yearly Amount Spent']

In [14]:
assembler = VectorAssembler(
    inputCols=["Avg Session Length", "Time on App", 
               "Time on Website",'Length of Membership'],
    outputCol="features")

In [15]:
output = assembler.transform(data)

In [16]:
output.select("features").show()

+--------------------+
|            features|
+--------------------+
|[34.4972677251122...|
|[31.9262720263601...|
|[33.0009147556426...|
|[34.3055566297555...|
|[33.3306725236463...|
|[33.8710378793419...|
|[32.0215955013870...|
|[32.7391429383803...|
|[33.9877728956856...|
|[31.9365486184489...|
|[33.9925727749537...|
|[33.8793608248049...|
|[29.5324289670579...|
|[33.1903340437226...|
|[32.3879758531538...|
|[30.7377203726281...|
|[32.1253868972878...|
|[32.3388993230671...|
|[32.1878120459321...|
|[32.6178560628234...|
+--------------------+
only showing top 20 rows



In [17]:
output.show()

+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+--------------------+
|               Email|             Address|          Avatar|Avg Session Length|       Time on App|   Time on Website|Length of Membership|Yearly Amount Spent|            features|
+--------------------+--------------------+----------------+------------------+------------------+------------------+--------------------+-------------------+--------------------+
|mstephenson@ferna...|835 Frank TunnelW...|          Violet| 34.49726772511229| 12.65565114916675| 39.57766801952616|  4.0826206329529615|  587.9510539684005|[34.4972677251122...|
|   hduke@hotmail.com|4547 Archer Commo...|       DarkGreen| 31.92627202636016|11.109460728682564|37.268958868297744|    2.66403418213262|  392.2049334443264|[31.9262720263601...|
|    pallen@yahoo.com|24645 Valerie Uni...|          Bisque|33.000914755642675|11.330278057777512|37

In [18]:
final_data = output.select("features",'Yearly Amount Spent')

In [19]:
final_data.show()

+--------------------+-------------------+
|            features|Yearly Amount Spent|
+--------------------+-------------------+
|[34.4972677251122...|  587.9510539684005|
|[31.9262720263601...|  392.2049334443264|
|[33.0009147556426...| 487.54750486747207|
|[34.3055566297555...|  581.8523440352177|
|[33.3306725236463...|  599.4060920457634|
|[33.8710378793419...|   637.102447915074|
|[32.0215955013870...|  521.5721747578274|
|[32.7391429383803...|  549.9041461052942|
|[33.9877728956856...|  570.2004089636196|
|[31.9365486184489...|  427.1993848953282|
|[33.9925727749537...|  492.6060127179966|
|[33.8793608248049...|  522.3374046069357|
|[29.5324289670579...|  408.6403510726275|
|[33.1903340437226...|  573.4158673313865|
|[32.3879758531538...|  470.4527333009554|
|[30.7377203726281...|  461.7807421962299|
|[32.1253868972878...| 457.84769594494855|
|[32.3388993230671...| 407.70454754954415|
|[32.1878120459321...|  452.3156754800354|
|[32.6178560628234...|   605.061038804892|
+----------

In [20]:
train_data,test_data = final_data.randomSplit([0.7,0.3])

In [21]:
train_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                343|
|   mean| 503.22316239354274|
| stddev|  77.37342981354372|
|    min|  304.1355915788555|
|    max|  765.5184619388373|
+-------+-------------------+



In [22]:
test_data.describe().show()

+-------+-------------------+
|summary|Yearly Amount Spent|
+-------+-------------------+
|  count|                157|
|   mean|  490.7737224733133|
| stddev|  83.01221647152195|
|    min| 256.67058229005585|
|    max|  689.7876041747194|
+-------+-------------------+



In [23]:
# Create a Linear Regression Model object
lr = LinearRegression(labelCol='Yearly Amount Spent')

In [24]:
# Fit the model to the data and call this model lrModel
lrModel = lr.fit(train_data,)

In [25]:
# Print the coefficients and intercept for linear regression
print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

Coefficients: [26.02776052470331,38.83893957874019,0.5770541918307003,61.47293911330216] Intercept: -1067.4262288892114


In [26]:
test_results = lrModel.evaluate(test_data)

In [27]:
# Interesting results....
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| 11.339367763896234|
| -4.686906872263364|
| 11.066548932104183|
| -16.84027780323015|
|-5.6855573375895005|
| 22.583629831963208|
| -7.307975684606959|
| -4.525825948280556|
| -9.140520015767095|
|  3.268138630271835|
|-17.486397891467732|
|  18.23296392711177|
|  6.465290026302171|
| -6.963048429780315|
| -1.772174671651726|
| -6.091992423440104|
| -4.757711478366048|
|-10.814961493745159|
|-3.0721414733104666|
|  7.698032427957685|
+-------------------+
only showing top 20 rows



In [28]:
unlabeled_data = test_data.select('features')

In [29]:
predictions = lrModel.transform(unlabeled_data)

In [30]:
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[29.5324289670579...|397.30098330873125|
|[30.4925366965402...| 287.1581525921779|
|[30.7377203726281...| 450.7141932641257|
|[30.8162006488763...|282.92661875169915|
|[31.0613251567161...| 493.2410153954911|
|[31.2834474760581...| 569.1974595937043|
|[31.4474464941278...|  425.910717779831|
|[31.5171218025062...|280.44424659866627|
|[31.5261978982398...| 418.2350462081049|
|[31.5316044825729...| 433.2474670990907|
|[31.5702008293202...| 563.4318900328726|
|[31.6005122003032...|460.93988756398517|
|[31.6548096756927...|468.79813370124634|
|[31.7207699002873...| 545.7379819078033|
|[31.7216523605090...|349.54910130352437|
|[31.7242025238451...| 509.4798797114006|
|[31.7656188210424...| 501.3117931139732|
|[31.8093003166791...| 547.5868608565863|
|[31.8124825597242...| 395.8824864571077|
|[31.8209982016720...|416.97724858525567|
+--------------------+------------

In [31]:
print("RMSE: {}".format(test_results.rootMeanSquaredError))
print("MSE: {}".format(test_results.meanSquaredError))

RMSE: 10.038612985625235
MSE: 100.77375067516358


# Intro to Spark Streaming

Streaming is something that is rapidly advancing and changin fast, there are multipl enew libraries every year, new and different services always popping up, and what is in this notebook may or may not apply to you. Maybe your looking for something specific on Kafka, or maybe you are looking for streaming about twitter, in which case Spark might be overkill for what you really want. Realistically speaking each situation is going to require a customized solution and this course is never going to be able to supply a one size fits all solution. Because of this, I wanted to point out some great resources for Python and Spark StreamingL

* [The Official Documentation is great. This should be your first go to.](http://spark.apache.org/docs/latest/streaming-programming-guide.html#spark-streaming-programming-guide)

* [Fantastic Guide to Spark Streaming with Kafka](https://www.rittmanmead.com/blog/2017/01/getting-started-with-spark-streaming-with-python-and-kafka/)

* [Another Spark Streaming Example with Geo Plotting](http://nbviewer.jupyter.org/github/ibm-cds-labs/spark.samples/blob/master/notebook/DashDB%20Twitter%20Car%202015%20Python%20Notebook.ipynb)

Spark has pretty well known Streaming Capabilities, if streaming is something you've found yourself needing at work then you are probably familiar with some of these concepts already, in which case you may find it more useful to jump straight to the official documentation here:

http://spark.apache.org/docs/latest/streaming-programming-guide.html#spark-streaming-programming-guide

Let's discuss SparkStreaming!

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.

<img src='http://spark.apache.org/docs/latest/img/streaming-arch.png'/>


There are SparkSQL modules for streaming: 

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=streaming#module-pyspark.sql.streaming

Internally, it works as follows. Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

<img src='http://spark.apache.org/docs/latest/img/streaming-flow.png'/>