In [12]:
import numpy as np
from math import sin, cos, sqrt, radians, atan2, degrees, asin
import os
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *
import pandas as pd
from pyspark.sql.functions import split
from pyspark import SparkContext
from pyspark.sql import SQLContext
from shapely.geometry import Point
import geopandas as gpd
from geopandas import GeoDataFrame
import pandas as pd
import geoplot
from pyspark.sql import functions as F
from math import sin, cos, sqrt, radians, atan2, degrees, asin
from pyspark.sql.window import Window
import boto3
import re
from io import StringIO

In [14]:
#sc = SparkContext("local", "First App")
sqlContext = SQLContext(sc)

In [15]:
#finds euclidian distance
def calc_euclidian_dist(p1_lat,p1_long,p2_lat,p2_long):
    #Initializing radius of earth in KM 
    radius_of_earth=6371
    
    #Converding to radians
    p1_lat_c=radians(float(p1_lat))
    p1_long_c=radians(float(p1_long))
    p2_lat_c=radians(float(p2_lat))
    p2_long_c=radians(float(p2_long))
    
    #Finding cartesian coordinates
    x1=radius_of_earth*cos(p1_lat_c)*cos(p1_long_c)
    y1=radius_of_earth*cos(p1_lat_c)*sin(p1_long_c)
    z1=radius_of_earth*sin(p1_lat_c)
    
    x2=radius_of_earth*cos(p2_lat_c)*cos(p2_long_c)
    y2=radius_of_earth*cos(p2_lat_c)*sin(p2_long_c)
    z2=radius_of_earth*sin(p2_lat_c)
    
    euclidian_dist=sqrt((x2-x1)**2 + (y2-y1)**2 + (z2-z1)**2)
    return euclidian_dist

In [16]:
#finds mean of the points
def add_points(centroid):
    centroid_lat=[]
    centroid_long=[]
    for lat in centroid[0]:
        centroid_lat.append(float(lat))
    for long in centroid[1]:
        centroid_long.append(float(long))
    avg_lat_deg=sum(centroid_lat)/len(centroid_lat)
    avg_long_deg=sum(centroid_long)/len(centroid_long)
    return [avg_lat_deg,avg_long_deg] 

In [17]:
#finds closest centroid to each point
def closest_centroid(point):
    x=[]
    intermediate=point[0].split(" ")
    intermediate_float=[float(x) for x in intermediate]
    for i in range(0,2*k,2):
        x.append(calc_euclidian_dist(intermediate_float[i],intermediate_float[i+1],float(point[1]),float(point[2])))
    return x.index(min(x))     

In [18]:
#user defined pyspark functions
cc = F.udf(lambda point: closest_centroid(point), IntegerType())    
distance_old_new = F.udf(lambda point: calc_euclidian_dist(point[0],point[1],point[2],point[3]),FloatType())
add_dd = F.udf(lambda centroid: add_points(centroid), ArrayType(FloatType())) 

In [19]:
#initializations
k=4
lat=[]
long=[]
ws=Window.orderBy(F.lit(1))

#getting data from S3
s3 = boto3.resource('s3')
bucket = s3.Bucket('geo-clustering')
obj = bucket.Object(key='sample_geo_processed.csv')
response = obj.get()
lines = response[u'Body'].read().split(b'\n')

raw_data=[]
for line in lines[1:]:
     raw_data.append(str(line, 'utf-8'))
x=raw_data.pop()

In [20]:
#spliiting the data to get latitudes and longitudes only
raw_rdd = sc.parallelize(raw_data)
split_rdd=raw_rdd.map(lambda line: re.split('\|',line)).map(lambda line: [line[0],line[1]])
data_df=split_rdd.toDF()

In [21]:
#taking initial centroids of size k
centroids=data_df.rdd.takeSample(False, k)
for i in range(0,4):
    lat.append(centroids[i][0])
    long.append(centroids[i][1])
init_lat=pd.DataFrame(lat,columns=["latitude"])
init_long=pd.DataFrame(long,columns=["longitude"])
init = pd.concat([init_lat, init_long], axis=1)

In [22]:
iternationDistanceFloat=99999999
while iternationDistanceFloat>5:
    
    #preprocessing to convert datatypes for calculations and renaming columns
    centroid_df=pd.DataFrame(centroids)
    falttened_df=centroid_df.values.flatten()
    listToStr = ' '.join([str(elem) for elem in falttened_df])
    data_df2=data_df.withColumn('latitude',data_df['_1'].cast(StringType())).withColumn('longitude',data_df['_2'].cast(StringType())) 
    data_df3=data_df.withColumn('centroid',F.lit(listToStr))
    data_df3 = data_df3.withColumnRenamed("_1","latitude").withColumnRenamed("_2","longitude")
 
    #calling closest centroid UDF
    data_df_final=data_df3.withColumn("closest_centroid",cc(F.array('centroid','latitude', 'longitude')))


    #finding clusters
    data_df_final_list=data_df_final.groupby(data_df_final['closest_centroid'])\
    .agg(F.collect_list(data_df_final['latitude']),F.collect_list(data_df_final['longitude']))\
    .withColumnRenamed("collect_list(latitude)","lat_list",)\
    .withColumnRenamed("collect_list(longitude)","long_list")\
    .sort(data_df_final['closest_centroid'])
    
    #Finding mean of every cluster & converting to pandas df
    with_mean=data_df_final_list.withColumn("mean",add_dd(F.array('lat_list', 'long_list')))   
    new_centroids=with_mean.withColumn("latitude",with_mean.mean[0]).withColumn("longitude",with_mean.mean[1])
    new_centroid_mean=new_centroids.select("latitude","longitude")
    new_centroids_pd=new_centroid_mean.toPandas() 
    
    #replacing initial clusters with new centroids
    init=new_centroids_pd
    new_centroids=new_centroid_mean.withColumnRenamed("latitude","_1").withColumnRenamed("longitude","_2").collect()
    old_cent=sqlContext.createDataFrame(centroids)
    old_cent=old_cent.withColumnRenamed("_1","old_latitude").withColumnRenamed("_2","old_longitude").withColumn("id",F.row_number().over(ws))
    new_centroid_mean=new_centroid_mean.withColumnRenamed("latitude","new_latitude").withColumnRenamed("longitude","new_longitude").withColumn("id",F.row_number().over(ws))
    combined_old_new=old_cent.join(new_centroid_mean,on="id",how="outer")                              
    
    #calculating overall distance bewteen old and new clusters to find stop condition
    distance=combined_old_new.withColumn("distance",distance_old_new(F.array("old_latitude","old_longitude","new_latitude","new_longitude"))).groupBy().agg(F.sum('distance'))
    iterationDistance=distance.collect()[0]
    iternationDistanceFloat=(float(iterationDistance[0]))
    
    #print to check
    data_df_final_list.show()
    print("Iteration distance:", iternationDistanceFloat)
    
    #Assigning for next iteration
    centroids=new_centroids

+----------------+--------------------+--------------------+
|closest_centroid|            lat_list|           long_list|
+----------------+--------------------+--------------------+
|               0|[34.27036086, 33....|[-118.3162918, -1...|
|               1|[37.77253945, 39....|[-77.49954987, -7...|
|               2|[36.24009843, 33....|[-115.1586914, -1...|
|               3|[42.09013298, 39....|[-87.68915558, -8...|
+----------------+--------------------+--------------------+

Iteration distance: 1207.6920928955078
+----------------+--------------------+--------------------+
|closest_centroid|            lat_list|           long_list|
+----------------+--------------------+--------------------+
|               0|[36.24009843, 34....|[-115.1586914, -1...|
|               1|[37.77253945, 39....|[-77.49954987, -7...|
|               2|[38.81664153, 29....|[-97.62573242, -9...|
|               3|[42.09013298, 39....|[-87.68915558, -8...|
+----------------+--------------------+------

In [58]:
#saving final cluster list on s3
final_clusteroids=data_df_final_list.toPandas()
csv_buffer = StringIO()
final_clusteroids.to_csv(csv_buffer, sep="|", index=False)
s3.Object("geo-clustering", "final_clusters.csv").put(Body=csv_buffer.getvalue())

{'ResponseMetadata': {'RequestId': '6BD7547A8328C138',
  'HostId': 'dyfvdDRzeDSNVC74kuwMQO9MQ4pMnk6shTq4g+7GU6oH87ANKrahDtdmZYlmWpDm4xIHtPaUeig=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'dyfvdDRzeDSNVC74kuwMQO9MQ4pMnk6shTq4g+7GU6oH87ANKrahDtdmZYlmWpDm4xIHtPaUeig=',
   'x-amz-request-id': '6BD7547A8328C138',
   'date': 'Sat, 28 Nov 2020 19:55:42 GMT',
   'etag': '"4c67b32bb086cce758d3fb7d0567b064"',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"4c67b32bb086cce758d3fb7d0567b064"'}