## Kernel Model


from __future__ import division
from math import radians, cos, sin, asin, sqrt, exp
from datetime import datetime
from pyspark import SparkContext
from pyspark.broadcast import Broadcast
import numpy as np

def haversine(lon1, lat1, lon2, lat2):
    # convert decimal degrees to radians
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    # haversine formula
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    km = 6367 * c
    return km

def day_difference(day1, day2):
    diff = abs(datetime.strptime(str(day1), "%Y-%m-%d") - datetime.strptime(str(day2), "%Y-%m-%d"))
    no_days = diff.days
    return no_days

def hour_diff(time1, time2):
    diff = abs(datetime.strptime(time1, "%H:%M:%S") - datetime.strptime(time2, "%H:%M:%S"))
    diff = (diff.total_seconds()) / 3600
    return diff

def gaussian_kernel(u, h):
    return np.exp(-u**2 / (2 * h**2))

def sum_kernel(distance_kernel, day_kernel, time_kernel):
    res = distance_kernel + day_kernel + time_kernel
    return res

def product_kernel(distance_kernel, day_kernel, time_kernel):
    res = distance_kernel * day_kernel * time_kernel
    return res

# Value to predict
target_latitude = 58.68681
target_longitude = 15.92183
target_date = '2014-05-17'
hour_list= ["00:00:00", "22:00:00", "20:00:00", "18:00:00", "16:00:00", "14:00:00", "12:00:00", "10:00:00", "08:00:00", "06:00:00", "04:00:00"]

#target_date = '1980-05-17'
#hour_list= ["00:00:00", "22:00:00"]

# Hyperparameters: widths for day, distance, and time
h_dist = 100
h_day = 30
h_time = 12

# Create SparkContext
sc = SparkContext(appName="Lab 3 ML")

temperature_file = sc.textFile("BDA/input/temperature-readings.csv")
station_file = sc.textFile("BDA/input/stations.csv")

temperature_data = temperature_file.map(lambda x: x.split(';'))
stations_data = station_file.map(lambda x: x.split(';'))

target_date_strip = datetime.strptime(target_date, '%Y-%m-%d')
prev_temp = temperature_data.filter(lambda x: datetime.strptime(x[1], '%Y-%m-%d') < target_date_strip)

# Calculate station_distkernel and broadcast it
station_distkernel = stations_data.map(lambda x: (x[0], gaussian_kernel(haversine(target_longitude, target_latitude, float(x[4]), float(x[3])), h_dist))).collectAsMap()
broadcast_station_distkernel = sc.broadcast(station_distkernel)

temperature_data_datekernel = prev_temp.map(lambda x: (x[0], x[1], x[2], x[3], gaussian_kernel(day_difference(target_date, x[1]), h_day))).cache()
#print("You are here 0")
#print(temperature_data_datekernel.take(1))

predictions = {}
for hour in hour_list:
    temp_kernels = temperature_data_datekernel.map(lambda x: (float(x[3]),
                                                broadcast_station_distkernel.value[x[0]],
                                                x[4],
                                                gaussian_kernel(hour_diff(hour, x[2]), h_time)))
    #temp,sumkernel, prodkernel                                
    #temp_both_kernels = temp_kernels.map(lambda x: (x[0], sum_kernel(x[1], x[2], x[3]), product_kernel(x[1], x[2], x[3])))
    #print("You are here 1")
    #print(temp_both_kernels.take(1))
    sum_kernel, sum_kernel*temp, prod_kernel, prod_kernel*temp 
    cal_kerneltemp = temp_both_kernels.map(lambda x :(x[1],x[0]*x[1],x[2],x[0]*x[2]))

	
    #print("You are here 2")
    #print(cal_kerneltemp.take(1))
    sum_kerneltemp = cal_kerneltemp.reduce(lambda x, y: (x[0] + y[0], x[1] + y[1], x[2] + y[2], x[3] + y[3]))
    #print("You are here 3")
    #print(sum_kerneltemp)
    predictions[hour]=(sum_kerneltemp[1]/sum_kerneltemp[0],sum_kerneltemp[3]/sum_kerneltemp[2])

# Convert predictions dictionary to RDD
predictions_rdd = sc.parallelize(list(predictions.items()))
# Coalesce and sort the RDD
predictions_rdd = predictions_rdd.coalesce(1)
predictions_rdd = predictions_rdd.sortByKey()
# Save the RDD as text files
predictions_rdd.saveAsTextFile("BDA/output/predictions")                                                   


## Time(Hour),Prediction using summation kernel, Prediction using Prodoct kernel
('00:00:00', (4.6183467454689477, 6.2916698934306972))  
('04:00:00', (4.7065925124492409, 6.5930895159243779))  
('06:00:00', (4.7595837225876654, 6.7317721621128745))  
('08:00:00', (4.8147176456957137, 6.8593316278114189))  
('10:00:00', (4.8698045237472147, 6.9736545639289158))  
('12:00:00', (4.9231523044830698, 7.0729286567357832))  
('14:00:00', (4.973566225831247, 7.1557335681263039))  
('16:00:00', (5.0203553843682771, 7.2211071751239935))  
('18:00:00', (5.0633437042742591, 7.2685807021593085))  
('20:00:00', (5.1028828756959461, 7.2981802141488545))  
('22:00:00', (5.1398646881935637, 7.3103960959870617))  

## MLlib

In [None]:
from __future__ import division
from pyspark import SparkContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
from datetime import datetime
from math import radians, cos, sin, asin, sqrt, exp
from pyspark.broadcast import Broadcast

# Function to calculate haversine distance
def haversine(lon1, lat1, lon2=16.321998712, lat2=62.38583179): # GEOGRAPHICAL CENTER OF SWEDEN
    # convert decimal degrees to radians
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    # haversine formula
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    km = 6367 * c
    return km

# Function to calculate the difference in days between two dates
def day_diff(day1, day2="1950-01-01"):
    diff = abs(datetime.strptime(str(day1), "%Y-%m-%d") - datetime.strptime(str(day2), "%Y-%m-%d"))
    no_days = diff.days
    return no_days

# Function to calculate the difference in hours between two times
def hour_diff(time1, time2="00:00:00"):
    diff = abs(datetime.strptime(time1, "%H:%M:%S") - datetime.strptime(time2, "%H:%M:%S"))
    diff = (diff.total_seconds()) / 3600
    return diff


sc = SparkContext(appName="Lab 3 ML")
target_date = '2014-12-31'
target_latitude = 58.68681
target_longitude = 15.92183

target_distance = haversine(lon1=target_longitude, lat1=target_latitude) 
target_date_diff = day_diff(day1=target_date)

temperature_file = sc.textFile("BDA/input/temperature-readings.csv")
#temperature_file = temperature_file.sample(withReplacement=False, fraction=0.1)
temperature_data = temperature_file.map(lambda x: x.split(';'))
print(temperature_data.take(1))
print("You are here0")

target_date_strip = datetime.strptime(target_date, '%Y-%m-%d')
prev_temp = temperature_data.filter(lambda x: datetime.strptime(x[1], '%Y-%m-%d') < target_date_strip)
print(prev_temp.take(1))
print("You are here1")

station_file = sc.textFile("BDA/input/stations.csv")
stations_data = station_file.map(lambda x: x.split(';'))

stations_distance = stations_data.map(lambda x: (x[0], haversine(lat1=float(x[3]), lon1=float(x[4]))))

print("You are here2")
stations_distance = stations_distance.collectAsMap()
broadcast_stations_distance = sc.broadcast(stations_distance)

# for training, the plan is to use the distance from lon=lat=0 as the original point
# how many dates have passed since 1950/01/01
# and the hour difference from 00:00:00
# training features are daydiff, hourdiff, distance
training_temp = prev_temp.map(lambda x: (
    float(x[3]), day_diff(x[1]), hour_diff(x[2]), broadcast_stations_distance.value[x[0]]))


print("You are here3")
####standardized####
# Note, since features are standardized, I think we will have to standardize the target as well,
# or the prediction will be super bad
# Or we can try not standardized it, but I did not get it to work at this point
features = training_temp.map(lambda x: x[1:])
standardizer = StandardScaler()
model = standardizer.fit(features)
features_transform = model.transform(features)
label = training_temp.map(lambda x: x[0])
standardized_data = label.zip(features_transform)
labeledpoint = standardized_data.map(lambda x: LabeledPoint(x[0], [x[1]]))
print(labeledpoint.take(10))
print("above is the labeledpoint")
#####################

linearModel = LinearRegressionWithSGD.train(labeledpoint, regType='l2', step=0.25, iterations = 50)
dt_model = DecisionTree.trainRegressor(labeledpoint, categoricalFeaturesInfo={})

######################
# Create a RDD for your target features
hour_list = ["00:00:00", "22:00:00", "20:00:00", "18:00:00", "16:00:00", "14:00:00",
             "12:00:00", "10:00:00", "08:00:00", "06:00:00", "04:00:00"]

target_features = []
for hour in hour_list:
    target_hour = hour_diff(time1=hour)
    target_feature = Vectors.dense([float(target_date_diff), target_hour, target_distance])
    target_features.append(target_feature)
target_features_rdd = sc.parallelize(target_features)

print("----target_features_rdd-----")
print(target_features_rdd.take(10))
print("----target_features_rdd-----")

target_features_transform = model.transform(target_features_rdd)
standardized_target_features = target_features_transform.collect()
#print("----standardized_target_features-----")
#print(standardized_target_features)
#print("----standardized_target_features-----")


print("------Linear-------")
lr_predictions = [linearModel.predict(feature) for feature in standardized_target_features]
print(lr_predictions)
print("------Linear-------")

print("------DT-------")
dt_predictions = [dt_model.predict(feature) for feature in standardized_target_features]
print(dt_predictions)
print("------DT-------")


# save the predictions to a text file
lr_predictions = sc.parallelize(lr_predictions)
lr_predictions = lr_predictions.coalesce(1)
lr_predictions.saveAsTextFile("BDA/output/lr_prediction")
# save the predictions to a text file
dt_predictions = sc.parallelize(dt_predictions)
dt_predictions = dt_predictions.coalesce(1)
dt_predictions.saveAsTextFile("BDA/output/dt_predictions")

Output of linear regression    
3.34001769622  
6.26347240094  
5.99770379142  
5.7319351819  
5.46616657238  
5.20039796286  
4.93462935334  
4.66886074382  
4.4030921343  
4.13732352478  
3.87155491526  

Output of Decision Tree  
3.22156055458  
3.87637925605  
3.87637925605  
5.22146646785  
6.63698189402  
6.63698189402  
6.63698189402  
6.63698189402  
4.98940174481  
3.22156055458  
3.22156055458  