## SF Crime Data Analysis with PySpark

In this notebook we use PySpark to explore a dataset from the San Francisco Police Department.
Dataset link: [https://data.sfgov.org/Public-Safety/sf-data/skgt-fej3/data](https://data.sfgov.org/Public-Safety/sf-data/skgt-fej3/data)

In [2]:
import pyspark
print(pyspark.__version__)

# Create a SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SF Crime Analysis").getOrCreate()
sc = spark.sparkContext
print('SparkSession started')

3.5.4


25/02/20 06:55:16 WARN Utils: Your hostname, codespaces-8f4ab0 resolves to a loopback address: 127.0.0.1; using 10.0.1.250 instead (on interface eth0)
25/02/20 06:55:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/20 06:55:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


SparkSession started


In [4]:
# Read the CSV data using an RDD
from csv import reader

# Read all lines from the CSV file
crime_data_lines = sc.textFile('data/sf_crime.csv')

# Convert each line to a list of strings (removing extra quotes)
df_crimes = crime_data_lines.map(lambda line: [x.strip('"') for x in next(reader([line]))])

In [5]:
# Extract header and filter it out
header = df_crimes.first()
print("Header:", header)
crimes = df_crimes.filter(lambda x: x != header)
print("First two data rows:", crimes.take(2))

[Stage 0:>                                                          (0 + 1) / 1]

Header: ['IncidntNum', 'Category', 'Descript', 'DayOfWeek', 'Date', 'Time', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y', 'Location', 'PdId']
First two data rows: [['179005317', 'OTHER OFFENSES', 'FALSE PERSONATION TO RECEIVE MONEY OR PROPERTY', 'Tuesday', '07/11/2017', '14:32', 'SOUTHERN', 'NONE', '800 Block of BRYANT ST', '-122.40340479147905', '37.775420706711', '(37.775420706711, -122.40340479147905)', '17900531709029'], ['179005210', 'FORGERY/COUNTERFEITING', 'MANUFACTURE OR SALE OF COUNTERFEIT GOODS', 'Tuesday', '07/11/2017', '17:27', 'SOUTHERN', 'NONE', '800 Block of BRYANT ST', '-122.40340479147905', '37.775420706711', '(37.775420706711, -122.40340479147905)', '17900521009261']]


                                                                                

In [None]:
# Convert the RDD of lists to an RDD of Rows
from pyspark.sql import Row

def list_to_row(keys, values):
    row_dict = dict(zip(keys, values))
    return Row(**row_dict)

rdd_rows = crimes.map(lambda x: list_to_row(header, x))
df = spark.createDataFrame(rdd_rows)
print("DataFrame created")

DataFrame created


25/02/20 06:55:32 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [7]:
# Show the DataFrame (first 20 rows)
df.show()

+----------+--------------------+--------------------+---------+----------+-----+----------+--------------+--------------------+-------------------+------------------+--------------------+--------------+
|IncidntNum|            Category|            Descript|DayOfWeek|      Date| Time|PdDistrict|    Resolution|             Address|                  X|                 Y|            Location|          PdId|
+----------+--------------------+--------------------+---------+----------+-----+----------+--------------+--------------------+-------------------+------------------+--------------------+--------------+
| 179005317|      OTHER OFFENSES|FALSE PERSONATION...|  Tuesday|07/11/2017|14:32|  SOUTHERN|          NONE|800 Block of BRYA...|-122.40340479147905|   37.775420706711|(37.775420706711,...|17900531709029|
| 179005210|FORGERY/COUNTERFE...|MANUFACTURE OR SA...|  Tuesday|07/11/2017|17:27|  SOUTHERN|          NONE|800 Block of BRYA...|-122.40340479147905|   37.775420706711|(37.775420706711,

In [8]:
# Replace columns 'X' and 'Y' with 'Longitude' and 'Latitude' and cast them as float
df = df.withColumn('X', df['X'].cast('float')).withColumn('Y', df['Y'].cast('float'))
df = df.withColumnRenamed('X', 'Longitude').withColumnRenamed('Y', 'Latitude')
print("Columns renamed and cast")

Columns renamed and cast


In [9]:
# Inspect the schema
df.printSchema()

root
 |-- IncidntNum: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Descript: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- PdDistrict: string (nullable = true)
 |-- Resolution: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Longitude: float (nullable = true)
 |-- Latitude: float (nullable = true)
 |-- Location: string (nullable = true)
 |-- PdId: string (nullable = true)



### Exploring the Data

Let's count the number of incidents for each category.

In [10]:
# Method 1: Using Spark SQL
df.createOrReplaceTempView("crime")
sqlDF = spark.sql("""
    SELECT Category, COUNT(*) AS Count 
    FROM crime 
    GROUP BY Category 
    ORDER BY Count DESC
""")
sqlDF.show(40, False)

+---------------------------+-----+
|Category                   |Count|
+---------------------------+-----+
|LARCENY/THEFT              |2804 |
|OTHER OFFENSES             |1002 |
|NON-CRIMINAL               |988  |
|ASSAULT                    |780  |
|VANDALISM                  |650  |
|VEHICLE THEFT              |353  |
|SUSPICIOUS OCC             |311  |
|WARRANTS                   |309  |
|BURGLARY                   |300  |
|MISSING PERSON             |263  |
|ROBBERY                    |187  |
|DRUG/NARCOTIC              |167  |
|FRAUD                      |155  |
|SECONDARY CODES            |118  |
|TRESPASS                   |109  |
|WEAPON LAWS                |106  |
|STOLEN PROPERTY            |66   |
|RECOVERED VEHICLE          |48   |
|SEX OFFENSES, FORCIBLE     |39   |
|RUNAWAY                    |39   |
|ARSON                      |29   |
|FORGERY/COUNTERFEITING     |29   |
|PROSTITUTION               |20   |
|DRUNKENNESS                |19   |
|DISORDERLY CONDUCT         

In [11]:
# Method 2: Using DataFrame functions
df.groupBy("Category").count().orderBy("count", ascending=False).show(40, False)

+---------------------------+-----+
|Category                   |count|
+---------------------------+-----+
|LARCENY/THEFT              |2804 |
|OTHER OFFENSES             |1002 |
|NON-CRIMINAL               |988  |
|ASSAULT                    |780  |
|VANDALISM                  |650  |
|VEHICLE THEFT              |353  |
|SUSPICIOUS OCC             |311  |
|WARRANTS                   |309  |
|BURGLARY                   |300  |
|MISSING PERSON             |263  |
|ROBBERY                    |187  |
|DRUG/NARCOTIC              |167  |
|FRAUD                      |155  |
|SECONDARY CODES            |118  |
|TRESPASS                   |109  |
|WEAPON LAWS                |106  |
|STOLEN PROPERTY            |66   |
|RECOVERED VEHICLE          |48   |
|SEX OFFENSES, FORCIBLE     |39   |
|RUNAWAY                    |39   |
|ARSON                      |29   |
|FORGERY/COUNTERFEITING     |29   |
|PROSTITUTION               |20   |
|DRUNKENNESS                |19   |
|DISORDERLY CONDUCT         

In [12]:
# Method 3: Using RDD functions
rdd = crimes.map(lambda line: (line[1], 1))
sorted_counts = sorted(rdd.countByKey().items(), key=lambda x: -x[1])
print(sorted_counts)

[('LARCENY/THEFT', 2804), ('OTHER OFFENSES', 1002), ('NON-CRIMINAL', 988), ('ASSAULT', 780), ('VANDALISM', 650), ('VEHICLE THEFT', 353), ('SUSPICIOUS OCC', 311), ('WARRANTS', 309), ('BURGLARY', 300), ('MISSING PERSON', 263), ('ROBBERY', 187), ('DRUG/NARCOTIC', 167), ('FRAUD', 155), ('SECONDARY CODES', 118), ('TRESPASS', 109), ('WEAPON LAWS', 106), ('STOLEN PROPERTY', 66), ('RECOVERED VEHICLE', 48), ('SEX OFFENSES, FORCIBLE', 39), ('RUNAWAY', 39), ('FORGERY/COUNTERFEITING', 29), ('ARSON', 29), ('PROSTITUTION', 20), ('DRUNKENNESS', 19), ('DISORDERLY CONDUCT', 18), ('DRIVING UNDER THE INFLUENCE', 16), ('KIDNAPPING', 10), ('BRIBERY', 7), ('EMBEZZLEMENT', 5), ('LOITERING', 4), ('LIQUOR LAWS', 4), ('SUICIDE', 3), ('EXTORTION', 2), ('PORNOGRAPHY/OBSCENE MAT', 1), ('SEX OFFENSES, NON FORCIBLE', 1), ('FAMILY OFFENSES', 1), ('TREA', 1)]


How about the number of incidents at each district?

In [13]:
# Method 1: SQL
sqlDF = spark.sql("""
    SELECT PdDistrict, COUNT(*) AS Count 
    FROM crime 
    GROUP BY PdDistrict 
    ORDER BY Count DESC
""")
sqlDF.show()

+----------+-----+
|PdDistrict|Count|
+----------+-----+
|  SOUTHERN| 1743|
|   MISSION| 1190|
|  NORTHERN| 1169|
|   CENTRAL| 1137|
|   BAYVIEW|  816|
| INGLESIDE|  670|
|   TARAVAL|  660|
|TENDERLOIN|  535|
|  RICHMOND|  527|
|      PARK|  517|
+----------+-----+



In [14]:
# Method 2: DataFrame
df.groupBy("PdDistrict").count().orderBy("count", ascending=False).show()

+----------+-----+
|PdDistrict|count|
+----------+-----+
|  SOUTHERN| 1743|
|   MISSION| 1190|
|  NORTHERN| 1169|
|   CENTRAL| 1137|
|   BAYVIEW|  816|
| INGLESIDE|  670|
|   TARAVAL|  660|
|TENDERLOIN|  535|
|  RICHMOND|  527|
|      PARK|  517|
+----------+-----+



In [15]:
# Method 3: RDD functions
rdd_district = crimes.map(lambda line: (line[6], 1))
sorted_district_counts = sorted(rdd_district.countByKey().items(), key=lambda x: -x[1])
print(sorted_district_counts)

[('SOUTHERN', 1743), ('MISSION', 1190), ('NORTHERN', 1169), ('CENTRAL', 1137), ('BAYVIEW', 816), ('INGLESIDE', 670), ('TARAVAL', 660), ('TENDERLOIN', 535), ('RICHMOND', 527), ('PARK', 517)]


Define "downtown" as an area within 0.005 degrees from (37.792489, -122.403221). 
Let's count the number of incidents on Sundays within this area.

In [16]:
# Method 1: SQL
sqlDF = spark.sql("""
    SELECT Date, DayOfWeek, COUNT(*) AS Count 
    FROM crime 
    WHERE DayOfWeek = 'Sunday'
      AND POW(Latitude - 37.792489, 2) + POW(Longitude + 122.403221, 2) < POW(0.005, 2)
    GROUP BY Date, DayOfWeek 
    ORDER BY Date
""")
sqlDF.show()

+----------+---------+-----+
|      Date|DayOfWeek|Count|
+----------+---------+-----+
|07/16/2017|   Sunday|   10|
|07/23/2017|   Sunday|   20|
|07/30/2017|   Sunday|   14|
+----------+---------+-----+



In [17]:
# Method 2: DataFrame
df_downtown = df.filter((df['Latitude'] - 37.792489)**2 + (df['Longitude'] + 122.403221)**2 < 0.005**2)
df_downtown.filter(df_downtown['DayOfWeek'] == 'Sunday') \
    .groupBy("Date", "DayOfWeek") \
    .count() \
    .orderBy('Date') \
    .show()

+----------+---------+-----+
|      Date|DayOfWeek|count|
+----------+---------+-----+
|07/16/2017|   Sunday|   10|
|07/23/2017|   Sunday|   20|
|07/30/2017|   Sunday|   14|
+----------+---------+-----+



### Visualizing the Spatial Distribution

Let's make a scatter plot of the crimes.

In [None]:
!pip install --upgrade pandas==2.1.0 seaborn==0.12.2 matplotlib==3.8.0

import matplotlib.pyplot as plt
import seaborn as sns

# Convert the Spark DataFrame to Pandas for plotting
pdf = df.select("Longitude", "Latitude").toPandas()
plt.figure(figsize=(8, 6))
plt.title('SF Crime Distribution')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.scatter(pdf['Longitude'], pdf['Latitude'], s=2, c='r')
plt.show()

Collecting pandas==2.1.0
  Downloading pandas-2.1.0.tar.gz (4.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.3/4.3 MB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Installing backend dependencies ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25l|

### Clustering with Spark ML

Spark ML requires that features be in a single vector column. We'll use the `VectorAssembler` to
combine the `Longitude` and `Latitude` columns into one features column, and then fit a k-means
model (with k=3, chosen arbitrarily).

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

df_coor = df.select("Longitude", "Latitude")
vecAssembler = VectorAssembler(inputCols=["Longitude", "Latitude"], outputCol="features")
new_df = vecAssembler.transform(df_coor).select("features")
new_df.show(10, False)

In [None]:
# Train a k-means model with k=3 and a fixed seed for reproducibility
kmeans = KMeans().setK(3).setSeed(1)
model = kmeans.fit(new_df)

# Print cluster centers
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

# Show cluster memberships
transformed = model.transform(new_df)
transformed.show(20, False)

### Visualizing Clustering Results

You can now visualize the clusters (for example, by converting the predictions back to Pandas
and plotting them with different colors). Below is one example:

In [None]:
pdf_clusters = transformed.select("features", "prediction").toPandas()
plt.figure(figsize=(8, 6))
plt.title('KMeans Clustering of SF Crimes')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.scatter(pdf_clusters['features'].apply(lambda x: x[0]),
            pdf_clusters['features'].apply(lambda x: x[1]),
            c=pdf_clusters['prediction'], cmap='viridis', s=2)
plt.show()