In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions as F

In [2]:
#create a Spark application
spark = SparkSession.builder.appName('SparkBasics').getOrCreate()
spark

23/11/05 22:18:24 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### 1) PySpark environment and Data Ingestion

a. Print the configuration settings of the PySpark environmen

In [3]:
spark.sparkContext.getConf().getAll()

http://qdxd6ahq2fdqbehoghgsusv574-dot-us-central1.dataproc.googleusercontent.com:80/gateway/default/yarn/proxy/application_1699123760114_0002'),
 ('spark.app.submitTime', '1699128252276'),
 ('spark.dataproc.sql.joinConditionReorder.enabled', 'true'),
 ('spark.dataproc.sql.local.rank.pushdown.enabled', 'true'),
 ('spark.yarn.unmanagedAM.enabled', 'true'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
 ('spark.metrics.namespace',
  'app_name:${spark.app.name}.app_id:${spark.app.id}'),
 ('spark.driver.maxResultSize', '1024m'),
 ('spark.dataproc.sql.optimizer.leftsemijoin.conversion.enabled', 'true'),
 ('spark.hadoop.hive.execution.engine', 'mr'),
 ('spark.eventLog.dir',
  'gs:/dataproc-temp-us-central1-798309803866-frmiwt84/69975949-d88e-46d5-9c93-a7f23d7f22f5/spark-job-history'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version', '2'),
 ('spark.dynamicAllocation.

b. Read the Chicago crimes dataset into a PySpark datafram

In [3]:
df = spark.read.csv("gs://gcs-de-2-annabella/Crimes_-_2001_to_Present.csv", header=True)
print(df.show(5))

[Stage 1:>                                                          (0 + 1) / 1]

+--------+-----------+--------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|                Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+--------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|11037294|   JA371270|03/18/2015 12:00:...|   0000X W WACKER DR|1153|DECEPTIVE PRACTICE|FINANCIAL IDENTIT...|                BANK| false|   false|0111|     001|  42|            32|      11| 

                                                                                

c. Print summary statistics of the data set

In [6]:
df.summary().show()

                                                                                

+-------+------------------+------------------+--------------------+--------------+-----------------+-----------------+---------------+--------------------+-------+--------+-----------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+--------------------+-------------------+--------------------+--------------------+
|summary|                ID|       Case Number|                Date|         Block|             IUCR|     Primary Type|    Description|Location Description| Arrest|Domestic|             Beat|         District|              Ward|    Community Area|          FBI Code|      X Coordinate|     Y Coordinate|              Year|          Updated On|           Latitude|           Longitude|            Location|
+-------+------------------+------------------+--------------------+--------------+-----------------+-----------------+---------------+--------------------+-------+--------+---------------

d. Inspect the data partitions and repartition if needed

In [11]:
#display number of records by partition
def displaypartitions(df):
    #number of records by partition
    num = df.rdd.getNumPartitions()
    print("Partitions:", num)
    df.withColumn("partitionId", F.spark_partition_id())\
        .groupBy("partitionId")\
        .count()\
        .orderBy(F.asc("count"))\
        .show(num)
displaypartitions(df)

Partitions: 14




+-----------+------+
|partitionId| count|
+-----------+------+
|         13|522333|
|          0|563918|
|         12|565002|
|         11|565438|
|         10|566124|
|          1|566449|
|          9|566755|
|          2|567841|
|          7|567875|
|          6|567927|
|          8|568058|
|          5|571033|
|          3|572629|
|          4|585896|
+-----------+------+



                                                                                

### 2) Data Transformation

a. Drop the columns beat, ward, latitude and longitude column

In [4]:
df = df.drop('Beat','Ward','Latitude','Longitude')

b. Convert remaining columns to appropriate data types. Make your best assumptions by sampling the
data. View schema again to ensure that data types have been converted

In [13]:
df.dtypes

[('ID', 'string'),
 ('Case Number', 'string'),
 ('Date', 'string'),
 ('Block', 'string'),
 ('IUCR', 'string'),
 ('Primary Type', 'string'),
 ('Description', 'string'),
 ('Location Description', 'string'),
 ('Arrest', 'string'),
 ('Domestic', 'string'),
 ('District', 'string'),
 ('Community Area', 'string'),
 ('FBI Code', 'string'),
 ('X Coordinate', 'string'),
 ('Y Coordinate', 'string'),
 ('Year', 'string'),
 ('Updated On', 'string'),
 ('Location', 'string')]

In [6]:
from pyspark.sql.functions import unix_timestamp, from_unixtime
df = df.select('Date', from_unixtime(unix_timestamp('Date', 'MM-dd-yyyy HH:mm:ss')).alias('Date_n'))

In [8]:
df.select('Date_n').distinct().show()



+------+
|Date_n|
+------+
|  null|
+------+



                                                                                

In [112]:
from pyspark.sql.types import BooleanType
df = df.withColumn('Arrest', F.col('Arrest').cast(BooleanType()))
df = df.withColumn('Domestic', F.col('Domestic').cast(BooleanType()))

In [28]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- District: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: timestamp (nullable = true)
 |-- Location: string (nullable = true)



c. Add a month column and community name (from metadata) to the dataset

In [148]:
# Add a month column
from pyspark.sql.functions import month
df = df.withColumn('Month', month(df.Date))

In [45]:
# Add community name column
from pyspark.sql.functions import lit
name = ['ROGERS PARK','WEST RIDGE','UPTOWN','LINCOLN SQUARE','NORTH CENTER','LAKE VIEW','LINCOLN PARK','NEAR NORTH SIDE','EDISON PARK','NORWOOD PARK','JEFFERSON PARK','FOREST GLEN','NORTH PARK','ALBANY PARK','PORTAGE PARK','IRVING PARK','DUNNING','MONTCLARE','BELMONT CRAGIN','HERMOSA','AVONDALE',
        'LOGAN SQUARE','HUMBOLDT PARK','WEST TOWN','AUSTIN','WEST GARFIELD PARK','EAST GARFIELD PARK','NEAR WEST SIDE','NORTH LAWNDALE','SOUTH LAWNDALE','LOWER WEST SIDE','LOOP','NEAR SOUTH SIDE','ARMOUR SQUARE','DOUGLAS','OAKLAND','FULLER PARK','GRAND BOULEVARD','KENWOOD','WASHINGTON PARK','HYDE PARK',
        'WOODLAWN','SOUTH SHORE','CHATHAM','AVALON PARK','SOUTH CHICAGO','BURNSIDE','CALUMET HEIGHTS','ROSELAND','PULLMAN','SOUTH DEERING','EAST SIDE','WEST PULLMAN','RIVERDALE','HEGEWISCH','GARFIELD RIDGE','ARCHER HEIGHTS','BRIGHTON PARK','MCKINLEY PARK','BRIDGEPORT','NEW CITY','WEST ELSDON','GAGE PARK',
        'CLEARING','WEST LAWN','CHICAGO LAWN','WEST ENGLEWOOD','ENGLEWOOD','GREATER GRAND CROSSING','ASHBURN','AUBURN GRESHAM','BEVERLY','WASHINGTON HEIGHTS','MOUNT GREENWOOD','MORGAN PARK','OHARE','EDGEWATER']

In [63]:
from pyspark.sql.functions import when
Conditional_values = [(df['Community Area'] == i, name[i]) for i in range(len(name))]
for c, n in Conditional_values:
    df = df.withColumn('Community Name', when(c,n).otherwise(F.col('Community Name')))

In [65]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- District: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Community Name: string (nullable = true)



### 3) Explore data by crime attributes

a. Group and count crimes where description begins with the word “aggravated”

In [40]:
Agg = df.filter("Description like 'AGGRAVATED%'").groupby('Primary Type').count()
Agg.show(5)



+--------------------+------+
|        Primary Type| count|
+--------------------+------+
|OFFENSE INVOLVING...|  2729|
|CRIMINAL SEXUAL A...|  1585|
|            STALKING|   151|
|               ARSON|  2076|
|             ASSAULT|139016|
+--------------------+------+
only showing top 5 rows



                                                                                

b. Which crime type is the most prevalent in apartments and which community has it occurred the mos

In [43]:
df.filter(df['Location Description']=='APARTMENT').groupby('Primary Type').count().orderBy(['count'], ascending=[0]).show(1)



+------------+------+
|Primary Type| count|
+------------+------+
|     BATTERY|306264|
+------------+------+
only showing top 1 row



                                                                                

In [84]:
df.filter(df['Primary Type'] == 'BATTERY').groupby('Community Name').count().orderBy(['count'],ascending=[0]).show(2)



+------------------+------+
|    Community Name| count|
+------------------+------+
|              null|130482|
|WEST GARFIELD PARK| 91900|
+------------------+------+
only showing top 2 rows



                                                                                

c. What is the maximum number of weapons violations per month that occurred in 2020

In [89]:
from pyspark.sql.functions import max
df.filter(df['Year'] == 2020).filter(df['Primary Type'] == 'WEAPONS VIOLATION').groupby('Month').count().agg(F.max('count')).show()



+----------+
|max(count)|
+----------+
|      8432|
+----------+



                                                                                

d. What percentage of the domestic crimes led to an arrest ?

In [92]:
dom_arrest = df.filter(df['Arrest']==True).groupby('Domestic').count()
dom_arrest.show()



+--------+-------+
|Domestic|  count|
+--------+-------+
|    true| 265354|
|   false|1785151|
+--------+-------+



                                                                                

In [95]:
per = 265354 * 100 / 1785151
print(f'percentage of the domestic crimes led to an arrest: {per} %')

percentage of the domestic crimes led to an arrest: 14.86451286193717 %


### 4) Explore data by date and time

a. Which day of the week and which month have the most and the least crimes on averag

In [97]:
#Month
df.groupby('Month').count().orderBy(['count'], ascending=[0]).show()



+-----+-------+
|Month|  count|
+-----+-------+
| null|7917278|
+-----+-------+



                                                                                

In [None]:
#Day of week
from pyspark.sql.functions import date_format
df = df.withColumn("DayOfWeek", date_format(F.col("Date"), "E"))
df.groupby('DayOfWeek').count().orderBy(['count'], ascending=[0]).show()

b. Which date had the most number of homicides in the dataset. How many days passed between this date
and the next highest number of homicides

In [None]:
df.filter(df['Primary Type'=='HOMICIDES']).groupby('DayOfWeek').count().orderBy(['count'], ascending=[0]).show()

c. Plot a monthly time series line chart of all crimes for the last 3 years

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
mon_crime = df.filter(df['Year']==2023 & df['Year']==2022 & df['Year']==2021).groupby('Year','Month').count().orderBy(["Year", "Month"], ascending=[0, 0])
pl = mon_crime.toPandas()
pl.plot(y="count", figsize=(15,4), style="-")

d. Plot a year over year comparison for 3 years (2020, 2021, 2022) by top 5 crime types

In [None]:
crime_type = df.filter(df['Year']==2023 & df['Year']==2022 & df['Year']==2021).groupby('Primary Type').count().order


### 5) Explore by location

a. Use a window function to calculate the community rank based on total crime figures (highest to lowest),
where the community with the highest crime will have rank 1. Your results set should have 1 row for
each community, with a column for the community name and the rank. You can also add a column with
the total crime count if it helps you

b. Use a window function to calculate a rolling 7 day sum of crimes over time within each community Your
results set should have 3 columns: community, date, and the rolling/lagging 7 day sum

c. Use window functions to calculate a 7 day moving average and cumulative sum of crimes over
time within each community. Your results set should have 4 columns: community, date, the 7 day
moving average, and the cumulative sum

d. Cross-tabulate Crime Types vs Location description and visualize it through a heatmap

### 6) Impact of Covid-19

a. Bring in daily Covid cases data from the City of Chicago data portal and load into your data lake or Hive
table

b. Create summarized daily total counts of the daily crime data by crime type

c. Join daily total covid cases and death data with daily chicago crimes data starting Jan 2020

d. Perform a thorough analysis in PySpark on how Covid-19 has impacted various types of crimes
compared to previous years.