In this notebook, you'll learn the basics of working with Spark in batch mode to build a random forest classifier. Note that this notebook is intended to be run on Google Colaboratory.

## Spark and Colaboratory setup

First, there's some configration specific to running Spark on Colaboratory that we'll need to attend to. Run these cells to set everything up.

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz

In [0]:
!pip install -q findspark
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 57kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 50.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=03816f96cecdb3671411e13473c9ea149e8fcf15c3b3b1bb3e4de2c79941b308
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


##  Import dependencies

Next, we need to import the tools we'll need from PySpark. The imports below allow us to connect to the Spark server, load our data, clean it, and prepare, execute, and evaluate a model.

In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler, PCA
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnan, when, count, col

## Set our constants

Next, we create a set of constants that we can refer to throughout the notebook. These are values that the rest of our code needs to run, but that we might need to change at some point (for instance, if the location of our data changes). 

If you saved the relevant datasets in the folders suggested in the previous checkpoint (link), you can use the below code chunk as is. If you saved the datasets elsewhere on your Google Drive, modify the file path after the `My Drive` folder. 

Regardless of the exact file path, these datasets **must** be stored on Google Drive!

In [0]:
CSV_PATH = "/content/gdrive/My Drive/thinkful_big_data/Colab Datasets/allData.csv" 
CSV_ACTIVITY_LABEL_PATH = "/content/gdrive/My Drive/thinkful_big_data/Colab Datasets/activity_labels.csv"
APP_NAME = "UCI HAR Random Forest Example"
SPARK_URL = "local[*]"
RANDOM_SEED = 141107
TRAINING_DATA_RATIO = 0.8
RF_NUM_TREES = 500
RF_MAX_DEPTH = 7
RF_NUM_BINS = 32

## Connect to the server and load data

Now we're ready to connect to the Spark server. We do that (relying on the constants set above) and then load our labels (loaded into `activity_labels`) and activity data (loaded into `df`). 

In [0]:
spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()
activity_labels = spark.read.options(inferschema = "true").csv(CSV_ACTIVITY_LABEL_PATH)
df = spark.read.options(inferschema = "true").csv(CSV_PATH)

In [0]:
spark

In [0]:
spark.sparkContext.defaultParallelism

2

So, the above shows the number of cores the machine I'm using has.

In [0]:
spark.sparkContext.defaultMinPartitions

2

In [0]:
activity_labels.show()

+---+------------------+
|_c0|               _c1|
+---+------------------+
|  1|           WALKING|
|  2|  WALKING_UPSTAIRS|
|  3|WALKING_DOWNSTAIRS|
|  4|           SITTING|
|  5|          STANDING|
|  6|            LAYING|
+---+------------------+



In [0]:
df.show()

+---+-----+--------+-------+------+------+------+------+------+------+------+------+------+-----+-----+-----+------+----+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+------+-------+------+--------+-------+--------+-------+-------+------+-----+------+--------+------+------+------+------+------+------+-----+------+--------+-----+------+-------+------+-----+------+------+------+------+------+------+----+------+-------+------+-------+------+------+------+-------+------+------+--------+-------+------+------+-------+------+------+--------+--------+------+------+------+------+------+------+------+------+------+-----+-----+-----+------+----+------+----+------+------+------+------+------+------+-----+-------+-------+-------+-------+--------+-------+------+-------+-------+-------+------+--------+-------+-------+-------+-------+-------+------+------+------+------+------+------+------+------+------+-----+-----+-----+------+-----+------+----

In [0]:
type(df)

pyspark.sql.dataframe.DataFrame

In [0]:
df.printSchema()

In [0]:
df.head(5)

[Row(_c0=5, _c1=0.289, _c2=-0.0203, _c3=-0.133, _c4=-0.995, _c5=-0.983, _c6=-0.914, _c7=-0.995, _c8=-0.983, _c9=-0.924, _c10=-0.935, _c11=-0.567, _c12=-0.744, _c13=0.853, _c14=0.686, _c15=0.814, _c16=-0.966, _c17=-1.0, _c18=-1.0, _c19=-0.995, _c20=-0.994, _c21=-0.988, _c22=-0.943, _c23=-0.408, _c24=-0.679, _c25=-0.602, _c26=0.929, _c27=-0.853, _c28=0.36, _c29=-0.0585, _c30=0.257, _c31=-0.225, _c32=0.264, _c33=-0.0952, _c34=0.279, _c35=-0.465, _c36=0.492, _c37=-0.191, _c38=0.376, _c39=0.435, _c40=0.661, _c41=0.963, _c42=-0.141, _c43=0.115, _c44=-0.985, _c45=-0.982, _c46=-0.878, _c47=-0.985, _c48=-0.984, _c49=-0.895, _c50=0.892, _c51=-0.161, _c52=0.125, _c53=0.977, _c54=-0.123, _c55=0.0565, _c56=-0.375, _c57=0.899, _c58=-0.971, _c59=-0.976, _c60=-0.984, _c61=-0.989, _c62=-0.918, _c63=-1.0, _c64=-1.0, _c65=0.114, _c66=-0.59, _c67=0.591, _c68=-0.592, _c69=0.592, _c70=-0.745, _c71=0.721, _c72=-0.712, _c73=0.711, _c74=-0.995, _c75=0.996, _c76=-0.996, _c77=0.992, _c78=0.57, _c79=0.439, _c80=0

In [0]:
df.show(5, truncate=True)

+---+-----+-------+------+------+------+------+------+------+------+------+------+------+-----+-----+-----+------+----+----+------+------+------+------+------+------+------+------+-------+-----+-------+------+-------+-----+-------+-----+-------+-------+------+-------+-------+-----+-----+------+------+------+------+------+------+------+------+-----+------+------+-----+------+------+------+-----+------+------+------+------+------+----+----+------+-------+------+-------+------+------+------+-------+------+------+-----+------+-----+------+------+-----+------+-------+--------+------+------+------+------+------+------+------+------+------+-----+-----+-----+------+----+----+----+------+------+------+------+------+------+-----+------+-----+-----+------+--------+-----+-----+-----+-------+------+------+------+-------+------+-------+-------+------+------+------+------+------+------+------+------+------+------+-----+-----+-----+------+-----+------+-----+------+------+------+-------+------+------+-

In [0]:
df.count()

10299

In [0]:
len(df.columns)

564

In [0]:
df.describe().show()

+-------+------------------+-------------------+--------------------+--------------------+------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+--------------------+-------------------+-----------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+-------------------+--------------------+--------------------+-------------------+-------------------+------------------+--------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------

In [0]:
df.describe('_c1').show()

+-------+-------------------+
|summary|                _c1|
+-------+-------------------+
|  count|              10299|
|   mean|0.27434761433148813|
| stddev|0.06762868745047078|
|    min|               -1.0|
|    max|                1.0|
+-------+-------------------+



In [0]:
df.select('_c0', '_c1').show(5)

+---+-----+
|_c0|  _c1|
+---+-----+
|  5|0.289|
|  5|0.278|
|  5| 0.28|
|  5|0.279|
|  5|0.277|
+---+-----+
only showing top 5 rows



In [0]:
print(df.select('_c0').distinct().count())
df.select('_c0').distinct().show()

6
+---+
|_c0|
+---+
|  1|
|  6|
|  3|
|  5|
|  4|
|  2|
+---+



In [0]:
df.crosstab('_c0', '_c1').show()

+-------+--------+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+----+-------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+---+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+----+-----+-----+-----+-----+-----

In [0]:
df.groupby('_c0').agg({'_c1': 'mean', '_c2': 'mean'}).show()

+---+-------------------+--------------------+
|_c0|           avg(_c1)|            avg(_c2)|
+---+-------------------+--------------------+
|  1| 0.2763350754936121|-0.01790675597560...|
|  6|0.26864574074074027|-0.01831791203703...|
|  3|0.28814317211948803|-0.01631077815433...|
|  5| 0.2791552990556135|-0.01615265215110...|
|  4| 0.2730567473269552|-0.01268911536297...|
|  2| 0.2622983290155441|-0.02592334590673...|
+---+-------------------+--------------------+



In [0]:
df.groupby('_c0').count().show()

+---+-----+
|_c0|count|
+---+-----+
|  1| 1722|
|  6| 1944|
|  3| 1406|
|  5| 1906|
|  4| 1777|
|  2| 1544|
+---+-----+



In [0]:
df.sample(withReplacement=False, fraction=.2).show(5)

+---+-----+-------+-------+------+------+------+------+------+------+------+------+------+-----+-----+-----+------+----+------+------+------+------+------+------+------+------+------+------+------+------+------+-------+------+-------+------+-------+-------+--------+------+------+-----+-----+------+-------+------+------+------+------+------+------+-----+------+--------+-----+------+-------+------+-----+------+------+------+------+------+------+------+------+------+-----+------+-----+------+------+------+------+-------+------+------+-----+------+------+-----+------+-------+--------+------+------+------+------+------+------+------+------+------+-----+-----+-----+------+----+----+----+------+------+------+------+------+------+-------+------+------+-----+------+-------+------+-----+-------+--------+-------+-----+------+-------+--------+-------+-------+------+------+------+------+------+------+------+------+------+------+-----+-----+-----+------+-----+------+------+------+------+------+-----

In [0]:
rdd_c0 = df.select("_c0").rdd.flatMap(lambda x: 1 if x < 4 else x)

In [0]:
rdd_c0

PythonRDD[713] at RDD at PythonRDD.scala:53

## Validate the data

If our data has been properly cleaned and prepared, it will meet the following criteria, which we'll verify in just a moment:

* The dataframe shape should be 10,299 rows by 562 columns
* All feature columns should be doubles. Note that one of the columns is for our labels and it will not be double.
* There should be no nulls. This point is crucial because Spark will fail to build our vector variables for our classifier if there are any null values.

Let's confirm these points.

In [0]:
# Confirm the dataframe shape is 10,299 rows by 562 columns
print(f"Dataset shape is {df.count():d} rows by {len(df.columns):d} columns.")

Dataset shape is 10299 rows by 562 columns.


In [0]:
# Confirm that all feature columns are doubles via a list comprehension
# We're expecting 561 of 562 here, accounting for the labels column
double_cols = [col[0] for col in df.dtypes if col[1] == 'double']
print(f"{len(double_cols):d} columns out of {len(df.columns):d} total are type double.")

561 columns out of 562 total are type double.


## Set up and run our classifier in Spark

After confirming our data is clean, we're ready to reshape the data and run the random forest model.

In Spark, we manipulate the data to work in a Spark pipeline, define each of the steps in the pipeline, chain them together, and finally run the pipeline.

Apache Spark classifiers expect 2 columns of input:

1. __labels__: an indexed set of numeric variables that represent the classification from the set of features we provide.
2. __features__: an indexed, vector variable that contains all of the feature values in each row. 

In order to do this, we need to create these 2 columns from our dataset - the data is there, but not yet in a format we can use for the classifier.

To create the indexed labels column, we'll create a column called `indexedLabel` using the `StringIndexer` method. We use the column `_c0` as the source for our label index since that contains our labels. The column contains only one value per index.
    
To create the indexed features column, we'll need to do two things. First, we'll create the vector of features using the `VectorAssembler` method. To create this vector, we'll need to use all 561 numeric columns from our data frame. The vector assembler will create a new column called `features`, and each row of this column will contain a 561-element vector that is built from the 561 features in the dataset.

Finally, we'll complete the data preparation by creating an indexed vector from the `features` column. We'll call this vector `indexedFeatures`.
    
Since the classifier expects indexed labels and an indexed vector column of data, we'll use the `indexedLabel` and `indexedFeatures` as inputs to our random forest classifier.

In [0]:
# Generate our feature vector.
# Note that we're doing the work on the `df` object - we don't create new dataframes, 
# just add columns to the one we already are using.

# the transform method creates the column.

df = VectorAssembler(inputCols=double_cols, outputCol="features").transform(df)
df = PCA(k=100, inputCol="features", outputCol="pcaFeatures").fit(df).transform(df)

Let's confirm that the features are there. It's easy to do this in Apache Spark using the `select` and `show` methods on the dataframe.  

In [0]:
df.select("_c0", "features", "pcaFeatures").show(5)

+---+--------------------+--------------------+
|_c0|            features|         pcaFeatures|
+---+--------------------+--------------------+
|  5|[0.289,-0.0203,-0...|[-15.053556278777...|
|  5|[0.278,-0.0164,-0...|[-15.080169284189...|
|  5|[0.28,-0.0195,-0....|[-15.015516422989...|
|  5|[0.279,-0.0262,-0...|[-15.216240677509...|
|  5|[0.277,-0.0166,-0...|[-15.284639715420...|
+---+--------------------+--------------------+
only showing top 5 rows



In [0]:
df.select("_c0").replace({2:1, 3:1})

DataFrame[_c0: int]

Now we're ready to build the indexers, split our data for training and testing, define our model, and finally chain everything together into a pipeline.

__It's important to note that when we execute this cell, we're not actually running our model. At this point, we're only defining its parameters__.

In [0]:
# Build the training indexers / split data / classifier
# first we'll generate a labelIndexer
labelIndexer = StringIndexer(inputCol="_c0", outputCol="indexedLabel").fit(df)

# now generate the indexed feature vector
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df)
    
# Split the data into training and validation sets (20% held out for testing)
(trainingData, testData) = df.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=RF_NUM_TREES,
                            maxDepth=RF_MAX_DEPTH, seed=RANDOM_SEED)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

This next cell runs the pipeline, delivering a trained model at the end of the process.

In [0]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

It is now easy to test our model and make predictions simply by using the model's `transform` method on the `testData` dataset.

In [0]:
# Make predictions.
predictions = model.transform(testData)

## Evaluate the model

Now we can use the MulticlassClassificationEvaluator to test the model's accuracy.

In [0]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.0488759
Accuracy = 0.951124


## Next Steps

We've seen how to prepare data and build a classifier in Spark. You might want to play around with this notebook and learn more about how Spark works. Here are some ideas:

- Look at the set of labels, and see if there are any features that would make sense to combine. Spark allows you to map values into a new column.
- Identify the most important features among the 561 source features (using PCA or something similar), then reduce the feature set and see if the model performs better.
- Modify the settings of the random forest to see if the performance improves.
- Use Spark's tools to find other techniques to evaluate the performance of your model. See if you can figure out how to generate an ROC plot, find the AUC value, or plot a confusion matrix.