In [None]:
# install pyspark
# !pip install pyspark



In [66]:
###### set up libraries ######
# set up spark session
from pyspark.sql import SparkSession
# set up spark schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, TimestampType, DateType, FloatType, DoubleType, ArrayType
# this is to convert the string to datetime
from pyspark.sql import functions as F
# set up date filtering step
from pyspark.sql.functions import current_date, date_sub, col, to_date

# these are for the APIs
import requests
import json
import pandas as pd

# Step 1: Create Spark Session

In [None]:
# create session
spark = SparkSession.builder.appName("Chicago").getOrCreate()
# https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-Present/ijzp-q8t2/about_data

# Step 2: Define the Schema for loading the Chicago crime dataset

In [None]:
### OLD BUT I WANT TO KEEP THIS -JU

# create schema
# manually setting up the spark schema for the chicago database
# https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-Present/ijzp-q8t2/about_data
# schema = StructType([
#     StructField("id", IntegerType(), True),
#     StructField("case_number", StringType(), True),
#     StructField("date", TimestampType(), True),
#     StructField("block", StringType(), True),
#     StructField("iucr", StringType(), True),
#     StructField("primary_type", StringType(), True),
#     StructField("description", StringType(), True),
#     StructField("location_description", StringType(), True),
#     StructField("arrest", BooleanType(), True),
#     StructField("domestic", BooleanType(), True),
#     StructField("beat", StringType(), True),
#     StructField("district", StringType(), True),
#     StructField("ward", IntegerType(), True),
#     StructField("community_area", StringType(), True),
#     StructField("fbi_code", StringType(), True),
#     StructField("x_coordinate", FloatType(), True),
#     StructField("y_coordinate", FloatType(), True),
#     StructField("year", IntegerType(), True),
#     StructField("updated_on", TimestampType(), True),
#     StructField("latitude", DoubleType(), True),
#     StructField("longitude", DoubleType(), True),
#     StructField("updated_on", TimestampType(), True),
#     StructField("location", StringType(), True)
# ])


In [None]:
# NEW SCHEMA
## I had to change the types so that it works with what the API gives me
## The
schema = StructType([
    StructField("arrest", BooleanType(), True),
    StructField("beat", StringType(), True),
    StructField("block", StringType(), True),
    StructField("case_number", StringType(), True),
    StructField("community_area", StringType(), True),
    StructField("date", StringType(), True),
    StructField("description", StringType(), True),
    StructField("district", StringType(), True),
    StructField("domestic", BooleanType(), True),
    StructField("fbi_code", StringType(), True),
    StructField("id", StringType(), True),
    StructField("iucr", StringType(), True),
    StructField("location_description", StringType(), True),
    StructField("primary_type", StringType(), True),
    StructField("updated_on", StringType(), True),
    StructField("ward", StringType(), True),
    StructField("year", StringType(), True)
])

# Step 3: Load the chicago crime data (you should have more than a million rows)

In [None]:
# this is SODA3 -- doesn't work, even when I create the API token
# api_url = 'https://data.cityofchicago.org/api/v3/views/ijzp-q8t2/query.json'
# api_url = 'https://data.cityofchicago.org/api/v3/views/crimes/query.json'

# this is SODA2
api_url = 'https://data.cityofchicago.org/resource/ijzp-q8t2.json'

# this is where we use the GET command request
response = requests.get(api_url)

#### IDK why it's there, but the documentation says it's to check for successful reponse
if response.status_code == 200:
  api_data = response.json()
else:
  print(f"Error fetching data from API: {response.status_code}")
  api_data = []


In [None]:
if api_data:
  df = spark.createDataFrame(api_data, schema=schema)
  df.printSchema()
  df.show()
else:
  print("No data to create DataFrame.")

root
 |-- arrest: boolean (nullable = true)
 |-- beat: string (nullable = true)
 |-- block: string (nullable = true)
 |-- case_number: string (nullable = true)
 |-- community_area: string (nullable = true)
 |-- date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- district: string (nullable = true)
 |-- domestic: boolean (nullable = true)
 |-- fbi_code: string (nullable = true)
 |-- id: string (nullable = true)
 |-- iucr: string (nullable = true)
 |-- location_description: string (nullable = true)
 |-- primary_type: string (nullable = true)
 |-- updated_on: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- year: string (nullable = true)

+------+----+--------------------+-----------+--------------+--------------------+--------------------+--------+--------+--------+--------+----+--------------------+--------------------+--------------------+----+----+
|arrest|beat|               block|case_number|community_area|                date|         desc

# Step 4: Clean the data
a. remove all null values
b. change 'Date' column data type

In [None]:
# removes all rows with null/NaN/NA values
df_cleaned = df.dropna()

In [None]:
df_cleaned.show()

+------+----+--------------------+-----------+--------------+--------------------+--------------------+--------+--------+--------+--------+----+--------------------+--------------------+--------------------+----+----+
|arrest|beat|               block|case_number|community_area|                date|         description|district|domestic|fbi_code|      id|iucr|location_description|        primary_type|          updated_on|ward|year|
+------+----+--------------------+-----------+--------------+--------------------+--------------------+--------+--------+--------+--------+----+--------------------+--------------------+--------------------+----+----+
| false|0613|     011XX W 87TH ST|   JJ451188|            71|2025-10-10T00:00:...|              SIMPLE|     006|    true|     08A|13997562|0560|           APARTMENT|             ASSAULT|2025-10-17T15:42:...|  17|2025|
| false|0434|   108XX S HOXIE AVE|   JJ450186|            51|2025-10-10T00:00:...|VIOLATE ORDER OF ...|     004|    true|      2

In [None]:
# this is conver the string 'date' column to a datetime 'date' column datatype
# df_cleaned = df_cleaned.withColumn("date", F.to_date(F.col("date"), "yyyy-MM-dd)) ### needed to add in the time as well
df_cleaned = df_cleaned.withColumn("date", F.to_date(F.col("date"), "yyyy-MM-dd'T'HH:mm:ss.SSS"))

In [None]:
df_cleaned.show()

+------+----+--------------------+-----------+--------------+----------+--------------------+--------+--------+--------+--------+----+--------------------+--------------------+--------------------+----+----+
|arrest|beat|               block|case_number|community_area|      date|         description|district|domestic|fbi_code|      id|iucr|location_description|        primary_type|          updated_on|ward|year|
+------+----+--------------------+-----------+--------------+----------+--------------------+--------+--------+--------+--------+----+--------------------+--------------------+--------------------+----+----+
| false|0613|     011XX W 87TH ST|   JJ451188|            71|2025-10-10|              SIMPLE|     006|    true|     08A|13997562|0560|           APARTMENT|             ASSAULT|2025-10-17T15:42:...|  17|2025|
| false|0434|   108XX S HOXIE AVE|   JJ450186|            51|2025-10-10|VIOLATE ORDER OF ...|     004|    true|      26|13997522|4387|           RESIDENCE|       OTHER 

In [65]:
df_cleaned.printSchema() # this verifies that date column was converted successfully

root
 |-- arrest: boolean (nullable = true)
 |-- beat: string (nullable = true)
 |-- block: string (nullable = true)
 |-- case_number: string (nullable = true)
 |-- community_area: string (nullable = true)
 |-- date: date (nullable = true)
 |-- description: string (nullable = true)
 |-- district: string (nullable = true)
 |-- domestic: boolean (nullable = true)
 |-- fbi_code: string (nullable = true)
 |-- id: string (nullable = true)
 |-- iucr: string (nullable = true)
 |-- location_description: string (nullable = true)
 |-- primary_type: string (nullable = true)
 |-- updated_on: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- year: string (nullable = true)



# Step 5: Filter the data for last ten years

In [67]:
# set up ten years ago based on today's date (actual today, the script will update based on the current world's time)
ten_years_ago = date_sub(current_date(), 365 * 10)
ten_years_ago

Column<'date_sub(current_date(), 3650)'>

In [68]:
filtered_df = df_cleaned.filter(col("date") >= ten_years_ago)
filtered_df.show()

+------+----+--------------------+-----------+--------------+----------+--------------------+--------+--------+--------+--------+----+--------------------+--------------------+--------------------+----+----+
|arrest|beat|               block|case_number|community_area|      date|         description|district|domestic|fbi_code|      id|iucr|location_description|        primary_type|          updated_on|ward|year|
+------+----+--------------------+-----------+--------------+----------+--------------------+--------+--------+--------+--------+----+--------------------+--------------------+--------------------+----+----+
| false|0613|     011XX W 87TH ST|   JJ451188|            71|2025-10-10|              SIMPLE|     006|    true|     08A|13997562|0560|           APARTMENT|             ASSAULT|2025-10-17T15:42:...|  17|2025|
| false|0434|   108XX S HOXIE AVE|   JJ450186|            51|2025-10-10|VIOLATE ORDER OF ...|     004|    true|      26|13997522|4387|           RESIDENCE|       OTHER 

In [71]:
filtered_df.tail(5)

[Row(arrest=False, beat='0921', block='042XX S ROCKWELL ST', case_number='JJ445846', community_area='58', date=datetime.date(2025, 10, 8), description='SIMPLE', district='009', domestic=False, fbi_code='08B', id='13992776', iucr='0460', location_description='SCHOOL - PUBLIC BUILDING', primary_type='BATTERY', updated_on='2025-10-16T15:42:34.000', ward='12', year='2025'),
 Row(arrest=False, beat='1431', block='020XX N CAMPBELL AVE', case_number='JJ444157', community_area='22', date=datetime.date(2025, 10, 8), description='SIMPLE', district='014', domestic=False, fbi_code='08B', id='13991535', iucr='0460', location_description='APARTMENT', primary_type='BATTERY', updated_on='2025-10-16T15:42:34.000', ward='1', year='2025'),
 Row(arrest=True, beat='0522', block='003XX W 115TH ST', case_number='JJ444154', community_area='49', date=datetime.date(2025, 10, 8), description='POSSESS - CRACK', district='005', domestic=False, fbi_code='18', id='13991544', iucr='2027', location_description='CTA BU

# TEST AREA

In [None]:
# dummy text code
## this works, but we get extra columns.
## will try and make this work via the API for the schema
### I used the output to make the NEW schema --10/17/2025
### the new schema works!

# if api_data:
#   df = spark.createDataFrame(api_data)
#   df.printSchema()
#   df.show()
# else:
#     print("No data to create DataFrame.")