# Spark Machine learning 

### Learning goals:
- align the relationships between Hadoop, Spark, and Databricks
- differentiate between Spark RDDs and Spark Dataframes and when each is appropriate
- locate and explore the Spark.ML documentation
- code along a text classification problem using four different ml algorithms, a data prep pipeline, and gridsearch to fine tune a model

# Intro to Big Data

## What is Big Data?

This leads us to the most widely used definition in the industry by Gartner: 


>***Big data is high-volume, high-velocity and/or high-variety information assets that demand cost-effective, innovative forms of information processing that enable enhanced insight, decision making, and process automation***.

<img src="https://raw.githubusercontent.com/learn-co-students/dsc-big-data-introduction-online-ds-ft-100719/master/images/3_components.png" width=50%$>

### Velocity
<img src="https://raw.githubusercontent.com/learn-co-students/dsc-big-data-introduction-online-ds-ft-100719/master/images/internet_minute.jpg" width=50%>

### Variety
<img src="https://raw.githubusercontent.com/learn-co-students/dsc-big-data-introduction-online-ds-ft-100719/master/images/unstructured_data.png" width=50%>

## Big Data Analytics

The key activities associated with big data analytics are reflected in four main areas: 

- Big data warehousing and distribution
- Big data storage
- Big data computational platforms
- Big data analyses, visualization, and evaluation

Such a framework can be applied for knowledge discovery and informed decision-making in big data-driven organizations.

<img src="https://raw.githubusercontent.com/learn-co-students/dsc-big-data-introduction-online-ds-ft-100719/master/images/tech_stack.png">

## Parrallel & Distributed Computing

- MapReduce is a programming paradigm that enables the ability to scale across hundreds or thousands of servers for big data analytics. 


> *In a nutshell, the term "MapReduce" refers to two distinct tasks. The first is the __Map__ job, which takes one set of data and transforms it into another set of data, where individual elements are broken down into tuples __(key/value pairs)__, while the __Reduce__ job takes the output from a map as input and combines those data tuples into a smaller set of tuples.*


- The MapReduce programming paradigm is designed to allow __parallel and distributed processing__  of large sets of data (also known as big data). MapReduce allows us to convert such big datasets into sets of __tuples__ as __key:value__ pairs,


- Somehow, all data can be mapped to **key:value** pairs 
- Keys and values themselves can be of ANY data type 


>So in simpler terms, _MapReduce uses parallel distributed computing to turn big data into regular data._


### Distributed Processing

> A distributed processing system is a group of computers in a network working in tandem to accomplish a task

<img src="https://raw.githubusercontent.com/learn-co-students/dsc-parallel-and-distributed-computing-with-mapreduce-online-ds-ft-100719/master/images/types_of_network.png">

### Parallel Processing Systems


With parallel computing:


* a larger problem is broken up into smaller pieces
* every part of the problem follows a series of instructions
* each one of the instructions is executed simultaneously on different processors
* all of the answers are collected from the small problems and combined into one final answer


In the image below, you can see a simple example of a process being broken up and completed both sequentially and in parallel.

<img src = "https://raw.githubusercontent.com/learn-co-students/dsc-parallel-and-distributed-computing-with-mapreduce-online-ds-ft-100719/master/images/parallel.png">


### MapReduce  Example
Here are the first five zoos the data scientist reads over in the data document they receive:

| Animals              |
|----------------------|
| lion tiger bear      |
| lion giraffe         |
| giraffe penguin      |
| penguin lion giraffe |
| koala giraffe        |


Let's now look at how you would use the MapReduce framework in this simple word count example that could be generalized to much more data.

<img src="https://raw.githubusercontent.com/learn-co-students/dsc-parallel-and-distributed-computing-with-mapreduce-online-ds-ft-100719/master/images/word_count.png">

#### 1. MAP Task (Splitting & Mapping)
- Data transformed into **key:value** pairs and split into fragments, which are then assigned to map tasks. 
    - Each computing cluster is assigned a number of map tasks, which are subsequently distributed among its nodes.

- We will then use the map function to create key:value pairs represented by:   
*{animal}* , *{# of animals per zoo}* 

- After processing of the original key:value pairs, some __intermediate__ key:value pairs are generated. 
    - The intermediate key:value pairs are __sorted by their key values__ to create a new list of key:value pairs.
    
#### 2. Shuffling
- This list from the map task is divided into a new set of fragments
    - that sorts and shuffles the mapped objects into an order or grouping that will make it easier to reduce them. 

- __The number of these new fragments will be the same as the number of the reduce tasks__. 

### 3. REDUCE Task (Reducing)

- Now, every properly shuffled segment will have a reduce task applied to it. 

    - After the task is completed, the final output is written onto a file system. 
    - The underlying file system is usually HDFS (Hadoop Distributed File System). 


- It's important to note that MapReduce will generally only be powerful when dealing with large amounts of data. 
    - When working with a small dataset, it will be faster not to perform operations in the MapReduce framework.



- There are two groups of entities in this process to ensuring that the MapReduce task gets done properly:

    1. __Job Tracker__: a "master" node that informs the other nodes which map and reduce jobs to complete

    2. __Task Tracker__: the "worker" nodes that complete the map and reduce operations

There are different names for these components depending on the technology used, but there will always be a master node that informs worker nodes what tasks to perform.



A general pseudocode for a word count map and reduce tasks would look like 

```python
# Count word frequency
def map( doc ) :
    for word in doc.split( ' ' ) :
    emit ( word , 1 )

def reduce( key , values ) :
    emit ( key , sum( values ) )
```

# Spark context and concepts review
![sparkler](https://images.pexels.com/photos/285173/pexels-photo-285173.jpeg?auto=compress&cs=tinysrgb&dpr=2&h=750&w=1260)

# Installing PySpark and Docker

### Updated Docker installation directions below:

1. Install Docker Desktop
2. Pull the pyspark-notebook image (this takes 10-20 minutes!):

3. Start the container with port forwarding ~~(can replace `12345` with anything, but~~  **leave `8888` intact:**


`docker run ~it~ --rm -p 8888:8888 jupyter/pyspark-notebook`

4. Access the notebook via the URL in the log output, replacing :8888 with the port you chose above (eg :12345)

    

> #### Running Shell Commands https://janakiev.com/blog/python-shell-commands/

In [15]:
cmd = f"docker run -it --rm -p 8888:8888 jupyter/pyspark-notebook"

# import os
# os.system('ls -l')
# os.system(f"docker run -it --rm -p 8888:8888 jupyter/pyspark-notebook")


# import os
# stream = os.popen(cmd)
# output = stream.read()
# output


# import subprocess
# process = subprocess.Popen(['echo', 'More output'],
#                      stdout=subprocess.PIPE, 
#                      stderr=subprocess.PIPE)
# stdout, stderr = process.communicate()
# stdout, stderr

**[Learn Lesson Directions](https://learn.co/tracks/data-science-career-v2/module-5-machine-learning-and-big-data/section-40-big-data-in-pyspark/installing-and-configuring-pyspark-with-docker)**
> The best way to use Docker to work with the labs in this section is to mount the folders containing the labs to a docker container. In order to do this, run the command:

```bash
docker run -it -p 8888:8888 -v {absolute file path}:/home/jovyan/work --rm jupyter/pyspark-notebook
```
> Once this command has been executed, you can go through the same process as above to input the token into your browser after going to http://localhost:8888. After doing so, navigate to the folder "work" and execute the cell below.

- Test Setup with Following Code
```python
import pyspark
sc = pyspark.SparkContext('local[*]')
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)
```

    - If everything went fine, you should see an output like this:
```python
[941, 60, 987, 542, 718]
```

# BOOKMARK

https://learn.co/tracks/data-science-career-v2/module-5-machine-learning-and-big-data/section-40-big-data-in-pyspark/resilient-distributed-datasets-rdds-lab

# The story of Spark (in diagrams)

<img src="https://raw.githubusercontent.com/learn-co-students/dsc-big-data-analytics-apache-spark-online-ds-ft-100719/master/images/spark.gif" width=60%>

> Salloum, S., Dautov, R., Chen, X. et al. Big data analytics on Apache Spark. Int J Data Sci Anal 1, 145–164 (2016). https://doi.org/10.1007/s41060-016-0027-9





## Start with Hadoop
![diagram of hadoop v1 compared to hadoop v2](img/yarn.png)
[diagram source](https://sites.google.com/site/codingbughunter/hadoop/yarn-general-discribe)

## Yarn facilitates the resource allocation between Spark and the HDFS
### YARN = Yet Another Resource Negotiator
#### YARN is a subproduct of Hadoop
![yarn diagram with spark](http://hortonworks.com/wp-content/uploads/2013/06/YARN.png)

[diagram source](https://sites.google.com/site/codingbughunter/hadoop/yarn-general-discribe)

## Then visualize the Spark ecosystem built on top of that

![diagram of spark eco system components](img/spark_eco.png)

[image source here](https://databricks.com/spark/about)

## Databricks provides wrap around services around _that_
![databricks architecture diagram](img/Databricks_product.png)
[diagram source](https://verify.wiki/wiki/Databricks)

# The story of Spark (a timeline)

|<p align="left justify">Date</p>|<p align="left justify">Product</p>|<p align="left justify">Update</p>|
|:----|:-----|:-----|
| 2002 | Hadoop | <p align="left justify">Doug Cutting starts `Apache Nutch` researching sort/merge processing</p> |
| 2006 | Hadoop |  <p align="left justify">Leaves `Nutch` and joins `Yahoo`, renaming the project `Hadoop` </p>|
| 2008 | Hadoop |  <p align="left justify">`Hadoop` was made `Apache’s` top level project </p> |
| Jan 2008 | Hadoop |  <p align="left justify">v 0.10.1 released </p>|
| 2009 | Spark | <p align="left justify">started as a research project at the UC Berkeley AMPLab  </p>|
| 2010 | Spark |  <p align="left justify">open sourced </p>|
| Sept 2012 | Spark |  <p align="left justify">0.6.0 released </p>|
| 2013 | Spark |  <p align="left justify">moved to the `Apache` Software Foundation </p>|
| Feb 2013| Spark |  <p align="left justify">Spark 0.7 adds a Python API called `PySpark` </p>|
| Sept 2013 | Spark | <p align="left justify">0.8.0 introduces `MLlib` </p>|
| 2013 | Databricks |  <p align="left justify">Original Spark research team at UC Berkeley found Databricks</p> |
| May 2014 |Spark |  <p align="left justify">v 1.0 introduces Spark SQL, for loading and manipulating structured data in Spark</p>|
| Sept 2014 | Spark|  <p align="left justify">v 1.1.0 provided support for registering Python lambda funtions as UDFs</p>|
|Mar 2015 | Spark | <p align="left justify"> v 1.3.0 brings a new DataFrame API</p> |
| Jun 2015 | Spark | <p align="left justify"> v 1.4.0 brings an R API to Spark</p> |
| 2015 | Databricks | <p align="left justify"> The Databricks Apache Spark cloud platform goes public</p> |
| Jan 2016|  Spark | <p align="left justify"> v 1.6.0 brings a new Dataset API <br> - A new Spark API, similar to RDDs, that allows users to work with custom objects and lambda functions while still gaining the benefits of the Spark SQL execution engine.</p> |
| Jul 2016 | Spark | <p align="left justify"> v 2.0.0 **big update**! <Br> - Unifying DataFrame and Dataset: In Scala and Java, DataFrame and Dataset have been unified, i.e. DataFrame is just a type alias for Dataset of Row. In Python and R, given the lack of type safety, DataFrame is the main programming interface. <br> - SparkSession: new entry point that replaces the old SQLContext<br>- Native CSV data source, based on Databricks’ spark-csv module<br>- MLlib - The DataFrame-based API is now the primary API. The RDD-based API is entering maintenance mode </p> |
| 2016 | Databricks | <p align="left justify"> Databricks Launches Free Community Edition As Companion To Free Online Spark Courses </p>|
| Jul 2017| Spark | <p align="left justify"> v 2.2.0 drops support for Python 2.6 |
| Nov 2018 | Spark | <p align="left justify"> v 2.4.0<br> - This release adds Barrier Execution Mode for better integration with deep learning frameworks<br> - more integration between pandas UDF and spark DataFrames </p>|


# Spark data objects

![diagram of definitions of Spark objects from databricks](https://databricks.com/wp-content/uploads/2018/05/rdd-1024x595.png)

## In Pyspark there are only RDD and DataFrames

In other languages where "compiling" is done, there is the distinction between DataFrames and DataSet. 

![dataframe image](https://databricks.com/wp-content/uploads/2018/05/DataFrames.png)

## Differences between objects:
![memory usage](https://databricks.com/wp-content/uploads/2016/07/memory-usage-when-caching-datasets-vs-rdds.png)

### Use an RDD when:
[quoted from databricks](https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html)

> - you want low-level transformation and actions and control on your dataset;
> - your data is unstructured, such as media streams or streams of text;
> - you want to manipulate your data with functional programming constructs than domain specific expressions;
> - you don’t care about imposing a schema, such as columnar format, while processing or accessing data attributes by name or column

### Use a dataframe when:
[also quoted from databricks](https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html)


> - you want rich semantics, high-level abstractions, and domain specific APIs, use DataFrame
> - your processing demands high-level expressions, filters, maps, aggregation, averages, sum, SQL queries, columnar access and use of lambda functions on semi-structured data, use DataFrame
> - you want higher degree of type-safety at compile time, want typed JVM objects, take advantage of Catalyst optimization, and benefit from Tungsten’s efficient code generation, use Dataset.
> - you want unification and simplification of APIs across Spark Libraries, use DataFrame or Dataset.
> - If you are a R user, use DataFrames.
> - If you are a Python user, use DataFrames and resort back to RDDs if you need more control.

**Note**: Machine learning algorithms are run on _DataFrames_

## Review:

- You are grabbing live tweets about the CW show 'Jane the Virgin' for later analysis. In the Spark ecosystem, where should you store them? an RDD or a DataFrame?

- You have an RDD of data that you wish to use to build a predictive model. Should you leave it as an RDD or transform it to a DataFrame?


# Machine learning in Spark
![bbc logo](https://www.nwcu.police.uk/wp-content/uploads/2013/05/BBC-News.png)

Section influenced by [this analysis of twitter data](https://wesslen.github.io/twitter/predicting_twitter_profile_location_with_pyspark/)

## The return of Greg

![greg](img/thinking.jpeg)

## Greg's life is full of pain

Greg has become really tired of his boss asking him to do all these random things.<br>
**First** she had him learn Object Oriented Programming and it's been down hill ever since.<br>
**Now** she's wanting him to send her a summary of political news from the BBC each day.<br>
The problem is it takes him hours just to sort through the BBC website to get *just* the political articles that interest her.

## But wait!
What if rather than sorting through them himself he could build a classification model that will sort only the ones he needs?

### Create spark context

In [None]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [None]:
spark

In [None]:
sc

### Read in our dataset of articles

In [None]:
bbc = spark.read.csv(path='bbc-text.csv',sep=',',encoding='UTF-8', header=True,inferSchema=True)

In [None]:
def show(df, n=5):
    return df.limit(n).toPandas()

### Do some basic data exploration

In [None]:
bbc.columns

In [None]:
bbc.dtypes

In [None]:
bbc.printSchema()

In [None]:
bbc.limit(10).toPandas()

In [None]:
bbc.count()

In [None]:
bbc.groupBy('category').count().show()

In [None]:
# Create a new column of target "politics"
from pyspark.sql.functions import when, col
bbc = bbc.withColumn("label", \
                           (when(col("category").like("%politics%"), 1) \
                           .otherwise(0)))

In [None]:
# drop original target column
bbc = bbc.drop(bbc.category)

In [None]:
show(bbc,10)

## Machine LEarning in Spark

Spark's [documentation](https://spark.apache.org/docs/2.2.0/ml-guide.html#mllib-main-guide) is fairly straight forward!  Let's take a look. It shouldn't look *too* different than `sklearn`

### Data prep pipeline

In [None]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer


# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

# stop words
add_stopwords = ["http","https","amp","rt","t","c","can"] # standard stop words
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)


In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(bbc)
dataset = pipelineFit.transform(bbc)

In [None]:
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

### Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression
# Build the model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0, family = "binomial")

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np

beta = np.sort(lrModel.coefficients)

plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

### Summary has many components one can call

In [None]:
# Extract the summary from the returned LogisticRegressionModel instance trained
trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
plt.plot(objectiveHistory)
plt.ylabel('Objective Function')
plt.xlabel('Iteration')
plt.show()

In [None]:
# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

#trainingSummary.roc.show(n=10, truncate=15)
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

In [None]:
# Set the model threshold to maximize F-Measure
#trainingSummary.fMeasureByThreshold.show(n=10, truncate = 15)
f = trainingSummary.fMeasureByThreshold.toPandas()
plt.plot(f['threshold'],f['F-Measure'])
plt.ylabel('F-Measure')
plt.xlabel('Threshold')
plt.show()

### Evaluate on test data

In [None]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

predictions.select("text","probability").show(n=10, truncate=40)

#### Prediction object is a dataframe
with some options

In [None]:
predictions.printSchema()

In [None]:
predictions.filter(predictions['prediction'] == 1) \
    .select("text","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 20, truncate = 30)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
print("Training: Area Under ROC: " + str(trainingSummary.areaUnderROC))

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

### Naive Bayes
#### Specify and fit the model

In [None]:
from pyspark.ml.classification import NaiveBayes

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1, modelType="multinomial")

# train the model
model = nb.fit(trainingData)

#### Evaluate Naive Bayes

As with the regression problem above, now evaluate the classifier.

In [None]:
# select example rows to display.
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 1) \
    .select("text","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 20, truncate = 30)

# compute accuracy on the test set
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

### Decision Tree


Using the `DecisionTreeClassifier` imported below, instantiate and fit a classifier with a depth of 3 to the training data.

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)

Great! With a instantiated decision tree model, you can also check the number of nodes and depth of the classifier:

In [None]:
print("numNodes = ", dtModel.numNodes)
print( "depth = ", dtModel.depth)

#### Evaluate Decision Tree

Now, evaluate the decision tree classifier you just fit.

In [None]:
predictions = dtModel.transform(testData)

predictions.filter(predictions['prediction'] == 0) \
    .select("text","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [None]:
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

### Random Forest


Let's try one more example. Fit a `RandomForestClassifier` with 100 trees. Each tree should have a maxDepth of 4.

In [None]:
from pyspark.ml.classification import RandomForestClassifier

In [None]:

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

# Train model with Training Data
rfModel = rf.fit(trainingData)

#### Score and evaluate Random Forest

Evaluate the model, as you have with the other models.

In [None]:
# Score test Data
predictions = rfModel.transform(testData)

predictions.filter(predictions['prediction'] == 1) \
    .select("text","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

### Implementing grid search with `CrossValidator` in pyspark

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [50, 100, 200]) # number of trees
             .addGrid(rf.maxDepth, [3, 4, 5]) # maximum depth
#            .addGrid(rf.maxBins, [24, 32, 40]) #Number of bins
             .build())

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=rf, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)

# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

# cvModel uses the best model found from the Cross Validation
# Evaluate best model
print("Test: Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

## Which model had the best AUC?

### Learning goals in review. How did we do?
- align the relationships between Hadoop, Spark, and Databricks
- differentiate between Spark RDDs and Spark Dataframes and when each is appropriate
- locate and explore the Spark.ML documentation
- code along a text classification problem using four different ml algorithms, a data prep pipeline, and gridsearch to fine tune a model