## Spark Machine Learning
- The purpose is to apply pyspark MLlib for doing machine learning on a real world forest dataset collected by US Forest Service

### Setup

In [1]:
%%bash
wget https://github.com/ppkgtmm/big-data/raw/main/setup_spark_colab.sh
chmod +x ./setup_spark_colab.sh
./setup_spark_colab.sh &> /dev/null

--2022-03-06 17:23:27--  https://github.com/ppkgtmm/big-data/raw/main/setup_spark_colab.sh
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/ppkgtmm/big-data/main/setup_spark_colab.sh [following]
--2022-03-06 17:23:27--  https://raw.githubusercontent.com/ppkgtmm/big-data/main/setup_spark_colab.sh
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 507 [text/plain]
Saving to: ‘setup_spark_colab.sh’

     0K                                                       100% 21.5M=0s

2022-03-06 17:23:27 (21.5 MB/s) - ‘setup_spark_colab.sh’ saved [507/507]



In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"

### Download data

In [3]:
!gdown -O /content/cover_type.csv --id 1TDRDFYYJN0o9aMLTVlwSlTGzf-XkAygN

Downloading...
From: https://drive.google.com/uc?id=1TDRDFYYJN0o9aMLTVlwSlTGzf-XkAygN
To: /content/cover_type.csv
100% 59.2M/59.2M [00:00<00:00, 101MB/s]


### Import libraries

In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, \
IndexToString
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### Create session

In [5]:
spark = SparkSession \
    .builder \
    .appName("Lab 7") \
    .getOrCreate()

### Steps

- Read the data into Spark. Verify that data is successfully loaded by showing first 5 rows.

In [6]:
data = (
    spark
    .read
    .csv('/content/cover_type.csv', header=True, inferSchema=True)
)
data.limit(5).toPandas()

Unnamed: 0,Elevation,Aspect,Slope,Horizontal_Distance_To_Hydrology,Vertical_Distance_To_Hydrology,Horizontal_Distance_To_Roadways,Hillshade_9am,Hillshade_Noon,Hillshade_3pm,Horizontal_Distance_To_Fire_Points,Wilderness_Type,Soil_Type,Cover_Type
0,2596,51,3,258,0,510,221,232,148,6279,Rawah,Como - Legault families complex,Aspen
1,2590,56,2,212,-6,390,220,235,151,6225,Rawah,Como - Legault families complex,Aspen
2,2804,139,9,268,65,3180,234,238,135,6121,Rawah,Legault family - Rock land complex,Lodgepole Pine
3,2785,155,18,242,118,3090,238,238,122,6211,Rawah,Como family - Rock land - Legault family complex,Lodgepole Pine
4,2595,45,2,153,-1,391,220,234,150,6172,Rawah,Como - Legault families complex,Aspen


In [7]:
data.printSchema()

root
 |-- Elevation: integer (nullable = true)
 |-- Aspect: integer (nullable = true)
 |-- Slope: integer (nullable = true)
 |-- Horizontal_Distance_To_Hydrology: integer (nullable = true)
 |-- Vertical_Distance_To_Hydrology: integer (nullable = true)
 |-- Horizontal_Distance_To_Roadways: integer (nullable = true)
 |-- Hillshade_9am: integer (nullable = true)
 |-- Hillshade_Noon: integer (nullable = true)
 |-- Hillshade_3pm: integer (nullable = true)
 |-- Horizontal_Distance_To_Fire_Points: integer (nullable = true)
 |-- Wilderness_Type: string (nullable = true)
 |-- Soil_Type: string (nullable = true)
 |-- Cover_Type: string (nullable = true)



- Split dataset into Training set and Test Set. Using 80:20 proportion.

In [8]:
(trainset, testset) = data.randomSplit([0.8, 0.2], seed = 11)
trainset.count(), testset.count()

(464558, 116454)

- Construct a Pipeline consists of following stages:
  - I. Indexer and Encoder for Wilderness_Type
  - II. Indexer and Encoder for Soil_Type
  - III. Indexer for Cover_Type
  - IV. Vector Assembler to pack everything together
  - V. Random Forest Classifier
  - VI. Label Converter

In [9]:
cat_cols = ['Wilderness_Type', 'Soil_Type', 'Cover_Type']
num_cols = list(set(trainset.columns) - set(cat_cols))

In [10]:
preps = []
prep_colnames = []
for col in cat_cols[:-1]:
  indexer = StringIndexer(inputCol=col, outputCol=col+'_index')
  encoder = OneHotEncoder(inputCol=col+'_index', outputCol=col+'_prep')
  preps.append(indexer)
  preps.append(encoder)
  prep_colnames.append(col+'_prep')

In [11]:
target_name = cat_cols[-1]+'_prep'
cover_indexer = StringIndexer(
    inputCol=cat_cols[-1],
    outputCol=target_name
)

In [12]:
all_cols = prep_colnames + num_cols
vec_assem = VectorAssembler(inputCols=all_cols, outputCol='features')

In [13]:
rf = RandomForestClassifier(featuresCol='features', labelCol=target_name)

In [14]:
label_inverter = IndexToString(
    inputCol='prediction',
    outputCol='predictionLabel',
    labels = cover_indexer.fit(trainset).labels
)

In [15]:
pipeline = Pipeline(
    stages= preps + [cover_indexer, vec_assem , rf, label_inverter]
)

- Make a prediction of Test set. Evaluate the performance using accuracy score.

In [16]:
model = pipeline.fit(trainset)

In [17]:
predictions = model.transform(testset)
(
    predictions
    .select(['probability', 'prediction', 'predictionLabel', cat_cols[-1]])
    .limit(5)
    .toPandas()
)

Unnamed: 0,probability,prediction,predictionLabel,Cover_Type
0,"[0.21835411914501396, 0.09380611083146631, 0.4...",2.0,Ponderosa Pine,Ponderosa Pine
1,"[0.19586503734169203, 0.08700763908296678, 0.4...",2.0,Ponderosa Pine,Ponderosa Pine
2,"[0.19057219110095994, 0.08645849944224393, 0.4...",2.0,Ponderosa Pine,Douglas-fir
3,"[0.21355438749109537, 0.1036751747327392, 0.42...",2.0,Ponderosa Pine,Douglas-fir
4,"[0.19586503734169203, 0.08700763908296678, 0.4...",2.0,Ponderosa Pine,Ponderosa Pine


In [18]:
acc_evaluator = MulticlassClassificationEvaluator(
    labelCol=target_name, 
    predictionCol='prediction', 
    metricName='accuracy'
)
print('Test Accuracy = {:.4f}'.format(acc_evaluator.evaluate(predictions)))

Test Accuracy = 0.6765


In [19]:
prec_evaluator = MulticlassClassificationEvaluator(
    labelCol=target_name, 
    predictionCol='prediction', 
    metricName='weightedPrecision'
)
print('Test Precision = {:.4f}'.format(prec_evaluator.evaluate(predictions)))

Test Precision = 0.6190


In [20]:
rec_evaluator = MulticlassClassificationEvaluator(
    labelCol=target_name, 
    predictionCol='prediction', 
    metricName='weightedRecall'
)
print('Test Recall = {:.4f}'.format(rec_evaluator.evaluate(predictions)))

Test Recall = 0.6765


In [21]:
f1_evaluator = MulticlassClassificationEvaluator(
    labelCol=target_name, 
    predictionCol='prediction', 
    metricName='f1'
)
print('Test F1 = {:.4f}'.format(f1_evaluator.evaluate(predictions)))

Test F1 = 0.6422
