# Machine Learning using PySpark

This notebook illustrates classification and regression ML libraries in Spark with the help of open source datasets from the UCI Machine Learning Repository. Iris dataset (https://archive.ics.uci.edu/ml/machine-learning-databases/iris/) and power plant data (https://archive.ics.uci.edu/ml/machine-learning-databases/00294/)

- A. Classification: (we build ML classifier models for species of the iris plant)
    - Naive Bayes
    - Multilayer Perceptron
    - Decision Trees
- B. Regression: (we build ML regression models to predict power generated at a plant using defining features)
    - Linear Regression
    - Decision Tree Regression
    - Gradient-boosted Tree Regression

### Initiate Spark Context and Import Spark ML libraries

In [22]:
sc

<pyspark.context.SparkContext at 0x1fc63c09a58>

In [23]:
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

### Import data

In [24]:
iris_df = spark.read.csv("iris.csv", inferSchema=True, header=True)
iris_df.take(1) #to see the first row

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, class='Iris-setosa')]

## Preprocessing

### *1. Create Vector Assembler*

In [25]:
vectorAssembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"], outputCol="features")
viris_df = vectorAssembler.transform(iris_df)
viris_df.take(1)

[Row(sepal_length=5.1, sepal_width=3.5, petal_length=1.4, petal_width=0.2, class='Iris-setosa', features=DenseVector([5.1, 3.5, 1.4, 0.2]))]

### *2. Use String Indexer*
- Convert the label names into numeric values for quantitative analysis

In [26]:
indexer = StringIndexer(inputCol="class", outputCol="label")
iviris_df = indexer.fit(viris_df).transform(viris_df)
iviris_df.show(1)

+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|      class|         features|label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|  0.0|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
only showing top 1 row



## Classification Algorithms

### *1. Naive Bayes Classification*

In [27]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

We need to split our dataframe into training and test data. We will do a 60:40 split for training and test data, respectively

In [28]:
splits = iviris_df.randomSplit([0.6,0.4],1)
train_df = splits[0] #corresponding to the first dataframe in splits
test_df = splits[1] #corresponding to the second dataframe in splits

In [29]:
train_df.count() #counts the training instances (92 records)
test_df.count() #counts the test instances (58 records)
iviris_df.count() #total (150 records)

150

In [30]:
#Now let's create our Naive Bayes classifier
nb = NaiveBayes(modelType="multinomial") #there are more than 2 different classes in our dataset
nbmodel =nb.fit(train_df) #fit the trainin data to our NB model

In [31]:
#Make predictions using the model we just created
predictions_df = nbmodel.transform(test_df)
predictions_df.take(1)

[Row(sepal_length=4.5, sepal_width=2.3, petal_length=1.3, petal_width=0.3, class='Iris-setosa', features=DenseVector([4.5, 2.3, 1.3, 0.3]), label=0.0, rawPrediction=DenseVector([-10.3605, -11.0141, -11.7112]), probability=DenseVector([0.562, 0.2924, 0.1456]), prediction=0.0)]

The predicted class for the instance is 0, which is an index value for one of the species/classes. Next we conduct more thorough evaluation of our model.

In [32]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
nbaccuracy = evaluator.evaluate(predictions_df)
nbaccuracy

0.5862068965517241

Accuracy is about 59%, which isnt great.

## Classification Algorithms

### *2. Multilayer Perceptron Classification*

In [33]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
layers = [4,5,5,3]
#the first number refers to inputs (features), and the last refers to the 3 classes of iris species in our labels
mlp = MultilayerPerceptronClassifier(layers=layers,seed=1)
mlp_model = mlp.fit(train_df)
mlp_predictions = mlp_model.transform(test_df)
mlp_evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
mlp_accuracy = mlp_evaluator.evaluate(mlp_predictions)
mlp_accuracy

0.9482758620689655

Our accuracy is much higher than under NB! approx 95% accurate.

## Classification Algorithms

### *3. Decision Tree Classification*

In [34]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="label",featuresCol="features" )
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
dt_accuracy = dt_evaluator.evaluate(dt_predictions)
dt_accuracy

0.9310344827586207

Our accuracy is similarly high!

## Regression Models

We'll use UCI Machine Learning database as we did with classification. See https://archive.ics.uci.edu/ml/datasets/combined+cycle+power+plant
The dataset contains 9568 data points collected from a Combined Cycle Power Plant over 6 years (2006-2011), when the power plant was set to work with full load. Features consist of hourly average ambient variables Temperature (T), Ambient Pressure (AP), Relative Humidity (RH) and Exhaust Vacuum (V) to predict the net hourly electrical energy output (EP) of the plant.
- we want to use this data to predict how much power a plat can generate based on defining factors

## Regression Models

### *1. Linear Regression*

In [35]:
from pyspark.ml.regression import LinearRegression
pp_df = spark.read.csv("power_plant.csv", header=True, inferSchema=True)
pp_df

DataFrame[AT: double, V: double, AP: double, RH: double, PE: double]

In [36]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols=["AT", "V", "AP", "RH"], outputCol="features")
#create a new, vectorized, dataframe and call it vpp_df
vpp_df=vectorAssembler.transform(pp_df)
#look at one row
vpp_df.take(1)

[Row(AT=23.25, V=71.29, AP=1008.05, RH=71.36, PE=442.21, features=DenseVector([23.25, 71.29, 1008.05, 71.36]))]

We are ready to create a Linear Regression Model

In [37]:
lr=LinearRegression(featuresCol="features", labelCol="PE")
lr_model = lr.fit(vpp_df)
lr_model.coefficients

DenseVector([-1.9775, -0.2339, 0.0621, -0.1581])

In [38]:
lr_model.intercept

454.60927430355474

In [39]:
lr_model.summary.rootMeanSquaredError

4.557126016749484

This is equivalent to about 1% error which is pretty good!

In [41]:
lr_model.save("lr_model") #save the model

## Regression Models

### *2. Decision Tree Regression*

In [42]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler

In [43]:
pp_df = spark.read.csv("power_plant.csv", header=True, inferSchema=True)
vectorAssembler = VectorAssembler(inputCols=["AT", "V", "AP", "RH"], outputCol="features")
vpp_df=vectorAssembler.transform(pp_df)
vpp_df.take(1)

[Row(AT=23.25, V=71.29, AP=1008.05, RH=71.36, PE=442.21, features=DenseVector([23.25, 71.29, 1008.05, 71.36]))]

In [44]:
#Split data into training and test sets
splits = vpp_df.randomSplit([0.75,0.25])
train_df = splits[0]
test_df = splits[1]
train_df.count() #7249 instances
test_df.count() #2367 instances
vpp_df.count() #9568 total instances

9568

#### *Create a Decision Tree Object*

In [45]:
dt = DecisionTreeRegressor(featuresCol="features", labelCol="PE")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
rmse

4.7736989914438706

- Pretty small error, ~1% 
- Fairly accurate prediction

## Regression Models

### *3. Gradient-Boosted Tree Regression*

In [47]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol="features", labelCol="PE")
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_evaluator = RegressionEvaluator(labelCol="PE", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
rmse

4.315434615837233

GBT is better than the other models