In [1]:
!pip install pyspark
!pip install pymongo
!pip install pandas
!pip install geopy
!pip install folium
!pip install haversine

Collecting geopy
  Downloading geopy-2.3.0-py3-none-any.whl (119 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m119.8/119.8 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting geographiclib<3,>=1.52
  Downloading geographiclib-2.0-py3-none-any.whl (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.3/40.3 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: geographiclib, geopy
Successfully installed geographiclib-2.0 geopy-2.3.0
Collecting haversine
  Downloading haversine-2.8.0-py2.py3-none-any.whl (7.7 kB)
Installing collected packages: haversine
Successfully installed haversine-2.8.0


# from google.colab import drive

# # mount your Google Drive to Colab
# drive.mount('/content/drive')


In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

# create a SparkSession
conf = SparkConf().set("spark.network.timeout", "600s")

spark = SparkSession.builder.appName("NYPD").config(conf=conf).getOrCreate()
# load the NYPD Complaints dataset into a Spark DataFrame
complaintsDF = spark.read.csv("NYPD_Complaint_Data_Historic.csv", header=True, inferSchema=True)

# download the JSON file using requests library
import requests
url = 'https://cocl.us/new_york_dataset'
response = requests.get(url)
newyork_data = response.json()

# extract neighborhoods data
neighborhoods_data = newyork_data['features']

# create a list of dictionaries to store the data
neighborhoods_list = []

# populate list with neighborhoods data
for data in neighborhoods_data:
    borough = data['properties']['borough']
    neighborhood_name = data['properties']['name']
    neighborhood_lat = data['geometry']['coordinates'][1]
    neighborhood_lon = data['geometry']['coordinates'][0]
    neighborhoods_list.append({'Borough': borough,
                               'Neighborhood': neighborhood_name,
                               'Latitude': neighborhood_lat,
                               'Longitude': neighborhood_lon})

# create a DataFrame from the list of dictionaries
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
schema = StructType([
    StructField('Borough', StringType(), True),
    StructField('Neighborhood', StringType(), True),
    StructField('Latitude', DoubleType(), True),
    StructField('Longitude', DoubleType(), True)
])
neighborhoodsDF = spark.createDataFrame(neighborhoods_list, schema=schema)

# show the dataframes
complaintsDF.show()
neighborhoodsDF.show()


23/05/07 15:21:21 WARN Utils: Your hostname, Utsavs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.3.22 instead (on interface en0)
23/05/07 15:21:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/07 15:21:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/07 15:22:05 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+------------+-------------------+------------+------------+-----------+----------+-----+--------------------+-----+--------------------+----------------+-----------+-------------+-----------------+----------------+----------------+-----------------+--------+----------+-----------+----------+----------+--------------+---------+--------+----------------+------------------+------------------+--------------------+--------------------+------------+-------------+--------------+-------+
|CMPLNT_NUM|CMPLNT_FR_DT|       CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|ADDR_PCT_CD|    RPT_DT|KY_CD|           OFNS_DESC|PD_CD|             PD_DESC|CRM_ATPT_CPTD_CD| LAW_CAT_CD|      BORO_NM|LOC_OF_OCCUR_DESC|   PREM_TYP_DESC|      JURIS_DESC|JURISDICTION_CODE|PARKS_NM|HADEVELOPT|HOUSING_PSA|X_COORD_CD|Y_COORD_CD|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|          Latitude|         Longitude|             Lat_Lon|         PATROL_BORO|STATION_NAME|VIC_AGE_GROUP|      VIC_RACE|VIC_SEX|
+-----

[Stage 3:>                                                          (0 + 1) / 1]

+---------+------------------+------------------+------------------+
|  Borough|      Neighborhood|          Latitude|         Longitude|
+---------+------------------+------------------+------------------+
|    Bronx|         Wakefield|    40.89470517661|-73.84720052054902|
|    Bronx|        Co-op City| 40.87429419303012|-73.82993910812398|
|    Bronx|       Eastchester|40.887555677350775|-73.82780644716412|
|    Bronx|         Fieldston| 40.89543742690383|-73.90564259591682|
|    Bronx|         Riverdale|40.890834493891305| -73.9125854610857|
|    Bronx|       Kingsbridge| 40.88168737120521|-73.90281798724604|
|Manhattan|       Marble Hill| 40.87655077879964|-73.91065965862981|
|    Bronx|          Woodlawn| 40.89827261213805|-73.86731496814176|
|    Bronx|           Norwood| 40.87722415599446| -73.8793907395681|
|    Bronx|    Williamsbridge| 40.88103887819211|-73.85744642974207|
|    Bronx|        Baychester|40.866858107252696|-73.83579759808117|
|    Bronx|    Pelham Parkway| 40.

                                                                                

In [3]:
from pyspark.sql.functions import col, lower

neighborCrimeDF = complaintsDF.join(
    neighborhoodsDF.select([
        "Borough",
        "Neighborhood",
        col("Latitude").alias("NLatitude"),
        col("Longitude").alias("NLongitude")
    ]),
    lower(complaintsDF.BORO_NM) == lower(neighborhoodsDF.Borough),
    "left"
)
neighborCrimeDF.show()


                                                                                

+----------+------------+-------------------+------------+------------+-----------+----------+-----+-------------+-----+--------------------+----------------+-----------+-------+-----------------+-------------+----------------+-----------------+--------+----------+-----------+----------+----------+--------------+---------+--------+----------------+-----------------+------------------+--------------------+-----------------+------------+-------------+--------+-------+-------+-------------------+------------------+------------------+
|CMPLNT_NUM|CMPLNT_FR_DT|       CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|ADDR_PCT_CD|    RPT_DT|KY_CD|    OFNS_DESC|PD_CD|             PD_DESC|CRM_ATPT_CPTD_CD| LAW_CAT_CD|BORO_NM|LOC_OF_OCCUR_DESC|PREM_TYP_DESC|      JURIS_DESC|JURISDICTION_CODE|PARKS_NM|HADEVELOPT|HOUSING_PSA|X_COORD_CD|Y_COORD_CD|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|         Latitude|         Longitude|             Lat_Lon|      PATROL_BORO|STATION_NAME|VIC_AGE_GROUP|VIC_RACE|V

In [9]:
# the haversine function expects an iterable of the form (lat, lon)
from haversine import haversine
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.window import Window
from pyspark.sql.functions import udf

def string2array(lat,long):
    if lat is None or long is None:
        return None
    return [float(lat), float(long)]

def haversine_miles(x, y):
    if x is None or y is None:
        return None
    return haversine(x, y, unit='mi')


udf_haversine = udf(haversine_miles, DoubleType())
udf_string2array = udf(string2array, ArrayType(DoubleType()))

window = Window.partitionBy("CMPLNT_NUM").orderBy(col("Distance").asc())

In [10]:
# import necessary packages
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType
from haversine import haversine
from pyspark.sql.window import Window
from pyspark.sql.functions import udf

# define UDFs to calculate distance using haversine formula
udf_haversine = udf(lambda x, y: haversine(x, y, unit='mi') if x is not None and y is not None else None, DoubleType())
udf_string2array = udf(lambda lat, long: [float(lat), float(long)] if lat is not None and long is not None else None, ArrayType(DoubleType()))
# create new columns with latitude and longitude as arrays, and distance between crime location and neighborhood
neighborCrimeDF2 = neighborCrimeDF \
    .withColumn("LatLong", udf_string2array("Latitude", "Longitude")) \
    .withColumn("NLatLong", udf_string2array("NLatitude", "NLongitude")) \
    .withColumn("Distance", udf_haversine("LatLong", "NLatLong"))
    # use window function to get the nearest neighborhood for each crime
window = Window.partitionBy("CMPLNT_NUM").orderBy(F.col("Distance").asc())
NCDF = neighborCrimeDF2 \
    .withColumn("DistanceRank", F.rank().over(window)) \
    .filter(F.col("DistanceRank") == 1)

In [11]:
NCDF.show(1)

[Stage 42:>                                                         (0 + 1) / 1]

+----------+------------+-------------------+------------+------------+-----------+----------+-----+---------------+-----+--------------------+----------------+-----------+-------+-----------------+-------------+----------------+-----------------+--------+----------+-----------+----------+----------+--------------+---------+--------+----------------+------------+-------------+--------------------+-----------------+------------+-------------+--------+-------+-------+------------+-----------------+------------------+--------------------+--------------------+-------------------+------------+
|CMPLNT_NUM|CMPLNT_FR_DT|       CMPLNT_FR_TM|CMPLNT_TO_DT|CMPLNT_TO_TM|ADDR_PCT_CD|    RPT_DT|KY_CD|      OFNS_DESC|PD_CD|             PD_DESC|CRM_ATPT_CPTD_CD| LAW_CAT_CD|BORO_NM|LOC_OF_OCCUR_DESC|PREM_TYP_DESC|      JURIS_DESC|JURISDICTION_CODE|PARKS_NM|HADEVELOPT|HOUSING_PSA|X_COORD_CD|Y_COORD_CD|SUSP_AGE_GROUP|SUSP_RACE|SUSP_SEX|TRANSIT_DISTRICT|    Latitude|    Longitude|             Lat_Lon|    

                                                                                

In [None]:
# neighborCrimeDF2 = neighborCrimeDF\
#     .withColumn("LatLong",udf_string2array(col("Latitude"),col("Longitude")))\
#     .withColumn("NLatLong",udf_string2array(col("NLatitude"),col("NLongitude")))\
#     .withColumn("Distance",udf_haversine(col("LatLong"),col("NLatLong")))
# neighborCrimeDF2.show()

In [None]:
# from pyspark.sql.functions import rank

# NCDF = neighborCrimeDF2.withColumn("DistanceRank", rank().over(window)).filter(col("DistanceRank") == 1)
# NCDF.show()


In [13]:
# count number of crimes based on neighborhood and borough
NC_CountDF = NCDF.groupBy(["Neighborhood","Borough"]).count().toDF("N","B","Count").drop("B")
NC_CountDF.show()



+-----------------+-----+
|                N|Count|
+-----------------+-----+
|     East Village|44669|
|       Greenridge| 9210|
|       Whitestone|11993|
|           Travis| 1963|
|      Silver Lake| 2770|
|          Chelsea|58731|
|          Hammels|13182|
|     Dongan Hills| 8220|
|   Rockaway Beach|14482|
|Battery Park City| 8159|
|       Mount Hope|71840|
|    South Jamaica|60126|
|         Allerton|28435|
|          Bayside|19695|
|          Woodrow| 4364|
|     West Village|42891|
|    Carnegie Hill|39033|
|     Williamsburg|37336|
|  Manhattan Beach| 8694|
|       Ozone Park|49173|
+-----------------+-----+
only showing top 20 rows



                                                                                

In [None]:
NeighborhoodCrimesDF = neighborhoodsDF.join(NC_CountDF,neighborhoodsDF.Neighborhood == NC_CountDF.N, "left").drop("N")
NeighborhoodCrimesDF.show()

In [None]:
NeighborhoodCrimesDF.coalesce(1)\
      .write\
      .option("header","true")\
      .option("sep",",")\
      .mode("overwrite")\
      .csv("/content/drive/MyDrive/NYPDArrestsByNeighborhood")

In [None]:
NCDF.drop("LatLong","NLatLong")\
    .coalesce(1)\
    .write\
    .option("header","true")\
    .option("sep",",")\
    .mode("overwrite")\
    .csv("/content/drive/MyDrive/NYPDWithNeighborhoods")