In [7]:
import numpy as np
from multiprocessing import process as WORKER
from pyspark.sql import SparkSession 
from pyspark.sql.types import *

In [37]:
gazze_file = "2018_Gaz_counties_national.csv"
threat_loc_file = "location.csv"

spss = SparkSession.builder\
    .master('local[*]')\
    .appName("sec_lab_4")\
    .getOrCreate()

threat_schema = StructType([
    StructField('ID', IntegerType()),
    StructField('LONG', DoubleType()),
    StructField('LAT', DoubleType()),
    StructField('COUNTRY', StringType()),
    StructField('COUNTRY_2', StringType()),
    StructField('COUNTRY_FULL', StringType()),
    StructField('CITY', StringType()),
    StructField('ZIP', IntegerType()),
    StructField('STATE', StringType()),
    StructField('NA_1', IntegerType()),
    StructField('NA_2', IntegerType()),
])


ga_sppd = spss.read.csv(gazze_file, inferSchema=True, header = True)
loca_sppd = spss.read.csv(threat_loc_file, schema=threat_schema, header=True)
ga_sppd.printSchema()
loca_sppd.printSchema()

root
 |-- USPS: string (nullable = true)
 |-- GEOID: integer (nullable = true)
 |-- ANSICODE: integer (nullable = true)
 |-- NAME: string (nullable = true)
 |-- ALAND: long (nullable = true)
 |-- AWATER: long (nullable = true)
 |-- ALAND_SQMI: double (nullable = true)
 |-- AWATER_SQMI: double (nullable = true)
 |-- INTPTLAT: double (nullable = true)
 |-- INTPTLONG: double (nullable = true)

root
 |-- ID: integer (nullable = true)
 |-- LONG: double (nullable = true)
 |-- LAT: double (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- COUNTRY_2: string (nullable = true)
 |-- COUNTRY_FULL: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- ZIP: integer (nullable = true)
 |-- STATE: string (nullable = true)
 |-- NA_1: integer (nullable = true)
 |-- NA_2: integer (nullable = true)



In [42]:
def distance(instance1, instance2):
    # just in case, if the instances are lists or tuples:
    instance1 = np.array(instance1) 
    instance2 = np.array(instance2)
    error = np.square(instance1 - instance2)
    return np.sqrt(np.sum(error))

In [43]:
def get_neighbors(training_set,  
                  test_instance, 
                  k, 
                  distance=distance):
    """
    get_neighors calculates a list of the k nearest neighbors
    of an instance 'test_instance'.
    The list neighbors contains 3-tuples with  
    (index, dist, label)
    where 
    index    is the index from the training_set, 
    dist     is the distance between the test_instance and the 
             instance training_set[index]
    distance is a reference to a function used to calculate the 
             distances
    """
    distances = []
    for index in range(len(training_set)):
        dist = distance(test_instance, training_set[index])
        distances.append((training_set[index], dist))
    distances.sort(key=lambda x: x[1])
    neighbors = distances[:k]
    return neighbors

In [44]:
ga_pd = ga_sppd.toPandas()
threat_pd = loca_sppd.toPandas()
test = np.array(threat_pd[['LAT', 'LONG']][:1])
train = np.array(ga_pd[['INTPTLAT', 'INTPTLONG']])
print(test)
print('neighbors')
for out in get_neighbors(train, test, 3):
    print out

[[  33.7081 -117.7596]]
neighbors
(array([  33.675687, -117.777207]), 0.03688643406456934)
(array([  34.196398, -118.261862]), 0.7005012872564816)
(array([  33.023604, -116.776117]), 1.1982376998346436)
