In [1]:
import pyspark
import warnings
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

!sed -i 's/hadoop.root.logger=INFO,console/hadoop.root.logger=WARN,console/' /usr/hadoop-3.3.2/etc/hadoop/log4j.properties
warnings.filterwarnings('ignore')

conf = pyspark.SparkConf().setAll(
    [('spark.master', 'local[2]'),
     ('spark.app.name', 'Cleaning')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()

2023-06-03 06:39:16,781 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### 1. Load Dataset

The dataset was procured as a csv from the [City of Chicago](https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present-Dashboard/5cd6-ry5g). To keep even distribution of data across seasons, 2023 data will be removed immediately (incomplete year).

In [2]:
# csv was renamed to `crime.csv`
path = 'home/work/Project/mas-dse-230/crime.csv'

df = spark.read.csv(f'file:///{path}', header=True, inferSchema=True)
df.createOrReplaceTempView('df')
df = df.filter("Year < 2023")

print(f'Rows: {df.count()}, Columns: {len(df.schema)}')

2023-06-03 06:39:46,416 WARN util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

Rows: 7711864, Columns: 30


                                                                                

### 2. Null Handling

It's possible that the distribution of null is time-dependent due to changing data practices. As 7.7M records is already plenty of data, imbalanced years can likely be dropped with minimal impact so long as it comes from an extreme of the dataset.

In [3]:
denulled = df.dropna().groupBy('Year').count()
denulled.createOrReplaceTempView('denulled')

query = '''
    WITH cte AS (
        SELECT Year, COUNT(*) AS total_records
        FROM df
        GROUP BY Year
    )
    SELECT cte.Year, cte.total_records, denulled.count AS records_without_nulls
        , ROUND(1 - denulled.count/cte.total_records, 4) AS proportion_removed
    FROM cte
    INNER JOIN denulled
        ON denulled.Year = cte.Year
    ORDER BY Year DESC
'''
spark.sql(query).show(n=100)



+----+-------------+---------------------+------------------+
|Year|total_records|records_without_nulls|proportion_removed|
+----+-------------+---------------------+------------------+
|2022|       238418|               231291|            0.0299|
|2021|       208616|               201259|            0.0353|
|2020|       212115|               206405|            0.0269|
|2019|       261260|               256918|            0.0166|
|2018|       268786|               261666|            0.0265|
|2017|       269082|               263038|            0.0225|
|2016|       269793|               265316|            0.0166|
|2015|       264759|               256729|            0.0303|
|2014|       275735|               272528|            0.0116|
|2013|       307470|               305056|            0.0079|
|2012|       336264|               333751|            0.0075|
|2011|       351965|               349473|            0.0071|
|2010|       370496|               368307|            0.0059|
|2009|  

                                                                                

Nearly all of 2001 had records with nulls. 2002 had a significant proportion of null-containing records as well (29%).

These years will be removed as will all null records since they account for a relatively small proportion of the total dataset. This null removall may impact the accuracy of downstream analyses.

In [4]:
df = df.filter("Year > 2002").dropna()
df.createOrReplaceTempView('df')

df.count()

                                                                                

6634666

### 3. Category Consolidation

The features `Primary Type` and `Description` (to be further explored in the next notebook) contain categorical values that describe a crime. Many of these labels appear to be overlapping or text variations of each other, for example, `NON-CRIMINAL` and `NON - CRIMINAL`.

The next cells will consolidate these categories.

In [5]:
ptype_consolidation = {
    'CRIM SEXUAL ASSAULT': 'CRIMINAL SEXUAL ASSAULT',
    'NON-CRIMINAL (SUBJECT SPECIFIED)': 'NON-CRIMINAL',
    'NON - CRIMINAL': 'NON-CRIMINAL',
    'OTHER NARCOTIC VIOLATION': 'NARCOTICS',
    'PUBLIC INDECENCY': 'PUBLIC INDECENCY/OBSCENITY',
    'OBSCENITY': 'PUBLIC INDECENCY/OBSCENITY'
}
desc_consolidation = {
    'AGGRAVATED':'AGG',
    'ATTEMPT':'ATT',
    'CRIMINAL':'CRIM',
    'POSSESSION':'POSS',
    'POS ':'POSS',
    'POSESS:':'POSS ',
    'POSESS ':'POSS ',    
    'REGISTRATION':'REG',
    'PRO. ':'PROFESSIONAL ',
    'PO ': 'POLICE OFFICER ',
    'P.O.':'POLICE OFFICER',
    'RITUAL': 'RIT',
    'MUTILATION':'MUT',
    'INSTRUMENT':'INSTR',
    'MANU/DEL:':'MANUFACTURE DELIVER',
    'MANU/DELIVER:':'MANUFACTURE DELIVER',
    'MANUFACTURE / DELIVER':'MANUFACTURE DELIVER'
}

for k, v in ptype_consolidation.items():
    df = df.replace(k, v, 'Primary Type')
    
replacement_expr = col('Description')
for k, v in desc_consolidation.items():
    replacement_expr = regexp_replace(replacement_expr, k, v)
df = df.withColumn('Description', replacement_expr)
df = df.withColumn('Description', regexp_replace(df['Description'], '[^A-Za-z0-9]', ''))

Cleaning is now done, specify the output path to generate the new csv which will be the input for the next notebooks.

In [6]:
write_path = 'home/work/Project/mas-dse-230/cleaned_crime.csv'
df.coalesce(1).write.mode('overwrite').csv(f'file:///{write_path}', header=True)

                                                                                

In [7]:
spark.stop()