In [1]:
#installing pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=533d52f0665a552faff648226b2bbb82a929cbaa1e0feb8d0dd7c0327caca2a7
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
#importing required libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType,IntegerType,TimestampType
from pyspark.sql.functions import year, month,col,first,to_timestamp,count,desc,isnull
from pyspark.sql import functions as F
spark = SparkSession.builder.appName("Crimedata").getOrCreate()

In [None]:
# (1) schema for crime dataset
schema_dict = {
    "X": StringType(),
    "Y": StringType(),
    "RowID": StringType(),
    "CCNumber": StringType(),
    "CrimeDateTime": StringType(),
    "CrimeCode": StringType(),
    "Description": StringType(),
    "Inside_Outside": StringType(),
    "Weapon": StringType(),
    "Post": StringType(),
    "Gender": StringType(),
    "Age": IntegerType(),
    "Race": StringType(),
    "Ethnicity": StringType(),
    "Location": StringType(),
    "Old_District": StringType(),
    "New_District": StringType(),
    "Neighborhood": StringType(),
    "Latitude": StringType(),
    "Longitude": StringType(),
    "GeoLocation": StringType(),
    "PremiseType": StringType(),
    "Total_Incidents": IntegerType()
}

# Creating StructFields from the dictionary
fields = [StructField(key, value, True) for key, value in schema_dict.items()]

# Creating StructType from the StructFields
schema = StructType(fields)




In [None]:
# (2) reading csv file using schema definition
from pyspark.sql import SparkSession
df = spark.read.csv("Crime_Data.csv", schema=schema)

In [None]:
# (3) cache the dataframe
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Caching DataFrame") \
    .getOrCreate()

df = spark.read.csv("Crime_Data.csv", header=True)  # Replace "your_csv_file.csv" with your CSV file

# (3) cache the dataframe
df.cache()

DataFrame[X: string, Y: string, RowID: string, CCNumber: string, CrimeDateTime: string, CrimeCode: string, Description: string, Inside_Outside: string, Weapon: string, Post: string, Gender: string, Age: string, Race: string, Ethnicity: string, Location: string, Old_District: string, New_District: string, Neighborhood: string, Latitude: string, Longitude: string, GeoLocation: string, PremiseType: string, Total_Incidents: string]

In [None]:
df.show()

+-----------------+----------------+-----+--------+--------------------+---------+-----------------+--------------+-------+----+------+----+--------------------+---------+------------------+------------+------------+--------------------+------------------+-------------------+--------------------+--------------------+---------------+
|                X|               Y|RowID|CCNumber|       CrimeDateTime|CrimeCode|      Description|Inside_Outside| Weapon|Post|Gender| Age|                Race|Ethnicity|          Location|Old_District|New_District|        Neighborhood|          Latitude|          Longitude|         GeoLocation|         PremiseType|Total_Incidents|
+-----------------+----------------+-----+--------+--------------------+---------+-----------------+--------------+-------+----+------+----+--------------------+---------+------------------+------------+------------+--------------------+------------------+-------------------+--------------------+--------------------+------------

In [None]:
# (4) Show the count of the rows
spark = SparkSession.builder \
    .appName("Spark SQL Operations") \
    .getOrCreate()
df.createOrReplaceTempView("temp_table")
count_query = "SELECT COUNT(*) AS count FROM temp_table"
count_df = spark.sql(count_query)
count_df.show()

+------+
| count|
+------+
|587356|
+------+



In [None]:
# (5) Print the schema
df.printSchema()

root
 |-- X: string (nullable = true)
 |-- Y: string (nullable = true)
 |-- RowID: string (nullable = true)
 |-- CCNumber: string (nullable = true)
 |-- CrimeDateTime: string (nullable = true)
 |-- CrimeCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Inside_Outside: string (nullable = true)
 |-- Weapon: string (nullable = true)
 |-- Post: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Race: string (nullable = true)
 |-- Ethnicity: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Old_District: string (nullable = true)
 |-- New_District: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- GeoLocation: string (nullable = true)
 |-- PremiseType: string (nullable = true)
 |-- Total_Incidents: string (nullable = true)



In [None]:
# (6) Display first 5 rows
first_5_rows_query = "SELECT * FROM temp_table LIMIT 5"
first_5_rows_df = spark.sql(first_5_rows_query)
first_5_rows_df.show()

+-----------------+----------------+-----+--------+--------------------+---------+--------------+--------------+-------+----+------+----+--------------------+---------+------------------+------------+------------+-------------------+---------+----------+--------------------+--------------------+---------------+
|                X|               Y|RowID|CCNumber|       CrimeDateTime|CrimeCode|   Description|Inside_Outside| Weapon|Post|Gender| Age|                Race|Ethnicity|          Location|Old_District|New_District|       Neighborhood| Latitude| Longitude|         GeoLocation|         PremiseType|Total_Incidents|
+-----------------+----------------+-----+--------+--------------------+---------+--------------+--------------+-------+----+------+----+--------------------+---------+------------------+------------+------------+-------------------+---------+----------+--------------------+--------------------+---------------+
|          -76.665|39.2945260000001|    1|13L06334|2013/12/14

In [None]:
# (7.1) Distinct crime codes
unique_crime_codes = df.select("CrimeCode").distinct()
unique_crime_codes.show()

+---------+
|CrimeCode|
+---------+
|       3P|
|       3K|
|      3BJ|
|       1A|
|       3M|
|       5F|
|       4B|
|       3B|
|       7A|
|      3NF|
|      3EF|
|       5D|
|       3N|
|       6K|
|      3LO|
|      3AF|
|       7B|
|      3GO|
|     3AJF|
|      8GV|
+---------+
only showing top 20 rows



In [None]:
# (7.2) Count the number of crimes by the crime codes and order by the resulting counts in descending order
count_of_crimes_desc = df.groupBy("CrimeCode").count().sort(col("count").desc())
count_of_crimes_desc.show()

+---------+------+
|CrimeCode| count|
+---------+------+
|       4E|107496|
|       6D| 73509|
|       5A| 46654|
|       7A| 42082|
|       6J| 32324|
|       6G| 28606|
|       6C| 26705|
|       4C| 24285|
|       6E| 23973|
|       4B| 19742|
|       4A| 16307|
|      3AF| 16096|
|       5D| 14970|
|       7C| 12577|
|       6F| 11884|
|       3B| 10645|
|       5B|  7944|
|       4D|  7345|
|       9S|  6543|
|       5C|  4912|
+---------+------+
only showing top 20 rows



In [None]:
# (7.3) neighbourhood having most crimes
crime_count_neighborhood = df.groupBy("neighborhood").count().sort(col("count").desc())
crime_count_neighborhood.first()

Row(neighborhood='DOWNTOWN', count=20267)

In [None]:
# (7.4) which month of the year had most crimes?
df = df.withColumn("CrimeDateTime", to_timestamp(df["CrimeDateTime"], "yyyy/MM/dd HH:mm:ssX"))
df = df.withColumn("Year", year("CrimeDateTime"))#creating a column for year and month
df = df.withColumn("Month", month("CrimeDateTime"))
crime_count = df.groupBy(["Year", "Month"]).agg(F.count("*").alias("Count"))

#filtering the null values in the year column
crime_high_in_month = crime_count.filter(~isnull("Year")).groupBy("Year").agg(
                        first("Month").alias("month"),
                        first("Count").alias("count")
                        ).sort(col("count").desc())

crime_high_in_month.show(20)

+----+-----+-----+
|Year|month|count|
+----+-----+-----+
|2018|   10| 4577|
|2023|    9| 4540|
|2016|    7| 4107|
|2012|   10| 4106|
|2011|    3| 4104|
|2019|   10| 3924|
|2017|    3| 3784|
|2014|    4| 3691|
|2022|   10| 3684|
|2021|    8| 3488|
|2020|    6| 3140|
|2013|    2| 3116|
|2015|    2| 2606|
|2008|    1|    4|
|1978|    1|    3|
|2001|    1|    3|
|2010|    7|    3|
|2006|    7|    2|
|2004|    4|    2|
|1982|    6|    2|
+----+-----+-----+
only showing top 20 rows



In [None]:
# (7.5) Weapons used
unique_weapons_used = df.select("weapon").distinct()
unique_weapons_used.show()

+--------------------+
|              weapon|
+--------------------+
|KNIFE_CUTTING_INS...|
|     AUTOMATIC_RIFLE|
|        BLUNT_OBJECT|
|       OTHER_FIREARM|
|        ASPHYXIATION|
|                  NA|
|FIRE_INCENDIARY_D...|
|   AUTOMATIC_HANDGUN|
|               HANDS|
|   AUTOMATIC_SHOTGUN|
|               KNIFE|
|             UNKNOWN|
|    PERSONAL_WEAPONS|
|          EXPLOSIVES|
|MOTOR_VEHICLE_VESSEL|
|               OTHER|
|   AUTOMATIC_FIREARM|
|DRUGS_NARCOTICS_S...|
|               RIFLE|
|                FIRE|
+--------------------+
only showing top 20 rows



In [None]:
# (7.6) Which weapon was used the most?
df = df.filter(~df["Weapon"].isin("NA", "None", "Unknown")) #filtering the null
weapon_crime_counts = df.groupBy("Weapon").count().orderBy(col("count").desc())
weapon_crime_counts.first()

Row(Weapon='FIREARM', count=47672)