# Week 5 - Big Data analytics using Spark

# Hands-on: Analyzing Sensor Data with Spark DataFrames

In [1]:
# Example weather station data
#
# 1419408015	0R1,Dn=059D,Dm=066D,Dx=080D,Sn=8.5M,Sm=9.5M,Sx=10.3M
# 1419408016	0R1,Dn=059D,Dm=065D,Dx=078D,Sn=8.5M,Sm=9.5M,Sx=10.3M
# 1419408016	0R2,Ta=13.9C,Ua=28.5P,Pa=889.9H
# 1419408017	0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.7M,Sm=9.6M,Sx=10.3M
# 1419408018	0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.9M,Sm=9.6M,Sx=10.3M
# 1419408019	0R1,Dn=059D,Dm=065D,Dx=075D,Sn=8.8M,Sm=9.5M,Sx=10.3M

In [2]:
# Key for measurements:
#
# Sn      Wind speed minimum m/s, km/h, mph, knots #,M, K, S, N
# Sm      Wind speed average m/s, km/h, mph, knots #,M, K, S, N
# Sx      Wind speed maximum m/s, km/h, mph, knots #,M, K, S, N
# Dn      Wind direction minimum deg #, D
# Dm      Wind direction average deg #, D
# Dx      Wind direction maximum deg #, D
# Pa      Air pressure hPa, Pa, bar, mmHg, inHg #, H, P, B, M, I
# Ta      Air temperature °C, °F #, C, F
# Tp      Internal temperature °C, °F #, C, F
# Ua      Relative humidity %RH #, P
# Rc      Rain accumulation mm, in #, M, I
# Rd      Rain duration s #, S
# Ri      Rain intensity mm/h, in/h #, M, I
# Rp      Rain peak intensity mm/h, in/h #, M, I
# Hc      Hail accumulation hits/cm2, hits/in2, hits #, M, I, H
# Hd      Hail duration s #, S
# Hi      Hail intensity hits/cm2h, hits/in2h, hits/ h #, M, I, H
# Hp      Hail peak intensity hits/cm2h, hits/in2h, hits/ h #, M, I, H
# Th      Heating temperature °C, °F #, C, F
# Vh      Heating voltage V #, N, V, W, F2
# Vs      Supply voltage V V
# Vr      3.5 V ref. voltage V V

Parse a line of weather station data, returning the average wind direction measurement.

In [3]:
import re
def parse(line):
    match = re.search('Dm=(\d+)', line)
    if match:
        val = match.group(1)
        return [int(val)]
    return []

In [4]:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)

Create DStream of weather data.

In [5]:
lines = ssc.socketTextStream('rtd.hpwren.ucsd.edu', 12028)

## Read measurement
Using `flatMap()` to iterate over the lines DStream, 
and calling the `parse()` function we defined above to get the average wind speed.

In [6]:
vals = lines.flatMap(parse)

## Create sliding window of data
Create a new DStream called _window_ that combines the ten seconds worth of data and moves by five seconds.

In [7]:
window = vals.window(10, 5)

## Define and call analysis function
First prints the entire contents of the RDD by calling the `collect()` method. This is done to demonstrate the sliding window and would not be practical if the RDD was containing a large amount of data. Then, check if the size of the RDD is greater than zero before printing the maximum and minimum values.

In [8]:
def stats(rdd):
    print(rdd.collect())
    if rdd.count() > 0:
        print('max = {} min = {}'.format(rdd.max(), rdd.min()))

Call the `stats()` function defined above for each RDD in the DStream window.

In [9]:
window.foreachRDD(lambda rdd: stats(rdd))

## Start the stream processing

In [10]:
ssc.start()

[203, 205, 203]
max = 205 min = 203
[203, 205, 203, 202, 196, 194, 197, 194]
max = 205 min = 194
[202, 196, 194, 197, 194, 194, 194, 193, 193, 191, 193]
max = 202 min = 191
[194, 194, 193, 193, 191, 193, 195, 197, 201, 198, 199]


In [11]:
ssc.stop()

max = 201 min = 191
