In [5]:
!pip install googlemaps
!pip install flask

Collecting flask
  Downloading flask-3.0.0-py3-none-any.whl.metadata (3.6 kB)
Collecting Werkzeug>=3.0.0 (from flask)
  Downloading werkzeug-3.0.1-py3-none-any.whl.metadata (4.1 kB)
Collecting itsdangerous>=2.1.2 (from flask)
  Downloading itsdangerous-2.1.2-py3-none-any.whl (15 kB)
Downloading flask-3.0.0-py3-none-any.whl (99 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.7/99.7 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading werkzeug-3.0.1-py3-none-any.whl (226 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m226.7/226.7 kB[0m [31m11.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: Werkzeug, itsdangerous, flask
Successfully installed Werkzeug-3.0.1 flask-3.0.0 itsdangerous-2.1.2


In [1]:
import os
import pyspark
conf = pyspark.SparkConf()

# conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4041')
conf.set('spark.sql.repl.eagerEval.enabled', True) # enabled for debuggig 
conf.set('spark.driver.memory','12g')
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.SQLContext.getOrCreate(sc)



In [2]:
from pyspark.sql.functions import col, to_date, when, concat, lit, count, avg
from pyspark.sql.types import DateType, StructType, StructField, DoubleType, StringType
from itertools import product
from pyspark.sql import functions as F
import requests
import googlemaps
import json
from flask import Flask, request, jsonify

In [3]:
arrest_data_df = spark.read.option("header", "true").csv("data")
arrest_data_df = arrest_data_df.withColumn("ARREST_DATE", to_date(arrest_data_df["ARREST_DATE"], "MM/dd/yyyy").cast(DateType()))
arrest_data_df = arrest_data_df \
    .select(col("ARREST_DATE"), col("ARREST_BORO"), col("AGE_GROUP"), col("PERP_SEX"), col("PERP_RACE"), col("Latitude"), col("Longitude")) 

arrest_data_df = arrest_data_df.dropna()
print(arrest_data_df.count())
arrest_data_df

5668719


ARREST_DATE,ARREST_BORO,AGE_GROUP,PERP_SEX,PERP_RACE,Latitude,Longitude
2021-11-22,M,45-64,M,BLACK,40.799008797000056,-73.95240854099995
2021-12-04,B,25-44,M,WHITE HISPANIC,40.816391847000034,-73.89529641399997
2021-11-09,Q,25-44,M,BLACK,40.67970040800003,-73.77604736799998
2019-01-26,M,45-64,M,BLACK,40.800694331000045,-73.94110928599997
2019-02-06,M,25-44,M,UNKNOWN,40.75783900300007,-73.99121211099998
2021-12-03,Q,25-44,M,BLACK,40.77205649600006,-73.87622400099998
2021-11-10,B,25-44,M,WHITE HISPANIC,40.804012949000025,-73.87833183299993
2021-12-28,Q,18-24,M,BLACK,40.69166001700007,-73.77919852099996
2016-01-06,K,25-44,M,BLACK,40.64865008500004,-73.95033556299995
2021-12-04,K,25-44,M,BLACK,40.68858351600005,-73.91652634699994


In [109]:
# Define the boundaries for NYC
# min_latitude, max_latitude = 40.7, 40.901
# min_longitude, max_longitude = -74.05, -73.899

# Define the new boundaries according to data
min_latitude, max_latitude = 40.49, 62.08
min_longitude, max_longitude = -74.26, -73.68

# Number of divisions along each axis to create 25,000,000 zones (5000x5000)
num_divisions = 5000

lat_step = (max_latitude - min_latitude) / num_divisions
lon_step = (max_longitude - min_longitude) / num_divisions

In [110]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def find_zone_id(latitude, longitude):
    # Input validation
    if not (min_latitude <= latitude <= max_latitude) or not (min_longitude <= longitude <= max_longitude):
        return "Invalid latitude or longitude"

    # Calculate indexes
    lat_index = int((latitude - min_latitude) / lat_step)
    lon_index = int((longitude - min_longitude) / lon_step)

    # Handle edge cases
    if lat_index == num_divisions:
        lat_index -= 1
    if lon_index == num_divisions:
        lon_index -= 1

    # Calculate zone_id
    zone_id = lat_index * num_divisions + lon_index
    return zone_id

find_zone_id_udf = udf(find_zone_id, IntegerType())

In [111]:
from pyspark.sql.functions import col, round, floor

# Round the latitude and longitude in new_df to 6 decimal places
# new_df = arrest_data_df
new_df = arrest_data_df.withColumn("Latitude", round(col("Latitude"), 6)) \
                      .withColumn("Longitude", round(col("Longitude"), 6))


# Update new_df to include the zone_id
new_df_with_zone_id = new_df.withColumn('zone_id', find_zone_id_udf(col("Latitude"), col("Longitude")))

# Show the updated DataFrame or process further
new_df_with_zone_id.show(10)

+-----------+-----------+---------+--------+--------------+---------+----------+-------+
|ARREST_DATE|ARREST_BORO|AGE_GROUP|PERP_SEX|     PERP_RACE| Latitude| Longitude|zone_id|
+-----------+-----------+---------+--------+--------------+---------+----------+-------+
| 2021-11-22|          M|    45-64|       M|         BLACK|40.799009|-73.952409| 357651|
| 2021-12-04|          B|    25-44|       M|WHITE HISPANIC|40.816392|-73.895296| 378144|
| 2021-11-09|          Q|    25-44|       M|         BLACK|  40.6797|-73.776047| 219172|
| 2019-01-26|          M|    45-64|       M|         BLACK|40.800694|-73.941109| 357749|
| 2019-02-06|          M|    25-44|       M|       UNKNOWN|40.757839|-73.991212| 312317|
| 2021-12-03|          Q|    25-44|       M|         BLACK|40.772056|-73.876224| 328308|
| 2021-11-10|          B|    25-44|       M|WHITE HISPANIC|40.804013|-73.878332| 363290|
| 2021-12-28|          Q|    18-24|       M|         BLACK| 40.69166|-73.779199| 234144|
| 2016-01-06|        

In [112]:
# Calculate the total number of crimes per zone on each arrest date
crime_count_per_day_zone = new_df_with_zone_id.groupBy("zone_id", "ARREST_DATE").agg(count("*").alias("daily_crimes"))

# Calculate the average crime rate per day for each zone
crime_rate_per_zone = crime_count_per_day_zone.groupBy("zone_id").agg(avg("daily_crimes").alias("risk_score"))
crime_rate_per_zone.dropna()

zone_id,risk_score
213516,1.0
279131,1.0
448531,1.2837837837837838
417823,1.2075471698113207
197258,1.2133620689655171
423444,1.1428571428571428
393144,1.2833333333333334
438279,1.4070996978851964
313148,1.625
198173,1.1132075471698113


In [113]:
crime_rate_per_zone.write.mode("overwrite").csv("zone_risk.csv")

In [114]:
zone_risk_df = spark.read.option("header", "false").csv("zone_risk.csv")
zone_risk_df = zone_risk_df.dropna()
rows = zone_risk_df.collect()

# Converting rows to dictionary
zone_risk_dict = {int(row[0]): float(row[1]) for row in rows}
zone_risk_dict

{218796: 1.3111111111111111,
 274353: 1.1304347826086956,
 147250: 1.0597014925373134,
 218047: 1.2092307692307693,
 418080: 1.3874015748031496,
 413100: 2.8260493292946776,
 273151: 1.4324324324324325,
 282801: 1.0,
 248014: 1.3095238095238095,
 208104: 1.2117117117117118,
 192785: 1.3181818181818181,
 227694: 1.2412280701754386,
 232359: 1.1688311688311688,
 338668: 1.295774647887324,
 116470: 1.0,
 362650: 1.3127413127413128,
 403220: 1.477491961414791,
 232740: 1.2727272727272727,
 443732: 1.5185185185185186,
 367991: 1.4129213483146068,
 197809: 1.1666666666666667,
 402851: 1.1643835616438356,
 307378: 1.4653284671532847,
 233034: 1.5344827586206897,
 129318: 1.2,
 378029: 1.3536723163841808,
 118998: 1.1382113821138211,
 388247: 1.1486486486486487,
 317382: 1.2,
 223543: 1.8181818181818181,
 25088: 1.2538860103626943,
 408247: 1.1913875598086126,
 152028: 1.1521739130434783,
 292350: 1.1931818181818181,
 208846: 1.3636363636363635,
 237745: 1.11864406779661,
 212881: 1.3163265306

In [96]:
zone_risk_dict[7]

2.2442206454566263

In [115]:
def find_zone_id(latitude, longitude):
    # Input validation
    if not (min_latitude <= latitude <= max_latitude) or not (min_longitude <= longitude <= max_longitude):
        return "Invalid latitude or longitude"

    # Calculate indexes
    lat_index = int((latitude - min_latitude) / lat_step)
    lon_index = int((longitude - min_longitude) / lon_step)

    # Handle edge cases
    if lat_index == num_divisions:
        lat_index -= 1
    if lon_index == num_divisions:
        lon_index -= 1

    # Calculate zone_id
    zone_id = lat_index * num_divisions + lon_index
    return zone_id

In [116]:
def identify_routes_risk_score(all_routes_data):
    route_object = all_routes_data

    # Function to identify zones for a route
    def identify_zones_for_route(route_coordinates):
        route_zones = []
        for coord in route_coordinates:
            zone = find_zone_id(coord['lat'], coord['long'])
            route_zones.append(zone)
        return route_zones

    route_zones_data = {}
    # Iterate through each route in the object
    for route_id, route_data in route_object.items():
        route_coordinates = route_data.get("route_coordinates", [])
        
        # Identify zones for the route coordinates
        route_zones = identify_zones_for_route(route_coordinates)

        risk_count = 0
        for zone in route_zones:
            if zone in zone_risk_dict:
                risk_count += zone_risk_dict[zone]

        t = {
            "Coordinate": route_coordinates,
            "distance": route_data.get("distance"),
            "time": route_data.get("time"),
            "risk_score": risk_count/len(route_coordinates)
        }
        route_zones_data[route_id] = t
    return route_zones_data

In [117]:
def get_all_routes_with_coordinates(api_key, origin, destination):
    # Initialize the Google Maps API client
    gmaps = googlemaps.Client(key=api_key)

    # Make the directions API request
    directions_result = gmaps.directions(origin, destination, mode="walking", alternatives=True)

    # Extract and format information about each route
    all_routes_data = {}
    for i, route in enumerate(directions_result):
        route_data = {
            "distance": route['legs'][0]['distance']['text'],
            "time": route['legs'][0]['duration']['text'],
            "route_coordinates": []
        }
        count = 0
        for step in route['legs'][0]['steps']:
            if count == 0:
                start_location = step['start_location']
                route_data["route_coordinates"].append({
                    "lat": start_location['lat'],
                    "long": start_location['lng']
                })
                count += 1
            end_location = step['end_location']
            route_data["route_coordinates"].append({
                "lat": end_location['lat'],
                "long": end_location['lng']
            })

        all_routes_data[str(i)] = route_data

    return all_routes_data

In [119]:
with open("api_key.txt", 'r') as file:
    api_key = file.readline()
source_address = "NYU Bobst Library"
destination_address = "Central Park"

gmaps = googlemaps.Client(key=api_key)

# Make the directions API request
gmaps.directions(source_address, destination_address, mode="walking", alternatives=True)
all_routes_data = get_all_routes_with_coordinates(api_key, source_address, destination_address)
identify_routes_risk_score(all_routes_data)

{'0': {'Coordinate': [{'lat': 40.7296912, 'long': -73.997006},
   {'lat': 40.7298572, 'long': -73.9972931},
   {'lat': 40.730585, 'long': -73.9972475},
   {'lat': 40.7310562, 'long': -73.9972506},
   {'lat': 40.7644559, 'long': -73.9730483}],
  'distance': '2.7 mi',
  'time': '1 hour 2 mins',
  'risk_score': 0.8656209042315645},
 '1': {'Coordinate': [{'lat': 40.7296912, 'long': -73.997006},
   {'lat': 40.7295523, 'long': -73.996668},
   {'lat': 40.7308074, 'long': -73.9955907},
   {'lat': 40.7308074, 'long': -73.9955907},
   {'lat': 40.7351746, 'long': -73.99185299999999},
   {'lat': 40.7371418, 'long': -73.99026669999999},
   {'lat': 40.7429175, 'long': -73.9884613},
   {'lat': 40.7644559, 'long': -73.9730483}],
  'distance': '2.8 mi',
  'time': '1 hour 2 mins',
  'risk_score': 1.2864479112733904},
 '2': {'Coordinate': [{'lat': 40.7296912, 'long': -73.997006},
   {'lat': 40.7298572, 'long': -73.9972931},
   {'lat': 40.730585, 'long': -73.9972475},
   {'lat': 40.7310562, 'long': -73.99