# Analyzing Internet of Things Data with IBM DSX: Trucking Data Analysis.

In this notebook you will see how to build a predictive model with Spark machine learning API (SparkML) and deploy it for scoring in Machine Learning (ML) in IBM DSX platform.
This notebook walks you through following steps:
- Fetching data from HDFS
- Feature engineering
- Data Visualization
- Build a binary classifier model with SparkML API
- Run basic model metrics

# Use Case

Imagine a trucking company that dispatches trucks across the country. The trucks are outfitted with sensors that collect data – data like location of the driver, weather conditions, and even what event recently occured (speeding, the truck weaving out of its lane, following too closely, etc). Data like this is generated very often, say once per second and is streamed back to the company’s servers.

The company needs a way to process this stream of data and run some analysis on the data so that it can make sure trucks are traveling safe and if the driver is likely to make any violations anytime soon. Oh, and this also needs to be done in real-time!


![CRISP-DM](https://raw.githubusercontent.com/dhananjaymehta/IoTtrucking/master/trucks2.jpg)





For predicting violations, we are simulating trucking events in terms of location, miles driven, weather conditions. Next step is to visually understand the data and correlations between different features. We will also need to do some feature engineering for data preparation. 

Once the data is ready, we can build a predictive model. In our example we are using the SparkML Random Forrest classification model. Classification is a statistical technique which assigns a "class" to each driver - **"Violations"** or **"Normal"**. We build the classification models using historical data to train our model. (In a typical analytics project large training datasets will be used but we are building this demo model with a small datasets)

If a model's meets accuracy expectations, it is good to be deployed for scoring. 

Scoring is the process of applying the model to a new set of data.

In [None]:
import numpy as np
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
#sc=SparkContext()

## Step 1: Import HDFS Data from remote HDP cluster.

In [None]:
# view dataset
!wget -i -L "https://raw.githubusercontent.com/roberthryniewicz/datasets/master/IoT-Trucking-Demo.csv" | tail -n 5

#### Load Events Data

In [None]:
!pwd

In [None]:
!ls

In [None]:
# Training Data : from HDFS
#DSX_PROJECT_DIR='/user-home/1002/DSX_Projects/Final DSX Test - Shared/datasets/'
eventsFile = SQLContext(sc).read.csv('./IoT-Trucking-Demo.csv', header='true', inferSchema = 'false')  # this will load it as Spark DataFrame
# see the data
eventsFile.show(5)
# total number of records
tot_row = eventsFile.count()

# events with violations
tot_violations = eventsFile.filter("iscertified == 'N'").count()
tot_no_violations = tot_row - tot_violations
print(type(eventsFile), tot_row)
print("Violations: " + str(tot_violations) + "; No violations: " + str(tot_no_violations))

In [None]:
eventsFile.show(5)

## Step 2: Data Wrangling

In [None]:
# old column names
old_col_names = eventsFile.columns
# new names to be assigned
new_col_names =['eventtyp', 'iscertified', 'paymentscheme', 'hoursdriven', 'milesdriven', 'latitude', 'longitude', 'isfoggy', 'israiny', 'iswindy']

# Renaming the columns
eventsdata = reduce(lambda eventsFile, idx: eventsFile.withColumnRenamed(old_col_names[idx], new_col_names[idx]), range(len(old_col_names)), eventsFile)
eventsdata.printSchema()

### Type conversion for Columns

In [None]:
data=eventsdata.withColumn("latitude", eventsdata["latitude"].cast("float")).withColumn("longitude", eventsdata["longitude"].cast("float")).withColumn("hoursdriven", eventsdata["hoursdriven"].cast("int")).withColumn("isfoggy", eventsdata["isfoggy"].cast("int")).withColumn("israiny", eventsdata["israiny"].cast("int")).withColumn("iswindy", eventsdata["iswindy"].cast("int")).withColumn("milesdriven", eventsdata["milesdriven"].cast("int"))

# view final schema
data.printSchema()

### Feature Engineering

** Transforming truck events** eventType

eventType into binary (Y/N) ifViolated

**N** - if driving is 'Normal' and there are no violations

**Y** - ['Lane Departure', 'Overspeed','Unsafe following distance', 'Unsafe tail distance']

In [None]:
# creating a pandas dataframe 
data_pandas=data.toPandas()
type(data_pandas)

In [None]:
# unique trucking events
truck_events= list(data_pandas['eventtyp'].unique())
truck_events

In [None]:
# transform column eventType
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'eventtyp'
udf = UserDefinedFunction(lambda x: 'N' if x=="Normal" else 'Y', StringType())
data_tran=data.select(*[udf(column).alias(name) if column == name else column for column in data.columns])

In [None]:
data_pandas=data_tran.toPandas()  # use updated dataframe

#### Register table for Enriched Events

In [None]:
data.registerTempTable("enrichedEvents")

## Step 3: Exploratory analysis

In [None]:
import seaborn as sns
import warnings
import matplotlib.pyplot as plt
%matplotlib inline

# set plot size
fig_size=[0,0]
fig_size[0] = 12
fig_size[1] = 9
plt.rcParams["figure.figsize"] = fig_size

# setting temp dataframe =
df = data_pandas

# setting style
sns.set_style("whitegrid")
warnings.filterwarnings('ignore')

#### Feature transformation in pandas

**Note:** This is for visualization purpose only

Setting int values for column

- eventTyp:
    - 1 if Violation 
    - 0 for Normal

- isCertified: 
    - 1 if Certified 
    - 0 for Not Certified

- paymentScheme: 
    - 1 if "hours" 
    - 0 for "miles"

In [None]:
df["eventtyp"].unique()

In [None]:
df['eventtyp'] = df['eventtyp'].apply(lambda x: 0 if x=='N' else 1)

In [None]:
df['iscertified'] = df['iscertified'].apply(lambda x: 0 if x=='N' else 1)
df['paymentscheme'] = df['paymentscheme'].apply(lambda x: 0 if x=='miles' else 1)

In [None]:
df.head(5)

**listing the columns**

['eventTyp',
 'isCertified',
 'paymentScheme',
 'hoursDriven',
 'milesDriven',
 'latitude',
 'longitude',
 'isFoggy',
 'isRainy',
 'isWindy']

### Correlation Matrix for features

In [None]:
# Compute the correlation matrix
corr = df.corr()

# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(12, 10))

# Generate a custom diverging colormap
cmap = sns.diverging_palette(220, 10, as_cmap=True)

# Draw the heatmap with the mask and correct aspect ratio
sns.heatmap(corr,cmap=cmap, vmax=.3, center=0,
            square=True, linewidths=.5, cbar_kws={"shrink": .7})

e.g. more miles driven => more seasoned drivers => negative correlation with violations taking place (i.e. eventtyp = 0)

### Visualizing multidimensional relationships

*exploring correlations between multidimensional data, when you'd like to plot all pairs of values against each other.*

In [None]:
sns.pairplot(df, hue='eventtyp', size=2.5);
plt.show()    

In [None]:
df.hist()
plt.show()

### Do certified drivers have less violations?

In [None]:
# Bar Plot
sns.regplot(x="eventtyp", y="iscertified", data=df)
plt.show()

In [None]:
sns.barplot(y="eventtyp", x="iscertified", data=df)
plt.show()

### Correleation btw hours driven and violations?

In [None]:
ax = sns.regplot(x="hoursdriven", y="eventtyp", data=df)

### What are median hours driven by a driver?

In [None]:
sns.distplot(df["hoursdriven"]);

### What are median miles driven by a driver?

In [None]:
sns.distplot(df["milesdriven"]);

## Step 4: Building a classifier to predict truck event

In [None]:
# verify data schema
data_tran.printSchema()

In [None]:
data_tran.show(5)

### Algorithm Used: RandomForest Classifier

Random forests or random decision forests are an ensemble learning method for classification, regression and other tasks, that operate by constructing a multitude of decision trees at training time and outputting the class that is the mode of the classes (classification) or mean prediction (regression) of the individual trees. Random decision forests correct for decision trees' habit of overfitting to their training set.

In [None]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

We are using **ML Pipelines** provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipeline

A **Pipeline** is specified as a sequence of stages, and each stage is either a **Transformer** or an **Estimator**. These stages are run in order, and the input DataFrame is transformed as it passes through each stage. 

For Estimator stages, the fit() method is called to produce a Transformer (which becomes part of the PipelineModel, or fitted Pipeline), and that Transformer’s transform() method is called on the DataFrame.

![CRISP-DM](https://raw.githubusercontent.com/dhananjaymehta/IoTtrucking/master/fit.png)

For Transformer stages, the transform() method is called on the DataFrame. 

![CRISP-DM](https://raw.githubusercontent.com/dhananjaymehta/IoTtrucking/master/transform.png)



For more details ref: https://spark.apache.org/docs/2.1.1/ml-pipeline.html

In [None]:
# Prepare string variables so that they can be used by the decision tree algorithm
# StringIndexer encodes a string column of labels to a column of label indices

SI1 = StringIndexer(inputCol='iscertified',outputCol='iscertifiedEncoded')
SI2 = StringIndexer(inputCol='paymentscheme',outputCol='paymentschemeEncoded')

#encode the Label column
labelIndexer = StringIndexer(inputCol='eventtyp', outputCol='label').fit(data_tran)

# Pipelines API requires that input variables are passed in  a vector
assembler = VectorAssembler(inputCols=["iscertifiedEncoded", "paymentschemeEncoded", "hoursdriven", "milesdriven", "latitude", \
                                       "longitude", "isfoggy", "israiny", "iswindy"], outputCol="features")

In [None]:
# instantiate the algorithm, take the default settings
rf=RandomForestClassifier(labelCol="label", featuresCol="features")

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

pipeline = Pipeline(stages=[SI1,SI2,labelIndexer, assembler, rf, labelConverter])

In [None]:
# Split data into train and test datasets
train, test = data_tran.randomSplit([0.8,0.2], seed=6)
train.cache()

In [None]:
test.cache()

In [None]:
train.printSchema()

In [None]:
train.show(5)

In [None]:
# Build model. 
# The fitted model from a Pipeline is a PipelineModel, which consists of fitted models and transformers, corresponding to the pipeline stages.
model = pipeline.fit(train)

### Score test dataset

In [None]:
results = model.transform(test)

In [None]:
results.show(5)

### Showing the prediction results of binary classifier: 

In [None]:
results=results.select(results["eventtyp"],results["label"],results["predictedlabel"],results["prediction"],results["probability"])
results.toPandas().head(5)

### Model evaluation

In [None]:
print ('Model accuracy = {:.2f}'.format(results.filter(results.label == results.prediction).count() / float(results.count())))

In [None]:
TP = (results.filter(results.label == results.prediction).filter(results.prediction == 1.0)).count() # True positive => predicted violation and it did occur
TP

In [None]:
FP = (results.filter(results.label != results.prediction).filter(results.prediction == 1.0)).count() # False positive => predicted violation that did not occur
FP

In [None]:
precision = float(TP) / (TP + FP)
print "Model precision = {:.2f}".format(precision)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC")
print 'Area under ROC curve = {:.2f}'.format(evaluator.evaluate(results))

### Area Under ROC Curve

The area under the ROC curve (AUC) is a measure of how well a parameter can distinguish between two groups: violation vs no violation.
In a ROC curve the true positive rate (Sensitivity) is plotted in function of the false positive rate (Specifity). See https://www.medcalc.org/manual/roc-curves.php

#### Evaluation Criteria

- .90-1 = excellent (A)
- .80-.90 = good (B)
- .70-.80 = fair (C)
- .60-.70 = poor (D)
- .50-.60 = fail (F)


