In [1]:
# Importing modules
from __future__ import division
from math import radians, cos, sin, asin, sqrt, exp
from datetime import datetime
from pyspark import SparkContext

In [2]:
# Crreating Context
sc = SparkContext(appName="lab_kernel")

In [3]:
# Getting data from external sources and splited each line to words
tempreture_readings = sc.textFile("short_temp.csv")
temp = tempreture_readings.map(lambda line: line.split(';'))

stations = sc.textFile('stations.csv')
stat = stations.map(lambda x: x.split(';'))

In [4]:
# Distance function (In km)
def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    # haversine formula
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    km = 6367 * c
    return km

In [5]:
# datediff function (In days)
def datediff(date1,date2):
    date_format = "%Y-%m-%d"
    a = datetime.strptime(date1, date_format)
    b = datetime.strptime(date2, date_format)
    delta = b - a
    return delta.days 

In [92]:
# timediff function (In hours); time2 in a tuple
def timediff(time1,time2):
    time_format = "%H:%M:%S"
    a = datetime.strptime(time1, time_format)
    b = [datetime.strptime(t, time_format) for t in time2]
    delta = [elem-a for elem in b]
    return [elem.seconds/3600 for elem in delta]

In [97]:
time2 = ['04:00:00','06:00:00','08:00:00']
time1 = '07:00:00'
timediff(time1, time2)
#print(list(map(timediff,time2,time1)))

[21.0, 23.0, 1.0]

In [7]:
# Smoothing factors
h_distance = 1000
h_date = 4
h_time = 3

In [8]:
# Input
a = 58.4274 
b = 14.826 
date = "2013-07-04" 
times = ['04:00:00','06:00:00','08:00:00']
for i in range(10,26,2):
    times.append('{}:00:00'.format(i))
        
# Create RDD for the input
input_map = map(lambda x: (x,(date,a,b)) ,times)
input_RDD = sc.parallelize(input_map)

In [33]:
# Extracting Key-Value pairs
station_KeyValue = stat.map(lambda x: (x[0],(x[3],x[4])))

temp_KeyValue = temp.map(lambda x: ((x[0],x[1],x[2]),x[3]))
# Filtering Dataset based on date
temp_KeyValue_filtered = temp_KeyValue.filter(lambda x: x[0][1] < date)

In [36]:
# We will not join but broadcast the station dataset (small) on the temperature (big)
# We will create a copy on each node 
stat_map = station_KeyValue.collectAsMap()
stat_broad = sc.broadcast(stat_map)

In [51]:
# Make a join with broadcasting
join_broad = temp_KeyValue_filtered.map(lambda t: (t[0], stat_broad.value.get(t[0][0], '-'), t[1]))

In [52]:
join_broad.take(5)

[(('102190', '1955-09-01', '06:00:00'), ('60.3097', '12.6959'), '14.5'),
 (('102190', '1955-09-01', '12:00:00'), ('60.3097', '12.6959'), '19.8'),
 (('102190', '1955-09-01', '18:00:00'), ('60.3097', '12.6959'), '15.2'),
 (('102190', '1955-09-02', '06:00:00'), ('60.3097', '12.6959'), '11.6'),
 (('102190', '1955-09-02', '12:00:00'), ('60.3097', '12.6959'), '15.8')]

In [55]:
input_RDD.take(2)

[('04:00:00', ('2013-07-04', 58.4274, 14.826)),
 ('06:00:00', ('2013-07-04', 58.4274, 14.826))]

In [60]:
input_RDD[1][1]

TypeError: 'RDD' object does not support indexing

In [62]:
# Distance Kernel
def kernel_dist(data, a, b):
    return data.map(lambda x: (x[0], exp(-1*(haversine(float(x[1][0]),float(x[1][1]),a,b)/h_distance)**2)))

# Date kernel
def kernel_date(data, date):
    return data.map(lambda x: (x[0], exp(-1*(datediff(x[0][1],date)/h_date)**2)))

# Time Kernel
# -- Function
def kernel_time(data, input_RDD):
    temp_KeyValue_filtered = data.filter(lambda x: x[0][2] < input_RDD[0])
    return temp_KeyValue_filtered.map(lambda x: (x[0], exp(-1*(timediff(x[0][2],input_RDD[0])/h_time)**2)))

In [57]:
# Combine the kernels: sum
def kernel_sum(kernel1, kernel2, kernel3):
    return kernel1+kernel2+kernel3

# Combine the kernels: product
def kernel_prod(kernel1, kernel2, kernel3):
    return kernel1*kernel2*kernel3

In [63]:
# Compute the kernels:
kern_dist = kernel_dist(join_broad, a, b)
kern_date = kernel_dist(join_broad, date)

In [64]:
kern_dist.take(10)

[(('102190', '1955-09-01', '06:00:00'), 0.9072824587256415),
 (('102190', '1955-09-01', '12:00:00'), 0.9072824587256415),
 (('102190', '1955-09-01', '18:00:00'), 0.9072824587256415),
 (('102190', '1955-09-02', '06:00:00'), 0.9072824587256415),
 (('102190', '1955-09-02', '12:00:00'), 0.9072824587256415),
 (('102190', '1955-09-02', '18:00:00'), 0.9072824587256415),
 (('102190', '1955-09-03', '06:00:00'), 0.9072824587256415),
 (('102190', '1955-09-03', '12:00:00'), 0.9072824587256415),
 (('102190', '1955-09-03', '18:00:00'), 0.9072824587256415),
 (('102190', '1955-09-04', '06:00:00'), 0.9072824587256415)]