# Bike Sharing Demand Project

## Part 1: Data Exploration and Transformation


### Datafields:
- datetime - hourly date + timestamp  
- season -  1 = spring, 2 = summer, 3 = fall, 4 = winter 
- holiday - whether the day is considered a holiday
- workingday - whether the day is neither a weekend nor holiday
- weather - 1: Clear, Few clouds, Partly cloudy, Partly cloudy
2: Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist
3: Light Snow, Light Rain + Thunderstorm + Scattered clouds, Light Rain + Scattered clouds
4: Heavy Rain + Ice Pallets + Thunderstorm + Mist, Snow + Fog 
- temp - temperature in Celsius
- atemp - "feels like" temperature in Celsius
- humidity - relative humidity
- windspeed - wind speed
- casual - number of non-registered user rentals initiated
- registered - number of registered user rentals initiated
- count - number of total rentals

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master('yarn').appName('RDD Creation').getOrCreate()

In [10]:
df = spark.read.options(header='True', inferSchema='True', delimiter=',').csv("pyspark-project1/train.csv")

In [11]:
df.printSchema()

root
 |-- datetime: timestamp (nullable = true)
 |-- season: integer (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- weather: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- casual: integer (nullable = true)
 |-- registered: integer (nullable = true)
 |-- count: integer (nullable = true)



In [12]:
df.show(10)

+-------------------+------+-------+----------+-------+-----+------+--------+---------+------+----------+-----+
|           datetime|season|holiday|workingday|weather| temp| atemp|humidity|windspeed|casual|registered|count|
+-------------------+------+-------+----------+-------+-----+------+--------+---------+------+----------+-----+
|2011-01-01 00:00:00|     1|      0|         0|      1| 9.84|14.395|      81|      0.0|     3|        13|   16|
|2011-01-01 01:00:00|     1|      0|         0|      1| 9.02|13.635|      80|      0.0|     8|        32|   40|
|2011-01-01 02:00:00|     1|      0|         0|      1| 9.02|13.635|      80|      0.0|     5|        27|   32|
|2011-01-01 03:00:00|     1|      0|         0|      1| 9.84|14.395|      75|      0.0|     3|        10|   13|
|2011-01-01 04:00:00|     1|      0|         0|      1| 9.84|14.395|      75|      0.0|     0|         1|    1|
|2011-01-01 05:00:00|     1|      0|         0|      2| 9.84| 12.88|      75|   6.0032|     0|         1

In [17]:
display(df.show(5)) #Summarize Table:

+-------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+
|           datetime|season|holiday|workingday|weather|temp| atemp|humidity|windspeed|casual|registered|count|
+-------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+
|2011-01-01 00:00:00|     1|      0|         0|      1|9.84|14.395|      81|      0.0|     3|        13|   16|
|2011-01-01 01:00:00|     1|      0|         0|      1|9.02|13.635|      80|      0.0|     8|        32|   40|
|2011-01-01 02:00:00|     1|      0|         0|      1|9.02|13.635|      80|      0.0|     5|        27|   32|
|2011-01-01 03:00:00|     1|      0|         0|      1|9.84|14.395|      75|      0.0|     3|        10|   13|
|2011-01-01 04:00:00|     1|      0|         0|      1|9.84|14.395|      75|      0.0|     0|         1|    1|
+-------------------+------+-------+----------+-------+----+------+--------+---------+------+----------+-----+
o

None

In [18]:
df.explain()

== Physical Plan ==
*(1) FileScan csv [datetime#225,season#226,holiday#227,workingday#228,weather#229,temp#230,atemp#231,humidity#232,windspeed#233,casual#234,registered#235,count#236] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://nameservice1/user/educlbdhid0017/pyspark-project1/train.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<datetime:timestamp,season:int,holiday:int,workingday:int,weather:int,temp:double,atemp:dou...


### Check for missing values in the dataset

In [19]:
print(df.count()) #Find the number of values
df_no_null = df.na.drop() #Drop all NULL/NAN values
print(df_no_null.count())

10886
10886


### Display the number of seasons found ih the dataset and explode them:

In [26]:
display(df.select('season').distinct())

DataFrame[season: int]

In [27]:
#Creating new columns in Dataset:
def valueToCategory(value, encoding_index):
   if(value == encoding_index):
      return 1
   else:
    return 0

### Explode season column into seperate columns and drop season 

In [29]:
from pyspark.sql.functions import udf
from pyspark.sql.functions import lit
from pyspark.sql.types import *
from pyspark.sql.functions import col
udfValueToCategory = udf(valueToCategory, IntegerType())
df_encoded = (df.withColumn("season_1", udfValueToCategory(col('season'),lit(1)))
                     .withColumn("season_2", udfValueToCategory(col('season'),lit(2)))
                     .withColumn("season_3", udfValueToCategory(col('season'),lit(3)))
                     .withColumn("season_4", udfValueToCategory(col('season'),lit(4))))
#Drop Season Column
df_encoded = df_encoded.drop('season')

In [32]:
df_encoded.take(5)


[Row(datetime=datetime.datetime(2011, 1, 1, 0, 0), holiday=0, workingday=0, weather=1, temp=9.84, atemp=14.395, humidity=81, windspeed=0.0, casual=3, registered=13, count=16, season_1=1, season_2=0, season_3=0, season_4=0),
 Row(datetime=datetime.datetime(2011, 1, 1, 1, 0), holiday=0, workingday=0, weather=1, temp=9.02, atemp=13.635, humidity=80, windspeed=0.0, casual=8, registered=32, count=40, season_1=1, season_2=0, season_3=0, season_4=0),
 Row(datetime=datetime.datetime(2011, 1, 1, 2, 0), holiday=0, workingday=0, weather=1, temp=9.02, atemp=13.635, humidity=80, windspeed=0.0, casual=5, registered=27, count=32, season_1=1, season_2=0, season_3=0, season_4=0),
 Row(datetime=datetime.datetime(2011, 1, 1, 3, 0), holiday=0, workingday=0, weather=1, temp=9.84, atemp=14.395, humidity=75, windspeed=0.0, casual=3, registered=10, count=13, season_1=1, season_2=0, season_3=0, season_4=0),
 Row(datetime=datetime.datetime(2011, 1, 1, 4, 0), holiday=0, workingday=0, weather=1, temp=9.84, atemp=

In [36]:
print(df_encoded.show(5))

+-------------------+-------+----------+-------+----+------+--------+---------+------+----------+-----+--------+--------+--------+--------+
|           datetime|holiday|workingday|weather|temp| atemp|humidity|windspeed|casual|registered|count|season_1|season_2|season_3|season_4|
+-------------------+-------+----------+-------+----+------+--------+---------+------+----------+-----+--------+--------+--------+--------+
|2011-01-01 00:00:00|      0|         0|      1|9.84|14.395|      81|      0.0|     3|        13|   16|       1|       0|       0|       0|
|2011-01-01 01:00:00|      0|         0|      1|9.02|13.635|      80|      0.0|     8|        32|   40|       1|       0|       0|       0|
|2011-01-01 02:00:00|      0|         0|      1|9.02|13.635|      80|      0.0|     5|        27|   32|       1|       0|       0|       0|
|2011-01-01 03:00:00|      0|         0|      1|9.84|14.395|      75|      0.0|     3|        10|   13|       1|       0|       0|       0|
|2011-01-01 04:00:00

In [42]:
#display weather column:
display(df.select('weather').show())

+-------+
|weather|
+-------+
|      1|
|      1|
|      1|
|      1|
|      1|
|      2|
|      1|
|      1|
|      1|
|      1|
|      1|
|      1|
|      1|
|      2|
|      2|
|      2|
|      2|
|      2|
|      3|
|      3|
+-------+
only showing top 20 rows



None

In [None]:
df_encoded = (df_encoded.withColumn("weather_1", udfValueToCategory(col('weather'),lit(1)))
                     .withColumn("weather_2", udfValueToCategory(col('weather'),lit(2)))
                     .withColumn("weather_3", udfValueToCategory(col('weather'),lit(3)))
                     .withColumn("weather_4", udfValueToCategory(col('weather'),lit(4))))
df_encoded = df_encoded.drop('weather')

In [44]:
df_encoded.take(5)

[Row(datetime=datetime.datetime(2011, 1, 1, 0, 0), holiday=0, workingday=0, temp=9.84, atemp=14.395, humidity=81, windspeed=0.0, casual=3, registered=13, count=16, season_1=1, season_2=0, season_3=0, season_4=0, weather_1=1, weather_2=0, weather_3=0, weather_4=0),
 Row(datetime=datetime.datetime(2011, 1, 1, 1, 0), holiday=0, workingday=0, temp=9.02, atemp=13.635, humidity=80, windspeed=0.0, casual=8, registered=32, count=40, season_1=1, season_2=0, season_3=0, season_4=0, weather_1=1, weather_2=0, weather_3=0, weather_4=0),
 Row(datetime=datetime.datetime(2011, 1, 1, 2, 0), holiday=0, workingday=0, temp=9.02, atemp=13.635, humidity=80, windspeed=0.0, casual=5, registered=27, count=32, season_1=1, season_2=0, season_3=0, season_4=0, weather_1=1, weather_2=0, weather_3=0, weather_4=0),
 Row(datetime=datetime.datetime(2011, 1, 1, 3, 0), holiday=0, workingday=0, temp=9.84, atemp=14.395, humidity=75, windspeed=0.0, casual=3, registered=10, count=13, season_1=1, season_2=0, season_3=0, seaso

### Split datetime into meaningful columns such as hour, day, month, etc.


In [45]:
from pyspark.sql.functions import split
from pyspark.sql.functions import *
from pyspark.sql.types import *
df_encoded = df_encoded.withColumn('hour',  split(split(df_encoded['datetime'], ' ')[1], ':')[0].cast('int'))
df_encoded = df_encoded.withColumn('month', split(split(df_encoded['datetime'], ' ')[0], '-')[0].cast('int'))
df_encoded = df_encoded.withColumn('day', split(split(df_encoded['datetime'], ' ')[0], '-')[1].cast('int'))
df_encoded = df_encoded.withColumn('year', split(split(df_encoded['datetime'], ' ')[0], '-')[2].cast('int'))

In [46]:
display(df_encoded.take(5))

[Row(datetime=datetime.datetime(2011, 1, 1, 0, 0), holiday=0, workingday=0, temp=9.84, atemp=14.395, humidity=81, windspeed=0.0, casual=3, registered=13, count=16, season_1=1, season_2=0, season_3=0, season_4=0, weather_1=1, weather_2=0, weather_3=0, weather_4=0, hour=0, month=2011, day=1, year=1),
 Row(datetime=datetime.datetime(2011, 1, 1, 1, 0), holiday=0, workingday=0, temp=9.02, atemp=13.635, humidity=80, windspeed=0.0, casual=8, registered=32, count=40, season_1=1, season_2=0, season_3=0, season_4=0, weather_1=1, weather_2=0, weather_3=0, weather_4=0, hour=1, month=2011, day=1, year=1),
 Row(datetime=datetime.datetime(2011, 1, 1, 2, 0), holiday=0, workingday=0, temp=9.02, atemp=13.635, humidity=80, windspeed=0.0, casual=5, registered=27, count=32, season_1=1, season_2=0, season_3=0, season_4=0, weather_1=1, weather_2=0, weather_3=0, weather_4=0, hour=2, month=2011, day=1, year=1),
 Row(datetime=datetime.datetime(2011, 1, 1, 3, 0), holiday=0, workingday=0, temp=9.84, atemp=14.395,

In [49]:
print(df_encoded.show(5))

+-------------------+-------+----------+----+------+--------+---------+------+----------+-----+--------+--------+--------+--------+---------+---------+---------+---------+----+-----+---+----+
|           datetime|holiday|workingday|temp| atemp|humidity|windspeed|casual|registered|count|season_1|season_2|season_3|season_4|weather_1|weather_2|weather_3|weather_4|hour|month|day|year|
+-------------------+-------+----------+----+------+--------+---------+------+----------+-----+--------+--------+--------+--------+---------+---------+---------+---------+----+-----+---+----+
|2011-01-01 00:00:00|      0|         0|9.84|14.395|      81|      0.0|     3|        13|   16|       1|       0|       0|       0|        1|        0|        0|        0|   0| 2011|  1|   1|
|2011-01-01 01:00:00|      0|         0|9.02|13.635|      80|      0.0|     8|        32|   40|       1|       0|       0|       0|        1|        0|        0|        0|   1| 2011|  1|   1|
|2011-01-01 02:00:00|      0|         0|

In [51]:
df_encoded.printSchema()

root
 |-- datetime: timestamp (nullable = true)
 |-- holiday: integer (nullable = true)
 |-- workingday: integer (nullable = true)
 |-- temp: double (nullable = true)
 |-- atemp: double (nullable = true)
 |-- humidity: integer (nullable = true)
 |-- windspeed: double (nullable = true)
 |-- casual: integer (nullable = true)
 |-- registered: integer (nullable = true)
 |-- count: integer (nullable = true)
 |-- season_1: integer (nullable = true)
 |-- season_2: integer (nullable = true)
 |-- season_3: integer (nullable = true)
 |-- season_4: integer (nullable = true)
 |-- weather_1: integer (nullable = true)
 |-- weather_2: integer (nullable = true)
 |-- weather_3: integer (nullable = true)
 |-- weather_4: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- year: integer (nullable = true)



In [53]:
df_encoded = df_encoded.drop('datetime')

In [55]:
df_encoded =  df_encoded.withColumnRenamed("count", "label")

### Split dataset into train and train_test

In [56]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
train, test = df_encoded.randomSplit([0.9, 0.1], seed=12345)

In [57]:
#Assembling and Sending Features to the Model

In [58]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["holiday","workingday","temp","atemp","humidity","windspeed","casual","registered","label","season_1","season_2","season_3","season_4","weather_1","weather_2","weather_3","weather_4", "hour", "month", "day", "year"],
    outputCol="features")

output = assembler.transform(train)
print("Assembled columns 'hour', 'day' etc  to vector column 'features'")
display(output.take(5))
print(output.count())
train_output = output.na.drop()
print(train_output.count())

Assembled columns 'hour', 'day' etc  to vector column 'features'


[Row(holiday=0, workingday=0, temp=3.28, atemp=2.275, humidity=79, windspeed=31.0009, casual=0, registered=24, label=24, season_1=1, season_2=0, season_3=0, season_4=0, weather_1=0, weather_2=0, weather_3=1, weather_4=0, hour=1, month=2012, day=2, year=12, features=SparseVector(21, {2: 3.28, 3: 2.275, 4: 79.0, 5: 31.0009, 7: 24.0, 8: 24.0, 9: 1.0, 15: 1.0, 17: 1.0, 18: 2012.0, 19: 2.0, 20: 12.0})),
 Row(holiday=0, workingday=0, temp=3.28, atemp=3.79, humidity=53, windspeed=16.9979, casual=0, registered=26, label=26, season_1=1, season_2=0, season_3=0, season_4=0, weather_1=1, weather_2=0, weather_3=0, weather_4=0, hour=8, month=2012, day=2, year=12, features=SparseVector(21, {2: 3.28, 3: 3.79, 4: 53.0, 5: 16.9979, 7: 26.0, 8: 26.0, 9: 1.0, 13: 1.0, 17: 8.0, 18: 2012.0, 19: 2.0, 20: 12.0})),
 Row(holiday=0, workingday=0, temp=3.28, atemp=4.545, humidity=53, windspeed=12.998, casual=0, registered=1, label=1, season_1=1, season_2=0, season_3=0, season_4=0, weather_1=1, weather_2=0, weathe

9797
9797


In [59]:
test_output = assembler.transform(test)
print(test_output.count())
train_output = test_output.na.drop()
print(test_output.count())
print("Assembled columns 'hour', 'day' etc  to vector column 'features'")
#select("features", "clicked")

1089
1089
Assembled columns 'hour', 'day' etc  to vector column 'features'


In [61]:
# Fit the Model
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10)
lrModel = lr.fit(train_output)

In [62]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

Coefficients: [0.2410070188565101,0.024030060989444376,-0.003297725855819951,-0.003820121916425772,-0.0031189855279797744,0.0006257837922436448,0.5632573474293397,0.5631891304217548,0.43676973472610325,0.6470373361880243,0.23954646048626346,-0.17068049666052562,-0.686157845775815,-0.050530037272087616,0.0248775678319237,0.08457201334769567,0.0,0.004769967132704666,-0.08677748541178142,0.15866876140276176,0.005723255520058335]
Intercept: 173.74350476370518


In [65]:
#Disply Predictions:
import pyspark.sql.functions
predictions = lrModel.transform(test_output)\
    .select("features", "label", "prediction")\
    .take(10)
display(predictions)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label",metricName="r2")
# print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(predictions))

[Row(features=SparseVector(21, {2: 3.28, 3: 4.545, 4: 53.0, 5: 12.998, 7: 18.0, 8: 18.0, 9: 1.0, 13: 1.0, 17: 7.0, 18: 2012.0, 19: 2.0, 20: 12.0}), label=18, prediction=17.9770260555849),
 Row(features=SparseVector(21, {2: 4.1, 3: 3.03, 4: 39.0, 5: 30.0026, 7: 22.0, 8: 22.0, 9: 1.0, 13: 1.0, 17: 23.0, 18: 2011.0, 19: 1.0, 20: 8.0}), label=22, prediction=22.015787042195342),
 Row(features=SparseVector(21, {2: 5.74, 3: 7.575, 4: 43.0, 5: 11.0014, 7: 28.0, 8: 28.0, 9: 1.0, 13: 1.0, 17: 22.0, 18: 2012.0, 19: 2.0, 20: 12.0}), label=28, prediction=28.058417254402173),
 Row(features=DenseVector([0.0, 0.0, 6.56, 6.06, 40.0, 31.0009, 4.0, 92.0, 96.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 18.0, 2012.0, 2.0, 12.0]), label=96, prediction=96.06176875299458),
 Row(features=DenseVector([0.0, 0.0, 6.56, 6.82, 40.0, 22.0028, 4.0, 44.0, 48.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 18.0, 2011.0, 1.0, 9.0]), label=48, prediction=47.96614802554875),
 Row(features=DenseVector([0.0, 0.0, 6.56, 6.82, 47.0,

In [66]:
# Parameter grid search for best parameters to give good predictions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train_output)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
display(model.transform(test_output)\
    .select("features", "label", "prediction")\
    .take(5))

[Row(features=SparseVector(21, {2: 3.28, 3: 4.545, 4: 53.0, 5: 12.998, 7: 18.0, 8: 18.0, 9: 1.0, 13: 1.0, 17: 7.0, 18: 2012.0, 19: 2.0, 20: 12.0}), label=18, prediction=17.997756723800773),
 Row(features=SparseVector(21, {2: 4.1, 3: 3.03, 4: 39.0, 5: 30.0026, 7: 22.0, 8: 22.0, 9: 1.0, 13: 1.0, 17: 23.0, 18: 2011.0, 19: 1.0, 20: 8.0}), label=22, prediction=22.002205520528925),
 Row(features=SparseVector(21, {2: 5.74, 3: 7.575, 4: 43.0, 5: 11.0014, 7: 28.0, 8: 28.0, 9: 1.0, 13: 1.0, 17: 22.0, 18: 2012.0, 19: 2.0, 20: 12.0}), label=28, prediction=28.00228889226001),
 Row(features=DenseVector([0.0, 0.0, 6.56, 6.06, 40.0, 31.0009, 4.0, 92.0, 96.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 18.0, 2012.0, 2.0, 12.0]), label=96, prediction=95.99923333047346),
 Row(features=DenseVector([0.0, 0.0, 6.56, 6.82, 40.0, 22.0028, 4.0, 44.0, 48.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 18.0, 2011.0, 1.0, 9.0]), label=48, prediction=48.000842644425376)]

In [67]:
# Random Forest Classifier model
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
rf = RandomForestRegressor(labelCol="label", featuresCol="features", numTrees=100)
# Train model.  This also runs the indexers.
rf_model = rf.fit(train_output)
# rf_model.persist()
# Make predictions.
predictions = rf_model.transform(test_output)

# Select example rows to display.
display(predictions.select("prediction", "label", "features").take(5))

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

[Row(prediction=34.931556750255645, label=18, features=SparseVector(21, {2: 3.28, 3: 4.545, 4: 53.0, 5: 12.998, 7: 18.0, 8: 18.0, 9: 1.0, 13: 1.0, 17: 7.0, 18: 2012.0, 19: 2.0, 20: 12.0})),
 Row(prediction=36.89780721865108, label=22, features=SparseVector(21, {2: 4.1, 3: 3.03, 4: 39.0, 5: 30.0026, 7: 22.0, 8: 22.0, 9: 1.0, 13: 1.0, 17: 23.0, 18: 2011.0, 19: 1.0, 20: 8.0})),
 Row(prediction=38.49619867538233, label=28, features=SparseVector(21, {2: 5.74, 3: 7.575, 4: 43.0, 5: 11.0014, 7: 28.0, 8: 28.0, 9: 1.0, 13: 1.0, 17: 22.0, 18: 2012.0, 19: 2.0, 20: 12.0})),
 Row(prediction=94.64301404077807, label=96, features=DenseVector([0.0, 0.0, 6.56, 6.06, 40.0, 31.0009, 4.0, 92.0, 96.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 18.0, 2012.0, 2.0, 12.0])),
 Row(prediction=57.55474515432686, label=48, features=DenseVector([0.0, 0.0, 6.56, 6.82, 40.0, 22.0028, 4.0, 44.0, 48.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 18.0, 2011.0, 1.0, 9.0]))]

Root Mean Squared Error (RMSE) on test data = 16.9584


In [68]:
#GBT Regressive Model
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol="features", maxIter=10)

gbt_model = gbt.fit(train_output)
# Make predictions.
predictions = gbt_model.transform(test_output)


gbt_model.write().overwrite().save("bike_sharing_gbt.model")
# Select example rows to display.
display(predictions.select("prediction", "label", "features").take(5))

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
#Gave root mean square error 

[Row(prediction=16.889233655915504, label=18, features=SparseVector(21, {2: 3.28, 3: 4.545, 4: 53.0, 5: 12.998, 7: 18.0, 8: 18.0, 9: 1.0, 13: 1.0, 17: 7.0, 18: 2012.0, 19: 2.0, 20: 12.0})),
 Row(prediction=16.92511614166162, label=22, features=SparseVector(21, {2: 4.1, 3: 3.03, 4: 39.0, 5: 30.0026, 7: 22.0, 8: 22.0, 9: 1.0, 13: 1.0, 17: 23.0, 18: 2011.0, 19: 1.0, 20: 8.0})),
 Row(prediction=30.32523546505164, label=28, features=SparseVector(21, {2: 5.74, 3: 7.575, 4: 43.0, 5: 11.0014, 7: 28.0, 8: 28.0, 9: 1.0, 13: 1.0, 17: 22.0, 18: 2012.0, 19: 2.0, 20: 12.0})),
 Row(prediction=95.83936857912708, label=96, features=DenseVector([0.0, 0.0, 6.56, 6.06, 40.0, 31.0009, 4.0, 92.0, 96.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 18.0, 2012.0, 2.0, 12.0])),
 Row(prediction=46.025330660611566, label=48, features=DenseVector([0.0, 0.0, 6.56, 6.82, 40.0, 22.0028, 4.0, 44.0, 48.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 18.0, 2011.0, 1.0, 9.0]))]

Root Mean Squared Error (RMSE) on test data = 7.24449
