## Analyzing Crowdedness at a GYM

This Python notebook demonstrates creating an ML Pipeline to preprocess a dataset, train a Machine Learning model, and make predictions.

**Data**: The data consist of the number of people present at different times in gym and also includes additional factors like temperature. Using this dataset interesting insights into attendace of the gym shall be derived. This dataset is from Kaggle : https://www.kaggle.com/nsrose7224/crowdedness-at-the-campus-gym

**Goal**: We want to predict the number of people attending the gym given the values of other factors from information such as day of the week, weather, is during semester, etc. Using this we will be able to predict how crowded the gym will be in the future.

#### 1. Read Data
Reading and inspecting the data using the SQL module available in SPARK.

In [4]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
gym_df = sqlContext.sql("Select * from gym_data")

#### Data description

The dataset consists of 26,000 people counts (about every 10 minutes) over the last year. The label is the number of people, which we would like to predict given some subset of the features.

**Feature columns**:

* date (string; datetime of data)
* timestamp (int; number of seconds since beginning of day)
* dayofweek (int; 0 [monday] - 6 [sunday])
* is_weekend (int; 0 or 1) [boolean, if 1, it's either saturday or sunday, otherwise 0]
* is_holiday (int; 0 or 1) [boolean, if 1 it's a federal holiday, 0 otherwise]
* temperature (float; degrees fahrenheit)
* isstartof_semester (int; 0 or 1) [boolean, if 1 it's the beginning of a school semester, 0 otherwise]
* month (int; 1 [jan] - 12 [dec])
* hour (int; 0 - 23)

**Label columns**:
* number_people: Number of people at gym at a given particular time

We can call `display()` on a DataFrame in Databricks to see a sample of the data.

In [7]:
display(gym_df)

number_people,date,timestamp,day_of_week,is_weekend,is_holiday,temperature,is_start_of_semester,is_during_semester,month,hour
37,2015-08-14 17:00:11-07:00,61211,,0.0,0.0,71.76,0.0,0.0,8.0,17.0
45,2015-08-14 17:20:14-07:00,62414,4.0,0.0,0.0,71.76,0.0,0.0,8.0,17.0
40,2015-08-14 17:30:15-07:00,63015,4.0,0.0,0.0,71.76,0.0,0.0,8.0,17.0
44,2015-08-14 17:40:16-07:00,63616,4.0,0.0,0.0,71.76,0.0,0.0,8.0,17.0
45,2015-08-14 17:50:17-07:00,64217,,0.0,,71.76,,0.0,8.0,17.0
46,2015-08-14 18:00:18-07:00,64818,4.0,0.0,,72.15,0.0,0.0,8.0,18.0
43,2015-08-14 18:20:08-07:00,66008,,0.0,0.0,72.15,0.0,0.0,8.0,18.0
53,2015-08-14 18:30:09-07:00,66609,4.0,0.0,0.0,72.15,0.0,0.0,8.0,18.0
54,2015-08-14 18:40:14-07:00,67214,4.0,0.0,0.0,72.15,0.0,0.0,8.0,18.0
43,2015-08-14 18:50:15-07:00,67815,4.0,0.0,0.0,72.15,0.0,0.0,8.0,18.0


In [8]:
print("Our dataset has %d rows." % gym_df.count())

#### 2. Define Target

In [10]:
target = 'number_people' #dependent variable in the dataset

#### 3. Checking the Dataset - Descriptive

In [12]:
# Printing the schema in a tree format
gym_df.printSchema()

In [13]:
gym_df.count()

In [14]:
gym_df.limit(5).toPandas()

Unnamed: 0,number_people,date,timestamp,day_of_week,is_weekend,is_holiday,temperature,is_start_of_semester,is_during_semester,month,hour
0,37,2015-08-14 17:00:11-07:00,61211,,0,0.0,71.760002,0.0,0,8,17
1,45,2015-08-14 17:20:14-07:00,62414,4.0,0,0.0,71.760002,0.0,0,8,17
2,40,2015-08-14 17:30:15-07:00,63015,4.0,0,0.0,71.760002,0.0,0,8,17
3,44,2015-08-14 17:40:16-07:00,63616,4.0,0,0.0,71.760002,0.0,0,8,17
4,45,2015-08-14 17:50:17-07:00,64217,,0,,71.760002,,0,8,17


In [15]:
summary = gym_df.describe().toPandas()
summary

Unnamed: 0,summary,number_people,date,timestamp,day_of_week,is_weekend,is_holiday,temperature,is_start_of_semester,is_during_semester,month,hour
0,count,62184.0,62184,62184.0,61709.0,61750.0,61723.0,61744.0,61726.0,61691.0,61684.0,61727.0
1,mean,29.07254277627685,,45799.43795831725,2.983308755611013,0.2829635627530364,0.0025922265606013,58.50328932607937,0.0772931989761202,0.6595613622732651,7.43003047791972,12.228943574124775
2,stddev,22.68902564217386,,24211.275890511104,1.9967839249525097,0.4504425276462453,0.0508482921208942,6.289692868508321,0.2670582629293928,0.4738605401215063,3.456973721006568,6.721469432848815
3,min,0.0,2015-08-14 17:00:11-07:00,0.0,0.0,0.0,0.0,38.14,0.0,0.0,1.0,0.0
4,max,145.0,2017-03-18 19:22:51-07:00,86399.0,6.0,1.0,1.0,87.17,1.0,1.0,12.0,23.0


In [16]:
# have a summary dataframe that gives key descriptive statistics
import pandas as pd
null_list = [gym_df.where(gym_df[x].isNull()).count() for x in gym_df.columns]
null_list.insert(0, 'null_count')
null_list = pd.DataFrame(null_list).T
null_list.columns = summary.columns
summary = summary.append(null_list)
summary.index = summary.summary
summary = summary[gym_df.columns]
summary
# data has missing values

Unnamed: 0_level_0,number_people,date,timestamp,day_of_week,is_weekend,is_holiday,temperature,is_start_of_semester,is_during_semester,month,hour
summary,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
count,62184.0,62184,62184.0,61709.0,61750.0,61723.0,61744.0,61726.0,61691.0,61684.0,61727.0
mean,29.07254277627685,,45799.43795831725,2.983308755611013,0.2829635627530364,0.0025922265606013,58.50328932607937,0.0772931989761202,0.6595613622732651,7.43003047791972,12.228943574124775
stddev,22.68902564217386,,24211.275890511104,1.9967839249525097,0.4504425276462453,0.0508482921208942,6.289692868508321,0.2670582629293928,0.4738605401215063,3.456973721006568,6.721469432848815
min,0.0,2015-08-14 17:00:11-07:00,0.0,0.0,0.0,0.0,38.14,0.0,0.0,1.0,0.0
max,145.0,2017-03-18 19:22:51-07:00,86399.0,6.0,1.0,1.0,87.17,1.0,1.0,12.0,23.0
null_count,0.0,0,0.0,475.0,434.0,461.0,440.0,458.0,493.0,500.0,457.0


## EXPLORATORY DATA ANALYSIS
We are converting spark data frame to pandas data frame for analysis. <br>
We are performing this since number of records are around 62K, <br>
If there were more than 100k records, we would skip this step since <br>
it will not perform parallel processing.

In [18]:
import smtplib
from matplotlib import style
import numpy as np
import seaborn as sns
sns.set(style='ticks', palette='RdBu')
import matplotlib.pyplot as plt
import datetime
import time
%matplotlib inline

** Pairplot to get a pictorial view of all data **
* Diagonal matrix always shows the distribution of data (Histogram).
* Number of people coming to Gym and Temperature are showing Normal distribution as both are continuous numerical variables and it don’t need any further data transformation Pre-processing work

In [20]:
pandas_eda = gym_df.toPandas()
sns.pairplot(pandas_eda)

**Temperature Histogram**

In [22]:
A=pandas_eda['temperature']
Anan=A[~np.isnan(A)]
sns.distplot(Anan,kde=True, rug=True)

**Correlation matrix between different features to understand the relationship that exist between features.**
* High positive correlation - Number of people and Timestamp.

In [24]:
correlation = pandas_eda.corr()
plt.figure(figsize=(10,10))
sns.heatmap(correlation, vmax=1, square=True,annot=True,cmap='viridis')

* Number of people coming to Gym are higher at 17th and 18th hour. 
* more number of people are coming at above 17th hour (17-23rd hour) timestamp when compared to 0-16th hour

In [26]:
Bins = []
for i in range(0,24):
    NumberofPeople = 0
    for index, row in pandas_eda.iterrows():
        if(row['timestamp']/3600 > i and row['timestamp']/3600 < i+1):
            NumberofPeople = NumberofPeople + row['number_people']
    Bins.append((NumberofPeople))
                  
sns.barplot(list(range(24)),Bins)

** Heat Map to show Busiest hour of the day **
* Weekend – Less number of people coming to Gym compared to weekdays
* Peak Hours – 17th and 18th hour.
* Opposite effect on Tuesday at 17th Hour. It is completely vacant at 5pm compared to the rest of the week at the same time.

In [28]:
pandas_eda['hour'] = pandas_eda.timestamp.apply( lambda x: int(np.floor(x/3600))) 

eda_group = pandas_eda.groupby(['hour','day_of_week'], as_index = False).number_people.mean().pivot('day_of_week','hour', 'number_people').fillna(0)


grid_spec = {"height_ratios": (.9, .05), "hspace": .3}

days= 'Sunday Saturday Friday Thursday Wednesday Tuesday Monday'.split()
days.reverse()

ax = sns.heatmap(eda_group, cmap='RdBu_r',cbar_kws={"orientation": "horizontal"})
ax.set_yticklabels(days, rotation = 0)
ax.set_ylabel('')
ax.set_xlabel('Hour')

cbar = ax.collections[0].colorbar
cbar.set_label('Average Number of People')


** Variation of people coming to Gym across Months and influence of crowdedness by semester.**
* At Start of semester, people coming to Gym are higher.
* Steady crowdedness during the semester.
* Crowd declined steadily in semester end.

In [30]:
sns.factorplot(x='month',y='number_people',data=pandas_eda,hue='is_during_semester',aspect=2)

##4.Preprocessing the data

We need to get our data ready for Machine Learning!!

* Goal: We want to learn to predict the number of people (the `number_people` column).  We refer to the 'number_people' as our target "label".

*Feature Selection*
* Some of the columns contain duplicate information. Variable 'is_weekend' is derived from 'day_of_week' (saturday = 5 or sunday = 6), hence removing 'is_weekend'.
* Since 'timestamp' variable gives exact time in seconds number of seconds since beginning of day, dropping 'hour' variable because it adds redundancy.
* Except for 'number_people', 'date', and 'timestamp' all the varibles has null values.
* Variable 'is_holiday' has less than 1% of data for holidays. Dropping this variable, since we do not enough data for holidays.
* We only have 3 years of data. Performing analysis using 'date' variable would not be helpful since we already have month and timestamp variable, we will only fetch day_of_month from date variable, which will help us to identify what time of the month people count is more.

In [32]:
gym_df.select('is_holiday').groupBy('is_holiday').count().show()

In [33]:
import dateutil.parser as dateparser
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

get_day = udf(lambda x: dateparser.parse(x).day, IntegerType())

gym_df = gym_df.withColumn('day_of_month', get_day(gym_df["date"]))
gym_df.limit(5).toPandas()

Unnamed: 0,number_people,date,timestamp,day_of_week,is_weekend,is_holiday,temperature,is_start_of_semester,is_during_semester,month,hour,day_of_month
0,37,2015-08-14 17:00:11-07:00,61211,,0,0.0,71.760002,0.0,0,8,17,14
1,45,2015-08-14 17:20:14-07:00,62414,4.0,0,0.0,71.760002,0.0,0,8,17,14
2,40,2015-08-14 17:30:15-07:00,63015,4.0,0,0.0,71.760002,0.0,0,8,17,14
3,44,2015-08-14 17:40:16-07:00,63616,4.0,0,0.0,71.760002,0.0,0,8,17,14
4,45,2015-08-14 17:50:17-07:00,64217,,0,,71.760002,,0,8,17,14


In [34]:
gym_df = gym_df.drop('is_weekend', 'hour','is_holiday','date')

In [35]:
display(gym_df)

number_people,timestamp,day_of_week,temperature,is_start_of_semester,is_during_semester,month,day_of_month
37,61211,,71.76,0.0,0.0,8.0,14
45,62414,4.0,71.76,0.0,0.0,8.0,14
40,63015,4.0,71.76,0.0,0.0,8.0,14
44,63616,4.0,71.76,0.0,0.0,8.0,14
45,64217,,71.76,,0.0,8.0,14
46,64818,4.0,72.15,0.0,0.0,8.0,14
43,66008,,72.15,0.0,0.0,8.0,14
53,66609,4.0,72.15,0.0,0.0,8.0,14
54,67214,4.0,72.15,0.0,0.0,8.0,14
43,67815,4.0,72.15,0.0,0.0,8.0,14


#### 5. Determining Categorical and Numerical Variables 
Categorical variables: { ordinal_variables = ['day_of_week', 'month', 'day_of_month'] , <br> binary_variables = ['is_start_of_semester', 'is_during_semester'] } <br>

Now that we have the columns we care about, let's print the schema of our dataset to see the type of each column.

In [38]:
gym_df.printSchema()

In [39]:
# numerical, ordinal, & binary variables
num_input = list(set(gym_df.columns) - set([target]))
#num_input = num_input[0:6]
num_input

#### 6. Creating Custom Transformer for Imputation 
Imputing numerical, ordinal, & binary variables with median

In [41]:
# imputing missing values
from pyspark import keyword_only 
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import col

class NumericImputer(Transformer, HasInputCol, HasOutputCol):

    def __init__(self, inputCol=None, outputCol=None):
        super(NumericImputer, self).__init__()
        self.setParams(inputCol = inputCol , outputCol = outputCol)

        
    def setParams(self, inputCol=None, outputCol=None):
      return self._set(inputCol = inputCol, outputCol = outputCol)
        

    def _transform(self, dataset):

      out_col = self.getOutputCol()
      in_col = self.getInputCol()
      from pyspark.sql.functions import when  
      median_v = dataset.approxQuantile(in_col, [0.5], 0)[0]
      return dataset.withColumn(out_col, when(col(in_col).isNull(), median_v).otherwise(col(in_col)))

#### 7. Handling outliers
Capping outliers <br>
* Replacing the outliers which are less than (Q1 - 1.5*IQR) with the 5th percentile and <br>
* Replacing the outliers which are greater than (Q3 + 1.5*IQR) with the 95th percentile of the data.

In [43]:
# handling outliers
from pyspark import keyword_only 
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import col

class OutlierHandler(Transformer, HasInputCol, HasOutputCol):

    def __init__(self, inputCol=None, outputCol=None):
        super(OutlierHandler, self).__init__()
        self.setParams(inputCol = inputCol , outputCol = outputCol)

        
    def setParams(self, inputCol=None, outputCol=None):
      return self._set(inputCol = inputCol, outputCol = outputCol)
        

    def _transform(self, dataset):

      out_col = self.getOutputCol()
      in_col = self.getInputCol()
      from pyspark.sql.functions import when  
      five_percentile = dataset.approxQuantile(in_col, [0.05], 0)[0]
      q1 = dataset.approxQuantile(in_col, [0.25], 0)[0]
      q3 = dataset.approxQuantile(in_col, [0.75], 0)[0]
      ninety_five_percentile = dataset.approxQuantile(in_col, [0.95], 0)[0]
      IQR = q3 - q1
      replace_5 = (q1 - 1.5 * IQR)
      replace_95 = (q3 + 1.5 * IQR)
      return dataset.withColumn(out_col, when(col(in_col) < five_percentile, replace_5).when(col(in_col) > ninety_five_percentile, replace_95).otherwise(col(in_col)))

#### 8. Creating Custom Standardization
* Since variables have different units, also 'timestamp' and 'temperature' have much higher values than other varaibles, standardizing the data.

In [45]:
# normalizing data

from pyspark import keyword_only  
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType


class Standardizer(Transformer, HasInputCol, HasOutputCol):

    def __init__(self, inputCol=None, outputCol=None):
        super(Standardizer, self).__init__()
        self.setParams(inputCol = inputCol , outputCol = outputCol)

        
        
    def setParams(self, inputCol=None, outputCol=None):
      return self._set(inputCol = inputCol, outputCol = outputCol)
        

    def _transform(self, dataset):
      from pyspark.sql.functions import stddev, mean, col
      out_col = self.getOutputCol()
      in_col = dataset[self.getInputCol()]
      mean, sttdev = dataset.select(mean(in_col), stddev(in_col)).first()
      return dataset.withColumn(out_col, (in_col - mean)/sttdev) 

In [46]:
outlier_input = ['timestamp', 'temperature']
standardizer_input = ['timestamp', 'temperature', 'day_of_week', 'month', 'day_of_month']

#### 9.Split data into training and test sets

Our final data preparation step will split our dataset into separate training and test sets.  We can train and tune our model as much as we like on the training set, as long as we do not look at the test set.  After we have a good model (based on the training set), we can validate it on the held-out test set in order to know with high confidence our well our model will make predictions on future (unseen) data.

In [48]:
# Split the dataset randomly into 70% for training and 30% for testing. Passing a seed for deterministic behavior
train, test = gym_df.randomSplit([0.7, 0.3], seed = 0)
print("We have %d training examples and %d test examples." % (train.count(), test.count()))

#### 10. Building Stages for the Pipeline
* Train a Machine Learning Pipeline

* Now that we have understood our data and prepared it as a DataFrame with numeric values, let's learn an ML model to predict number of people attending gym in the future.  Most ML algorithms expect to predict a single "label" column (`number_people` for our dataset) using a single "features" column of feature vectors.  For each row in our data, the feature vector should describe what we know: temperature, day of the week, etc., and the label should be what we want to predict (`number_people`).

In [50]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

numericimputers = [NumericImputer(inputCol = column, outputCol = column) for column in num_input]
outlierhandlers = [OutlierHandler(inputCol = column, outputCol = column) for column in outlier_input]
standardizers = [Standardizer(inputCol = column, outputCol = column) for column in standardizer_input]

#### 11. Determining Input Variables (Features)

In [52]:
input_cols = num_input[:]

#### 12. Adding more stages to the Pipeline

In [54]:
assembler = VectorAssembler(inputCols= input_cols, outputCol="features")

#### LINEAR REGRESSION

In [56]:
lr = LinearRegression(featuresCol = 'features', labelCol = target ,maxIter=10, regParam=0.3, elasticNetParam=0.8)

#### 13. Combining the stages as a list

In [58]:
import functools 
import operator
stages = []
stages = functools.reduce(operator.concat, [numericimputers, outlierhandlers, standardizers])
stages.append(assembler)
stages.append(lr)
stages

#### 14. Running The Pipeline
Train the Pipeline!
Now that we have set up our workflow, we can train the Pipeline in a single call. Calling fit() will run feature processing, model tuning, and training in a single call.

In [60]:
# RUNING THE PIPELINE
pipeline = Pipeline(stages=stages)
df_r = pipeline.fit(train)

## Making predictions

Calling transform() on a new dataset passes that data through feature processing and uses the fitted model to make predictions. We get back a DataFrame with a new column predictions (as well as intermediate results such as our rawFeatures column from feature processing).

In [63]:
predictions = df_r.transform(test)

In [64]:
display(predictions.select("number_people", "prediction", 'features'))

number_people,prediction,features
0,-5.258689429584102,"List(1, 7, List(), List(0.0, -2.4420845112604304, -0.24096983705154498, 1.012991445393226, 0.0, -0.12864039718598663, 0.01351235492498539))"
0,1.445034932222157,"List(1, 7, List(), List(0.0, -2.4420845112604304, -0.7139790374585075, 1.012991445393226, 1.0, 1.3272121978944642, -1.2347614277037124))"
0,-18.961131295375814,"List(1, 7, List(), List(0.0, -2.4420845112604304, -1.5748556378482272, 1.5148972230623456, 0.0, 1.3272121978944642, 0.4674300940626937))"
0,-2.2114126073946565,"List(1, 7, List(), List(0.0, -2.4420845112604304, 0.7050485637623801, 1.5148972230623456, 0.0, 0.1625301218301035, 0.8078683984159749))"
0,-1.4477359944105466,"List(1, 7, List(), List(0.0, -2.4420845112604304, -0.03915277067917732, -0.994631665283253, 0.0, 1.3272121978944642, 0.6943889636315478))"
0,7.784908897843648,"List(1, 7, List(), List(0.0, -2.4420845112604304, -0.436480571196502, 1.5148972230623456, 1.0, -1.5844929922664375, 1.3752655723381102))"
0,19.15982402603066,"List(1, 7, List(), List(1.0, -2.4420845112604304, 1.146524298645386, 1.5148972230623456, 1.0, 0.1625301218301035, 1.6022244419069644))"
0,-1.87396635958822,"List(1, 7, List(), List(0.0, -2.4420845112604304, -2.2544115034325403, -0.994631665283253, 1.0, 1.3272121978944642, -0.09996707985944168))"
0,1.57125496663625,"List(1, 7, List(), List(0.0, -2.4420845112604304, -0.7423596375999094, 1.012991445393226, 1.0, 1.036041678878374, -0.21344651464386877))"
0,8.55069581033894,"List(1, 7, List(), List(0.0, -2.4420845112604304, -0.5878433709757778, 1.012991445393226, 1.0, -1.2933224732503472, -0.4404053842127229))"


#### Evaluating Performance
* Our final step will be to use our fitted model to make predictions on new data.  We will use our held-out test set, but you could also use this model to make predictions on completely new data.
* We will also evaluate our predictions.  Computing evaluation metrics is important for understanding the quality of predictions, as well as for comparing models.

In [66]:
summary = pipeline.fit(train).stages[-1].summary
print("numIterations: %d" % summary.totalIterations)
print("objectiveHistory: %s" % str(summary.objectiveHistory))
summary.residuals.show()
print("RMSE: %f" % summary.rootMeanSquaredError)
print("r2: %f" % summary.r2)

In [67]:
# Residual vs. Fitted Plot
import numpy as np
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt
sns.set(style="whitegrid")
fig, ax = plt.subplots()
# Make an example dataset with y ~ x
rs = np.random.RandomState(7)
x = np.asarray(df_r.transform(train).select('prediction').toPandas())
x = x.reshape(x.shape[0])
y = np.asarray(summary.residuals.toPandas())
y = y.reshape(y.shape[0])
# Plot the residuals after fitting a linear model
sns.residplot(x, y, lowess=True, color="g")
ax.set_title('residual vs. fitted')
ax.set(xlabel='fitted', ylabel='residuals')
display(fig)

In [68]:
import functools 
import operator
stages = []
stages = functools.reduce(operator.concat, [numericimputers, outlierhandlers, standardizers])
stages.append(assembler)

#### RANDOM FOREST REGRESSION

In [70]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

rf = RandomForestRegressor(featuresCol="features", labelCol = target)

stages.append(rf)

pipeline = Pipeline(stages=stages)

model = pipeline.fit(train)

##Making predictions

In [72]:
predictions_RF = model.transform(test)

In [73]:
display(predictions_RF.select("number_people", "prediction", 'features'))

number_people,prediction,features
0,1.8589958336909775,"List(1, 7, List(), List(0.0, -2.4420845112604304, -0.24096983705154498, 1.012991445393226, 0.0, -0.12864039718598663, 0.01351235492498539))"
0,5.618992827108995,"List(1, 7, List(), List(0.0, -2.4420845112604304, -0.7139790374585075, 1.012991445393226, 1.0, 1.3272121978944642, -1.2347614277037124))"
0,0.6122270686163079,"List(1, 7, List(), List(0.0, -2.4420845112604304, -1.5748556378482272, 1.5148972230623456, 0.0, 1.3272121978944642, 0.4674300940626937))"
0,2.5372487432898767,"List(1, 7, List(), List(0.0, -2.4420845112604304, 0.7050485637623801, 1.5148972230623456, 0.0, 0.1625301218301035, 0.8078683984159749))"
0,7.751654644647147,"List(1, 7, List(), List(0.0, -2.4420845112604304, -0.03915277067917732, -0.994631665283253, 0.0, 1.3272121978944642, 0.6943889636315478))"
0,3.565670633088022,"List(1, 7, List(), List(0.0, -2.4420845112604304, -0.436480571196502, 1.5148972230623456, 1.0, -1.5844929922664375, 1.3752655723381102))"
0,2.152587391767381,"List(1, 7, List(), List(1.0, -2.4420845112604304, 1.146524298645386, 1.5148972230623456, 1.0, 0.1625301218301035, 1.6022244419069644))"
0,12.494522350188351,"List(1, 7, List(), List(0.0, -2.4420845112604304, -2.2544115034325403, -0.994631665283253, 1.0, 1.3272121978944642, -0.09996707985944168))"
0,5.618992827108995,"List(1, 7, List(), List(0.0, -2.4420845112604304, -0.7423596375999094, 1.012991445393226, 1.0, 1.036041678878374, -0.21344651464386877))"
0,5.454778746737705,"List(1, 7, List(), List(0.0, -2.4420845112604304, -0.5878433709757778, 1.012991445393226, 1.0, -1.2933224732503472, -0.4404053842127229))"


#### Evaluating Performance

In [75]:
# Compute error
evaluator = RegressionEvaluator(
    labelCol='number_people', predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions_RF)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = pipeline.fit(test).stages[-1]
print(rfModel)  # summary only

In [76]:
# ref: https://www.silect.is/blog/2019/4/2/random-forest-in-spark-ml
rfResult = predictions_RF.toPandas()

plt.plot(rfResult.number_people, rfResult.prediction, 'bo')
plt.xlabel('number_people')
plt.ylabel('Prediction')
plt.suptitle("Model Performance RMSE: %f" % rmse)
plt.show()

In [77]:
# ['is_start_of_semester',
#  'timestamp',
#  'temperature',
#  'day_of_week',
#  'is_during_semester',
#  'month',
#  'day_of_month']

importances = rfModel.featureImportances

x_values = list(range(len(importances)))

plt.bar(x_values, importances, orientation = 'vertical')
plt.xticks(x_values, feature_list, rotation=40)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances')

In [78]:
import functools 
import operator
stages = []
stages = functools.reduce(operator.concat, [numericimputers, outlierhandlers, standardizers])
stages.append(assembler)

## DECISION TREE REGRESSION

In [80]:
from pyspark.ml.regression import DecisionTreeRegressor

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="features", labelCol = target)

stages.append(dt)

pipeline = Pipeline(stages=stages)

model = pipeline.fit(train)



##Making predictions

In [82]:
predictions_DT = model.transform(test)

In [83]:
# Select example rows to display.
predictions_DT.select('number_people', 'features', 'prediction').show(5)



#### Evaluating Performance

In [85]:
#compute error
evaluator = RegressionEvaluator(
    labelCol="number_people", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions_DT)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = pipeline.fit(test).stages[-1]
# summary only
print(treeModel)

In [86]:
dtResult = predictions_DT.toPandas()

plt.plot(dtResult.number_people, dtResult.prediction, 'bo')
plt.xlabel('number_people')
plt.ylabel('Prediction')
plt.suptitle("Model Performance RMSE: %f" % rmse)
plt.show()

In [87]:
# ['is_start_of_semester',
#  'timestamp',
#  'temperature',
#  'day_of_week',
#  'is_during_semester',
#  'month',
#  'day_of_month']

importances = treeModel.featureImportances

x_values = list(range(len(importances)))

plt.bar(x_values, importances, orientation = 'vertical')
plt.xticks(x_values, feature_list, rotation=40)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances')

In [88]:
import functools 
import operator
stages = []
stages = functools.reduce(operator.concat, [numericimputers, outlierhandlers, standardizers])
stages.append(assembler)

## Gradient Boosted Trees Regression

In [90]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol="features", labelCol = target, maxIter=10)

stages.append(gbt)

pipeline = Pipeline(stages=stages)

model = pipeline.fit(train)



##Making predictions

In [92]:
predictions_GBT = model.transform(test)

In [93]:

# Select example rows to display.
predictions_GBT.select('number_people', 'features', 'prediction').show(5)



#### Evaluating Performance

In [95]:
# Compute error
evaluator = RegressionEvaluator(
    labelCol="number_people", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions_GBT)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

gbtModel = pipeline.fit(test).stages[-1]
print(gbtModel)  # summary only

In [96]:
gbtResult = predictions_GBT.toPandas()

plt.plot(gbtResult.number_people, gbtResult.prediction, 'bo')
plt.xlabel('number_people')
plt.ylabel('Prediction')
plt.suptitle("Model Performance RMSE: %f" % rmse)
plt.show()

## CONCLUSION

** Metrics**: Manually viewing the predictions gives intuition about accuracy, but it can be useful to have a more concrete metric. Here, we used RMSE metric to compare all the models to tell us how well our model makes predictions on all of our data.  In this case (for [RMSE](https://en.wikipedia.org/wiki/Root-mean-square_deviation)), lower is better.  This metric does not mean much on its own, but it can be used to compare different models.  (This is what `CrossValidator` does internally.)
<br>

* Linear Regression      RMSE - 17.35
* RANDOM FOREST          RMSE - 13.73
* DECISION TREE          RMSE - 13.78
* GRADIENT BOOSTING TREE RMSE - 12.14
* GRADIENT BOOSTING TREE Regression model achieved the best results as it has the lowest RMSE out of all.

* To sum it up, we have learned how to build rgression models using PySpark and MLlib Pipelines.We tried four algorithms and gradient boosting performed best on our data set