
# Introduction:

# Motivation:

# Design:




## Step 1: Importing the libraries and creating a spark session.


In the below step we will load all the required libraries. The description of each library and function has been mentioned in the code chunk.

In [1]:

from pyspark.sql import SQLContext#For loading the csv files as dataframes
sqlContext = SQLContext(sc)
from pyspark.ml.tuning import TrainValidationSplit#For train test split
from pyspark.ml.regression import LinearRegression#model builder function
from pyspark.sql import SparkSession#creating spark session
from pyspark.ml.feature import VectorAssembler#Data structuring
from pyspark.sql.types import DoubleType,DateType#For defining schema of dataset
from pyspark.sql.functions import *
from datetime import datetime#For manipulating the date column
from pyspark.ml.evaluation import RegressionEvaluator#model evaluator



In [2]:
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("LinearRegression") \
        .getOrCreate()

## Step 2: Loading and cleaning the dataset.

In [25]:

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',delimiter=';')\
    .load('/home/Downloads/household_power_consumption.txt')


In [26]:

df=df.withColumn("Global_active_power", df.Global_active_power.cast(DoubleType()))
df=df.withColumn("Global_reactive_power", df.Global_reactive_power.cast(DoubleType()))
df=df.withColumn("Voltage", df.Voltage.cast(DoubleType()))
df=df.withColumn("Global_intensity", df.Global_intensity.cast(DoubleType()))
df=df.withColumn("Sub_metering_1", df.Sub_metering_1.cast(DoubleType()))
df=df.withColumn("Sub_metering_2", df.Sub_metering_2.cast(DoubleType()))
df=df.withColumn("Sub_metering_3", df.Sub_metering_3.cast(DoubleType()))

func =  udf (lambda x: datetime.strptime(x, '%d/%m/%Y'), DateType())
df = df.withColumn('Date', func(col('Date')))


## Step 3: Explorartory analysis and feature extraction

In [5]:
df.show(5)

+----------+--------+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+
|      Date|    Time|Global_active_power|Global_reactive_power|Voltage|Global_intensity|Sub_metering_1|Sub_metering_2|Sub_metering_3|
+----------+--------+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+
|2006-12-16|17:24:00|              4.216|                0.418| 234.84|            18.4|           0.0|           1.0|          17.0|
|2006-12-16|17:25:00|               5.36|                0.436| 233.63|            23.0|           0.0|           1.0|          16.0|
|2006-12-16|17:26:00|              5.374|                0.498| 233.29|            23.0|           0.0|           2.0|          17.0|
|2006-12-16|17:27:00|              5.388|                0.502| 233.74|            23.0|           0.0|           1.0|          17.0|
|2006-12-16|17:28:00|              3.666|                0.528

In [6]:
df.count()#total observations in our data

2075259

In [7]:
df.printSchema()#getting the schema of the data

root
 |-- Date: date (nullable = true)
 |-- Time: string (nullable = true)
 |-- Global_active_power: double (nullable = true)
 |-- Global_reactive_power: double (nullable = true)
 |-- Voltage: double (nullable = true)
 |-- Global_intensity: double (nullable = true)
 |-- Sub_metering_1: double (nullable = true)
 |-- Sub_metering_2: double (nullable = true)
 |-- Sub_metering_3: double (nullable = true)



In [8]:
df=df.na.drop()#removing NAs
df.count()#Count after NA removal

2049280

In [27]:
df.select('Global_intensity').describe().show()

+-------+-----------------+
|summary| Global_intensity|
+-------+-----------------+
|  count|          2049280|
|   mean|4.627759310588417|
| stddev|4.444396259786192|
|    min|              0.2|
|    max|             48.4|
+-------+-----------------+




In the below steps we will be extracting 4 features from the date parameter which would go into our model building process. We will extract the day of month, Day of the year, the month and year from the dataset. Also we will remove the date and time parameters which are now redundant as the information of these has been extracting and integrated in the form of new features.

In [12]:
df=df.withColumn('Day', dayofmonth('Date'))#extracting day of month
df=df.withColumn('Day_y', dayofyear('Date'))#extracting day of year
df=df.withColumn('month', month('Date'))#extracting month
df=df.withColumn('year', year('Date'))#extracting year

In [13]:
df=df.drop('Date')#dropping date column
df=df.drop('Time')#dropping time column
df.show(2)#Viewing the new records

+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+---+-----+-----+----+
|Global_active_power|Global_reactive_power|Voltage|Global_intensity|Sub_metering_1|Sub_metering_2|Sub_metering_3|Day|Day_y|month|year|
+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+---+-----+-----+----+
|              4.216|                0.418| 234.84|            18.4|           0.0|           1.0|          17.0| 16|  350|   12|2006|
|               5.36|                0.436| 233.63|            23.0|           0.0|           1.0|          16.0| 16|  350|   12|2006|
+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+---+-----+-----+----+
only showing top 2 rows



## Step 4: Building the model and evaluating it.

In [14]:
cols=['Global_active_power','Global_reactive_power','Voltage','Sub_metering_1',
'Sub_metering_2','Sub_metering_3','Day_y','Day','month','year']

assembler=VectorAssembler(inputCols=cols,outputCol="features")#asembler for all i/p features
df=assembler.transform(df)#transforming the data


In [15]:
df=df.select("Global_intensity","features")#combing o/p and i/p columns
df=df.toDF("label","features")#renaming the columns

In [16]:
df.show(2)#viewing the top 2 rows to understand the structure of data

+-----+--------------------+
|label|            features|
+-----+--------------------+
| 18.4|[4.216,0.418,234....|
| 23.0|[5.36,0.436,233.6...|
+-----+--------------------+
only showing top 2 rows



In [17]:
train, test = df.randomSplit([0.75, 0.25], seed=121)#Splitting into train and test
# Define LinearRegression algorithm
lr = LinearRegression()
#Fit the model on the train data
lrModel = lr.fit(train)

In [18]:
# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))


Coefficients: [4.17758642536,0.758770641078,-0.0190950101879,0.00289293352559,0.00332667441527,-0.00749759938443,-0.00307734296416,0.00291367372114,0.0934963236669,-0.00332910370784]
Intercept: 11.2073177952


In [19]:
# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

numIterations: 1
objectiveHistory: [0.0]
+--------------------+
|           residuals|
+--------------------+
| -0.2611831341588086|
|-0.25158508365371474|
|-0.25101223334807726|
|-0.23535432499400882|
|-0.23712342132207026|
|  -0.225424919696308|
|-0.22084211725121533|
|-0.20703911637794653|
|-0.21599445221342534|
|-0.21536704019006975|
|-0.22005494976403311|
|-0.21255735037960583|
|-0.21201178093282919|
|-0.21961848784255728|
| -0.2116025998702124|
|-0.21721797909470836|
|-0.21721797909470836|
| -0.2166178479302136|
|-0.20879291005974493|
|-0.21607227848343696|
+--------------------+
only showing top 20 rows

RMSE: 0.170039
r2: 0.998535


In [20]:
predictions=lrModel.transform(test)
evaluator = RegressionEvaluator(metricName="rmse")
RMSE = evaluator.evaluate(predictions)
print("Model: Root Mean Squared Error = " + str(RMSE))

Model: Root Mean Squared Error = 0.170344677796


# Challenges faced:

# Conclusion: