# Pattern Analysis

In [None]:
# Set JAVA_HOME environment variable
import os
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/temurin-11.jdk/Contents/Home'
print(f"JAVA_HOME set to: {os.environ['JAVA_HOME']}")

JAVA_HOME set to: /Library/Java/JavaVirtualMachines/temurin-11.jdk/Contents/Home


In [None]:
'''
1. What is the distribution of Victim Age Groups across all incidents?
'''
#import findspark
#findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import when, col
from pyspark.sql.types import IntegerType

spark = SparkSession.builder \
    .appName("Victim Age Group Analysis") \
    .master("local[*]") \
    .getOrCreate()


input_file = "final_combined_2020_2024_std>10.csv"
print("Reading:", input_file)

df = spark.read.csv(input_file, header=True, inferSchema=True)


# 3. Convert age column to integer

df = df.withColumn("age", col("Victim Age Group").cast(IntegerType()))


# 4. Create clean age ranges

df = df.withColumn(
    "Age Range",
    when(col("age") == -1, "Unknown")
    .when(col("age").between(0, 17), "0–17 (Minor)")
    .when(col("age").between(18, 24), "18–24 (Young Adult)")
    .when(col("age").between(25, 34), "25–34")
    .when(col("age").between(35, 44), "35–44")
    .when(col("age").between(45, 54), "45–54")
    .when(col("age").between(55, 64), "55–64")
    .when(col("age") >= 65, "65+ (Senior)")
    .otherwise("Other/Invalid")
)


# 5. Get distribution

total_rows = df.count()

dist = (
    df.groupBy("Age Range")
      .count()
      .withColumn("Percent", F.round(col("count") / total_rows * 100, 2))
      .orderBy(col("count").desc())
)

dist.show(100, truncate=False)


dist.write.mode("overwrite").option("header", True).csv("victim_age_distribution")

print("\nDone.")

25/11/16 21:31:00 WARN Utils: Your hostname, Ishmeets-MacBook-Air-10.local resolves to a loopback address: 127.0.0.1; using 10.0.0.161 instead (on interface en0)
25/11/16 21:31:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/16 21:31:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Reading: final_combined_2020_2024_std>10.csv


                                                                                

+-------------------+-------+-------+
|Age Range          |count  |Percent|
+-------------------+-------+-------+
|25–34              |1753193|25.62  |
|35–44              |1424950|20.82  |
|18–24 (Young Adult)|985317 |14.4   |
|45–54              |923092 |13.49  |
|55–64              |654714 |9.57   |
|0–17 (Minor)       |540225 |7.89   |
|65+ (Senior)       |450926 |6.59   |
|Unknown            |110886 |1.62   |
+-------------------+-------+-------+



                                                                                


Done.


In [None]:
'''
2. How does Victim Sex vary by Offense Category?
'''
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
from pyspark.sql.window import Window

offense_col = "Offense Category"
sex_col = "Victim Sex"


try:
    spark
except NameError:
    spark = SparkSession.builder \
        .appName("Victim Sex by Offense Analysis") \
        .master("local[*]") \
        .getOrCreate()

input_file = "final_combined_2020_2024_std>10.csv"
print("Reading:", input_file)

df = spark.read.csv(input_file, header=True, inferSchema=True)

# Check if required columns exist
if offense_col not in df.columns or sex_col not in df.columns:
    print(f"Error: Required columns ('{offense_col}' or '{sex_col}') not found.")
    print(f"Available columns: {df.columns}")
else:

    #  Analysis 1: Raw Counts (Crosstab)

    print(f"\n--- Distribution of {sex_col} by {offense_col} (Raw Counts) ---")

    crosstab_df = df.groupBy(offense_col) \
                    .pivot(sex_col) \
                    .count() \
                    .fillna(0)

    crosstab_df.orderBy(offense_col).show(100, truncate=False)


    # Analysis 2: Percentage Distribution

    print(f"\n--- Percentage Distribution of {sex_col} by {offense_col} ---")

    # Get counts in "long" format
    long_counts_df = df.groupBy(offense_col, sex_col).count()

    # Calculate percentages within each offense category
    windowSpec = Window.partitionBy(offense_col)

    percentage_df = long_counts_df \
        .withColumn("Total_Per_Category", sum("count").over(windowSpec)) \
        .withColumn("Percentage", (col("count") / col("Total_Per_Category")) * 100) \
        .select(
            offense_col,
            sex_col,
            col("count").alias("Incident_Count"),
            "Total_Per_Category",
            col("Percentage").cast("decimal(5,2)").alias("Percentage")
        )

    percentage_df.orderBy(offense_col, col("Percentage").desc()).show(100, truncate=False)

    print("\nSaving crosstab results...")
    crosstab_df.write.mode("overwrite").option("header", True).csv("victim_sex_by_offense_crosstab")

    print("Saving percentage results...")
    percentage_df.write.mode("overwrite").option("header", True).csv("victim_sex_by_offense_percentages")

print("\nDone.")

Reading: final_combined_2020_2024_std>10.csv


                                                                                


--- Distribution of Victim Sex by Offense Category (Raw Counts) ---


                                                                                

+----------------------------------------+-------+-------+-----+
|Offense Category                        |F      |M      |U    |
+----------------------------------------+-------+-------+-----+
|Arson                                   |4847   |5950   |504  |
|Assault Offenses                        |2038113|1536867|18296|
|Bribery                                 |548    |409    |12   |
|Burglary/Breaking & Entering            |127305 |142665 |2254 |
|Counterfeiting/Forgery                  |18401  |21947  |1599 |
|Destruction/Damage/Vandalism of Property|360006 |309946 |4367 |
|Embezzlement                            |2230   |2925   |18   |
|Extortion/Blackmail                     |4046   |7302   |39   |
|Fraud Offenses                          |131915 |117910 |6264 |
|Homicide Offenses                       |3876   |12323  |62   |
|Human Trafficking                       |2210   |792    |172  |
|Kidnapping/Abduction                    |39231  |10085  |117  |
|Larceny/Theft Offenses  

                                                                                

+----------------------------------------+----------+--------------+------------------+----------+
|Offense Category                        |Victim Sex|Incident_Count|Total_Per_Category|Percentage|
+----------------------------------------+----------+--------------+------------------+----------+
|Arson                                   |M         |5950          |11301             |52.65     |
|Arson                                   |F         |4847          |11301             |42.89     |
|Arson                                   |U         |504           |11301             |4.46      |
|Assault Offenses                        |F         |2038113       |3593276           |56.72     |
|Assault Offenses                        |M         |1536867       |3593276           |42.77     |
|Assault Offenses                        |U         |18296         |3593276           |0.51      |
|Bribery                                 |F         |548           |969               |56.55     |
|Bribery  

                                                                                

Saving percentage results...





Done.


                                                                                

In [None]:
'''
3. Are certain Victim Types more likely to appear in certain crimes?
'''
offense_col = "Offense Category"
victim_type_col = "Victim Type"

from pyspark.sql.functions import col

print(f"--- Counts of {victim_type_col} by {offense_col} ---")

simple_counts_df = df.groupBy(f"`{offense_col}`", f"`{victim_type_col}`").count()

# Show the results, ordered for readability
simple_counts_df.orderBy(col(f"`{offense_col}`"), col("count").desc()).show(100, truncate=False)

--- Counts of Victim Type by Offense Category ---




+----------------------------------------+-----------------------+-------+
|Offense Category                        |Victim Type            |count  |
+----------------------------------------+-----------------------+-------+
|Arson                                   |Individual             |11297  |
|Arson                                   |Law Enforcement Officer|4      |
|Assault Offenses                        |Individual             |3517094|
|Assault Offenses                        |Law Enforcement Officer|76182  |
|Bribery                                 |Individual             |951    |
|Bribery                                 |Law Enforcement Officer|18     |
|Burglary/Breaking & Entering            |Individual             |272218 |
|Burglary/Breaking & Entering            |Law Enforcement Officer|6      |
|Counterfeiting/Forgery                  |Individual             |41940  |
|Counterfeiting/Forgery                  |Law Enforcement Officer|7      |
|Destruction/Damage/Vanda

                                                                                

In [None]:
'''
Percentage Distribution of Victim Types by Offense Category (using Join)

'''

from pyspark.sql.functions import col

offense_col = "Offense Category"
victim_type_col = "Victim Type"

print(f"--- Percentage Distribution of {victim_type_col} by {offense_col} (using Join) ---")

# Get the counts for each combination

counts_df = df.groupBy(f"`{offense_col}`", f"`{victim_type_col}`") \
              .count() \
              .alias("Incident_Count")

# Get the total count for each offense category

totals_df = df.groupBy(f"`{offense_col}`") \
              .count() \
              .withColumnRenamed("count", "Total_Per_Category")

#  Join the two tables back together on the offense category

percentage_df = counts_df.join(
    totals_df,
    on=offense_col,
    how="left"
)

# Calculate the percentage
percentage_df = percentage_df.withColumn(
    "Percentage %",
    (col("count") / col("Total_Per_Category")) * 100
)

# Show the final results
percentage_df.orderBy(col(f"`{offense_col}`"), col("Percentage %").desc()).show(100, truncate=False)

--- Percentage Distribution of Victim Type by Offense Category (using Join) ---




+----------------------------------------+-----------------------+-------+------------------+---------------------+
|Offense Category                        |Victim Type            |count  |Total_Per_Category|Percentage %         |
+----------------------------------------+-----------------------+-------+------------------+---------------------+
|Arson                                   |Individual             |11297  |11301             |99.96460490222104    |
|Arson                                   |Law Enforcement Officer|4      |11301             |0.035395097778957614 |
|Assault Offenses                        |Individual             |3517094|3593276           |97.87987340799872    |
|Assault Offenses                        |Law Enforcement Officer|76182  |3593276           |2.120126592001282    |
|Bribery                                 |Individual             |951    |969               |98.14241486068111    |
|Bribery                                 |Law Enforcement Officer|18    

                                                                                

In [None]:
'''
4. Does Victim Race show any pattern with specific offense categories?
'''
from pyspark.sql.functions import col

offense_col = "Offense Category"
race_col = "Victim Race"
print(f"--- Percentage Distribution of {race_col} by {offense_col} ")

# Get the counts for each combination

counts_df = df.groupBy(f"`{offense_col}`", f"`{race_col}`").count()

# Get the total count for each offense category

totals_df = df.groupBy(f"`{offense_col}`").count().withColumnRenamed("count", "Total_Per_Category")

# Join the two tables back together on the offense category
percentage_df = counts_df.join(totals_df, on=offense_col, how="left")

# Calculate the percentage and tidy column names
percentage_df = (
    percentage_df
        .withColumn("Percentage %", (col("count") / col("Total_Per_Category")) * 100)
        .withColumnRenamed("count", "Incident_Count")
)

# Show the final results ordered by offense and highest percentage first
percentage_df.orderBy(offense_col, col("Percentage %").desc()).show(200, truncate=False)

--- Percentage Distribution of Victim Race by Offense Category (using Join) ---




+----------------------------------------+-----------------------------------------+--------------+------------------+-------------------+
|Offense Category                        |Victim Race                              |Incident_Count|Total_Per_Category|Percentage %       |
+----------------------------------------+-----------------------------------------+--------------+------------------+-------------------+
|Arson                                   |White                                    |7412          |11301             |65.58711618440846  |
|Arson                                   |Black or African American                |1752          |11301             |15.503052827183437 |
|Arson                                   |Unknown                                  |1654          |11301             |14.635872931598973 |
|Arson                                   |Asian                                    |331           |11301             |2.9289443412087426 |
|Arson                     

                                                                                

In [None]:
'''5. Which age groups are most vulnerable during nighttime incidents?
'''

from pyspark.sql import functions as F
from pyspark.sql.functions import col

hour_col = "Incident Hour"

# Nighttime defined as 8 PM (20) to 5 AM (5)
nighttime_hours = [20, 21, 22, 23, 0, 1, 2, 3, 4, 5]
# run first code before this one!!!
# Use the Age Range column created earlier
age_group_col = "Age Range"

print(f"--- Nighttime Incident Counts and Percentages by {age_group_col} ---")

# Filter the DataFrame for nighttime hours
nighttime_df = df.filter(col(hour_col).isin(nighttime_hours))

# Group by age group and count
counts_df = (
    nighttime_df
        .groupBy(F.expr(f"`{age_group_col}`"))
        .count()
        .withColumnRenamed("count", "Nighttime_Incident_Count")
)

# Compute percentage share within nighttime incidents
total_night = counts_df.agg(F.sum("Nighttime_Incident_Count").alias("Total")) \
                         .collect()[0]["Total"]

dist_df = counts_df.withColumn(
    "Percentage",
    F.round(col("Nighttime_Incident_Count") / F.lit(total_night) * 100, 2)
)

# Show ordered results
(
    dist_df
        .orderBy(col("Nighttime_Incident_Count").desc())
        .show(100, truncate=False)
)

# Save results
(
    dist_df
        .write
        .mode("overwrite")
        .option("header", True)
        .csv("victim_age_distribution_nighttime")
)

# print("\nSaved to folder: victim_age_distribution_nighttime")

--- Nighttime Incident Counts and Percentages by Age Range ---


                                                                                

+-------------------+------------------------+----------+
|Age Range          |Nighttime_Incident_Count|Percentage|
+-------------------+------------------------+----------+
|25–34              |752222                  |28.32     |
|35–44              |573520                  |21.6      |
|18–24 (Young Adult)|438405                  |16.51     |
|45–54              |343153                  |12.92     |
|55–64              |218862                  |8.24      |
|0–17 (Minor)       |157233                  |5.92      |
|65+ (Senior)       |127974                  |4.82      |
|Unknown            |44349                   |1.67      |
+-------------------+------------------------+----------+






Saved to folder: victim_age_distribution_nighttime


                                                                                

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

hour_col = "Incident Hour"
age_group_col = "Age Range"
nighttime_hours = [20, 21, 22, 23, 0, 1, 2, 3, 4, 5]  # Nighttime definition

print(f"--- Nighttime Vulnerability Rate by {age_group_col} ---")

# Total incident count (all hours) per age group
total_counts_df = (
    df.groupBy(F.expr(f"`{age_group_col}`"))
      .count()
      .withColumnRenamed("count", "Total_Incident_Count")
)

# Nighttime incident count per age group
nighttime_df = df.filter(col(hour_col).isin(nighttime_hours))
nighttime_counts_df = (
    nighttime_df.groupBy(F.expr(f"`{age_group_col}`"))
                .count()
                .withColumnRenamed("count", "Nighttime_Incident_Count")
)

#Join totals and nighttime counts
vulnerability_df = (
    total_counts_df
        .join(nighttime_counts_df, on=age_group_col, how="left")
        .fillna(0)
)

# Calculate vulnerability percentage (share of incidents that occur at night)
vulnerability_df = vulnerability_df.withColumn(
    "Vulnerability % (Nighttime)",
    F.round(col("Nighttime_Incident_Count") / col("Total_Incident_Count") * 100, 2)
)

# Show ordered results (highest vulnerability first)
vulnerability_df.orderBy(col("Vulnerability % (Nighttime)").desc()).show(100, truncate=False)

# Save results
vulnerability_df.write.mode("overwrite").option("header", True).csv("victim_age_vulnerability_nighttime")

# print("\nSaved to folder: victim_age_vulnerability_nighttime")

--- Nighttime Vulnerability Rate by Age Range ---


                                                                                

+-------------------+--------------------+------------------------+---------------------------+
|Age Range          |Total_Incident_Count|Nighttime_Incident_Count|Vulnerability % (Nighttime)|
+-------------------+--------------------+------------------------+---------------------------+
|18–24 (Young Adult)|985317              |438405                  |44.49                      |
|25–34              |1753193             |752222                  |42.91                      |
|35–44              |1424950             |573520                  |40.25                      |
|Unknown            |110886              |44349                   |40.0                       |
|45–54              |923092              |343153                  |37.17                      |
|55–64              |654714              |218862                  |33.43                      |
|0–17 (Minor)       |540225              |157233                  |29.11                      |
|65+ (Senior)       |450926             

                                                                                

In [None]:
'''6. Which Location Types (“Location Name”) have the highest incident concentration?
'''

from pyspark.sql.functions import col


location_col = "Location Name"


print(f"---  top 10 Incident Locations by Concentration ---")

location_counts_df = df.groupBy(f"`{location_col}`") \
                       .count() \
                       .orderBy(col("count").desc())

location_counts_df.show(10, truncate=False)

---  top 10 Incident Locations by Concentration ---




+----------------------------------+-------+
|Location Name                     |count  |
+----------------------------------+-------+
|Residence/Home                    |3603127|
|Highway/Road/Alley/Street/Sidewalk|870022 |
|Parking/Drop Lot/Garage           |620395 |
|Other/Unknown                     |288317 |
|Hotel/Motel/Etc.                  |131282 |
|School-Elementary/Secondary       |124610 |
|Commercial/Office Building        |112283 |
|Restaurant                        |103087 |
|Convenience Store                 |87927  |
|Department/Discount Store         |80588  |
+----------------------------------+-------+
only showing top 10 rows



                                                                                

In [None]:
'''Second way'''

from pyspark.sql.functions import col, when

# --- Create the new "Location_Category" column ---
df_with_category = df.withColumn("Location_Category",
    when(col("Location Name") == "Residence/Home", "Residential")

    .when(col("Location Name").isin("Highway/Road/Alley/Street/Sidewalk", "Park/Playground", "Parking/Drop Lot/Garage", "Rest Area"), "Public Space/Transport")

    .when(col("Location Name").isin("Bar/Nightclub", "Convenience Store", "Department/Discount Store", "Grocery/Supermarket", "Hotel/Motel/Etc.", "Restaurant", "Shopping Mall", "Service/Gas Station"), "Commercial (Retail/Service)")

    .when(col("Location Name").isin("Bank/Savings and Loan", "Commercial/Office Building"), "Commercial (Office/Financial)")

    .when(col("Location Name").like("School%"), "Education") # Groups all School types

    .when(col("Location Name").isin("Church/Synagogue/Temple/Mosque", "Government/Public Building", "Jail/Prison/Penitentiary/Corrections Facility"), "Institutional")

    .when(col("Location Name").isin("Field/Woods", "Lake/Waterway/Beach", "Camp/Campground"), "Outdoor/Rural")

    .when(col("Location Name") == "Other/Unknown", "Unknown")

    # Everything else
    .otherwise("Other")
)

# --- Now, run the analysis on the NEW
print("Incident Concentration by Thematic Category")

df_with_category.groupBy("Location_Category") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(truncate=False)

--- Incident Concentration by Thematic Category ---




+-----------------------------+-------+
|Location_Category            |count  |
+-----------------------------+-------+
|Residential                  |3603127|
|Public Space/Transport       |1551481|
|Commercial (Retail/Service)  |630178 |
|Other                        |348771 |
|Unknown                      |288317 |
|Education                    |156055 |
|Commercial (Office/Financial)|150272 |
|Institutional                |86712  |
|Outdoor/Rural                |28390  |
+-----------------------------+-------+



                                                                                

In [None]:
'''7. Which victim types (e.g., police, minors, elderly) correlate with severe crimes?

Part 1 - Severity vs. Victim Type
'''
from pyspark.sql.functions import col, when

offense_col = "Offense Category"
victim_type_col = "Victim Type"
# ---------------------

# High Severity crimes

high_severity_list = [
    "Homicide Offenses",
    "Human Trafficking",
    "Kidnapping/Abduction",
    "Robbery",
    "Sex Offenses",
    "Sex Offenses, Non-forcible",
    "Assault Offenses"
]

# new 'Severity_Level' column
df_with_severity = df.withColumn("Severity_Level",
    when(col(f"`{offense_col}`").isin(high_severity_list), "High Severity")
    .otherwise("Lower Severity")
)

#  percentage analysis
print(f"--- Percentage Distribution of {victim_type_col} by Severity_Level ---")

# Get counts for each combination
counts_df = df_with_severity.groupBy("Severity_Level", f"`{victim_type_col}`").count()

# Get totals for each severity level
totals_df = df_with_severity.groupBy("Severity_Level") \
                            .count() \
                            .withColumnRenamed("count", "Total_Per_Category")

#Join and calculate percentage
percentage_df = counts_df.join(
    totals_df,
    on="Severity_Level",
    how="left"
)

percentage_df = percentage_df.withColumn(
    "Percentage %",
    (col("count") / col("Total_Per_Category")) * 100
)


percentage_df.orderBy(col("Severity_Level"), col("Percentage %").desc()).show(100, truncate=False)

--- Percentage Distribution of Victim Type by Severity_Level ---




+--------------+-----------------------+-------+------------------+--------------------+
|Severity_Level|Victim Type            |count  |Total_Per_Category|Percentage %        |
+--------------+-----------------------+-------+------------------+--------------------+
|High Severity |Individual             |4006205|4082440           |98.13261186937223   |
|High Severity |Law Enforcement Officer|76235  |4082440           |1.8673881306277618  |
|Lower Severity|Individual             |2760395|2760863           |99.98304877858844   |
|Lower Severity|Law Enforcement Officer|468    |2760863           |0.016951221411565878|
+--------------+-----------------------+-------+------------------+--------------------+



                                                                                

In [None]:
'''Part 2 - Severity vs. Victim Age Group'''

age_group_col = "Age range" # The column created earlier

print(f"Percentage Distribution of {age_group_col} by Severity_Level")

# Get counts for each combination
# Re-use 'df_with_severity' from the cell above
counts_df_age = df_with_severity.groupBy("Severity_Level", f"`{age_group_col}`").count()

# Get totals for each severity level
# re-use 'totals_df' from the cell above
totals_df = df_with_severity.groupBy("Severity_Level") \
                            .count() \
                            .withColumnRenamed("count", "Total_Per_Category")

# Join and calculate percentage
percentage_df_age = counts_df_age.join(
    totals_df,
    on="Severity_Level",
    how="left"
)

percentage_df_age = percentage_df_age.withColumn(
    "Percentage %",
    (col("count") / col("Total_Per_Category")) * 100
)

# Show the final results
percentage_df_age.orderBy(col("Severity_Level"), col("Percentage %").desc()).show(100, truncate=False)

--- Percentage Distribution of Age range by Severity_Level ---




+--------------+-------------------+-------+------------------+------------------+
|Severity_Level|Age range          |count  |Total_Per_Category|Percentage %      |
+--------------+-------------------+-------+------------------+------------------+
|High Severity |25–34              |1074406|4082440           |26.31774135076082 |
|High Severity |35–44              |829012 |4082440           |20.306777319446216|
|High Severity |18–24 (Young Adult)|663138 |4082440           |16.24366800246911 |
|High Severity |0–17 (Minor)       |499290 |4082440           |12.230185869235065|
|High Severity |45–54              |490042 |4082440           |12.003654677104869|
|High Severity |55–64              |308045 |4082440           |7.545609978346284 |
|High Severity |65+ (Senior)       |165278 |4082440           |4.048510204681514 |
|High Severity |Unknown            |53229  |4082440           |1.3038525979561244|
|Lower Severity|25–34              |678787 |2760863           |24.586044291223434|
|Low

                                                                                

In [None]:
'''8. Which Offender Age Groups lead to more severe crimes?
'''
from pyspark.sql.functions import col, when

offender_age_col = "Offender Age"
age_group_col_name = "Offender Age group"

# We apply this to the 'df_with_severity' DataFrame from the previous step
df_with_offender_age = df_with_severity.withColumn(f"`{age_group_col_name}`",
    when(col(offender_age_col) == -1, "Unknown")
    .when(col(offender_age_col).between(0, 17), "0-17 (Minor)")
    .when(col(offender_age_col).between(18, 24), "18-24 (Young Adult)")
    .when(col(offender_age_col).between(25, 34), "25-34")
    .when(col(offender_age_col).between(35, 44), "35-44")
    .when(col(offender_age_col).between(45, 54), "45-54")
    .when(col(offender_age_col).between(55, 64), "55-64")
    .when(col(offender_age_col) >= 65, "65+ (Senior)")
    .otherwise("Other/Invalid")
)

print(f"Created new column: {age_group_col_name}")


Created new column: Offender Age group


In [None]:
from pyspark.sql.functions import col

print(f"--- Percentage Distribution of {age_group_col_name} by Severity_Level ---")

# check offender age group column name is clean
for c in df_with_offender_age.columns:
    if "`" in c:
        cleaned = c.replace("`", "")
        if cleaned == age_group_col_name:
            df_with_offender_age = df_with_offender_age.withColumnRenamed(c, cleaned)
            break

# Get counts for each combination
counts_df_off_age = df_with_offender_age.groupBy("Severity_Level", age_group_col_name).count()

# Get totals for each severity level
totals_df = (df_with_severity
             .groupBy("Severity_Level")
             .count()
             .withColumnRenamed("count", "Total_Per_Category"))

# Join and calculate percentage
percentage_df_off_age = counts_df_off_age.join(
    totals_df,
    on="Severity_Level",
    how="left"
)

percentage_df_off_age = percentage_df_off_age.withColumn(
    "Percentage %",
    col("count") / col("Total_Per_Category") * 100
)

# Show the final results (ordered for readability)
percentage_df_off_age.orderBy(col("Severity_Level"), col("Percentage %").desc()).show(100, truncate=False)


--- Percentage Distribution of Offender Age group by Severity_Level ---




+--------------+-------------------+-------+------------------+------------------+
|Severity_Level|Offender Age group |count  |Total_Per_Category|Percentage %      |
+--------------+-------------------+-------+------------------+------------------+
|High Severity |25-34              |1110606|4082440           |27.20446595663378 |
|High Severity |35-44              |821826 |4082440           |20.130755136633972|
|High Severity |18-24 (Young Adult)|621043 |4082440           |15.212544458706068|
|High Severity |Unknown            |451150 |4082440           |11.05098911435318 |
|High Severity |45-54              |406554 |4082440           |9.958603188289358 |
|High Severity |0-17 (Minor)       |367745 |4082440           |9.007970723390914 |
|High Severity |55-64              |216270 |4082440           |5.297567141219467 |
|High Severity |65+ (Senior)       |87246  |4082440           |2.137104280773263 |
|Lower Severity|Unknown            |1128611|2760863           |40.87892083018969 |
|Low

                                                                                

In [None]:
'''9. Do crimes involving strangers vs. known relationships differ by severity?

'''

from pyspark.sql.functions import col, when

relationship_col = "Victim-Offender Relationship"
relationship_type_col = "Relationship Type"


#cCreate the new 'Relationship Type' column
df_with_relationship = df_with_severity.withColumn(relationship_type_col,
    when(col(relationship_col) == "Victim Was Stranger", "Stranger")
    .when(col(relationship_col) == "Relationship Unknown", "Unknown")
    .when(col(relationship_col).isNull(), "Unknown")
    # Groups all 28 other types (Acquaintance, Family, etc.)
    .otherwise("Known Relationship")
)

print(f"--- Percentage Distribution of {relationship_type_col} by Severity_Level ---")

# counts for each combination
counts_df_rel = df_with_relationship.groupBy("Severity_Level", relationship_type_col).count()

# totals_df
totals_df = (df_with_severity
             .groupBy("Severity_Level")
             .count()
             .withColumnRenamed("count", "Total_Per_Category"))

# Join and calculate percentage
percentage_df_rel = counts_df_rel.join(
    totals_df,
    on="Severity_Level",
    how="left"
)

percentage_df_rel = percentage_df_rel.withColumn(
    "Percentage %",
    (col("count") / col("Total_Per_Category")) * 100
)

percentage_df_rel.orderBy(col("Severity_Level"), col("Percentage %").desc()).show(100, truncate=False)

--- Percentage Distribution of Relationship Type by Severity_Level ---




+--------------+------------------+-------+------------------+------------------+
|Severity_Level|Relationship Type |count  |Total_Per_Category|Percentage %      |
+--------------+------------------+-------+------------------+------------------+
|High Severity |Known Relationship|2763132|4082440           |67.6833462341149  |
|High Severity |Stranger          |708432 |4082440           |17.353151546624076|
|High Severity |Unknown           |610876 |4082440           |14.963502219261029|
|Lower Severity|Unknown           |1504796|2760863           |54.504551656492914|
|Lower Severity|Known Relationship|668603 |2760863           |24.217174122729016|
|Lower Severity|Stranger          |587464 |2760863           |21.27827422077807 |
+--------------+------------------+-------+------------------+------------------+



                                                                                