In [2]:
# Example weather station data
#
# 1419408015	0R1,Dn=059D,Dm=066D,Dx=080D,Sn=8.5M,Sm=9.5M,Sx=10.3M
# 1419408016	0R1,Dn=059D,Dm=065D,Dx=078D,Sn=8.5M,Sm=9.5M,Sx=10.3M
# 1419408016	0R2,Ta=13.9C,Ua=28.5P,Pa=889.9H
# 1419408017	0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.7M,Sm=9.6M,Sx=10.3M
# 1419408018	0R1,Dn=059D,Dm=064D,Dx=075D,Sn=8.9M,Sm=9.6M,Sx=10.3M
# 1419408019	0R1,Dn=059D,Dm=065D,Dx=075D,Sn=8.8M,Sm=9.5M,Sx=10.3M

In [3]:
# Key for measurements:
#
# Sn      Wind speed minimum m/s, km/h, mph, knots #,M, K, S, N
# Sm      Wind speed average m/s, km/h, mph, knots #,M, K, S, N
# Sx      Wind speed maximum m/s, km/h, mph, knots #,M, K, S, N
# Dn      Wind direction minimum deg #, D
# Dm      Wind direction average deg #, D
# Dx      Wind direction maximum deg #, D
# Pa      Air pressure hPa, Pa, bar, mmHg, inHg #, H, P, B, M, I
# Ta      Air temperature °C, °F #, C, F
# Tp      Internal temperature °C, °F #, C, F
# Ua      Relative humidity %RH #, P
# Rc      Rain accumulation mm, in #, M, I
# Rd      Rain duration s #, S
# Ri      Rain intensity mm/h, in/h #, M, I
# Rp      Rain peak intensity mm/h, in/h #, M, I
# Hc      Hail accumulation hits/cm2, hits/in2, hits #, M, I, H
# Hd      Hail duration s #, S
# Hi      Hail intensity hits/cm2h, hits/in2h, hits/ h #, M, I, H
# Hp      Hail peak intensity hits/cm2h, hits/in2h, hits/ h #, M, I, H
# Th      Heating temperature °C, °F #, C, F
# Vh      Heating voltage V #, N, V, W, F2
# Vs      Supply voltage V V
# Vr      3.5 V ref. voltage V V

In [1]:
# Parse a line of weather station data, returning the average wind direction measurement 
#
import re
def parse(line):
    match = re.search("Dm=(\d+)", line)
    if match:
        val = match.group(1)
        return [int(val)]
    return []

In [2]:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)

In [4]:
# Open connection to the streaming weather data

lines = ssc.socketTextStream("rtd.hpwren.ucsd.edu", 12028)

In [7]:
# discretized stream = DStream - high level abstraction Spark Streaming provides to represent a continuous stream of data
# Internally, DStream is represented as a sequence of RDDs. 

# read average wind speed from each line and store it in a new DStream vals
vals = lines.flatMap(parse)

In [8]:
# The window, sliding, contains 10 seconds worth of data and slides every five seconds

window = vals.window(10,5)

In [9]:
# Define and call analysis function

def stats(rdd):
    print(rdd.collect())
    if rddcount() >0:
            print("max = {}, min = {}".format(rdd.max(), rdd.min()))

In [10]:
# Call stats functino for each RDD in our sliding window

window.foreachRDD(lambda rdd: stats(rdd))

In [11]:
# Start the streaming process

ssc.start()

[]
[67, 76, 74, 64, 53]
[67, 76, 74, 64, 53, 57, 55, 48, 56, 69]
[57, 55, 48, 56, 69, 62, 51, 44, 50, 50]
[62, 51, 44, 50, 50, 53, 55, 67, 62, 58]
[53, 55, 67, 62, 58, 61, 61, 66, 68, 76]
[61, 61, 66, 68, 76, 69, 64, 50, 51, 60]
[69, 64, 50, 51, 60, 65, 66, 64, 55, 49]
[65, 66, 64, 55, 49, 56, 50, 56, 55, 47]
[56, 50, 56, 55, 47, 45, 39, 36, 32, 29]
[45, 39, 36, 32, 29, 26, 22, 24, 27, 30]
[26, 22, 24, 27, 30, 35, 40, 32, 45, 45]
[35, 40, 32, 45, 45, 47, 57, 59, 59, 55, 49]
[47, 57, 59, 59, 55, 49, 42, 39, 48, 43, 39]
[42, 39, 48, 43, 39, 29, 29, 21, 26, 26]
[29, 29, 21, 26, 26, 24, 25, 27, 32, 40]
[24, 25, 27, 32, 40, 43, 37, 35, 32, 32]
[43, 37, 35, 32, 32, 34, 40, 45, 43, 41]
[34, 40, 45, 43, 41, 45, 50, 61, 61, 61]
[45, 50, 61, 61, 61, 68, 68, 63, 68, 60]
[68, 68, 63, 68, 60, 51, 41, 43, 51, 57]
[51, 41, 43, 51, 57, 46, 41, 35, 32, 41]
[46, 41, 35, 32, 41, 48, 62, 52, 40, 29]
[48, 62, 52, 40, 29, 30, 29, 46, 46, 49]
[30, 29, 46, 46, 49, 52, 51, 54, 54, 61, 65]
[52, 51, 54, 54, 61, 

In [12]:
ssc.stop()

[21, 27, 33, 45, 34, 31, 32, 25, 28, 25]
