In [0]:


# (1) Import the required Python dependencies
%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd


In [3]:
# Load the Drive helper and mount
from google.colab import drive

# This will prompt for authorization.
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [0]:
!wget -q http://www-eu.apache.org/dist/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz
!tar xf spark-2.3.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.3-bin-hadoop2.7"

In [8]:
!pip install pyspark
import findspark
findspark.init()

import pyspark
# get a spark context
sc = pyspark.SparkContext.getOrCreate()
print(sc)
# get the context
sqlContext = pyspark.sql.SparkSession.builder.getOrCreate()
print(sqlContext) 



<SparkContext master=local[*] appName=pyspark-shell>
<pyspark.sql.session.SparkSession object at 0x7f69acb63a90>


In [0]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# (2) Instantiate a Spark Context
#conf = SparkConf().setMaster("spark://192.168.56.10:7077").setAppName("Multivariate Linear Regression - Bike Sharing")
#sc = SparkContext(conf=conf)
#sqlContext = SQLContext(sc)

In [10]:
# (3) Load the Bike Sharing dataset into a Spark DataFrame
#bike_sharing_df = sqlContext.read.format('com.databricks.spark.csv').options(header = 'true', inferschema = 'true').load('/data/workspaces/jillur.quddus/jupyter/notebooks/Machine-Learning-with-Apache-Spark-QuickStart-Guide/chapter04/data/bike-sharing-data/day.csv')
#bike_sharing_df = sqlContext.read.format('com.databricks.spark.csv').options(header = 'true', inferschema = 'true').load('/content/drive/My Drive/Colab Noteboooks/Machine-Learning-with-Apache-Spark-QuickStart-Guide/chapter04/data/bike-sharing-data/day.csv')
#bike_sharing_df = sqlContext.read.format('com.databricks.spark.csv').options(header = 'true', inferschema = 'true').load('/content/drive/My Drive/testdata/Machine-Learning-with-Apache-Spark-QuickStart-Guide/chapter04/data/bike-sharing-data/day.csv')
bike_sharing_df = sqlContext.read.format('com.databricks.spark.csv').options(header = 'true', inferschema = 'true').load('/content/drive/My Drive/testdata/day.csv')
bike_sharing_df.show(5)

+-------+-------------------+------+---+----+-------+-------+----------+----------+--------+--------+--------+---------+------+----------+----+
|instant|             dteday|season| yr|mnth|holiday|weekday|workingday|weathersit|    temp|   atemp|     hum|windspeed|casual|registered| cnt|
+-------+-------------------+------+---+----+-------+-------+----------+----------+--------+--------+--------+---------+------+----------+----+
|      1|2011-01-01 00:00:00|     1|  0|   1|      0|      6|         0|         2|0.344167|0.363625|0.805833| 0.160446|   331|       654| 985|
|      2|2011-01-02 00:00:00|     1|  0|   1|      0|      0|         0|         2|0.363478|0.353739|0.696087| 0.248539|   131|       670| 801|
|      3|2011-01-03 00:00:00|     1|  0|   1|      0|      1|         1|         1|0.196364|0.189405|0.437273| 0.248309|   120|      1229|1349|
|      4|2011-01-04 00:00:00|     1|  0|   1|      0|      2|         1|         1|     0.2|0.212122|0.590435| 0.160296|   108|      145

In [11]:
# (4) Calculate the level of Correlation between the relevant Independent Variables and the Dependent Variable
independent_variables = ['season', 'yr', 'mnth', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed']
dependent_variable = ['cnt']
bike_sharing_df = bike_sharing_df.select( independent_variables + dependent_variable ) 
for i in bike_sharing_df.columns:
        print( "Correlation to CNT for ", i, bike_sharing_df.stat.corr('cnt', i))

Correlation to CNT for  season 0.40610037079863526
Correlation to CNT for  yr 0.5667097078680867
Correlation to CNT for  mnth 0.2799771122192702
Correlation to CNT for  holiday -0.06834771589248398
Correlation to CNT for  weekday 0.06744341241063072
Correlation to CNT for  workingday 0.06115606306052115
Correlation to CNT for  weathersit -0.29739123883466345
Correlation to CNT for  temp 0.6274940090334915
Correlation to CNT for  atemp 0.6310656998491827
Correlation to CNT for  hum -0.1006585621371548
Correlation to CNT for  windspeed -0.2345449974216706
Correlation to CNT for  cnt 1.0


In [12]:
# (5) Generate Input Feature Vectors from the Raw Spark DataFrame
multivariate_feature_columns = ['season', 'yr', 'mnth', 'temp', 'atemp']
multivariate_label_column = 'cnt'
vector_assembler = VectorAssembler(inputCols = multivariate_feature_columns, outputCol = 'features')
bike_sharing_features_df = vector_assembler.transform(bike_sharing_df).select(['features', multivariate_label_column])
bike_sharing_features_df.head(10)

[Row(features=DenseVector([1.0, 0.0, 1.0, 0.3442, 0.3636]), cnt=985),
 Row(features=DenseVector([1.0, 0.0, 1.0, 0.3635, 0.3537]), cnt=801),
 Row(features=DenseVector([1.0, 0.0, 1.0, 0.1964, 0.1894]), cnt=1349),
 Row(features=DenseVector([1.0, 0.0, 1.0, 0.2, 0.2121]), cnt=1562),
 Row(features=DenseVector([1.0, 0.0, 1.0, 0.227, 0.2293]), cnt=1600),
 Row(features=DenseVector([1.0, 0.0, 1.0, 0.2043, 0.2332]), cnt=1606),
 Row(features=DenseVector([1.0, 0.0, 1.0, 0.1965, 0.2088]), cnt=1510),
 Row(features=DenseVector([1.0, 0.0, 1.0, 0.165, 0.1623]), cnt=959),
 Row(features=DenseVector([1.0, 0.0, 1.0, 0.1383, 0.1162]), cnt=822),
 Row(features=DenseVector([1.0, 0.0, 1.0, 0.1508, 0.1509]), cnt=1321)]

In [13]:
!java --version

openjdk 11.0.3 2019-04-16
OpenJDK Runtime Environment (build 11.0.3+7-Ubuntu-1ubuntu218.04.1)
OpenJDK 64-Bit Server VM (build 11.0.3+7-Ubuntu-1ubuntu218.04.1, mixed mode, sharing)


In [14]:
# (6) Split the Raw DataFrame into a Training DataFrame and a Test DataFrame
train_df, test_df = bike_sharing_features_df.randomSplit([0.75, 0.25], seed=12345)
train_df.count(), test_df.count()

(534, 197)

In [0]:
# (7) Train a Multivariate Linear Regression Model on the Training DataFrame
linear_regression = LinearRegression(featuresCol = 'features', labelCol = multivariate_label_column)
linear_regression_model = linear_regression.fit(train_df)

In [16]:
# (8) Output Multivariate Linear Regression Model Summary Statistics to evaluate the Training Model
print("Model Coefficients: " + str(linear_regression_model.coefficients))
print("Intercept: " + str(linear_regression_model.intercept))
training_summary = linear_regression_model.summary
print("RMSE: %f" % training_summary.rootMeanSquaredError)
print("R-SQUARED: %f" % training_summary.r2)
print("TRAINING DATASET DESCRIPTIVE SUMMARY: ")
train_df.describe().show()
print("TRAINING DATASET RESIDUALS: ")
training_summary.residuals.show()

Model Coefficients: [526.0540180009755,2058.847493710504,-51.89760751842719,2408.656105373764,3502.937182145296]
Intercept: -389.93775612437224
RMSE: 1008.497716
R-SQUARED: 0.731432
TRAINING DATASET DESCRIPTIVE SUMMARY: 
+-------+------------------+
|summary|               cnt|
+-------+------------------+
|  count|               534|
|   mean| 4420.209737827716|
| stddev|1947.8454814788004|
|    min|                22|
|    max|              8714|
+-------+------------------+

TRAINING DATASET RESIDUALS: 
+-------------------+
|          residuals|
+-------------------+
|  477.3807048512753|
|   323.066605195992|
|  684.0978143961413|
| -2.369006518574679|
|  289.3019346708047|
| 395.81941740152433|
|  99.61456409852781|
|-126.95759195936716|
|  478.3658026684425|
|  328.7947859642086|
| -892.5018938233816|
|  128.3341811819805|
|-189.59260407255988|
| 212.66081050998378|
| -386.6079329148888|
| -736.9428510805565|
|  -815.299367561752|
|  540.9698509452983|
| 166.00157418405865|
| -2

In [17]:
# (9) Apply the Trained Multivariate Linear Regression Model to the Test DataFrame to make predictions
test_linear_regression_predictions_df = linear_regression_model.transform(test_df)
print("TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL: ")
test_linear_regression_predictions_df.select("prediction", multivariate_label_column, "features").show(10)

TEST DATASET PREDICTIONS AGAINST ACTUAL LABEL: 
+------------------+----+--------------------+
|        prediction| cnt|            features|
+------------------+----+--------------------+
| 976.0746662395563|1321|[1.0,0.0,1.0,0.15...|
|  1050.01248129665| 959|[1.0,0.0,1.0,0.16...|
| 1162.187088514198|1263|[1.0,0.0,1.0,0.16...|
|1289.1224676804804|1510|[1.0,0.0,1.0,0.19...|
| 1300.636622198192|1098|[1.0,0.0,1.0,0.19...|
|1308.9999163839534|1562|[1.0,0.0,1.0,0.2,...|
|1235.6848532836393|1746|[1.0,0.0,2.0,0.18...|
|1384.2702586148866|1472|[1.0,0.0,2.0,0.22...|
|1549.7784033067871|1526|[1.0,0.0,2.0,0.26...|
| 1906.531084586451|2115|[1.0,0.0,2.0,0.31...|
+------------------+----+--------------------+
only showing top 10 rows



In [18]:
# (10) Evaluate the performance of our Linear Regression Model on the Test DataFrame
test_summary = linear_regression_model.evaluate(test_df)
print("RMSE on Test Data = %g" % test_summary.rootMeanSquaredError)
print("R-SQUARED on Test Data = %g" % test_summary.r2)

RMSE on Test Data = 964.597
R-SQUARED on Test Data = 0.739356


In [0]:
# (11) Stop the Spark Context
sc.stop()