<a href="https://colab.research.google.com/github/kmtsui/Colab_Example/blob/main/PySpark_for_Machine_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark for Machine Learning

## 1. Introduction to Spark and MLlib 

### Introduction to Spark

1. Distributed - Runs in cluster of servers
2. Processing - Performs computations, such as ETL and modeling
3. Big Data - Terrabyte and more volumes of data

Supports  Multiple Languages
1. Scala
2. Java
3. Python
4. R

Modular Architecture

Apache Spark Core (MLlib, Spark SQL, Spark Streaming and GraphX)

Spark Use Cases
1. Real time monitoring
2. Text analysis
3. Ecommerce pattern analysis
4. Healthcare and genomic analysis

### Steps in Machine Learning Process

1. Preprocessing - Collect, reformat, and transform data
2. Model Building = Apply machine learning algorithms to training data
3. Validation - Assess the quality of models built in step 2

#### Preprocessing
1. Extract, transform and load data to staging area
2. Review data for missing data and invalid values
3. Normalize and scale numeric data
4. Standardize categorical values


#### Model Building
1. Selecting algorithms
2. Executing algorithms to fit data to models
3. Tuning hyperparameters

#### Validating Models
Applying models to additional test sets

Measuring quality of models
1. Accuracy
2. Precision (positive predictive value)
3. Sensitivity (recall)

### Setup PySpark for Google Colab Notebook

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz

!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate();

In [None]:
employeesTxtPath = '/content/drive/My Drive/Colab Notebooks/MMM_DEVELOPMENT/DATASETS/Ex_Files_Spark_ML_AI/Exercise Files/Ch01/01_04/employee.txt'
clusteringDatasetPath = '/content/drive/My Drive/Colab Notebooks/MMM_DEVELOPMENT/DATASETS/Ex_Files_Spark_ML_AI/Exercise Files/Ch03/03_02/clustering_dataset.csv'
irisTxtPath = '/content/drive/My Drive/Colab Notebooks/MMM_DEVELOPMENT/DATASETS/Ex_Files_Spark_ML_AI/Exercise Files/iris.txt'

### Organizing data in DataFrames

In [None]:
empDF = spark.read.csv(employeesTxtPath, header=True)

In [None]:
empDF

DataFrame[id: string, last_name: string, email: string, gender: string, department: string, start_date: string, salary: string, job_title: string, region_id: string]

In [None]:
empDF.schema

StructType(List(StructField(id,StringType,true),StructField(last_name,StringType,true),StructField(email,StringType,true),StructField(gender,StringType,true),StructField(department,StringType,true),StructField(start_date,StringType,true),StructField(salary,StringType,true),StructField(job_title,StringType,true),StructField(region_id,StringType,true)))

In [None]:
empDF.printSchema()

root
 |-- id: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- department: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- region_id: string (nullable = true)



In [None]:
empDF.columns

['id',
 'last_name',
 'email',
 'gender',
 'department',
 'start_date',
 'salary',
 'job_title',
 'region_id']

In [None]:
empDF.take(5)

[Row(id='1', last_name="'Kelley'", email="'rkelley0@soundcloud.com'", gender="'Female'", department="'Computers'", start_date="'10/2/2009'", salary='67470', job_title="'Structural Engineer'", region_id='2'),
 Row(id='2', last_name="'Armstrong'", email="'sarmstrong1@infoseek.co.jp'", gender="'Male'", department="'Sports'", start_date="'3/31/2008'", salary='71869', job_title="'Financial Advisor'", region_id='2'),
 Row(id='3', last_name="'Carr'", email="'fcarr2@woothemes.com'", gender="'Male'", department="'Automotive'", start_date="'7/12/2009'", salary='101768', job_title="'Recruiting Manager'", region_id='3'),
 Row(id='4', last_name="'Murray'", email="'jmurray3@gov.uk'", gender="'Female'", department="'Jewelery'", start_date="'12/25/2014'", salary='96897', job_title="'Desktop Support Technician'", region_id='3'),
 Row(id='5', last_name="'Ellis'", email="'jellis4@sciencedirect.com'", gender="'Female'", department="'Grocery'", start_date="'9/19/2002'", salary='63702', job_title="'Software

In [None]:
empDF.count()

1000

In [None]:
sampleDF = empDF.sample(False, 0.1)
sampleDF.count()

96

In [None]:
empMgrsDF = empDF.filter('salary >= 100000')
empMgrsDF.count()

478

In [None]:
empMgrsDF.show()

+---+-----------+--------------------+--------+-------------+------------+------+--------------------+---------+
| id|  last_name|               email|  gender|   department|  start_date|salary|           job_title|region_id|
+---+-----------+--------------------+--------+-------------+------------+------+--------------------+---------+
|  3|     'Carr'|'fcarr2@woothemes...|  'Male'| 'Automotive'| '7/12/2009'|101768|'Recruiting Manager'|        3|
|  6| 'Phillips'|'bphillips5@time....|  'Male'|      'Tools'| '8/21/2013'|118497|'Executive Secret...|        1|
|  9|    'James'|'rjames8@prnewswi...|  'Male'|   'Jewelery'|  '9/7/2005'|108657|   'Sales Associate'|        2|
| 10|  'Sanchez'|'rsanchez9@cloudf...|  'Male'|     'Movies'| '3/13/2013'|108093|'Sales Representa...|        1|
| 11|   'Jacobs'|'jjacobsa@sbwire....|'Female'|   'Jewelery'|'11/27/2003'|121966|'Community Outrea...|        7|
| 15|   'Jacobs'|'ajacobse@google.it'|'Female'|      'Games'|  '3/4/2007'|141139|'Community Outr

### Components of Spark MLlib

SparkML Algorithms
1. Classification
2. Regression
3. Clustering
4. Topic Modeling 

SparkML Workflows
1. Feature transformations
2. pipelines
3. Evaluations
4. Hyperparameter tuning

#### Utilities
1. Distributed math libaries
2. Distributed statistic functions

## 2. Spark Data Preparation and Transformation

### Introduction to Preprocessing

Numeric: Normalize

1. Maps data values from their original range to the range of 0 to 1

2. Avoids problems when some attributes have large ranges and others have small ranges

Numeric: Standardize

1. Maps data values from their original range to -1 to 1
2. Mean value of 0
3. Normally distributed with standard deviation of 1
4. Used when attributes have different scales and ML algorithms assume a normal distribution

Numeric: Partitioning

1. Map data values from continuous values to buckets
2. Deciles and percentiles are examples of buckets
3. Useful when you want to work with groups of values instead of a continuous range of values

Text: Tokenizing

Map text from a single string to a set of tokens or words 

Text: TF-IDF

1. Map text from a single, typically long string, to a vector indicating the frequency of each word in a text relative to a group of texts

2. Widely used in text classifications

3. Infrequently used words are more useful for distinguishing categories of text

### Normalizing Numeric Data

In [None]:
from pyspark.ml.feature import MinMaxScaler 
from pyspark.ml.linalg import Vectors

In [None]:
featuresDF = spark.createDataFrame([
                                    (1, Vectors.dense([10.0, 10000.0, 1.0]),),
                                    (1, Vectors.dense([20.0, 30000.0, 2.0]),),
                                    (1, Vectors.dense([30.0, 40000.0, 3.0]),)
                                    ], ['id', 'features'])

In [None]:
featuresDF.show()

+---+------------------+
| id|          features|
+---+------------------+
|  1|[10.0,10000.0,1.0]|
|  1|[20.0,30000.0,2.0]|
|  1|[30.0,40000.0,3.0]|
+---+------------------+



In [None]:
featuresDF.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [None]:
featureScaler = MinMaxScaler(inputCol='features', outputCol='sfeatures')

In [None]:
smodel = featureScaler.fit(featuresDF)

In [None]:
sfeaturesDF = smodel.transform(featuresDF)

In [None]:
sfeaturesDF.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=SparseVector(3, {}))]

In [None]:
sfeaturesDF.select('features', 'sfeatures').show()

+------------------+--------------------+
|          features|           sfeatures|
+------------------+--------------------+
|[10.0,10000.0,1.0]|           (3,[],[])|
|[20.0,30000.0,2.0]|[0.5,0.6666666666...|
|[30.0,40000.0,3.0]|       [1.0,1.0,1.0]|
+------------------+--------------------+



### Standardize Spark Data

In [None]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors

In [None]:
featuresDF = spark.createDataFrame([
                                    (1, Vectors.dense([10.0, 10000.0, 1.0]),),
                                    (1, Vectors.dense([20.0, 30000.0, 2.0]),),
                                    (1, Vectors.dense([30.0, 40000.0, 3.0]),)
                                    ], ['id', 'features'])

In [None]:
featuresDF.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [None]:
featureStandardScaler = StandardScaler(inputCol='features', outputCol='sfeatures', withStd=True, withMean=True)

In [None]:
standardSModel = featureStandardScaler.fit(featuresDF)

In [None]:
standardSFeaturesDF = standardSModel.transform(featuresDF)

In [None]:
standardSFeaturesDF.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=DenseVector([-1.0, -1.0911, -1.0]))]

### Bucketize Numeric Data

In [None]:
from pyspark.ml.feature import Bucketizer
splits = [-float('inf'), -10.0, 0.0, 10.0, float('inf')]

In [None]:
bData = [(-800.0,), (-10.5,),(-1.7,),(0.0,),(8.2,),(90.1,)]

In [None]:
bDF = spark.createDataFrame(bData, ['features'])

In [None]:
bDF.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.7|
|     0.0|
|     8.2|
|    90.1|
+--------+



In [None]:
bucketizer = Bucketizer(splits=splits, inputCol='features', outputCol='bfeatures')

In [None]:
bucketedDF = bucketizer.transform(bDF)

In [None]:
bucketedDF.show()

+--------+---------+
|features|bfeatures|
+--------+---------+
|  -800.0|      0.0|
|   -10.5|      0.0|
|    -1.7|      1.0|
|     0.0|      2.0|
|     8.2|      2.0|
|    90.1|      3.0|
+--------+---------+



### Tokenize Text Data

In [None]:
from pyspark.ml.feature import Tokenizer

In [None]:
sentencesDF = spark.createDataFrame([
                                    (1, 'this is an introduction to spark mllib'),
                                    (2, 'mllib contains libraries for classification and regression'),
                                    (1, 'it also contains supporting tools for pipelines')
                                    ], ['id', 'sentence'])

In [None]:
sentencesDF.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  1|this is an introd...|
|  2|mllib contains li...|
|  1|it also contains ...|
+---+--------------------+



In [None]:
sentToken = Tokenizer(inputCol='sentence', outputCol='words')

In [None]:
sentTokenizedDF = sentToken.transform(sentencesDF)

In [None]:
sentTokenizedDF.show(truncate=False)

+---+----------------------------------------------------------+------------------------------------------------------------------+
|id |sentence                                                  |words                                                             |
+---+----------------------------------------------------------+------------------------------------------------------------------+
|1  |this is an introduction to spark mllib                    |[this, is, an, introduction, to, spark, mllib]                    |
|2  |mllib contains libraries for classification and regression|[mllib, contains, libraries, for, classification, and, regression]|
|1  |it also contains supporting tools for pipelines           |[it, also, contains, supporting, tools, for, pipelines]           |
+---+----------------------------------------------------------+------------------------------------------------------------------+



### TF-IDF (Term Frequency Inverse Document Frequency)

In [None]:
from pyspark.ml.feature import HashingTF, IDF

In [None]:
sentencesDF

DataFrame[id: bigint, sentence: string]

In [None]:
sentencesDF.take(1)

[Row(id=1, sentence='this is an introduction to spark mllib')]

In [None]:
sentTokenizedDF.take(1)

[Row(id=1, sentence='this is an introduction to spark mllib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'])]

In [None]:
hashingTF = HashingTF(inputCol='words', outputCol = 'rawFeatures', numFeatures=20)

In [None]:
sentHFTFDF = hashingTF.transform(sentTokenizedDF)

In [None]:
sentHFTFDF.take(1)

[Row(id=1, sentence='this is an introduction to spark mllib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0, 15: 1.0}))]

In [None]:
IDF = IDF(inputCol='rawFeatures', outputCol='idfFeatures')

In [None]:
IDFModel = IDF.fit(sentHFTFDF)

In [None]:
TFIDFDF = IDFModel.transform(sentHFTFDF)

In [None]:
TFIDFDF.take(1)

[Row(id=1, sentence='this is an introduction to spark mllib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0, 15: 1.0}), idfFeatures=SparseVector(20, {6: 0.5754, 8: 0.0, 9: 0.6931, 10: 0.6931, 13: 0.6931, 15: 0.2877}))]

## 3. Clustering with Spark MLlib

### Introduction to Clustering

Clustering algorithms group data into clusters that allow us to see how large data sets can break down into distinct subgroups 

K-means is widely used and works well for finding clusters in small and mid-sized data sets 

For large data sets, the Bisecting K-means algorithms can be faster

### K-means Clustering in Spark

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

In [None]:
clusterDF = spark.read.csv(clusteringDatasetPath, header=True, inferSchema=True)

In [None]:
clusterDF

DataFrame[col1: int, col2: int, col3: int]

In [None]:
clusterDF.show(100, truncate=False)

+----+----+----+
|col1|col2|col3|
+----+----+----+
|7   |4   |1   |
|7   |7   |9   |
|7   |9   |6   |
|1   |6   |5   |
|6   |7   |7   |
|7   |9   |4   |
|7   |10  |6   |
|7   |8   |2   |
|8   |3   |8   |
|4   |10  |5   |
|7   |4   |5   |
|7   |8   |4   |
|2   |5   |1   |
|2   |6   |2   |
|2   |3   |8   |
|3   |9   |1   |
|4   |2   |9   |
|1   |7   |1   |
|6   |2   |3   |
|4   |1   |9   |
|4   |8   |5   |
|6   |6   |7   |
|4   |6   |2   |
|8   |1   |1   |
|7   |5   |10  |
|17  |25  |21  |
|15  |23  |32  |
|42  |25  |45  |
|41  |47  |21  |
|37  |20  |27  |
|40  |18  |26  |
|41  |28  |50  |
|32  |25  |40  |
|24  |29  |35  |
|47  |18  |47  |
|36  |42  |45  |
|49  |29  |15  |
|47  |39  |22  |
|38  |27  |25  |
|45  |23  |40  |
|23  |36  |19  |
|47  |40  |50  |
|37  |30  |40  |
|42  |48  |41  |
|29  |31  |21  |
|36  |39  |48  |
|50  |24  |31  |
|42  |44  |37  |
|37  |39  |46  |
|22  |40  |30  |
|17  |29  |41  |
|85  |100 |69  |
|68  |76  |67  |
|76  |70  |93  |
|62  |66  |91  |
|83  |93  |76 

In [None]:
vectorAssembler = VectorAssembler(inputCols=['col1', 'col2', 'col3'], outputCol='features')

In [None]:
vclusterDF = vectorAssembler.transform(clusterDF)

In [None]:
vclusterDF.show()

+----+----+----+--------------+
|col1|col2|col3|      features|
+----+----+----+--------------+
|   7|   4|   1| [7.0,4.0,1.0]|
|   7|   7|   9| [7.0,7.0,9.0]|
|   7|   9|   6| [7.0,9.0,6.0]|
|   1|   6|   5| [1.0,6.0,5.0]|
|   6|   7|   7| [6.0,7.0,7.0]|
|   7|   9|   4| [7.0,9.0,4.0]|
|   7|  10|   6|[7.0,10.0,6.0]|
|   7|   8|   2| [7.0,8.0,2.0]|
|   8|   3|   8| [8.0,3.0,8.0]|
|   4|  10|   5|[4.0,10.0,5.0]|
|   7|   4|   5| [7.0,4.0,5.0]|
|   7|   8|   4| [7.0,8.0,4.0]|
|   2|   5|   1| [2.0,5.0,1.0]|
|   2|   6|   2| [2.0,6.0,2.0]|
|   2|   3|   8| [2.0,3.0,8.0]|
|   3|   9|   1| [3.0,9.0,1.0]|
|   4|   2|   9| [4.0,2.0,9.0]|
|   1|   7|   1| [1.0,7.0,1.0]|
|   6|   2|   3| [6.0,2.0,3.0]|
|   4|   1|   9| [4.0,1.0,9.0]|
+----+----+----+--------------+
only showing top 20 rows



In [None]:
kmeans = KMeans().setK(3)

In [None]:
kmeans = kmeans.setSeed(1)

In [None]:
kmodel = kmeans.fit(vclusterDF)

In [None]:
centers = kmodel.clusterCenters()

In [None]:
# this will print out the 3 sets of clusters for your dataset vclusterDF
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667]),
 array([5.12, 5.84, 4.84])]

### Hierarchical Clustering in Spark

In [None]:
from pyspark.ml.clustering import BisectingKMeans

In [None]:
clusterDF.show()

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   7|   4|   1|
|   7|   7|   9|
|   7|   9|   6|
|   1|   6|   5|
|   6|   7|   7|
|   7|   9|   4|
|   7|  10|   6|
|   7|   8|   2|
|   8|   3|   8|
|   4|  10|   5|
|   7|   4|   5|
|   7|   8|   4|
|   2|   5|   1|
|   2|   6|   2|
|   2|   3|   8|
|   3|   9|   1|
|   4|   2|   9|
|   1|   7|   1|
|   6|   2|   3|
|   4|   1|   9|
+----+----+----+
only showing top 20 rows



In [None]:
BKMeans = BisectingKMeans().setK(3)
BKMeans = BKMeans.setSeed(1)

In [None]:
BKModel = BKMeans.fit(vclusterDF)

In [None]:
BKCenters = BKModel.clusterCenters()

Clustering K-Means vs. Hierarchical Results

In [None]:
# hierarchical clustering results
BKCenters

[array([5.12, 5.84, 4.84]),
 array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667])]

In [None]:
# k-means clustering results
centers

[array([35.88461538, 31.46153846, 34.42307692]),
 array([80.        , 79.20833333, 78.29166667]),
 array([5.12, 5.84, 4.84])]

## 4. Classification with Spark MLlib

### Preprocessing Iris.txt Example

In [None]:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

In [None]:
irisDF = spark.read.csv(irisTxtPath, inferSchema=True)

In [None]:
# get new iris txt file to read into for example
irisDF.columns

['_c0', '_c1', '_c2', '_c3', '_c4']

In [None]:
irisDF = irisDF.select(col('_c0').alias('sepal_length'),
                       col('_c1').alias('sepal_width'),
                       col('_c2').alias('petal_length'),
                       col('_c3').alias('petal_width'),
                       col('_c4').alias('species'))

In [None]:
irisDF.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa')]

In [None]:
irisDF.printSchema()

root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [None]:
vectorAssembler = VectorAssembler(inputCols=['sepal_length',
                                             'sepal_width', 
                                             'petal_length',
                                             'petal_width'],
                                  outputCol='features')

In [None]:
virisDF = vectorAssembler.transform(irisDF)
virisDF.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]))]

In [None]:
indexer = StringIndexer(inputCol='species', outputCol='label')
iVirisDF = indexer.fit(virisDF).transform(virisDF)
iVirisDF

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, features: vector, label: double]

### Naive Bayes Classification

In [None]:
iVirisDF.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0)]

In [None]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
splits = iVirisDF.randomSplit([0.6,0.4],1)
trainDF = splits[0]
testDF = splits[1]

In [None]:
trainDF.count()

98

In [None]:
testDF.count()

52

In [None]:
iVirisDF.count()

150

In [None]:
nb = NaiveBayes(modelType='multinomial')

In [None]:
nbModel = nb.fit(trainDF)

In [None]:
predictionsDF = nbModel.transform(testDF)

In [None]:
predictionsDF.take(1)

[Row(sepal_length=4.3, sepal_width=3.0, petal_length=1.1, petal_width=0.1, species='Iris-setosa', features=DenseVector([4.3, 3.0, 1.1, 0.1]), label=0.0, rawPrediction=DenseVector([-9.9894, -11.3476, -11.902]), probability=DenseVector([0.7118, 0.183, 0.1051]), prediction=0.0)]

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

In [None]:
nbAccuracy = evaluator.evaluate(predictionsDF)
nbAccuracy

0.9807692307692307

### Multilayer Perceptron Classification

In [None]:
iVirisDF.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0)]

In [None]:
trainDF.count()

98

In [None]:
testDF.count()

52

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

1. First layer has the same number of nodes as there are inputs. There are four measures, so the first layer will be four. You can then create list of layers. Set the first element to be four. 

2. Last element should have the same number of neurons as there are types of outputs. There are three types of iris species. Last row will be three. 

3. You want to have layers in between to help the multi-layer perceptron learn how to classify correctly. Insert two rows of five neurons each. There is going to be a four layer multi-layer perceptron. 

4. First layer will have four neurons, the middle two layers will have five neurons each, and then the output layer will have three neurons. One for each kind of iris species. 

In [None]:
layers = [4,5,5,3]

In [None]:
mlp = MultilayerPerceptronClassifier(layers=layers, seed=1)

In [None]:
mlpModel = mlp.fit(trainDF)

In [None]:
mlpPredictions = mlpModel.transform(testDF)

In [None]:
mlpEvaluator = MulticlassClassificationEvaluator(metricName='accuracy')

In [None]:
mlpAccuracy = mlpEvaluator.evaluate(mlpPredictions)

In [None]:
mlpAccuracy

0.6923076923076923

### Decision Tree Classification

In [None]:
iVirisDF

DataFrame[sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string, features: vector, label: double]

In [None]:
iVirisDF.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, species='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]), label=0.0)]

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

In [None]:
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features')

In [None]:
dtModel = dt.fit(trainDF)

In [None]:
dtPredictions = dtModel.transform(testDF)

In [None]:
dtEvaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')

In [None]:
dtAccuracy = dtEvaluator.evaluate(dtPredictions)
dtAccuracy

0.9423076923076923

### Classification Algorithms Summary

1. Naive Bayes - Works well if the attributes in your data set are what is known as independent of each other (they don't tightly correlate with each other)

2. Multilayer perceptron - Good choice when you have non-linear relationships between data elements

3. Decision Trees - Good choice for classification for many problems and decision trees are good to start with

## 5. Regression

### Preprocessing Regression Data

In [None]:
powerPlantPath = '/content/drive/My Drive/Colab Notebooks/MMM_DEVELOPMENT/DATASETS/Ex_Files_Spark_ML_AI/Exercise Files/CCPP/power_plant.csv'

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
ppDF = spark.read.csv(powerPlantPath)

In [None]:
ppDF

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string]

In [None]:
ppDF = spark.read.csv(powerPlantPath, header=True, inferSchema=True)

In [None]:
ppDF

DataFrame[AT: double, V: double, AP: double, RH: double, PE: double]

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
vectorAssembler = VectorAssembler(inputCols = ['AT', 'V', 'AP', 'RH'], outputCol='features')

In [None]:
vppDF = vectorAssembler.transform(ppDF)
vppDF.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26, features=DenseVector([14.96, 41.76, 1024.07, 73.17]))]

In [None]:
lr = LinearRegression(featuresCol='features', labelCol='PE')

In [None]:
lrModel = lr.fit(vppDF)

In [None]:
# this is a list of four numbers which correspond to the coefficients 
# of the different variables that we were using to build the model
lrModel.coefficients

DenseVector([-1.9775, -0.2339, 0.0621, -0.1581])

In [None]:
# you can look at the intercept which will give you a point where
# the line crosses the Y axis
lrModel.intercept

454.6092744523414

In [None]:
# one of the important measures of the quality of a linear model is the error
# there are different ways of measuring it, you can use the root mean squared error
lrModel.summary.rootMeanSquaredError

4.557126016749488

In [None]:
# saving linear model for later use
lrModel.save('lr1.model')

In [None]:
lrModel

LinearRegressionModel: uid=LinearRegression_c8dfa610f5b7, numFeatures=4

### Decision Tree Regression

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

In [None]:
ppDF = spark.read.csv(powerPlantPath, header=True, inferSchema=True)

In [None]:
ppDF.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26)]

In [None]:
vectorAssembler = VectorAssembler(inputCols=['AT', 'V', 'AP', 'RH'], outputCol='features')

In [None]:
vppDF = vectorAssembler.transform(ppDF)
vppDF.take(1)

[Row(AT=14.96, V=41.76, AP=1024.07, RH=73.17, PE=463.26, features=DenseVector([14.96, 41.76, 1024.07, 73.17]))]

In [None]:
splits = vppDF.randomSplit([0.7, 0.3])

In [None]:
trainDF = splits[0]
testDF = splits[1]

In [None]:
trainDF.count()

6591

In [None]:
testDF.count()

2977

In [None]:
vppDF.count()

9568

In [None]:
dt = DecisionTreeRegressor(featuresCol='features', labelCol='PE')

In [None]:
dtModel = dt.fit(trainDF)

In [None]:
dtPredictions = dtModel = dtModel.transform(testDF)

In [None]:
dtEvaluator = RegressionEvaluator(labelCol='PE', predictionCol='prediction', metricName='rmse')

In [None]:
rmse = dtEvaluator.evaluate(dtPredictions)

In [None]:
rmse

4.617704761570376

### Gradient-boosted Tree Regression

In [None]:
from pyspark.ml.regression import GBTRegressor

In [None]:
gbt = GBTRegressor(featuresCol='features', labelCol='PE')

In [None]:
gbtModel = gbt.fit(trainDF)

In [None]:
gbtPredictions = gbtModel.transform(testDF)

In [None]:
gbtEvaluator = RegressionEvaluator(labelCol='PE', predictionCol='prediction', metricName='rmse')

In [None]:
gbtRMSE = gbtEvaluator.evaluate(gbtPredictions)

In [None]:
gbtRMSE

4.176800795104159

### Regression Algorithms Summary

1. Linear Regression - In general, it's best to start with linear regression. In real-word examples, linear regression frequently gives usable, high-quality results

2. Decision Tree Regression - If you have a data set that doesn't work well with linear regression, then try decision tree regression

3. Gradient-boosted Tree Regression - Good choice if you need to get the best performing model possible, and you're willing to spend extra time building the model

## 6. Recommendation Systems with Spark

### Collaborative Filtering 

*Collaborative Filtering Preprocessing* 

1. Alternating least squares

2. Import ALS from pyspark.ml.recommendation

3. Build a dataframe if user-item ratings

*Collaborative Filtering Modeling*

4. Create an ALS object (userCol, itemCol, ratingCol)

5. Train model using fit, which is a part of the ALS object

*Collaborative Filtering Validation*

6. Create predictions using a transform of an ALS model using test data

7. Create a RegressionEvaluator object

8. Evaluate predictions using the evaluate function of the RegressionEvaluator

In [None]:
!pip install nbconvert
!jupyter nbconvert --to html /content/PySpark_for_Machine_Learning.ipynb

This application is used to convert notebook files (*.ipynb) to various other
formats.


Options
-------

Arguments that take values are actually convenience aliases to full
Configurables, whose aliases are listed on the help line. For more information
on full configurables, see '--help-all'.

--execute
    Execute the notebook prior to export.
--allow-errors
    Continue notebook execution even if one of the cells throws an error and include the error message in the cell output (the default behaviour is to abort conversion). This flag is only relevant if '--execute' was specified, too.
--no-input
    Exclude input cells and output prompts from converted document. 
    This mode is ideal for generating code-free reports.
--stdout
    Write notebook output to stdout instead of files.
--stdin
    read a single notebook file from stdin. Write the resulting notebook with default basename 'notebook.*'
--inplace
    Run nbconvert in place, overwriting the existing notebook (only 
    relevan