In [None]:
# pip
# !pip install tqdm
# !pip install dask
# !pip install apache-sedona
# !pip install shapely

In [None]:
# conda
# !conda install tqdm
# !conda install dask
#!conda install findspark

In [1]:
from sedona.register import SedonaRegistrator  
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
from shapely.geometry import Point
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from haversine import haversine, Unit
import pyspark.sql.types as types

In [2]:
# configuration, worked on using python@3.10.9 
import os
import urllib
import json
from threading import Thread, Lock
from tqdm import tqdm
from kafka import KafkaConsumer, KafkaProducer
import pyspark
import pandas

In [3]:
conf = pyspark.SparkConf()\
    .setMaster("local[7]")\
    .set("spark.eventLog.enabled", "true")\
    .set("spark.eventLog.dir", "./logs")\
    .set("spark.eventLog.gcMetrics.youngGenerationGarbageCollectors", "true")\
    .set("spark.executor.heartbeatInterval","3600s")\
    .set("spark.network.timeout","3601s")
    # .set("spark.kryo.registrator", SedonaKryoRegistrator.getName)\
    # .set("spark.kryo.registrator", SedonaKryoRegistrator.getName)\
    # .set('spark.jars.packages',
           # 'org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.4.0,'
           # 'org.datasyslab:geotools-wrapper:1.4.0-28.2')

'''
sparkSession = SparkSession. \
    builder. \
    appName('appName'). \
    config("spark.serializer", KryoSerializer.getName). \
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-spark-shaded-3.0_2.12:1.4.0,'
           'org.datasyslab:geotools-wrapper:1.4.0-28.2'). \
    getOrCreate()
'''

sc = pyspark.SparkContext(conf=conf)
sc.setLogLevel('ERROR')
spark = pyspark.sql.SparkSession(sc)
spark

In [None]:
# SedonaRegistrator.registerAll(spark)

In [4]:
# data folder
data_dir = 'data'

# data urls
historic_arrest_loc = { 'url': 'https://data.cityofnewyork.us/resource/8h9b-rp9u.json?$limit=10', 'filename': 'lil_arrest.json' }
historic_complaint_loc = { 'url': 'https://data.cityofnewyork.us/resource/qgea-i56i.json?$limit=15000000', 'filename': 'complaint.json' }
historic_court_summons_loc = { 'url': 'https://data.cityofnewyork.us/resource/sv2w-rv3k.json?$limit=15000000', 'filename': 'summons.json' }
traffic_speed_loc = { 'url': 'https://data.cityofnewyork.us/resource/i4gi-tjb9.json?$limit=15000000', 'filename': 'speed.json' }
turnstile_loc = { 'url': 'https://data.ny.gov/resource/i55r-43gk.json?$limit=15000000', 'filename': 'turnstile.json' }
#subway_loc = { 'url': 'https://data.ny.gov/resource/i9wp-a4ja.json?$limit=15000000', 'filename': 'subway.json' }
subway_loc = { 'url': 'http://web.mta.info/developers/data/nyct/subway/Stations.csv?$limit=15000000', 'filename': 'subway.csv' }

In [5]:
# download flags
downloadflag = True
redownload = False

thread_lock = Lock()

# download utils
def download_dataset_thread(loc, folder):
    with thread_lock:
         if ((not os.path.exists(os.path.join(folder, loc['filename']))) or redownload) and downloadflag:
            if os.path.isfile(os.path.join(folder, loc['filename'])):
                os.remove(os.path.join(folder, loc['filename']))
            if not os.path.exists(folder):
                os.makedirs(folder) 
            with tqdm(unit="B", unit_scale=True, desc=loc['filename'], miniters=1) as progress_bar:
                urllib.request.urlretrieve(loc['url'], os.path.join(folder, loc['filename']), lambda block_num, block_size, total_size: progress_bar.update(block_size))
            progress_bar.display()
        
def download_dataset(loc, folder):
    thread = Thread(target=download_dataset_thread, args=(loc, folder))
    thread.start()
    thread.join()
        

In [6]:
# download datasets
for dataset in [historic_arrest_loc,
                historic_complaint_loc,
                historic_court_summons_loc,
                turnstile_loc,
                subway_loc]:
    download_dataset(dataset, data_dir)

In [None]:
# dataframes
arrest_rdd = spark.read.json(os.path.join(data_dir, historic_arrest_loc['filename']), multiLine=True)
complaint_rdd = spark.read.json(os.path.join(data_dir, historic_complaint_loc['filename']), multiLine=True)
summons_rdd = spark.read.json(os.path.join(data_dir, historic_court_summons_loc['filename']), multiLine=True)
turnstile_rdd = spark.read.json(os.path.join(data_dir, turnstile_loc['filename']), multiLine=True)
subway_rdd = spark.read.csv(os.path.join(data_dir, subway_loc['filename']), header=True, inferSchema=True)

In [7]:
#subway_rdd = spark.read.json(os.path.join(data_dir, subway_loc['filename']), multiLine=True)
subway_rdd = spark.read.csv(os.path.join(data_dir, subway_loc['filename']), header=True, inferSchema=True)

In [None]:
arrest_rdd = spark.read.json(os.path.join(data_dir, historic_arrest_loc['filename']), multiLine=True)

In [8]:
lil_arrest_rdd = spark.read.json(os.path.join(data_dir, historic_arrest_loc['filename']), multiLine=True)

In [9]:
# subway_DF = subway_rdd.toDF("ada", "ada_notes", "corner", "division", "east_west_street", "entrance_georeference", "entrance_latitude", "entrance_location", "entrance_longitude", "entrance_type", "entry", "exit_only", "free_crossover", "line", "north_south_street", "route1", "route10", "route11", "route2", "route3", "route4", "route5", "route6", "route7", "route8", "route9", "staff_hours", "staffing", "station_georeference", "station_latitude", "station_location", "station_longitude", "station_name", "vending")
subway_DF = subway_rdd.toDF("Station ID", "Complex ID", "GTFS Stop ID", "Division", "Line", "Stop Name", "Borough", "Daytime Routes", "Structure", "GTFS Latitude", "GTFS Longitude", "North Direction Label", "South Direction Label", "ADA", "ADA Direction Notes", "ADA NB", "ADA SB", "Capital Outage NB", "Capital Outage SB")

In [13]:
#subway_DF = subway_DF.select("station_name","station_latitude", "station_longitude", "station_location", "station_georeference")
subway_DF = subway_DF.select('Station ID', 'Complex ID', 'GTFS Stop ID', 'Stop Name', 'Borough', 'GTFS Latitude','GTFS Longitude')\
                     .filter(F.col('GTFS Latitude').isNotNull() & F.col('GTFS Longitude').isNotNull())

In [14]:
subway_DF.show()

+----------+----------+------------+--------------------+-------+-------------+--------------+
|Station ID|Complex ID|GTFS Stop ID|           Stop Name|Borough|GTFS Latitude|GTFS Longitude|
+----------+----------+------------+--------------------+-------+-------------+--------------+
|         1|         1|         R01|Astoria-Ditmars Blvd|      Q|    40.775036|    -73.912034|
|         2|         2|         R03|        Astoria Blvd|      Q|    40.770258|    -73.917843|
|         3|         3|         R04|               30 Av|      Q|    40.766779|    -73.921479|
|         4|         4|         R05|            Broadway|      Q|     40.76182|    -73.925508|
|         5|         5|         R06|               36 Av|      Q|    40.756804|    -73.929575|
|         6|         6|         R08|   39 Av-Dutch Kills|      Q|    40.752882|    -73.932755|
|         7|       613|         R11|  Lexington Av/59 St|      M|     40.76266|    -73.967258|
|         8|         8|         R13|          5 Av

In [15]:
subway_DF.printSchema()

root
 |-- Station ID: integer (nullable = true)
 |-- Complex ID: integer (nullable = true)
 |-- GTFS Stop ID: string (nullable = true)
 |-- Stop Name: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- GTFS Latitude: double (nullable = true)
 |-- GTFS Longitude: double (nullable = true)



In [16]:
subway_DF.count()

496

In [None]:
# CONSIDER DROPPING SUBWAY STOPS WITH DUPLICATE NAMES?
# subway_DF.dropDuplicates(subset=["Stop Name"]).count()

Adapting the Haversine function from Homework 2. Setting it to check distance in meters between lat/lon pairs

In [17]:
def withinMeters(slat, slong, dlat, dlong):
    srs = (slat, slong)
    dst = (dlat, dlong)
    print(type(srs[0]),type(srs[1]))
    print(srs[0],srs[1])
    print(type(dst[0]),type(dst[1]))
    print(dst[0],dst[1])
    distance = float(haversine(srs, dst,unit=Unit.METERS))
    print(distance)
    return bool(distance < 402)
    
withinMetersUdf = F.udf(withinMeters, types.BooleanType())

Tried to get the join to work on the full arrest data set but failed and have commented it out

In [None]:
# arrest_DF = arrest_rdd.toDF(":@computed_region_92fq_4b7q", ":@computed_region_efsh_h5xi", ":@computed_region_f5dn_yrer", ":@computed_region_sbqj_enih", ":@computed_region_yeji_bk3q", "age_group", "arrest_boro", "arrest_date", "arrest_key", "arrest_precinct", "jurisdiction_code", "ky_cd", "latitude", "law_cat_cd", "law_code", "lon_lat", "longitude", "ofns_desc", "pd_cd", "pd_desc", "perp_race", "perp_sex", "x_coord_cd", "y_coord_cd")

In [None]:
# arrest_DF = arrest_DF.select("arrest_boro","arrest_date", "arrest_key", "latitude", "longitude", )

In [None]:
# subway_DF = subway_DF.join(arrest_DF, withinMetersUdf('GTFS Latitude', 'GTFS Longitude', 'latitude', 'longitude'), 'cross')\
    # .drop(F.col('latitude'))\
    # .drop(F.col('longitude'))

In [None]:
# subway_DF.printSchema()

In [None]:
# arrest_count = subway_DF.groupBy(F.col('Stop Name')).count()
# arrest_count = arrest_count.dropDuplicates(subset=["Stop Name"])

In [None]:
# arrest_count.printSchema()

In [None]:
# arrest_count.count()

Building the smaller arrest dataset (10 rows) and trying to join with the subway data using the haversine function.

In [18]:
lil_arrest_DF = lil_arrest_rdd.toDF(":@computed_region_92fq_4b7q", ":@computed_region_efsh_h5xi", ":@computed_region_f5dn_yrer", ":@computed_region_sbqj_enih", ":@computed_region_yeji_bk3q", "age_group", "arrest_boro", "arrest_date", "arrest_key", "arrest_precinct", "jurisdiction_code", "ky_cd", "latitude", "law_cat_cd", "law_code", "lon_lat", "longitude", "ofns_desc", "pd_cd", "pd_desc", "perp_race", "perp_sex", "x_coord_cd", "y_coord_cd")

In [19]:
lil_arrest_DF = lil_arrest_DF.select("arrest_boro","arrest_date", "arrest_key", "latitude", "longitude", ).filter(F.col('latitude').isNotNull() & F.col('longitude').isNotNull()).withColumn("longitude", F.col("longitude").cast("double")).withColumn("latitude", F.col("latitude").cast("double"))

In [20]:
lil_arrest_DF.show()

+-----------+--------------------+----------+------------------+------------------+
|arrest_boro|         arrest_date|arrest_key|          latitude|         longitude|
+-----------+--------------------+----------+------------------+------------------+
|          M|2021-11-22T00:00:...| 236791704|40.799008797000056|-73.95240854099995|
|          B|2021-12-04T00:00:...| 237354740|40.816391847000034|-73.89529641399997|
|          Q|2021-11-09T00:00:...| 236081433| 40.67970040800003|-73.77604736799998|
|          M|2019-01-26T00:00:...| 192799737|40.800694331000045|-73.94110928599997|
|          M|2019-02-06T00:00:...| 193260691| 40.75783900300007|-73.99121211099998|
|          Q|2021-12-03T00:00:...| 237291769| 40.77205649600006|-73.87622400099998|
|          B|2021-11-10T00:00:...| 236106641|40.804012949000025|-73.87833183299993|
|          Q|2021-12-28T00:00:...| 238383628| 40.69166001700007|-73.77919852099996|
|          K|2016-01-06T00:00:...| 149117452|40.648650085000035|-73.95033556

In [21]:
lil_arrest_DF.printSchema()

root
 |-- arrest_boro: string (nullable = true)
 |-- arrest_date: string (nullable = true)
 |-- arrest_key: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [22]:
lil_arrest_DF.show()

+-----------+--------------------+----------+------------------+------------------+
|arrest_boro|         arrest_date|arrest_key|          latitude|         longitude|
+-----------+--------------------+----------+------------------+------------------+
|          M|2021-11-22T00:00:...| 236791704|40.799008797000056|-73.95240854099995|
|          B|2021-12-04T00:00:...| 237354740|40.816391847000034|-73.89529641399997|
|          Q|2021-11-09T00:00:...| 236081433| 40.67970040800003|-73.77604736799998|
|          M|2019-01-26T00:00:...| 192799737|40.800694331000045|-73.94110928599997|
|          M|2019-02-06T00:00:...| 193260691| 40.75783900300007|-73.99121211099998|
|          Q|2021-12-03T00:00:...| 237291769| 40.77205649600006|-73.87622400099998|
|          B|2021-11-10T00:00:...| 236106641|40.804012949000025|-73.87833183299993|
|          Q|2021-12-28T00:00:...| 238383628| 40.69166001700007|-73.77919852099996|
|          K|2016-01-06T00:00:...| 149117452|40.648650085000035|-73.95033556

Joining the subway dataframe to the smaller arrest data set unsuccessfully. I tried this with just the strings for the column names and with the F.col() structure but didn't have any success either way

In [23]:
subway_arrest_DF = subway_DF.join(lil_arrest_DF, withinMetersUdf(F.col('GTFS Latitude'), F.col('GTFS Longitude'), F.col('latitude'), F.col('longitude')), 'cross')\
    .drop(F.col('latitude'))\
    .drop(F.col('longitude'))

The error occurs when trying to execute the pipeline graph with the show() below

In [None]:
subway_arrest_DF = subway_arrest_DF.na.drop(subset=["Stop Name"])
lil_arrest_count = subway_arrest_DF.groupBy(F.col('Stop Name')).count()
lil_arrest_count.printSchema()
lil_arrest_count.show()

Testing the haversine function locally and it works so it seems like there's an issue passing the columns? I checked the hw2 submission and it seems fine

In [25]:
distance = withinMeters(40.799008797000056, -73.95240854099995, 40.816391847000034, -73.89529641399997)

<class 'float'> <class 'float'>
40.799008797000056 -73.95240854099995
<class 'float'> <class 'float'>
40.816391847000034 -73.89529641399997
5180.8801108369735
