## **PySpark Machine Learning**
###Pipeline Example ([Docs](https://spark.apache.org/docs/latest/ml-pipeline.html))

In machine learning, it's common to run a series of steps for data prep, cleansing, feature engineering, and then ultimately model training (among several other potential steps).

Spark ML Pipelines sequences these steps into an ordered array (or DAG). A Pipeline is specified as a sequence of stages, and each stage is either a **Transformer** or an **Estimator**.

It's often a best practice to save a model or a pipeline to disk for later use.

Below is an example Spark ML Pipeline that shows two Transformers (Tokenizer and HashingTF) and one Estimator (Logistic Regression).

<img src="https://spark.apache.org/docs/latest/img/ml-Pipeline.png">

## **Install Spark Dependencies**



In [None]:
# Install Spark dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!rm spark-3.5.1-bin-hadoop3.tgz
!wget --no-cookies --no-check-certificate https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar zxvf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark==3.5.1

In [None]:
!ls -al | grep spark

drwxr-xr-x 13 1000 1000      4096 Feb 15 11:36 spark-3.5.1-bin-hadoop3
-rw-r--r--  1 root root 400446614 Feb 15 11:39 spark-3.5.1-bin-hadoop3.tgz


## **Import Python and PySpark Libraries**

In [None]:
# Set up required environment variables
import os
os.environ["JAVA_HOME"]  = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

## **Initialize Spark Session**

In [None]:
spark = SparkSession.builder.appName("Spark ML Pipeline Example").master("local[*]").getOrCreate()

## **Load Sample Data**

In [None]:
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

training.show(10,False)

+---+----------------+-----+
|id |text            |label|
+---+----------------+-----+
|0  |a b c d e spark |1.0  |
|1  |b d             |0.0  |
|2  |spark f g h     |1.0  |
|3  |hadoop mapreduce|0.0  |
+---+----------------+-----+



## **Configure Pipeline Objects**
Transforms (tokenizer and hashingTF) and Estimators (logistic regression)

In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)

In [None]:
type(tokenizer)

In [None]:
tokenizer.transform(training).show(5, False)

+---+----------------+-----+----------------------+
|id |text            |label|words                 |
+---+----------------+-----+----------------------+
|0  |a b c d e spark |1.0  |[a, b, c, d, e, spark]|
|1  |b d             |0.0  |[b, d]                |
|2  |spark f g h     |1.0  |[spark, f, g, h]      |
|3  |hadoop mapreduce|0.0  |[hadoop, mapreduce]   |
+---+----------------+-----+----------------------+



## **Create Pipeline Object**

In [None]:
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

## **Run Pipeline to transform data and train model**

In [None]:
model = pipeline.fit(training)

## **Test Model**

In [None]:
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
prediction.show(10,False)
#selected = prediction.select("id", "text", "probability", "prediction")
#for row in selected.collect():
#    rid, text, prob, prediction = row
#    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

+---+------------------+----------------------+------------------------------------------------------+----------------------------------------+----------------------------------------+----------+
|id |text              |words                 |features                                              |rawPrediction                           |probability                             |prediction|
+---+------------------+----------------------+------------------------------------------------------+----------------------------------------+----------------------------------------+----------+
|4  |spark i j k       |[spark, i, j, k]      |(262144,[19036,68693,173558,213660],[1.0,1.0,1.0,1.0])|[0.5288285522796787,-0.5288285522796787]|[0.6292098489668484,0.3707901510331516] |0.0       |
|5  |l m n             |[l, m, n]             |(262144,[1303,52644,248090],[1.0,1.0,1.0])            |[4.169141395340047,-4.169141395340047]  |[0.984770006762304,0.015229993237696027]|0.0       |
|6  |spark hadoop sp