In [0]:
from pyspark.sql.functions import col, count, avg, min, max, year, hour, trim, explode, split, when, mean

In [0]:
from pyspark.sql import SparkSession

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [0]:
# File location and type
file_location = "/FileStore/police_project.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

stop_date,stop_time,county_name,driver_gender,driver_age_raw,driver_age,driver_race,violation_raw,violation,search_conducted,search_type,stop_outcome,is_arrested,stop_duration,drugs_related_stop
2005-01-02,01:55,,M,1985.0,20.0,White,Speeding,Speeding,False,,Citation,False,0-15 Min,False
2005-01-18,08:15,,M,1965.0,40.0,White,Speeding,Speeding,False,,Citation,False,0-15 Min,False
2005-01-23,23:15,,M,1972.0,33.0,White,Speeding,Speeding,False,,Citation,False,0-15 Min,False
2005-02-20,17:15,,M,1986.0,19.0,White,Call for Service,Other,False,,Arrest Driver,True,16-30 Min,False
2005-03-14,10:00,,F,1984.0,21.0,White,Speeding,Speeding,False,,Citation,False,0-15 Min,False
2005-03-23,09:45,,M,1982.0,23.0,Black,Equipment/Inspection Violation,Equipment,False,,Citation,False,0-15 Min,False
2005-04-01,17:30,,M,1969.0,36.0,White,Speeding,Speeding,False,,Citation,False,0-15 Min,False
2005-06-06,13:20,,F,1986.0,19.0,White,Speeding,Speeding,False,,Citation,False,0-15 Min,False
2005-07-13,10:15,,M,1970.0,35.0,Black,Speeding,Speeding,False,,Citation,False,0-15 Min,False
2005-07-13,15:45,,M,1970.0,35.0,White,Speeding,Speeding,False,,Citation,False,0-15 Min,False


In [0]:
df.printSchema()

root
 |-- stop_date: string (nullable = true)
 |-- stop_time: string (nullable = true)
 |-- county_name: string (nullable = true)
 |-- driver_gender: string (nullable = true)
 |-- driver_age_raw: double (nullable = true)
 |-- driver_age: double (nullable = true)
 |-- driver_race: string (nullable = true)
 |-- violation_raw: string (nullable = true)
 |-- violation: string (nullable = true)
 |-- search_conducted: boolean (nullable = true)
 |-- search_type: string (nullable = true)
 |-- stop_outcome: string (nullable = true)
 |-- is_arrested: boolean (nullable = true)
 |-- stop_duration: string (nullable = true)
 |-- drugs_related_stop: boolean (nullable = true)



In [0]:
df.describe().show()

+-------+----------+---------+-----------+-------------+------------------+------------------+-----------+-------------+---------+--------------------+-------------+------------------+
|summary| stop_date|stop_time|county_name|driver_gender|    driver_age_raw|        driver_age|driver_race|violation_raw|violation|         search_type| stop_outcome|     stop_duration|
+-------+----------+---------+-----------+-------------+------------------+------------------+-----------+-------------+---------+--------------------+-------------+------------------+
|  count|     91741|    91741|          0|        86406|             86414|             86120|      86408|        86408|    86408|                3196|        86408|             86408|
|   mean|      null|     null|       null|         null|1970.4912282731964|34.011333023687875|       null|         null|     null|                null|         null|               1.5|
| stddev|      null|     null|       null|         null|110.91490937586805|

In [0]:
from pyspark.sql.functions import col, sum

# Calculate the sum of null values for each column
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

# Display the result
null_counts.show()


+---------+---------+-----------+-------------+--------------+----------+-----------+-------------+---------+----------------+-----------+------------+-----------+-------------+------------------+
|stop_date|stop_time|county_name|driver_gender|driver_age_raw|driver_age|driver_race|violation_raw|violation|search_conducted|search_type|stop_outcome|is_arrested|stop_duration|drugs_related_stop|
+---------+---------+-----------+-------------+--------------+----------+-----------+-------------+---------+----------------+-----------+------------+-----------+-------------+------------------+
|        0|        0|      91741|         5335|          5327|      5621|       5333|         5333|     5333|               0|      88545|        5333|       5333|         5333|                 0|
+---------+---------+-----------+-------------+--------------+----------+-----------+-------------+---------+----------------+-----------+------------+-----------+-------------+------------------+



county_name has 100% missing values

In [0]:
# Number of rows
num_rows = df.count()

# Number of columns
num_columns = len(df.columns)

# Print shape
print(f"Shape: ({num_rows}, {num_columns})")

Shape: (91741, 15)


In [0]:
# Calculate the total number of rows
total_rows = df.count()

# Loop through each column and calculate the percentage of missing values
for column in df.columns:
    missing_count = df.select(sum(col(column).isNull().cast("int"))).collect()[0][0]
    missing_percentage = (missing_count / total_rows) * 100
    print(f"{column: <20}: ==============> {missing_percentage:.2f}%")



In [0]:
# Filter columns that are not entirely null
non_empty_columns = [column for column in df.columns if df.filter(col(column).isNotNull()).count() > 0]

# Select only the non-empty columns
df_non_empty = df.select(*non_empty_columns)

# Get the shape of the resulting DataFrame
num_rows = df_non_empty.count()
num_columns = len(df_non_empty.columns)

# Print shape
print(f"Shape after dropping all-null columns: ({num_rows}, {num_columns})")

Shape after dropping all-null columns: (91741, 14)


In [0]:
# Drop the 'county_name' column
df = df.drop('county_name')

In [0]:
# Calculate the total number of rows
total_rows = df.count()

# Loop through each column and calculate the percentage of missing values
for column in df.columns:
    missing_count = df.select(sum(col(column).isNull().cast("int"))).collect()[0][0]
    missing_percentage = (missing_count / total_rows) * 100
    print(f"{column: <20}: ==============> {missing_percentage:.2f}%")



In [0]:
# Register the DataFrame as a temporary view
df.createOrReplaceTempView("policing_data")

# Question: **_Do_** men or women speed more often?

### Counting occurrences of each unique value in driver_gender:

## **SQL**

In [0]:
%sql
SELECT driver_gender, COUNT(*) AS count
FROM policing_data
WHERE driver_gender IS NOT NULL
GROUP BY driver_gender
ORDER BY driver_gender;


driver_gender,count
F,23511
M,62895


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT driver_gender, COUNT(*) AS count
FROM policing_data
WHERE driver_gender IS NOT NULL
GROUP BY driver_gender
ORDER BY count DESC;


driver_gender,count
M,62895
F,23511


## **Pyspark**

In [0]:
# Filter null values
df = df.filter(col("driver_gender").isNotNull())
# Count occurrences of each gender
df.groupBy("driver_gender").count().orderBy("driver_gender").show()


+-------------+-----+
|driver_gender|count|
+-------------+-----+
|            F|23511|
|            M|62895|
+-------------+-----+



### Counting occurrences of each unique value in driver_gender as a percentage (normalized count):

In [0]:
%sql
SELECT driver_gender, 
       (COUNT(*) * 100.0 / (SELECT COUNT(*) FROM policing_data)) AS percentage
FROM policing_data
WHERE driver_gender IS NOT NULL
GROUP BY driver_gender
ORDER BY percentage DESC;


driver_gender,percentage
M,68.55713366978777
F,25.62758199714413


### Counting occurrences of each unique value in violation

## **SQL**

In [0]:
%sql
SELECT violation, COUNT(*) AS count
FROM policing_data
WHERE violation IS NOT NULL
GROUP BY violation
ORDER BY count DESC;


violation,count
Speeding,48463
Moving violation,16224
Equipment,11020
Other,4317
Registration/plates,3432
Seat belt,2952


Databricks visualization. Run in Databricks to view.

### Counting occurrences of each unique value in violation as a percentage:

In [0]:
%sql
SELECT violation, 
       (COUNT(*) * 100.0 / (SELECT COUNT(*) FROM policing_data WHERE violation IS NOT NULL)) AS percentage
FROM policing_data
WHERE violation IS NOT NULL
GROUP BY violation
ORDER BY percentage DESC;


violation,percentage
Speeding,56.08624201462828
Moving violation,18.77603925562448
Equipment,12.75344875474493
Other,4.99606517915008
Registration/plates,3.97185445792056
Seat belt,3.41635033793167


## **PySpark**

In [0]:
# Count of violations
df.groupBy("violation").count().orderBy(col("count").desc()).show()

# Percentage distribution of violations
violation_total = df.filter(col("violation").isNotNull()).count()
df.groupBy("violation").agg((count("*") * 100 / violation_total).alias("percentage")).orderBy(col("percentage").desc()).show()


+-------------------+-----+
|          violation|count|
+-------------------+-----+
|           Speeding|48461|
|   Moving violation|16224|
|          Equipment|11020|
|              Other| 4317|
|Registration/plates| 3432|
|          Seat belt| 2952|
+-------------------+-----+

+-------------------+------------------+
|          violation|        percentage|
+-------------------+------------------+
|           Speeding| 56.08522556303961|
|   Moving violation|18.776473855982225|
|          Equipment|12.753743952966229|
|              Other| 4.996180820776335|
|Registration/plates| 3.971946392611624|
|          Seat belt|3.4164294146239844|
+-------------------+------------------+



## **SQL**

### Counting occurrences of each driver_gender for violation = 'Speeding'

In [0]:
%sql
SELECT driver_gender, COUNT(*) AS count
FROM policing_data
WHERE violation = 'Speeding' AND driver_gender IS NOT NULL
GROUP BY driver_gender
ORDER BY count DESC;


driver_gender,count
M,32979
F,15482


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT violation, driver_gender, COUNT(*) AS count
FROM policing_data
WHERE driver_gender IS NOT NULL
GROUP BY violation, driver_gender
ORDER BY violation, count DESC;


violation,driver_gender,count
Equipment,M,8533
Equipment,F,2487
Moving violation,M,13020
Moving violation,F,3204
Other,M,3627
Other,F,690
Registration/plates,M,2419
Registration/plates,F,1013
Seat belt,M,2317
Seat belt,F,635


Databricks visualization. Run in Databricks to view.

### Finding proportaion of males and females held responsible for speeding

In [0]:
%sql
SELECT violation,
       (COUNT(*) * 100.0 / (SELECT COUNT(*) FROM policing_data WHERE driver_gender = 'M')) AS percentage
FROM policing_data
WHERE driver_gender = 'M' AND violation='Speeding'
GROUP BY violation
ORDER BY percentage DESC;


violation,percentage
Speeding,52.43501073217267


In [0]:
%sql
SELECT violation, 
       (COUNT(*) * 100.0 / (SELECT COUNT(*) FROM policing_data WHERE driver_gender = 'F')) AS percentage
FROM policing_data
WHERE driver_gender = 'F' AND violation='Speeding'
GROUP BY violation
ORDER BY percentage DESC;


violation,percentage
Speeding,65.85002764663349


## **PySpark**

In [0]:
# Count of violations
df.groupBy("violation").count().orderBy(col("count").desc()).show()

# Percentage distribution of violations
violation_total = df.filter(col("violation").isNotNull()).count()
df.groupBy("violation").agg((count("*") * 100 / violation_total).alias("percentage")).orderBy(col("percentage").desc()).show()

# Count gender for speeding
df.filter(col("violation") == "Speeding").groupBy("driver_gender").count().orderBy(col("count").desc()).show()

# Violation counts by gender
df.groupBy("violation", "driver_gender").count().orderBy("violation", col("count").desc()).show()

# Male speeding proportion
male_total = df.filter(col("driver_gender") == "M").count()
df.filter((col("violation") == "Speeding") & (col("driver_gender") == "M")) \
    .agg((count("*") * 100 / male_total).alias("percentage")).show()

# Female speeding proportion
female_total = df.filter(col("driver_gender") == "F").count()
df.filter((col("violation") == "Speeding") & (col("driver_gender") == "F")) \
    .agg((count("*") * 100 / female_total).alias("percentage")).show()

+-------------------+-----+
|          violation|count|
+-------------------+-----+
|           Speeding|48461|
|   Moving violation|16224|
|          Equipment|11020|
|              Other| 4317|
|Registration/plates| 3432|
|          Seat belt| 2952|
+-------------------+-----+

+-------------------+------------------+
|          violation|        percentage|
+-------------------+------------------+
|           Speeding| 56.08522556303961|
|   Moving violation|18.776473855982225|
|          Equipment|12.753743952966229|
|              Other| 4.996180820776335|
|Registration/plates| 3.971946392611624|
|          Seat belt|3.4164294146239844|
+-------------------+------------------+

+-------------+-----+
|driver_gender|count|
+-------------+-----+
|            M|32979|
|            F|15482|
+-------------+-----+

+-------------------+-------------+-----+
|          violation|driver_gender|count|
+-------------------+-------------+-----+
|          Equipment|            M| 8533|
|      

_**According to the above analysis it can be seen that when a man is pulled over, 52% of the times the violation is speeding and when a female is pulled over, 65% of the times the violation is speeding.**_

# **_Question: Does gender affects who gets searched during a pullover?_**

## **SQL**

### Counting occurrences of each unique value in search_conducted:

In [0]:
%sql
SELECT search_conducted, COUNT(*) AS count
FROM policing_data
WHERE driver_gender IS NOT NULL
GROUP BY search_conducted
ORDER BY count DESC;


search_conducted,count
False,83210
True,3196


Out of the total cases only 3196 data represents for searches

### Counting occurrences of each driver_gender where search_conducted is True

In [0]:
%sql
SELECT driver_gender, COUNT(*) AS count
FROM policing_data
WHERE search_conducted = True and driver_gender IS NOT NULL
GROUP BY driver_gender
ORDER BY count DESC;


driver_gender,count
M,2725
F,471


Databricks visualization. Run in Databricks to view.

From the searched cases 2725 are men and only 471 is women

In [0]:
%sql
WITH searched AS (
    SELECT driver_gender, COUNT(*) AS count_searched
    FROM policing_data
    WHERE driver_gender IS NOT NULL AND search_conducted = True
    GROUP BY driver_gender
),
not_searched AS (
    SELECT driver_gender, COUNT(*) AS count_not_searched
    FROM policing_data
    WHERE driver_gender IS NOT NULL AND search_conducted = False
    GROUP BY driver_gender
)
SELECT COALESCE(searched.driver_gender, not_searched.driver_gender) AS driver_gender,
       COALESCE(count_searched, 0) AS count_searched,
       COALESCE(count_not_searched, 0) AS count_not_searched
FROM searched
FULL OUTER JOIN not_searched ON searched.driver_gender = not_searched.driver_gender
ORDER BY driver_gender;


driver_gender,count_searched,count_not_searched
F,471,23040
M,2725,60170


Databricks visualization. Run in Databricks to view.

## **PySpark**

In [0]:
# Count searches conducted
df.filter(col("driver_gender").isNotNull()).groupBy("search_conducted").count().orderBy(col("count").desc()).show()

# Gender-wise search count where conducted
df.filter(col("search_conducted") == True).groupBy("driver_gender").count().orderBy(col("count").desc()).show()

# Total counts for each gender
total_by_gender = df.groupBy("driver_gender").agg(count("*").alias("total"))

# Searches conducted
searched = df.filter(col("search_conducted") == True) \
    .groupBy("driver_gender") \
    .agg(count("*").alias("count_searched"))

# Searches not conducted
not_searched = df.filter(col("search_conducted") == False) \
    .groupBy("driver_gender") \
    .agg(count("*").alias("count_not_searched"))

# Combine results and calculate percentages
result = searched.join(not_searched, "driver_gender", "outer") \
    .join(total_by_gender, "driver_gender", "outer") \
    .withColumn("searched_percentage", (col("count_searched") * 100 / col("total"))) \
    .withColumn("not_searched_percentage", (col("count_not_searched") * 100 / col("total"))) \
    .orderBy("driver_gender")

# Display the results
result.show()

+----------------+-----+
|search_conducted|count|
+----------------+-----+
|           false|83210|
|            true| 3196|
+----------------+-----+

+-------------+-----+
|driver_gender|count|
+-------------+-----+
|            M| 2725|
|            F|  471|
+-------------+-----+

+-------------+--------------+------------------+-----+-------------------+-----------------------+
|driver_gender|count_searched|count_not_searched|total|searched_percentage|not_searched_percentage|
+-------------+--------------+------------------+-----+-------------------+-----------------------+
|            F|           471|             23040|23511|  2.003317596018885|      97.99668240398111|
|            M|          2725|             60170|62895| 4.3326178551554175|      95.66738214484458|
+-------------+--------------+------------------+-----+-------------------+-----------------------+



**_According to the above analysis it can be seen that 2.05% of the females are searched during a pullover and 4.5% of males are being searched during the pull over which is double than the number of females_**

In [0]:
%sql
SELECT violation,
       driver_gender,
       AVG(CASE WHEN search_conducted = True THEN 1 ELSE 0 END) AS search_conducted_mean
FROM policing_data
WHERE driver_gender is NOT NULL
GROUP BY violation, driver_gender
ORDER BY search_conducted_mean desc, violation, driver_gender;


violation,driver_gender,search_conducted_mean
Registration/plates,M,0.1103761885076477
Equipment,M,0.0700808625336927
Registration/plates,F,0.0661401776900296
Moving violation,M,0.0598310291858679
Other,F,0.0565217391304347
Other,M,0.0471464019851116
Equipment,F,0.0426216324889425
Seat belt,M,0.037980146741476
Moving violation,F,0.0362047440699126
Speeding,M,0.024924952242336


Databricks visualization. Run in Databricks to view.

**_It can be concluded that most of the search during pullover happens when there is a problem with the registration plate followed by equipment violation and moving violation_**

# **_Question: During a search how often is the driver frisked?_**

## **SQL**

In [0]:
%sql
SELECT search_type, COUNT(*) AS count
FROM policing_data
WHERE search_type IS NOT NULL
GROUP BY search_type
ORDER BY count DESC;

search_type,count
Incident to Arrest,1219
Probable Cause,891
Inventory,220
Reasonable Suspicion,197
Protective Frisk,161
"Incident to Arrest,Inventory",129
"Incident to Arrest,Probable Cause",106
"Probable Cause,Reasonable Suspicion",75
"Incident to Arrest,Inventory,Probable Cause",34
"Incident to Arrest,Protective Frisk",33


In [0]:
%sql
SELECT TRIM(search_individual) AS search_type, COUNT(*) AS count
FROM (
    SELECT EXPLODE(SPLIT(search_type, ',')) AS search_individual
    FROM policing_data
    WHERE search_type IS NOT NULL
) AS exploded_search
GROUP BY search_type
ORDER BY count DESC;


search_type,count
Incident to Arrest,1566
Probable Cause,1182
Inventory,439
Reasonable Suspicion,314
Protective Frisk,274


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT 
    (274 * 100.0 / COUNT(*)) AS protective_frisk_percentage
FROM policing_data
WHERE search_type IS NOT NULL;

protective_frisk_percentage
8.573216520650814


## **PySpark**

In [0]:
# Frisking by search type
df.filter(col("search_type").isNotNull()) \
    .groupBy("search_type").count().orderBy(col("count").desc()).show()

# Frisk type breakdown
df.filter(col("search_type").isNotNull()) \
    .select(explode(split(col("search_type"), ",")).alias("search_type")) \
    .groupBy(trim(col("search_type"))).count().orderBy(col("count").desc()).show()

# Percentage of protective frisks
total_searches = df.filter(col("search_type").isNotNull()).count()
df.filter(col("search_type").contains("Protective Frisk")) \
    .agg((count("*") * 100 / total_searches).alias("protective_frisk_percentage")).show()

+--------------------+-----+
|         search_type|count|
+--------------------+-----+
|  Incident to Arrest| 1219|
|      Probable Cause|  891|
|           Inventory|  220|
|Reasonable Suspicion|  197|
|    Protective Frisk|  161|
|Incident to Arres...|  129|
|Incident to Arres...|  106|
|Probable Cause,Re...|   75|
|Incident to Arres...|   34|
|Incident to Arres...|   33|
|Probable Cause,Pr...|   33|
|Inventory,Probabl...|   22|
|Incident to Arres...|   13|
|Inventory,Protect...|   11|
|Incident to Arres...|   11|
|Protective Frisk,...|   11|
|Incident to Arres...|   10|
|Incident to Arres...|    6|
|Incident to Arres...|    4|
|Inventory,Reasona...|    4|
+--------------------+-----+
only showing top 20 rows

+--------------------+-----+
|   trim(search_type)|count|
+--------------------+-----+
|  Incident to Arrest| 1566|
|      Probable Cause| 1182|
|           Inventory|  439|
|Reasonable Suspicion|  314|
|    Protective Frisk|  274|
+--------------------+-----+

+---------------

###  **_According to the above analysis 8.57% of the time the driver is frisked._**

# **_Question: Which year has the least number of pullover?_**

## **SQL**

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW policing_data_with_year AS
SELECT *, 
       CAST(stop_date AS DATE) AS stop_date_converted, 
       YEAR(CAST(stop_date AS DATE)) AS year
FROM policing_data;


In [0]:
%sql
SELECT year, COUNT(*) AS count
FROM policing_data_with_year
GROUP BY year
ORDER BY count DESC;


year,count
2012,10970
2006,10639
2007,9476
2014,9228
2008,8752
2015,8599
2011,8126
2013,7924
2009,7908
2010,7561


Databricks visualization. Run in Databricks to view.

## **PySpark**

In [0]:
# Add year column
df = df.withColumn("year", year(col("stop_date").cast("date")))

# Count pullovers by year
df.groupBy("year").count().orderBy(col("count")).show()

+----+-----+
|year|count|
+----+-----+
|2005| 2505|
|2010| 6995|
|2009| 7237|
|2013| 7421|
|2011| 7575|
|2008| 8151|
|2015| 8231|
|2014| 8849|
|2007| 8905|
|2006|10141|
|2012|10396|
+----+-----+



###  **_It can be seen that the least pullover happened in 2005 that is 2,558_**

# **_Question: How does drug activity change by time of day? Do most of the stops occur at night?_**

## **SQL**

### Counting occurrences of each unique value in drugs_related_stop

In [0]:
%sql
SELECT drugs_related_stop, COUNT(*) AS count
FROM policing_data
GROUP BY drugs_related_stop
ORDER BY count DESC;

drugs_related_stop,count
False,90926
True,815


In [0]:
%sql
CREATE OR REPLACE TEMP VIEW policing_data_with_stop_hour AS
SELECT *, 
       HOUR(CAST(stop_time AS TIMESTAMP)) AS stop_hour
FROM policing_data;


In [0]:
%sql
SELECT stop_hour, COUNT(*) AS stop_count
FROM policing_data_with_stop_hour
WHERE drugs_related_stop = True
GROUP BY stop_hour
ORDER BY stop_hour;


stop_hour,stop_count
0,103
1,60
2,46
3,20
4,3
5,1
6,10
7,15
8,14
9,43


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

## **PySpark**

In [0]:
df_with_stop_hour = df.withColumn("stop_hour", hour(col("stop_time").cast("timestamp")))

df_with_stop_hour.filter(col("drugs_related_stop") == True) \
    .groupBy("stop_hour") \
    .agg(count("*").alias("stop_count")) \
    .orderBy("stop_hour") \
    .show(24, truncate=False)

+---------+----------+
|stop_hour|stop_count|
+---------+----------+
|0        |103       |
|1        |60        |
|2        |46        |
|3        |20        |
|4        |3         |
|5        |1         |
|6        |10        |
|7        |15        |
|8        |14        |
|9        |43        |
|10       |42        |
|11       |41        |
|12       |16        |
|13       |50        |
|14       |44        |
|15       |31        |
|16       |22        |
|17       |16        |
|18       |22        |
|19       |30        |
|20       |27        |
|21       |20        |
|22       |47        |
|23       |92        |
+---------+----------+



### **_From the above analysis it can be seen that the pullover because of drug usage has increased at the night around 22:00 hours. There is an upward trend which contunies till 00:00 hours and then sees a downward trend._**

# Question: What is the mean stop_duration for each violation_raw?

## **SQL**

### Retrieve unique values for stop_duration:

In [0]:
%sql
SELECT DISTINCT stop_duration
FROM policing_data;


stop_duration
0-15 Min
16-30 Min
""
30+ Min
1
2


### Count the number of unique values in violation_raw

In [0]:
%sql
SELECT COUNT(DISTINCT violation_raw) AS num_unique_violations
FROM policing_data;

num_unique_violations
12


### Retrieve the count and unique values for violation_raw

In [0]:
%sql
SELECT violation_raw, COUNT(*) AS count
FROM policing_data
WHERE violation_raw IS NOT NULL
GROUP BY violation_raw
ORDER BY count DESC;

violation_raw,count
Speeding,48463
Other Traffic Violation,16224
Equipment/Inspection Violation,11020
Registration Violation,3432
Seatbelt Violation,2952
Special Detail/Directed Patrol,2455
Call for Service,1298
Violation of City/Town Ordinance,211
Motorist Assist/Courtesy,203
APB,79


### **Grouping the pullover time based on violations**

In [0]:
%sql
SELECT stop_duration, violation_raw, COUNT(*) AS count
FROM policing_data
WHERE violation_raw IS NOT NULL AND stop_duration IN ('0-15 Min', '16-30 Min', '30+ Min')
GROUP BY stop_duration, violation_raw
ORDER BY stop_duration, count DESC;

stop_duration,violation_raw,count
0-15 Min,Speeding,41014
0-15 Min,Other Traffic Violation,11763
0-15 Min,Equipment/Inspection Violation,9173
0-15 Min,Seatbelt Violation,2665
0-15 Min,Registration Violation,2421
0-15 Min,Special Detail/Directed Patrol,1622
0-15 Min,Call for Service,547
0-15 Min,Violation of City/Town Ordinance,144
0-15 Min,Motorist Assist/Courtesy,119
0-15 Min,APB,37


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT stop_duration, COUNT(*) AS count
FROM policing_data
WHERE stop_duration IN ('0-15 Min', '16-30 Min', '30+ Min')
GROUP BY stop_duration
ORDER BY count DESC;


stop_duration,count
0-15 Min,69543
16-30 Min,13635
30+ Min,3228


Databricks visualization. Run in Databricks to view.

## **PySpark**

In [0]:
# Unique stop durations
df.select("stop_duration").distinct().show()

# Count unique `violation_raw`
df.select("violation_raw").distinct().count()

# Count and unique values of `violation_raw`
df.groupBy("violation_raw").count().orderBy(col("count").desc()).show()

# Stop duration grouped by violation
df.filter((col("stop_duration").isin("0-15 Min", "16-30 Min", "30+ Min")) & (col("violation_raw").isNotNull())) \
    .groupBy("stop_duration", "violation_raw").count().orderBy("stop_duration", col("count").desc()).show()

df.filter(col("stop_duration").isin("0-15 Min", "16-30 Min", "30+ Min")) \
    .groupBy("stop_duration") \
    .agg(count("*").alias("count")) \
    .orderBy(col("count").desc()) \
    .show()


+-------------+
|stop_duration|
+-------------+
|     0-15 Min|
|    16-30 Min|
|      30+ Min|
|            1|
|            2|
+-------------+

+--------------------+-----+
|       violation_raw|count|
+--------------------+-----+
|            Speeding|48461|
|Other Traffic Vio...|16224|
|Equipment/Inspect...|11020|
|Registration Viol...| 3432|
|  Seatbelt Violation| 2952|
|Special Detail/Di...| 2455|
|    Call for Service| 1298|
|Violation of City...|  211|
|Motorist Assist/C...|  203|
|                 APB|   79|
|   Suspicious Person|   56|
|             Warrant|   15|
+--------------------+-----+

+-------------+--------------------+-----+
|stop_duration|       violation_raw|count|
+-------------+--------------------+-----+
|     0-15 Min|            Speeding|41012|
|     0-15 Min|Other Traffic Vio...|11763|
|     0-15 Min|Equipment/Inspect...| 9173|
|     0-15 Min|  Seatbelt Violation| 2665|
|     0-15 Min|Registration Viol...| 2421|
|     0-15 Min|Special Detail/Di...| 1622|
|  

# **_Question: Compare the age distributions for each violation_**

## **SQL**

In [0]:
%sql
SELECT violation,
       COUNT(driver_age) AS count,
       AVG(driver_age) AS mean,
       MIN(driver_age) AS min,
       MAX(driver_age) AS max
FROM policing_data
WHERE violation IS NOT NULL
GROUP BY violation
ORDER BY violation;


violation,count,mean,min,max
Equipment,11007,31.781502680112656,16.0,89.0
Moving violation,16164,36.120019797079934,15.0,99.0
Other,4204,39.536869647954326,16.0,87.0
Registration/plates,3427,32.80303472424861,16.0,74.0
Seat belt,2952,32.20630081300813,17.0,77.0
Speeding,48361,33.53009656541428,15.0,90.0


In [0]:
%sql
SELECT driver_age, violation
FROM policing_data
WHERE driver_age IS NOT NULL AND violation IS NOT NULL;

driver_age,violation
20.0,Speeding
40.0,Speeding
33.0,Speeding
19.0,Other
21.0,Speeding
23.0,Equipment
36.0,Speeding
19.0,Speeding
35.0,Speeding
35.0,Speeding


Databricks visualization. Run in Databricks to view.

## **PySpark**

In [0]:
df.groupBy("violation") \
    .agg(
        count("driver_age").alias("count"),
        avg("driver_age").alias("mean"),
        min("driver_age").alias("min"),
        max("driver_age").alias("max")
    ).orderBy("violation").show()


+-------------------+-----+------------------+----+----+
|          violation|count|              mean| min| max|
+-------------------+-----+------------------+----+----+
|          Equipment|11007|31.781502680112656|16.0|89.0|
|   Moving violation|16164|36.120019797079934|15.0|99.0|
|              Other| 4204|39.536869647954326|16.0|87.0|
|Registration/plates| 3427| 32.80303472424861|16.0|74.0|
|          Seat belt| 2952| 32.20630081300813|17.0|77.0|
|           Speeding|48359| 33.53044934758783|15.0|90.0|
+-------------------+-----+------------------+----+----+



### **_From the above analysis it can be seen that speeding is the majority reason for pullover at an early age. The offence is peaked at 22 which is considered as teenage._**

# **Modelling**

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt

In [0]:
# Calculate the total number of stops
total_stops = df.count()

In [0]:
# Calculate the total number of searches conducted
total_searches = df.filter(col("search_conducted") == True).count()

# Calculate the count and proportion of stops by race
stops_by_race = df.groupBy("driver_race") \
    .agg(
        count("*").alias("count"),
        (count("*") / total_stops * 100).alias("proportion")
    )

# Calculate the count and proportion of searches conducted by race
searches_by_race = df.filter(col("search_conducted") == True).groupBy("driver_race") \
    .agg(
        count("*").alias("search_count"),
        (count("*") / total_searches * 100).alias("search_proportion")
    )

# Join the two DataFrames on driver_race
result_df = stops_by_race.join(searches_by_race, on="driver_race", how="left")

# Calculate the proportion of stops where a search was conducted
result_df = result_df.withColumn("search_proportion_of_stops", (col("search_count") / col("count")) * 100)

# Show the results
result_df.orderBy(col("proportion").desc()).show()

+-----------+-----+------------------+------------+-------------------+--------------------------+
|driver_race|count|        proportion|search_count|  search_proportion|search_proportion_of_stops|
+-----------+-----+------------------+------------+-------------------+--------------------------+
|      White|62156| 71.93481934124945|        1768| 55.319148936170215|        2.8444558851920974|
|      Black|12244| 14.17031224683471|         790| 24.718397997496872|         6.452139823587063|
|   Hispanic| 9507| 11.00270814526769|         584| 18.272841051314142|         6.142842116335332|
|      Asian| 2259|  2.61440177765433|          51| 1.5957446808510638|         2.257636122177955|
|      Other|  240|0.2777584889938199|           3|0.09386733416770963|                      1.25|
+-----------+-----+------------------+------------+-------------------+--------------------------+



- The White race has the highest number of stops and searches, but a lower proportion of stops resulting in searches compared to Black and Hispanic races.
- The Black and Hispanic races have higher proportions of their stops resulting in searches (6.45% and 6.14%, respectively) compared to White (2.84%).
- The Asian and Other races have the lowest number of stops and searches, with Asian having a slightly higher proportion of stops resulting in searches (2.26%) compared to White but lower than Black and Hispanic.

In [0]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [0]:
mean_driver_age = df.select(mean(col("driver_age"))).collect()[0][0]

In [0]:
# Preprocessing: Fill null values for necessary columns
data = df.fillna({'driver_age': mean_driver_age, 'driver_gender': 'Unknown', 'driver_race': 'Unknown'})

In [0]:
# Calculate the total number of rows
total_rows = data.count()

# Loop through each column and calculate the percentage of missing values
for column in data.columns:
    missing_count = data.select(sum(col(column).isNull().cast("int"))).collect()[0][0]
    missing_percentage = (missing_count / total_rows) * 100
    print(f"{column: <20}: ==============> {missing_percentage:.2f}%")



In [0]:
# Convert the Boolean target column to StringType
data = data.withColumn("is_arrested_str", data["is_arrested"].cast("string"))

In [0]:
# Index the target column (StringIndexer now uses the string column)
indexer = StringIndexer(inputCol="is_arrested_str", outputCol="label")
data = indexer.fit(data).transform(data)

In [0]:
# Assemble features
feature_cols = ["driver_age", "drugs_related_stop"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data = assembler.transform(data)

In [0]:
# Split the dataset
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [0]:
# Train Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label", rawPredictionCol="rawPrediction")
model = lr.fit(train_data)

In [0]:
# Make predictions
predictions = model.transform(test_data)

# Evaluate the model using available columns
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)
print(f"ROC AUC: {roc_auc}")

ROC AUC: 0.5644847059811475


In [0]:
# Show predictions with available columns
predictions.select("label", "prediction", "probability").show(10)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.96807302529883...|
|  0.0|       0.0|[0.96923200911473...|
|  0.0|       0.0|[0.96885028012549...|
|  0.0|       0.0|[0.96300104608998...|
|  0.0|       0.0|[0.97314278317282...|
|  0.0|       0.0|[0.96998192364672...|
|  1.0|       0.0|[0.96389717222147...|
|  0.0|       0.0|[0.97178001691538...|
|  0.0|       0.0|[0.96807302529883...|
|  0.0|       0.0|[0.97347334927757...|
+-----+----------+--------------------+
only showing top 10 rows



- **Target Column:** The is_arrested column is indexed as the label for binary classification.
- **Features:** Selected driver_age and drugs_related_stop as relevant features. Add more if necessary.
- **Data Cleaning:** Filled null values in driver_age, driver_gender, and driver_race.
- **Model:** Logistic Regression is used to classify whether a driver is arrested based on features.
- **Evaluation:** ROC AUC is computed to measure the model's performance.