In [1]:
from __future__ import division
from math import radians, cos, sin, asin, sqrt, exp
from datetime import datetime
from pyspark import SparkContext

sc = SparkContext(appName="lab_kernel_2")

def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    # haversine formula
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a))
    km = 6367 * c
    return km

h_distance = 80
h_date = 10
h_time = 6
a = 58.40 # latitude
b = 15.62 # longitude
date = "2013-11-04"


## date
def d_date(date1, date2):
    date1 = datetime.strptime(date1, "%Y-%m-%d")
    date2 = datetime.strptime(date2, "%Y-%m-%d")
    output = abs((date2 - date1).days)
    return output

## time
def d_time(time1,time2):
    time1 = datetime.strptime(time1, '%H:%M:%S')
    time2 = datetime.strptime(time2, '%H:%M:%S')
    output = abs((time1-time2).seconds/(60**2))
    return output

def add_kernels(d1, d2, d3):
    k1 = exp(- d1**2 / (2*(h_distance**2)))
    k2 = exp(- d2**2 / (2*(h_date**2)))
    k3 = exp(- d3**2 / (2*(h_time**2)))
    output = k1 + k2 + k3
    return output
    
def mul_kernels(d1, d2, d3):
    k1 = exp(- d1**2 / (2*(h_distance**2)))
    k2 = exp(- d2**2 / (2*(h_date**2)))
    k3 = exp(- d3**2 / (2*(h_time**2)))
    output = k1 * k2 * k3
    return output


stations = sc.textFile("/Users/darin/Desktop/stations.csv")
temp = sc.textFile("/Users/darin/Desktop/temperature-readings-small.csv")

###########
stations = stations.map(lambda x: x.split(";"))
stations = stations.map(lambda x: (x[0],haversine(b,a,float(x[4]),float(x[3]))))

m=sc.parallelize(stations.collect()).collectAsMap()
stations=sc.broadcast(m)


###########
temp = temp.map(lambda x: x.split(";"))     
temp = temp.map(lambda x: (stations.value[str(x[0])],x[1],x[2],float(x[3])))

filter_temp = temp.filter(lambda x: x[1] <= date)

filter_temp = filter_temp.map(lambda x: (x[0], d_date(date,x[1]), d_time(time,x[2]), x[3]))

##########
predicted_add=[]
predicted_mul=[]


for time in ["00:00:00", "22:00:00", "20:00:00", "18:00:00", "16:00:00", "14:00:00",
"12:00:00", "10:00:00", "08:00:00", "06:00:00", "04:00:00"]:
    add_allkernels = filter_temp.map(lambda x:(add_kernels(x[0],x[1],x[2]),x[3])).map(lambda x:(x[0],x[0]*x[1])).reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    predicted_add.append(add_allkernels[1] / add_allkernels[0])

    mul_allkernels = filter_temp.map(lambda x:(mul_kernels(x[0],x[1],x[2]),x[3])).map(lambda x:(x[0],x[0]*x[1])).reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    predicted_mul.append(mul_allkernels[1] / mul_allkernels[0])
    
print(predicted_add)
print('\n')
print(predicted_mul)
sc.parallelize(predicted_add).saveAsTextFile("add_kernel_predictions")
sc.parallelize(predicted_mul).saveAsTextFile("mul_kernel_predictions")   


sc.stop()

[3.876649015227585, 3.845461804411638, 3.832316480947907, 3.8439126628440006, 4.110669123798261, 4.151395306904242, 4.19238583014905, 4.224023221607601, 4.237505282130029, 4.2261149305438375, 3.9573819861789064]


[3.3885900519940253, 3.410613572232551, 3.45145724020793, 3.5240266060201395, 4.696178708661572, 4.690142331419868, 4.678540799990786, 4.6565172797522605, 4.615673611776882, 4.543104245964671, 3.37095214332324]


Py4JJavaError: An error occurred while calling o250.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/Users/darin/add_kernel_predictions already exists
	at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.assertConf(SparkHadoopWriter.scala:298)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:71)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1090)
	at org.apache.spark.rdd.PairRDDFunctions$$Lambda$1384/801185334.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1088)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
	at org.apache.spark.rdd.PairRDDFunctions$$Lambda$1382/1408888566.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions$$Lambda$1381/261548291.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1007)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:964)
	at org.apache.spark.rdd.PairRDDFunctions$$Lambda$1380/2013855721.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1578)
	at org.apache.spark.rdd.RDD$$Lambda$1376/1075405481.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1578)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1564)
	at org.apache.spark.rdd.RDD$$Lambda$1375/1105259101.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:551)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
