In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Calculate Distances") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [None]:
import string
dfMetroAreasRaw = spark.read.load("../rawdata/Gaz_ua_national.txt", format="csv", delimiter="\t", header=True, inferSchema=True)
dfZCTAsRaw = spark.read.load("../rawdata/Gaz_zcta_national.txt", format="csv", delimiter="\t", header=True, inferSchema=True)
dfMetroAreasRaw = dfMetroAreasRaw.withColumnRenamed(dfMetroAreasRaw.columns[-1],dfMetroAreasRaw.columns[-1].strip(string.whitespace))
dfZCTAsRaw = dfZCTAsRaw.withColumnRenamed(dfZCTAsRaw.columns[-1],dfZCTAsRaw.columns[-1].strip(string.whitespace))
dfMetroAreasRaw.printSchema()
dfZCTAsRaw.printSchema()

In [None]:
dfMetroAreas = dfMetroAreasRaw.select('GEOID','NAME','UATYPE','POP10','HU10','ALAND_SQMI',\
                                      'AWATER_SQMI','INTPTLAT','INTPTLONG')\
                            .filter(dfMetroAreasRaw.POP10>MIN_POP)\
                            .withColumnRenamed('GEOID','m_id')\
                            .withColumnRenamed('NAME', 'name')\
                            .withColumnRenamed('POP10','m_pop')\
                            .withColumnRenamed('HU10','m_house_unit')\
                            .withColumnRenamed('ALAND_SQMI','m_land')\
                            .withColumnRenamed('AWATER_SQMI','m_water')\
                            .withColumnRenamed('INTPTLAT', 'm_lat_d')\
                            .withColumnRenamed('INTPTLONG', 'm_long_d')

In [None]:
import numpy as np
def deg2rad(deg):
    return deg/360*(2*np.pi)
MIN_POP = 50000
dfMetroAreas = dfMetroAreasRaw.select('GEOID','NAME','UATYPE','POP10','HU10','ALAND_SQMI',\
                                      'AWATER_SQMI','INTPTLAT','INTPTLONG')\
                            .filter(dfMetroAreasRaw.POP10>MIN_POP)\
                            .withColumnRenamed('GEOID','m_id')\
                            .withColumnRenamed('NAME', 'name')\
                            .withColumnRenamed('POP10','m_pop')\
                            .withColumnRenamed('HU10','m_house_unit')\
                            .withColumnRenamed('ALAND_SQMI','m_land')\
                            .withColumnRenamed('AWATER_SQMI','m_water')\
                            .withColumnRenamed('INTPTLAT', 'm_lat_d')\
                            .withColumnRenamed('INTPTLONG', 'm_long_d')

dfMetroAreas = dfMetroAreas.withColumn('m_lat_r', deg2rad(dfMetroAreas.m_lat_d)).withColumn('m_long_r', deg2rad(dfMetroAreas.m_long_d))
print(dfMetroAreas.count())
dfMetroAreas.show()

In [None]:
dfZCTAs = dfZCTAsRaw.select('GEOID','POP10','HU10','ALAND_SQMI','AWATER_SQMI','INTPTLAT','INTPTLONG')\
                .withColumnRenamed('GEOID','z_id')\
                .withColumnRenamed('POP10','z_pop')\
                .withColumnRenamed('HU10','z_house_unit')\
                .withColumnRenamed('ALAND_SQMI','z_land')\
                .withColumnRenamed('AWATER_SQMI','z_water')\
                .withColumnRenamed('INTPTLAT', 'z_lat_d')\
                .withColumnRenamed('INTPTLONG', 'z_long_d')


dfZCTAs = dfZCTAs.withColumn('z_lat_r', deg2rad(dfZCTAs.z_lat_d)).withColumn('z_long_r', deg2rad(dfZCTAs.z_long_d))
print(dfZCTAs.count())
dfZCTAs.show()

In [None]:
dfDist = dfZCTAs.join(dfMetroAreas)
print(dfZCTAs.count(),dfMetroAreas.count(),dfZCTAs.count()*dfMetroAreas.count(),dfDist.count())
dfDist.printSchema()
dfDist.show()

In [None]:
# Wikipedia, "Great-circle distance," https://en.wikipedia.org/wiki/Great-circle_distance retrieved 12/9/2016
from pyspark.sql.functions import acos, cos, sin, abs

RMETERS = 6371000 #meters
RMILES = RMETERS*0.000621371

dfDist = dfDist.withColumn('dist',acos(
                            sin(dfDist.z_lat_r)*sin(dfDist.m_lat_r)+
                            cos(dfDist.z_lat_r)*cos(dfDist.m_lat_r)*cos(abs(dfDist.z_long_r-dfDist.m_long_r))
                            )*RMILES)


In [None]:
dfDist.write.csv("../processeddata/distances.parquet",header=True)

In [None]:
dfDist = spark.read.parquet("../processeddata/driv_dist.parquet")
dfDist.printSchema()