# Seoul Bike Sharing Demand Prediction

# Importing required Libraries

In [0]:
pip install findspark

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
# to make pyspark importable as a regular library
import findspark
findspark.init()

import pyspark

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

#initializasing SparkSession for creating Spark DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [0]:
# Data Frame spark profiling 
from pyspark.sql.types import IntegerType, StringType, DoubleType, ShortType, DecimalType
import pyspark.sql.functions as func
from pyspark.sql.functions import isnull
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import mean
from pyspark.sql.functions import round
from pyspark.sql.types import Row
import matplotlib.pyplot as plt
from pyspark.sql.functions import udf


# Pandas DF operation
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from numpy import array

# Modeling + Evaluation
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer
from pyspark.sql.functions import when
from pyspark.sql import functions as F
from pyspark.sql.functions import avg
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor, GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder 
from sklearn.metrics import log_loss
from pyspark.sql.functions import corr
import pyspark.sql.functions as fn 
from pyspark.sql.functions import rank,sum,col
from pyspark.sql import Window

window = Window.rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)

In [0]:
df=spark.read.csv("/FileStore/tables/SeoulBikeData-2.csv",inferSchema=True,header=True)

In [0]:
type(df)

Out[4]: pyspark.sql.dataframe.DataFrame

In [0]:
df.show(5)

+----------+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-------+----------+---------------+
|      Date|Rented Bike Count|Hour|Temperature(�C)|Humidity(%)|Wind speed (m/s)|Visibility (10m)|Dew point temperature(�C)|Solar Radiation (MJ/m2)|Rainfall(mm)|Snowfall (cm)|Seasons|   Holiday|Functioning Day|
+----------+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-------+----------+---------------+
|01/12/2017|              254|   0|           -5.2|         37|             2.2|            2000|                    -17.6|                    0.0|         0.0|          0.0| Winter|No Holiday|            Yes|
|01/12/2017|              204|   1|           -5.5|         38|             0.8|            2000|                    -17.6|                    0.0|         0.0|

In [0]:
len(df.columns), df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Rented Bike Count: integer (nullable = true)
 |-- Hour: integer (nullable = true)
 |-- Temperature(�C): double (nullable = true)
 |-- Humidity(%): integer (nullable = true)
 |-- Wind speed (m/s): double (nullable = true)
 |-- Visibility (10m): integer (nullable = true)
 |-- Dew point temperature(�C): double (nullable = true)
 |-- Solar Radiation (MJ/m2): double (nullable = true)
 |-- Rainfall(mm): double (nullable = true)
 |-- Snowfall (cm): double (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- Holiday: string (nullable = true)
 |-- Functioning Day: string (nullable = true)

Out[6]: (14, None)

In [0]:
from pyspark.ml.feature import StringIndexer

In [0]:
indexer=StringIndexer(inputCols=["Seasons","Holiday"],outputCols=["Seasons_indexed","Holiday_indexed"])
df_r=indexer.fit(df).transform(df)
df_r.show()

+----------+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-------+----------+---------------+---------------+---------------+
|      Date|Rented Bike Count|Hour|Temperature(�C)|Humidity(%)|Wind speed (m/s)|Visibility (10m)|Dew point temperature(�C)|Solar Radiation (MJ/m2)|Rainfall(mm)|Snowfall (cm)|Seasons|   Holiday|Functioning Day|Seasons_indexed|Holiday_indexed|
+----------+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-------+----------+---------------+---------------+---------------+
|01/12/2017|              254|   0|           -5.2|         37|             2.2|            2000|                    -17.6|                    0.0|         0.0|          0.0| Winter|No Holiday|            Yes|            3.0|            0.0|
|01/12/2017|              204|  

In [0]:
df_r.columns

Out[9]: ['Date',
 'Rented Bike Count',
 'Hour',
 'Temperature(�C)',
 'Humidity(%)',
 'Wind speed (m/s)',
 'Visibility (10m)',
 'Dew point temperature(�C)',
 'Solar Radiation (MJ/m2)',
 'Rainfall(mm)',
 'Snowfall (cm)',
 'Seasons',
 'Holiday',
 'Functioning Day',
 'Seasons_indexed',
 'Holiday_indexed']

# Independent Features

In [0]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=['Hour', 'Wind speed (m/s)', 'Dew point temperature(�C)',  
                        'Visibility (10m)', 'Solar Radiation (MJ/m2)', 'Rainfall(mm)', 'Snowfall (cm)', 'Holiday_indexed'],outputCol='Independent Features')
output=featureassembler.transform(df_r)

In [0]:
output.show()

+----------+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-------+----------+---------------+---------------+---------------+--------------------+
|      Date|Rented Bike Count|Hour|Temperature(�C)|Humidity(%)|Wind speed (m/s)|Visibility (10m)|Dew point temperature(�C)|Solar Radiation (MJ/m2)|Rainfall(mm)|Snowfall (cm)|Seasons|   Holiday|Functioning Day|Seasons_indexed|Holiday_indexed|Independent Features|
+----------+-----------------+----+---------------+-----------+----------------+----------------+-------------------------+-----------------------+------------+-------------+-------+----------+---------------+---------------+---------------+--------------------+
|01/12/2017|              254|   0|           -5.2|         37|             2.2|            2000|                    -17.6|                    0.0|         0.0|          0.0| Winter|No Holiday|            Yes|  

In [0]:
output.select('Independent Features').show()

+--------------------+
|Independent Features|
+--------------------+
|(8,[1,2,3],[2.2,-...|
|(8,[0,1,2,3],[1.0...|
|(8,[0,1,2,3],[2.0...|
|(8,[0,1,2,3],[3.0...|
|(8,[0,1,2,3],[4.0...|
|(8,[0,1,2,3],[5.0...|
|(8,[0,1,2,3],[6.0...|
|(8,[0,1,2,3],[7.0...|
|[8.0,1.1,-19.8,20...|
|[9.0,0.5,-22.4,19...|
|[10.0,1.2,-21.2,1...|
|[11.0,1.3,-20.2,1...|
|[12.0,1.4,-17.2,2...|
|[13.0,1.6,-15.6,2...|
|[14.0,2.0,-14.6,2...|
|[15.0,3.2,-11.4,2...|
|[16.0,4.2,-7.0,79...|
|[17.0,1.6,-6.5,20...|
|(8,[0,1,2,3],[18....|
|(8,[0,1,2,3],[19....|
+--------------------+
only showing top 20 rows



In [0]:
finalized_data=output.select("Independent Features","Rented Bike Count")

In [0]:
train_data,test_data=finalized_data.randomSplit([0.75,0.25])

# Linear Regression Using Pyspark

In [0]:
regressor=LinearRegression(featuresCol='Independent Features', labelCol='Rented Bike Count')
regressor=regressor.fit(train_data)

In [0]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(regressor.coefficients))
print("Intercept: %s" % str(regressor.intercept))

Coefficients: [34.088965781868914,10.802403081946473,20.47550750688791,0.20263385567952164,95.08493733913832,-85.34006592336964,-63.151868096369064,-152.4790539337075]
Intercept: -109.54791581248539


In [0]:
#Summarize model over the data train and see the residual
trainingSummary = regressor.summary
trainingSummary.residuals.show()

+-------------------+
|          residuals|
+-------------------+
| 212.15854911406734|
| 137.93503518120565|
| -6.045039376729861|
| 315.73632930000974|
|  180.1993447822603|
|-211.27258862475816|
| 207.12729379209685|
|  368.6408585003049|
|  403.4422915041141|
|  237.2834583714971|
| 137.45498877385376|
|-198.09898607706396|
| 202.76460119136698|
|   76.9393191236336|
| 252.55433515058968|
| 309.76133831838354|
|  621.4342822585876|
|  669.3048943344777|
|   11.8253986193223|
|  353.8861230700451|
+-------------------+
only showing top 20 rows



In [0]:
#calculate RMSE and R2 in training data
trainingSummary = regressor.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 498.257930
r2: 0.406397


In [0]:
lr_prediction=regressor.transform(test_data)

In [0]:
#view id, label, prediction and probability from result of modelling
lr_prediction.select("prediction","Rented Bike Count","Independent Features").show(5)

+-------------------+-----------------+--------------------+
|         prediction|Rented Bike Count|Independent Features|
+-------------------+-----------------+--------------------+
|  720.0509174308531|              975|(8,[0,1,2,3],[1.0...|
|-18.180368615747923|              109|(8,[0,1,2,3],[1.0...|
|  558.1717678319288|              609|(8,[0,1,2,3],[1.0...|
|  -32.8976625476774|               97|(8,[0,1,2,3],[1.0...|
| 60.514308893201004|              229|(8,[0,1,2,3],[1.0...|
+-------------------+-----------------+--------------------+
only showing top 5 rows



In [0]:
#Calculate R squared
lr_evaluator=RegressionEvaluator(predictionCol='prediction',labelCol="Rented Bike Count" ,metricName='r2')
print("R squared (R2) on test data=%g" % lr_evaluator.evaluate(lr_prediction))

R squared (R2) on test data=0.399676


In [0]:
#Calculate RMSE
lr_evaluator=RegressionEvaluator(predictionCol='prediction',labelCol="Rented Bike Count", metricName='rmse')
print("Root Mean Squared Error (RMSE) on linear regression model=%g" % lr_evaluator.evaluate(lr_prediction))

Root Mean Squared Error (RMSE) on linear regression model=495.504


# Random Forest

In [0]:
#Create Random forest model regression
rf = RandomForestRegressor(featuresCol='Independent Features', labelCol='Rented Bike Count')

#Fit model to data train
rf_model = rf.fit(train_data)

#Make prediction on data test
rf_prediction = rf_model.transform(test_data)

In [0]:
#View result with column selection
rf_prediction.select("prediction","Rented Bike Count","Independent Features").show(5)

+------------------+-----------------+--------------------+
|        prediction|Rented Bike Count|Independent Features|
+------------------+-----------------+--------------------+
|  556.941209327438|              975|(8,[0,1,2,3],[1.0...|
|246.95370371953013|              109|(8,[0,1,2,3],[1.0...|
|  556.941209327438|              609|(8,[0,1,2,3],[1.0...|
| 232.2934877405834|               97|(8,[0,1,2,3],[1.0...|
|235.54082107391673|              229|(8,[0,1,2,3],[1.0...|
+------------------+-----------------+--------------------+
only showing top 5 rows



In [0]:
#Calculate R squared
rf_evaluator=RegressionEvaluator(predictionCol='prediction', labelCol='Rented Bike Count', metricName='r2')
print("R squared (R2) on Random Forest Model=%g" % rf_evaluator.evaluate(rf_prediction))

R squared (R2) on Random Forest Model=0.604315


In [0]:
#Calculate RMSE
rf_evaluator=RegressionEvaluator(predictionCol='prediction', labelCol="Rented Bike Count", metricName='rmse')
print("Root Mean Squared Error (RMSE) on Random Forest Model=%g" % rf_evaluator.evaluate(rf_prediction))

Root Mean Squared Error (RMSE) on Random Forest Model=402.28


# Random Forest (Hyperparameter Tunning)

In [0]:
rf_hyper= RandomForestRegressor(featuresCol="Independent Features", labelCol='Rented Bike Count')

In [0]:
# Hyper-Parameter Tuning
paramGrid = ParamGridBuilder() \
 .addGrid(rf_hyper.maxDepth, [10, 20, 30]) \
 .addGrid(rf_hyper.maxBins, [300, 400, 500]) \
 .build()

In [0]:
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

crossval = CrossValidator(estimator=rf_hyper,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator().setLabelCol("Rented Bike Count"),
                          numFolds=5)

In [0]:
cvModel = crossval.fit(train_data)

In [0]:
#Transform model to data test
rf_prediction_hyper= cvModel.transform(test_data)

In [0]:
#View result of prediction, label, features
rf_prediction_hyper.select("prediction","Rented Bike Count","Independent Features").show(5)

+------------------+-----------------+--------------------+
|        prediction|Rented Bike Count|Independent Features|
+------------------+-----------------+--------------------+
| 667.4450178715036|              975|(8,[0,1,2,3],[1.0...|
|209.62143527667985|              109|(8,[0,1,2,3],[1.0...|
| 573.6676996139115|              609|(8,[0,1,2,3],[1.0...|
| 159.0408729536747|               97|(8,[0,1,2,3],[1.0...|
| 202.4632552000515|              229|(8,[0,1,2,3],[1.0...|
+------------------+-----------------+--------------------+
only showing top 5 rows



In [0]:
#Calculate R squared
rf_evaluator=RegressionEvaluator(predictionCol='prediction', labelCol="Rented Bike Count", metricName='r2')
print("R squared (R2) on Random Forest Model=%g" % rf_evaluator.evaluate(rf_prediction_hyper))

R squared (R2) on Random Forest Model=0.727255


In [0]:
#Calculate RMSE
rf_evaluator=RegressionEvaluator(predictionCol='prediction', labelCol="Rented Bike Count", metricName='rmse')
print("Root Mean Squared Error (RMSE) on Random Forest Model=%g" % rf_evaluator.evaluate(rf_prediction_hyper))

Root Mean Squared Error (RMSE) on Random Forest Model=333.99
