A combined approach was chosen in order to leverage the strenght of both Hadoop and Apache Spark for an effective big data solution.

The steps outlined below enabled me to initiate a connection to Apache Spark through SparkContext and launching my Spark Application. This way I am making sure that Spark is running locally and uses all available cores on the machine.

In [65]:
sc

In [2]:
sc.master

'local[*]'

In [38]:
from pyspark.sql.functions import col, to_timestamp, concat_ws
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql.functions import to_timestamp

Measurements of electric power consumption in one household with a one-minute sampling rate over a period of almost 4 years. Different electrical quantities and some sub-metering values are available.

https://archive.ics.uci.edu/dataset/235/individual+household+electric+power+consumption

This dataset is licensed under a Creative Commons Attribution 4.0 International (CC BY 4.0) license.


After transferring the dataset to the local hadoop directory (hadoop fs -put ./household_power_consumption.txt /user1
) and ensuring that all processes are running successfully, I can then import the data in jupyter notebook using pyspark. This approach takes full advantage of Hadoop storage capabilities and Apache Spark processing power.

In [107]:
schema = StructType([
    StructField("Date", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("Global_active_power", FloatType(), True),
    StructField("Global_reactive_power", FloatType(), True),
    StructField("Voltage", FloatType(), True),
    StructField("Global_intensity", FloatType(), True),
    StructField("Sub_metering_1", FloatType(), True),
    StructField("Sub_metering_2", FloatType(), True),
    StructField("Sub_metering_3", FloatType(), True)
])

df = spark.read.csv("hdfs://localhost:9000/user1/household_power_consumption.txt", header=True, schema=schema, sep=";")


In [108]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Global_active_power: float (nullable = true)
 |-- Global_reactive_power: float (nullable = true)
 |-- Voltage: float (nullable = true)
 |-- Global_intensity: float (nullable = true)
 |-- Sub_metering_1: float (nullable = true)
 |-- Sub_metering_2: float (nullable = true)
 |-- Sub_metering_3: float (nullable = true)



In [109]:
df.show(5)

+----------+--------+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+
|      Date|    Time|Global_active_power|Global_reactive_power|Voltage|Global_intensity|Sub_metering_1|Sub_metering_2|Sub_metering_3|
+----------+--------+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+
|16/12/2006|17:24:00|              4.216|                0.418| 234.84|            18.4|           0.0|           1.0|          17.0|
|16/12/2006|17:25:00|               5.36|                0.436| 233.63|            23.0|           0.0|           1.0|          16.0|
|16/12/2006|17:26:00|              5.374|                0.498| 233.29|            23.0|           0.0|           2.0|          17.0|
|16/12/2006|17:27:00|              5.388|                0.502| 233.74|            23.0|           0.0|           1.0|          17.0|
|16/12/2006|17:28:00|              3.666|                0.528

In [110]:
print(f'Number of rows: {df.count()}')
print(f'Number of columns: {len(df.columns)}')

[Stage 285:>                                                        (0 + 2) / 2]

Number of rows: 2075259
Number of columns: 9


                                                                                

In [111]:
for column in df.columns:
    print(column, ":", df.filter(df[column].isNull()).count())

                                                                                

Date : 0


                                                                                

Time : 0


                                                                                

Global_active_power : 25979


                                                                                

Global_reactive_power : 25979


                                                                                

Voltage : 25979


                                                                                

Global_intensity : 25979


                                                                                

Sub_metering_1 : 25979


                                                                                

Sub_metering_2 : 25979


[Stage 312:>                                                        (0 + 2) / 2]

Sub_metering_3 : 25979


                                                                                

Missing values will be interpolated as for the time-series analysis is crucial to keep the sequence of the data. However, pyspark does not have a built-in function equivalent to the interpolate method in pandas. This is due to the distributed nature of Spark, where data is spread across multiple nodes in a cluster. 

I will combine the date and time into a single timestamp to facilitate time-series analysis

In [112]:
df = df.withColumn("Timestamp", to_timestamp(concat_ws(" ", col("Date"), col("Time")), "dd/MM/yyyy HH:mm:ss")) \
       .drop("Date", "Time")

In [113]:
df.show(5)

+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+-------------------+
|Global_active_power|Global_reactive_power|Voltage|Global_intensity|Sub_metering_1|Sub_metering_2|Sub_metering_3|          Timestamp|
+-------------------+---------------------+-------+----------------+--------------+--------------+--------------+-------------------+
|              4.216|                0.418| 234.84|            18.4|           0.0|           1.0|          17.0|2006-12-16 17:24:00|
|               5.36|                0.436| 233.63|            23.0|           0.0|           1.0|          16.0|2006-12-16 17:25:00|
|              5.374|                0.498| 233.29|            23.0|           0.0|           2.0|          17.0|2006-12-16 17:26:00|
|              5.388|                0.502| 233.74|            23.0|           0.0|           1.0|          17.0|2006-12-16 17:27:00|
|              3.666|                0.528| 235.68|           

In [114]:
df.summary().show()



+-------+-------------------+---------------------+------------------+------------------+------------------+------------------+-----------------+
|summary|Global_active_power|Global_reactive_power|           Voltage|  Global_intensity|    Sub_metering_1|    Sub_metering_2|   Sub_metering_3|
+-------+-------------------+---------------------+------------------+------------------+------------------+------------------+-----------------+
|  count|            2049280|              2049280|           2049280|           2049280|           2049280|           2049280|          2049280|
|   mean| 1.0916150366540094|   0.1237144765251571|240.83985796672414| 4.627759313004169|1.1219233096502186|1.2985199679887571| 6.45844735712055|
| stddev| 1.0572941611180025|  0.11272197958641315|3.2399866612063435|4.4443962589812385|  6.15303108970134| 5.822026473177461|8.437153908665614|
|    min|              0.076|                  0.0|             223.2|               0.2|               0.0|               0

                                                                                

My focus will be on forecasting the global active power, meaning that all other columns are irrelevant for my analysis so they will be removed. 

In [115]:
df = df.select("Timestamp", "Global_active_power")

In [116]:
df.show(5)

+-------------------+-------------------+
|          Timestamp|Global_active_power|
+-------------------+-------------------+
|2006-12-16 17:24:00|              4.216|
|2006-12-16 17:25:00|               5.36|
|2006-12-16 17:26:00|              5.374|
|2006-12-16 17:27:00|              5.388|
|2006-12-16 17:28:00|              3.666|
+-------------------+-------------------+
only showing top 5 rows



The steps of the Explorary Data Analysis were conducted in order to verify the successful loading of my data into pyspark dataframe, as well as to confirm the data types and general descriptive statistics.

In [94]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

Importing tensorflow to perform the neural network resulted in my Kernel dying, 

The issue is related to the free space on my local computer. Because of that I would need to continue the modelling outside of the VM where I can import tensorflow.

In [120]:
consumption_data_path = '/user1/preprocessed_df.csv'

df.write.csv(path=consumption_data_path, mode='overwrite', header = True)

                                                                                

Trying to import the tensorflow library in order to perform the neural network results in my Kernel dying. This is related to the free space I have on my machine. Because of that I will have to continue the analysis outside of the VM. I already exported the preprocessed dataset and it will be used in the othet jupyter notebook. There I will be able to apply the inerpolation method as well, as I will be using a pandas dataframe. As per pyspark functionalities the data was exported in two separate csv files, which later on will be combined.