In [1]:
# Example weather station data
#
# 1419408015	0R1,Dn=059D,Dm=066D,Dx=080D,Sn=8.5M,Sm=9.5M,Sx=10.3M
# 1419408016	0R1,Dn=059D,Dm=065D,Dx=078D,Sn=8.5M,Sm=9.5M,Sx=10.3M
# 1419408016	0R2,Ta=13.9C,Ua=28.5P,Pa=889.9H
# 1419408017	0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.7M,Sm=9.6M,Sx=10.3M
# 1419408018	0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.9M,Sm=9.6M,Sx=10.3M
# 1419408019	0R1,Dn=059D,Dm=065D,Dx=075D,Sn=8.8M,Sm=9.5M,Sx=10.3M

In [2]:
# Key for measurements:
#
# Sn      Wind speed minimum m/s, km/h, mph, knots #,M, K, S, N
# Sm      Wind speed average m/s, km/h, mph, knots #,M, K, S, N
# Sx      Wind speed maximum m/s, km/h, mph, knots #,M, K, S, N
# Dn      Wind direction minimum deg #, D
# Dm      Wind direction average deg #, D
# Dx      Wind direction maximum deg #, D
# Pa      Air pressure hPa, Pa, bar, mmHg, inHg #, H, P, B, M, I
# Ta      Air temperature °C, °F #, C, F
# Tp      Internal temperature °C, °F #, C, F
# Ua      Relative humidity %RH #, P
# Rc      Rain accumulation mm, in #, M, I
# Rd      Rain duration s #, S
# Ri      Rain intensity mm/h, in/h #, M, I
# Rp      Rain peak intensity mm/h, in/h #, M, I
# Hc      Hail accumulation hits/cm2, hits/in2, hits #, M, I, H
# Hd      Hail duration s #, S
# Hi      Hail intensity hits/cm2h, hits/in2h, hits/ h #, M, I, H
# Hp      Hail peak intensity hits/cm2h, hits/in2h, hits/ h #, M, I, H
# Th      Heating temperature °C, °F #, C, F
# Vh      Heating voltage V #, N, V, W, F2
# Vs      Supply voltage V V
# Vr      3.5 V ref. voltage V V

In [3]:
# Parse a line of weather station data, returning the average wind direction measurement 
#
import re

def parse(line):
    match = re.search("Dm=(\d+)", line)
    if match:
        val = match.group(1)
        return [int(val)]
    return []

In [4]:
from pyspark.streaming import StreamingContext

In [5]:
# Batch interval of 1 second
ssc = StreamingContext(sc,1)

In [8]:
lines = ssc.socketTextStream('rtd.hpwren.ucsd.edu',12028)

In [11]:
# Our defined function parse giving only the average WD values
vals = lines.flatMap(parse)

In [12]:
# Aggregate DStream vals: length of window is 10 seconds and window moves every 5 seconds
window = vals.window(10,5)

In [14]:
def stats(rdd):
    print(rdd.collect())
    
    if rdd.count() > 0:
        print('max = {} min = {}'.format(rdd.max(),rdd.min()))

In [15]:
window.foreachRDD(stats)

### Start Streaming

In [17]:
ssc.start()

[200, 181]
max = 200 min = 181
[200, 181, 168, 158, 176, 176, 157]
max = 200 min = 157
[168, 158, 176, 176, 157, 140, 141, 123, 135, 155]
max = 176 min = 123
[140, 141, 123, 135, 155, 173, 183, 172, 185, 185]
max = 185 min = 123
[173, 183, 172, 185, 185, 186, 203, 213, 199, 179]
max = 213 min = 172
[186, 203, 213, 199, 179, 161, 152, 163, 162, 180]
max = 213 min = 152
[161, 152, 163, 162, 180, 183, 179, 182, 186, 201]
max = 201 min = 152
[183, 179, 182, 186, 201, 207, 211, 201, 191, 175]
max = 211 min = 175
[207, 211, 201, 191, 175, 188, 179, 173, 163, 154]
max = 211 min = 154
[188, 179, 173, 163, 154, 154, 143, 152, 157, 156]


### Stop Streaming

In [18]:
ssc.stop()

max = 188 min = 143
