# 2  Calculating the aggregate demand with Spark

In this section we will explore our full dataset with a Spark mindset. We have already explored it in the introduction with pandas to know what it contains and how it looks like but now we need to prepare it to be able to do the calculations and aggregations we need with the appropriate data types. We will start with the data sample and then try the scheme with the full dataset. As a result, we will build a program that can be run in HDFS.

#### Downloading the data

In [None]:
#Sample
!wget 'https://files.datapress.com/london/dataset/smartmeter-energy-use-data-in-london-households/UKPN-LCL-smartmeter-sample.csv'

In [None]:
#Full dataset
!wget 'https://files.datapress.com/london/dataset/smartmeter-energy-use-data-in-london-households/Power-Networks-LCL-June2015(withAcornGps).zip'

#### File transformation to bz2

The file is compressed in .zip format. The problem of this format is it cannot be broken into pieces without being corrupted. Therefore, if we attempt a MapReduce operation we will get a useless binary output. To be able to use it without decompressing it (as we do not have enough disk space) we will transform it line by line to bzip2 using pipes.

In [None]:
!unzip -c 'Power-Networks-LCL-June2015(withAcornGps).zip'  | bzip2 > 'Power-Networks-LCL-June2015.bz2'

#### Library importing

Now we are ready to start. The commented lines would be needed if our Virtual Machine had not been already configured to make our life easier to import SparkContext and assign it to sc upon start.

In [1]:
#from pyspark import SparkContext
#sc = SparkContext()
from pyspark.sql.types import DateType
from datetime import datetime
import numpy as np
from pyspark import Row
sc

<pyspark.context.SparkContext at 0x7f0868237d90>

In [2]:
sc.version

u'2.0.1'

#### Sample dataset

We will discover the steps needed to process our dataset using the small sample dataset.

In [3]:
path_sample = 'UKPN-LCL-smartmeter-sample.csv'

In [4]:
data_sample = sc.textFile(path_sample)

In [5]:
data_sample.take(3)

[u'LCLid,stdorToU,DateTime,KWH/hh (per half hour) ,Acorn,Acorn_grouped',
 u'MAC003718,Std,17/10/2012 13:00:00,0.09,ACORN-A,Affluent',
 u'MAC003718,Std,17/10/2012 13:30:00,0.16,ACORN-A,Affluent']

The header is useful in pandas but we need to remove it when working with Spark.

In [6]:
header = data_sample.first()

In [7]:
data_sample2 = data_sample.filter(lambda l: l != header).persist()

In [8]:
data_sample2.take(3)

[u'MAC003718,Std,17/10/2012 13:00:00,0.09,ACORN-A,Affluent',
 u'MAC003718,Std,17/10/2012 13:30:00,0.16,ACORN-A,Affluent',
 u'MAC003718,Std,17/10/2012 14:00:00,0.212,ACORN-A,Affluent']

Now we will map the file extracting the relevant fields and converting them to the appropriate types. We will ignore the last field, which is actually a higher-level ACORN grouping compared to the second last. ACORN is a UK consumer socioeconomical segmentation system that may be useful in this work (see http://acorn.caci.co.uk/downloads/Acorn-User-guide.pdf).

In [9]:
def line2tuple(l):
    fields = l.split(',')
    if len(fields) == 6:
        ID = fields[0]
        tariff = fields[1]
        DateTime = datetime.strptime(fields[2], '%d/%m/%Y %H:%M:%S')
        ACORN = fields[4]
        try:
            consumption = float(fields[3])
        except ValueError:
            consumption = np.nan
        return (ID, tariff, DateTime, consumption, ACORN)
    else:
        return (np.nan,np.nan,np.nan,np.nan,np.nan)

In [10]:
data_sample2.map(line2tuple).take(3)

[(u'MAC003718',
  u'Std',
  datetime.datetime(2012, 10, 17, 13, 0),
  0.09,
  u'ACORN-A'),
 (u'MAC003718',
  u'Std',
  datetime.datetime(2012, 10, 17, 13, 30),
  0.16,
  u'ACORN-A'),
 (u'MAC003718',
  u'Std',
  datetime.datetime(2012, 10, 17, 14, 0),
  0.212,
  u'ACORN-A')]

In [11]:
rows_sample = data_sample2.map(line2tuple)\
.map(lambda x: Row(ID = x[0], Tariff = x[1], DateTime = x[2], kWh_30min = x[3], ACORN = x[4]))

In [13]:
df_sample = rows_sample.toDF()

In [14]:
df_sample.show(5)

+-------+--------------------+---------+------+---------+
|  ACORN|            DateTime|       ID|Tariff|kWh_30min|
+-------+--------------------+---------+------+---------+
|ACORN-A|2012-10-17 13:00:...|MAC003718|   Std|     0.09|
|ACORN-A|2012-10-17 13:30:...|MAC003718|   Std|     0.16|
|ACORN-A|2012-10-17 14:00:...|MAC003718|   Std|    0.212|
|ACORN-A|2012-10-17 14:30:...|MAC003718|   Std|    0.145|
|ACORN-A|2012-10-17 15:00:...|MAC003718|   Std|    0.104|
+-------+--------------------+---------+------+---------+
only showing top 5 rows



We can try calculating the mean grouping by tariff. In this case, there is only one consumer, which is subject to the standard tariff.

In [None]:
df_sample.dropna().groupBy('Tariff').mean().collect()

In [15]:
#Checking the types, everything looks correct.
df_sample.printSchema

<bound method DataFrame.printSchema of DataFrame[ACORN: string, DateTime: timestamp, ID: string, Tariff: string, kWh_30min: double]>

##### Saving the data to a text file

At the end of our full dataset processing, we will aggregate the data and and save it to a text file. Let us explore alternatives with the smaller sample dataset.

There are different options to save the data as a text file. The problem with these first two options is that it will save it in several text files, as it does not coalesce the Spark DataFrame in one partition before writing the text file. This is the only option when the resulting DataFrame is too large to save it in one local partition but is not so convenient for future processing:

In [19]:
%%time
df_sample.write.csv('sample.csv') # Needs Spark >= 2.0. In the cluster, we have Spark 1.5.0

CPU times: user 2.18 ms, sys: 155 µs, total: 2.33 ms
Wall time: 1.62 s


In [20]:
%%time
df_sample.write.format('com.databricks.spark.csv').save('sample2.csv') # For Spark 1.5.0

CPU times: user 1.32 ms, sys: 176 µs, total: 1.49 ms
Wall time: 1.12 s


In order to save the data in one file, we can use these handier alternatives. Our first aggregations of the full dataset will be small enough (~ 80k lines, ~500k lines) to be able to do this:

In [16]:
%%time
df_sample.rdd.map(lambda x: ','.join(map(str, x))).coalesce(1).saveAsTextFile('file_rdd.csv')

CPU times: user 5.57 ms, sys: 1.22 ms, total: 6.79 ms
Wall time: 2.35 s


In [17]:
%%time
df_sample.toPandas().to_csv('file_pd.csv')

CPU times: user 446 ms, sys: 59.1 ms, total: 505 ms
Wall time: 1.53 s


Transforming it to a Pandas DataFrame was quicker, simpler and the exit file includes the header. We will follow this path when data is small enough.

#### Processing the full dataset

Now let us start tackling the real thing:

In [2]:
path = 'Power-Networks-LCL-June2015.bz2'

In [3]:
data = sc.textFile(path)

In [38]:
data.take(5)

[u'Archive:  Power-Networks-LCL-June2015(withAcornGps).zip',
 u'  inflating: Power-Networks-LCL-June2015(withAcornGps)v2.csv  ',
 u'LCLid,stdorToU,DateTime,KWH/hh (per half hour) ,Acorn,Acorn_grouped',
 u'MAC000002,Std,2012-10-12 00:30:00.0000000, 0 ,ACORN-A,Affluent',
 u'MAC000002,Std,2012-10-12 01:00:00.0000000, 0 ,ACORN-A,Affluent']

There are two new problems.

First of all, there are two extra lines above the header, so now we need to remove 3 lines before mapping.

In [4]:
data_no_header = data.zipWithIndex().filter(lambda x: x[1] > 2).keys()

In [25]:
data_no_header.map(line2tuple).take(3)

[(u'MAC000002', u'Std', u'2012-10-12 00:30:00.0000000', 0.0, u'ACORN-A'),
 (u'MAC000002', u'Std', u'2012-10-12 01:00:00.0000000', 0.0, u'ACORN-A'),
 (u'MAC000002', u'Std', u'2012-10-12 01:30:00.0000000', 0.0, u'ACORN-A')]

Moreover, the DateTime format has changed compared to the sample provided!! Not very nice, let's hope that at least it is consistent thoughout the whole full dataset.
We must change our mapping function accordingly. This datetime format can be automatically changed later for the full dataset to pyspark.sql TimestampType so we will leave it as string for now.

In [28]:
data_no_header.map(line2tuple).filter(lambda (_1,t,_2,_3,_4): t == 'ToU').take(3)

[(u'MAC000005', u'ToU', u'2012-06-01 10:30:00.0000000', 0.095, u'ACORN-C'),
 (u'MAC000005', u'ToU', u'2012-06-01 11:00:00.0000000', 0.051, u'ACORN-C'),
 (u'MAC000005', u'ToU', u'2012-06-01 11:30:00.0000000', 0.098, u'ACORN-C')]

In [30]:
data_no_header.map(line2tuple)\
.filter(lambda (u,t,_2,_3,_4): (u == 'MAC000005') & (t=='ToU')).take(3)

[(u'MAC000005', u'ToU', u'2012-06-01 10:30:00.0000000', 0.095, u'ACORN-C'),
 (u'MAC000005', u'ToU', u'2012-06-01 11:00:00.0000000', 0.051, u'ACORN-C'),
 (u'MAC000005', u'ToU', u'2012-06-01 11:30:00.0000000', 0.098, u'ACORN-C')]

So actually "stdorToU" tells us if a user was subjected to dToU or std tariff in 2013, not in the timestamp corresponding to a given measurement. We do not need to separate users then but we should bear in mind later that dToU tariff was only applied in 2013.

In [23]:
def line2tuple(l):
    fields = l.split(',')
    if len(fields) == 6: 

#We have seen that there is at least one line in the file that does not follow the format.
#This condition is to avoid an "Index out of range" error. 
        ID = fields[0]
        tariff = fields[1]
        DateTime = fields[2]
        ACORN = fields[4]
        try:
            consumption = float(fields[3])
        except ValueError:
            consumption = np.nan
        return (ID, tariff, DateTime, consumption,ACORN)
    else:
        return (np.nan,np.nan,np.nan,np.nan,np.nan)
   
#We treat any line that does not follow the general format as NaN. 
#We will remove them afterwards

We create our dataframe and check that DateTime is a string variable.

In [58]:
rows = data_no_header.map(line2tuple)\
.map(lambda x: Row(ID = x[0], Tariff = x[1], DateTime = x[2], kWh_30min = x[3]))

In [59]:
df = rows.toDF()

In [60]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[DateTime: string, ID: string, Tariff: string, kWh_30min: double]>

We change DateTime to pyspark.sql TimestampType (this is equivalent to datetime.datetime).

In [65]:
from pyspark.sql.types import TimestampType
df2 = df.withColumn('DateTime', df['DateTime'].cast(TimestampType()))

In [80]:
df2.printSchema

<bound method DataFrame.printSchema of DataFrame[DateTime: timestamp, ID: string, Tariff: string, kWh_30min: double]>

In [76]:
df2.show(5)

+--------------------+---------+------+---------+
|            DateTime|       ID|Tariff|kWh_30min|
+--------------------+---------+------+---------+
|2012-10-12 00:30:...|MAC000002|   Std|      0.0|
|2012-10-12 01:00:...|MAC000002|   Std|      0.0|
|2012-10-12 01:30:...|MAC000002|   Std|      0.0|
|2012-10-12 02:00:...|MAC000002|   Std|      0.0|
|2012-10-12 02:30:...|MAC000002|   Std|      0.0|
+--------------------+---------+------+---------+
only showing top 5 rows



Let's try if calculating the mean grouped by tariff works, this would mean we have cleaned the dataset successfully.

In [79]:
df2.dropna().groupBy('Tariff').mean().collect()

[Row(Tariff=u'Std', avg(kWh_30min)=0.21507225601128732),
 Row(Tariff=u'ToU', avg(kWh_30min)=0.1986226410448441)]

That's great! We have also learnt the keys used to name the two tariff groups we expected to find ('Std' and 'ToU')

It will be useful in our analysis which consumers were subjected to Dynamic Time of Use (ToU) tariff during 2013. These had a standard tariff during the rest of the period included in the dataset so we can evaluate behavioural changes due to the tariff scheme.

In [94]:
df2.filter(df2['Tariff'] == 'ToU').select('ID').drop_duplicates()

DataFrame[ID: string]

In the first step of our analysis we will aggregate all users under the same tariff plan (distinguishing consumers subjected to ToU during 2013 and consumers with the standard flat rate tariff throughout the whole period of time). We will analyse this data in R later (no need for Spark), including explanatory plots. R provides useful packages for this purpose: xts and zoo for time series plotting and forecast for prediction models.

Therefore, we will calculate basic statistics for each timestamp per tariff plan. We will then refine the analysis aggregating by ACORN group and tariff plan.

In [24]:
def ACORN(l):
    fields = l.split(',')
    if len(fields) == 6:
        acorn = fields[4]
        return acorn
    else:
        return 'NA'

In [25]:
#ACORN groups
ACORN_groups = data_no_header.map(ACORN).filter(lambda a: a != 'NA').distinct()

In [26]:
ACORN_groups.collect()

[u'ACORN-K',
 u'ACORN-J',
 u'ACORN-I',
 u'ACORN-',
 u'ACORN-H',
 u'ACORN-O',
 u'ACORN-N',
 u'ACORN-M',
 u'ACORN-L',
 u'ACORN-C',
 u'ACORN-B',
 u'ACORN-Q',
 u'ACORN-A',
 u'ACORN-P',
 u'ACORN-G',
 u'ACORN-F',
 u'ACORN-E',
 u'ACORN-U',
 u'ACORN-D']

Code for script to be executed in the cluster:

In [1]:
#from pyspark import SparkContext
#sc = SparkContext()
from pyspark.sql.types import DateType
from datetime import datetime
import numpy as np
from pyspark import Row

In [2]:
path = 'Power-Networks-LCL-June2015.bz2'

In [3]:
data = sc.textFile(path)
data_no_header = data.zipWithIndex().filter(lambda x: x[1] > 2).keys()

In [4]:
def ToU_IDs(l):
    fields = l.split(',')
    if len(fields) == 6:
        ID = fields[0]
        tariff = fields[1]
        return (tariff, ID)
    else:
        return ('NA','NA')

In [5]:
ID_ToU_rdd = data_no_header.map(ToU_IDs).filter(lambda (t,_): t == 'ToU').distinct()
ID_ToU = ID_ToU_rdd.map(lambda (_,i): i).collect()

In [14]:
def line2tuple(l):
    fields = l.split(',')
    if len(fields) == 6:
        ID = fields[0]
        tariff = fields[1]
        DateTime = fields[2]
        ACORN = fields[4]
        try:
            consumption = float(fields[3])
        except ValueError:
            consumption = np.nan
        return (ID, tariff, DateTime, consumption, ACORN)
    else:
        return (np.nan,np.nan,np.nan,np.nan,np.nan)

In [7]:
rows = data_no_header.map(line2tuple)\
.map(lambda x: Row(ID = x[0], Tariff = x[1], DateTime = x[2], 
                   kWh_30min = x[3], ACORN = x[4]))

In [8]:
df = rows.toDF()

In [9]:
df.show(5)

+-------+--------------------+---------+------+--------+---------+
|  ACORN|            DateTime|       ID|Tariff|ToU_User|kWh_30min|
+-------+--------------------+---------+------+--------+---------+
|ACORN-A|2012-10-12 00:30:...|MAC000002|   Std|       0|      0.0|
|ACORN-A|2012-10-12 01:00:...|MAC000002|   Std|       0|      0.0|
|ACORN-A|2012-10-12 01:30:...|MAC000002|   Std|       0|      0.0|
|ACORN-A|2012-10-12 02:00:...|MAC000002|   Std|       0|      0.0|
|ACORN-A|2012-10-12 02:30:...|MAC000002|   Std|       0|      0.0|
+-------+--------------------+---------+------+--------+---------+
only showing top 5 rows



In [10]:
from pyspark.sql.types import TimestampType
df = df.withColumn('DateTime', df['DateTime'].cast(TimestampType()))

In [11]:
df.printSchema

<bound method DataFrame.printSchema of DataFrame[ACORN: string, DateTime: timestamp, ID: string, Tariff: string, ToU_User: bigint, kWh_30min: double]>

In [12]:
import pyspark.sql.functions as F

In [13]:
agg_stats = df.dropna().groupBy('DateTime','Tariff')\
    .agg(F.count('kWh_30min').alias('count'),
         F.sum('kWh_30min').alias('sum'),
         F.min('kWh_30min').alias('min'),
         F.mean('kWh_30min').alias('mean'),
         F.max('kWh_30min').alias('max'),
         F.stddev('kWh_30min').alias('std_dev')
        )

As the grouped Dataframe is small enough to be saved in one partition we can transform it to a Pandas dataframe and then save it to csv. Saving it in only one text file is much handier for future processing.

We could have also transformed it back to a functional API rdd and used coalesce(1).saveAsTextFile('agg_stats.csv').

Otherwise, we could have written it to multiple text files with:

agg_stats.write.csv('agg_stats.csv') (Spark >= 2.0)

agg_stats.write.format('com.databricks.spark.csv').save('agg_stats.csv') (in the cluster, where we have Spark 1.5.0)

In [15]:
agg_stats.toPandas().to_csv('outputs/agg_stats.csv')

In [None]:
acorn_stats = df.dropna().groupBy('ACORN','DateTime','Tariff')\
    .agg(F.count('kWh_30min').alias('count'),
         F.sum('kWh_30min').alias('sum'),
         F.min('kWh_30min').alias('min'),
         F.mean('kWh_30min').alias('mean'),
         F.max('kWh_30min').alias('max'),
         F.stddev('kWh_30min').alias('std_dev')
        )

In [None]:
acorn_stats.toPandas().to_csv('outputs/acorn_stats.csv')

We will copy this last part to SmartMeter_agg1.py to run it in the cluster.