In [1]:
import numpy as np
from scipy import spatial
import time
import pandas as pd
import pickle
from pyspark.sql.functions import udf,col,cos,sin
from pyspark.sql.types import *
from pyspark.sql.dataframe import *
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import monotonically_increasing_id
from multiprocessing.dummy import Pool
earth_rad=6371
deg_to_rad=0.0174533
nneib_dist=500
#multiprocessing.cpu_count()

In [None]:
spark.sparkContext.getConf().getAll()

In [9]:
obj_sql = "select latitude as lat, longitude as lon, object_type as obj_type from advanlwork.geo_objects where latitude < 90 and latitude > -90 and latitude <> 0" 
obj_ds = spark.sql(obj_sql)

In [3]:
obj_ds.show(5)

+--------------------+--------------------+-----------+
|                 lat|                 lon|   obj_type|
+--------------------+--------------------+-----------+
|39.02684000000000...|-94.4558870000000...| prox_never|
|39.02666500000000...|-94.4494320000000...| prox_never|
|39.02402600000000...|-94.4600600000000...|prox_active|
|39.01933300000000...|-94.4792940000000...|prox_active|
|39.01406060000000...|-94.4646107000000...|prox_fmr0t1|
+--------------------+--------------------+-----------+
only showing top 5 rows



In [10]:
obj_ds = obj_ds.withColumn("lat2", obj_ds["lat"].cast(DoubleType()))
obj_ds = obj_ds.withColumn("lon2", obj_ds["lon"].cast(DoubleType()))

In [11]:
obj_ds = obj_ds.withColumn("Cord1",earth_rad*cos(deg_to_rad*obj_ds["lat2"])*cos(deg_to_rad*obj_ds["lon2"]))
obj_ds = obj_ds.withColumn("Cord2",earth_rad*cos(deg_to_rad*obj_ds["lat2"])*sin(deg_to_rad*obj_ds["lon2"]))
obj_ds = obj_ds.withColumn("Cord3",earth_rad*sin(deg_to_rad*obj_ds["lat2"]))



In [12]:
obj_ds.createOrReplaceTempView("data")
obj=spark.sql("select obj_type,CAST (Cord1 as DECIMAL(8,4)),CAST (Cord2 as DECIMAL(8,4)),CAST (Cord3 as DECIMAL(8,4)) from data")

In [13]:
###### write to disk, this will save run time
obj.write.mode("overwrite").saveAsTable("mbansa001c.obj_mod")

In [25]:
obj_ds.unpersist()
obj.unpersist()

DataFrame[obj_type: string, Cord1: decimal(8,4), Cord2: decimal(8,4), Cord3: decimal(8,4)]

In [2]:
obj_sql_mod = "select * from mbansa001c.obj_mod" 
obj_mod = spark.sql(obj_sql_mod)

In [22]:
obj_mod.show(3)

+-----------+--------+----------+---------+
|   obj_type|   Cord1|     Cord2|    Cord3|
+-----------+--------+----------+---------+
| prox_never|189.1617|-4737.3109|4255.7894|
|prox_active|189.1116|-4737.3510|4255.7471|
|prox_active|189.8875|-4737.3765|4255.6842|
+-----------+--------+----------+---------+
only showing top 3 rows



In [3]:
obj_list_cur=obj_mod.select("obj_type").distinct().rdd.map(lambda row : row[0]).collect()

In [4]:
len(obj_list_cur)

47

In [17]:
start=time.time()

for i in range(len(obj_list_cur)):
                print (obj_list_cur[i])
                start1=time.time()
                obj_filter=obj_mod.select("Cord1","Cord2","Cord3").where(col("obj_type")==obj_list_cur[i])
                np_obj_filter=np.array(obj_filter.rdd.map(lambda l: (l[0],l[1],l[2])).collect())
                tree = spatial.cKDTree(np_obj_filter,leafsize=16)
                with open('/home/mbansa001c/tree_'+str(obj_list_cur[i]), 'wb') as f:
                      pickle.dump(tree, f)
                print("Took " + str(round(((time.time())-start1)/60)) + " minutes")
print("Done! Overall Took " + str(round(((time.time())-start)/60)) + " minutes")

prox_coinstar
Took 0.0 minutes
prox_fmr0t1
Took 2.0 minutes
prox_hotel
Took 0.0 minutes
prox_upsdrop
Took 0.0 minutes
prox_active
Took 16.0 minutes
prox_greendot
Took 0.0 minutes
prox_invest
Took 0.0 minutes
prox_resi_cdv
Took 5.0 minutes
prox_bank
Took 0.0 minutes
prox_shoes
Took 0.0 minutes
prox_grocery
Took 0.0 minutes
prox_calc
Took 0.0 minutes
prox_beauty
Took 0.0 minutes
prox_mobilekiosk
Took 0.0 minutes
prox_starbucks
Took 0.0 minutes
prox_bankofmurica
Took 0.0 minutes
prox_bus_cdv
Took 1.0 minutes
prox_gas
Took 0.0 minutes
prox_hnrblock
Took 0.0 minutes
prox_comb_banks
Took 0.0 minutes
prox_subway
Took 0.0 minutes
prox_fmr4pl
Took 0.0 minutes
prox_msft_pinpt
Took 0.0 minutes
prox_vzw
Took 0.0 minutes
prox_24hrs
Took 0.0 minutes
prox_farmequip
Took 0.0 minutes
prox_insurance
Took 0.0 minutes
prox_westunion
Took 0.0 minutes
prox_never
Took 23.0 minutes
prox_unsvc_resi
Took 5.0 minutes
prox_cricket
Took 0.0 minutes
prox_church
Took 0.0 minutes
prox_bizsvc
Took 0.0 minutes
prox_bak

In [18]:
cust_sql = "select cast(serloc_longitude as Decimal(24,20)) as lon, cast(serloc_latitude as Decimal(24,20)) as lat, klondike_monthly.housekey from advanl.klondike_monthly inner join (select housekey from advanl.klondike_monthly where year=2018 and month=1 and coalesce(housekey,'')<>'' group by housekey having count(1)=1) as rsUnique on klondike_monthly.housekey = rsUnique.housekey where year=2018 and month=1 and coalesce(serloc_latitude,'')<>'' and coalesce(serloc_longitude,'')<>'' and serloc_latitude <> 0"
cust_ds = spark.sql(cust_sql)

In [19]:
cust_ds.count()

59263206

In [20]:
cust_ds = cust_ds.withColumn("lat2", cust_ds["lat"].cast(DoubleType()))
cust_ds = cust_ds.withColumn("lon2", cust_ds["lon"].cast(DoubleType()))

cust_ds = cust_ds.withColumn("Cord1",earth_rad*cos(deg_to_rad*cust_ds["lat2"])*cos(deg_to_rad*cust_ds["lon2"]))
cust_ds = cust_ds.withColumn("Cord2",earth_rad*cos(deg_to_rad*cust_ds["lat2"])*sin(deg_to_rad*cust_ds["lon2"]))
cust_ds = cust_ds.withColumn("Cord3",earth_rad*sin(deg_to_rad*cust_ds["lat2"]))

In [21]:
cust_ds.createOrReplaceTempView("data1")
obj_cust=spark.sql("select housekey,CAST (Cord1 as DECIMAL(8,4)),CAST (Cord2 as DECIMAL(8,4)),CAST (Cord3 as DECIMAL(8,4)) from data1")

In [22]:
###### write to disk, this will save run time
obj_cust.write.mode("overwrite").saveAsTable("mbansa001c.cust_mod")

In [28]:
obj_mod.unpersist()
cust_ds.unpersist()
obj_cust.unpersist()
sc._jvm.System.gc()

In [5]:
obj_mod.unpersist()

DataFrame[obj_type: string, Cord1: decimal(8,4), Cord2: decimal(8,4), Cord3: decimal(8,4)]

In [5]:
cust_sql_mod = "select * from mbansa001c.cust_mod" 
cust_mod = spark.sql(cust_sql_mod)

In [6]:
o={}
start=time.time()
k=0
obj_names=[]
for i in range(len(obj_list_cur)):
        with open('/home/mbansa001c/tree_'+str(obj_list_cur[i]), "rb") as f:
               tree = pickle.load(f)   
        kdt_b = sc.broadcast(tree)
        points_parsed= cust_mod.select('Cord1','Cord2','Cord3').rdd.map(lambda l: (l[0],l[1],l[2]))
        t = points_parsed.map(lambda x: kdt_b.value.query_ball_point(x,r=nneib_dist/1000))
        t_length = t.map(lambda x: len(x))
        o[k] = t_length.toDF('int')
        k=k+1
        obj_names.append(str(obj_list_cur[i])+"_cnt"+str(nneib_dist))


print("Done! Took " + str(round(((time.time())-start)/60)) + " minutes")

Done! Took 3.0 minutes


In [7]:
df={}
start=time.time()
for i in range(len(obj_list_cur)+1):
    if i==0:
        df[i] = cust_mod.select('housekey')
        df[i+1] = df[i].rdd.zipWithIndex().toDF()
     
    else:
        df_tmp = o[i-1].select("value").rdd.zipWithIndex().toDF()
        df_tmp1 = df_tmp.select(col("_1").alias(obj_names[i-1]),col("_2"))
        
        df[i+1] = df[i].join(df_tmp1, "_2", "inner").drop(df_tmp1._2)
        df[i].unpersist()

print("done")
print("Done! Took " + str(round(((time.time())-start)/60)) + " minutes")

done
Done! Took 34.0 minutes


In [41]:
df[i+1].count()

59263206

In [49]:
df_final = df[i+1].withColumnRenamed("_1", "housekey").drop(df[i+1]._2)

In [48]:
start=time.time()
df_final.write.mode("overwrite").saveAsTable("mbansa001c.final_count")
print("Done! Took " + str(round(((time.time())-start)/60)) + " minutes")

Done! Took 6.0 minutes
