<a href="https://colab.research.google.com/github/lkodwani/Lucky_Backup/blob/main/crime_rate_prediction_LuckyKodwani.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 0.0 Spark Environment Setup

In [2]:
#pyspark setup

!apt-get update
# Install Java 8 (required by Spark)
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Install Spark
!pip install pyspark

# setup environment variables
import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/usr/local/lib/python3.10/dist-packages/pyspark"

Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:3 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,452 kB]
Get:12 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,223 kB]
Get:13 http://security.ubuntu.com/ubuntu jammy-securit

# 0.1 Crime Data Generation

In [3]:
import pyspark
from pyspark.sql import SparkSession

# Create or get an existing Spark session
spark = SparkSession.builder.master("local[*]").appName("Crime-rate-prediction").getOrCreate()

In [4]:
import pandas as pd
import numpy as np

def generate_city_crime_data(num_cities=100, seed=42):
    """
    Generates synthetic data for city crime rates with logical correlations and occasional null values.

    Args:
    - num_cities (int): Number of cities to generate data for.
    - seed (int): Random seed for reproducibility.

    Returns:
    - pd.DataFrame: A DataFrame containing the generated data.
    """
    np.random.seed(seed)

    # Generate city names
    cities = [f"City{i}" for i in range(1, num_cities + 1)]

    # Generate population density (people per square mile)
    population_density = np.random.randint(500, 20000, size=num_cities)

    # Poverty rate (%) - Higher in densely populated cities
    poverty_rate = np.clip(15 + (population_density / 2000) + np.random.normal(0, 5, num_cities), 5, 30)

    # Police budget ($) - Correlated with population density
    police_budget = (population_density * 50) + np.random.randint(500000, 2000000, size=num_cities)

    # Unemployment rate (%) - Higher in cities with high poverty
    unemployment_rate = np.clip(5 + (poverty_rate / 3) + np.random.normal(0, 2, num_cities), 3, 15)

    # Housing quality index (1 to 10) - Inversely related to poverty rate
    housing_quality_index = np.clip(10 - (poverty_rate / 5) + np.random.normal(0, 1, num_cities), 1, 10)

    # Traffic violation rate (per 1000 people) - Higher in densely populated areas
    traffic_violation_rate = np.clip(100 + (population_density / 100) + np.random.normal(0, 30, num_cities), 50, 500)

    # Drug abuse rate (%) - Higher in cities with low housing quality
    drug_abuse_rate = np.clip(10 + (20 - housing_quality_index) + np.random.normal(0, 5, num_cities), 5, 40)

    # Mental health services availability (1 to 10) - Better in cities with higher police budgets
    mental_health_services = np.clip(5 + (police_budget / 200000) + np.random.normal(0, 2, num_cities), 1, 10)

    # Gun ownership rate (%) - Randomly distributed but generally higher in cities with lower population density
    gun_ownership_rate = np.clip(20 + (15000 / (population_density + 1)) + np.random.normal(0, 10, num_cities), 20, 60)

    # Crime rate (per 1000 people) - Correlated with poverty, unemployment, and drug abuse
    crime_rate = np.clip(20 + (poverty_rate * 0.8) + (unemployment_rate * 1.2) +
                         (drug_abuse_rate * 0.5) - (mental_health_services * 2) +
                         np.random.normal(0, 10, num_cities), 10, 100)

    # Introduce occasional null values randomly
    for col in ['poverty_rate', 'housing_quality_index', 'traffic_violation_rate']:
        null_indices = np.random.choice(num_cities, size=int(0.1 * num_cities), replace=False)
        locals()[col][null_indices] = np.nan

    # Create DataFrame
    data = pd.DataFrame({
        'city_name': cities,
        'population_density': population_density,
        'poverty_rate': poverty_rate,
        'police_budget': police_budget,
        'unemployment_rate': unemployment_rate,
        'housing_quality_index': housing_quality_index,
        'traffic_violation_rate': traffic_violation_rate,
        'drug_abuse_rate': drug_abuse_rate,
        'mental_health_services': mental_health_services,
        'gun_ownership_rate': gun_ownership_rate,
        'crime_rate': crime_rate
    })

    return data

# Generate and display the synthetic data
crime_data = generate_city_crime_data()

Preview top 10 rows of data

In [None]:
crime_data.head(10)

Unnamed: 0,city_name,population_density,poverty_rate,police_budget,unemployment_rate,housing_quality_index,traffic_violation_rate,drug_abuse_rate,mental_health_services,gun_ownership_rate,crime_rate
0,City1,16295,27.321477,1317619,10.986045,,301.213897,21.255302,10.0,30.417632,66.092051
1,City2,1360,17.162807,1349474,14.721632,8.710644,110.853018,14.995146,8.745041,27.411433,46.255446
2,City3,5890,12.755851,1248851,6.851687,7.106241,187.012176,28.837764,10.0,20.0,34.298549
3,City4,12464,20.852981,2484028,14.353755,6.078718,207.560216,23.71474,10.0,23.778102,62.9
4,City5,11784,25.756818,1725784,13.885273,5.173155,230.251384,25.696677,10.0,24.11753,60.217491
5,City6,6765,22.360477,1456717,12.735307,7.203198,207.912655,27.336162,10.0,22.602482,44.876219
6,City7,17350,30.0,2506348,15.0,4.888984,195.23636,25.067063,10.0,20.0,43.59249
7,City8,4926,19.153906,784767,7.62401,6.665283,129.607441,25.591773,10.0,21.829983,13.551667
8,City9,14923,30.0,2282835,15.0,4.055572,267.042265,30.023686,10.0,43.78988,52.635186
9,City10,11863,16.329546,1970994,9.421293,6.794324,255.014452,26.039456,10.0,20.0,25.525565


Find out missing or null values in columns

In [None]:

missing_columns = crime_data.columns[crime_data.isnull().any()].tolist()

print(missing_columns)




['poverty_rate', 'housing_quality_index', 'traffic_violation_rate']


**Task 1 - Handle Missing Data**

In [5]:
from pyspark.sql.functions  import col, when, lit, mean as _mean, stddev as _stddev, expr,isnan, count
from pyspark.sql import DataFrame
from pyspark.ml.feature import VectorAssembler

# Step 1: Convert pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(crime_data)

# Step 1: Handle Missing Values
numerical_cols = ['poverty_rate', 'housing_quality_index', 'traffic_violation_rate']

# Check for missing data (for debugging/insights)
spark_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in spark_df.columns]).show()

# Impute missing values with median for numerical columns
for col_name in numerical_cols:
    median_value = crime_data[col_name].median()  # Calculate median using pandas
    spark_df = spark_df.withColumn(
        col_name,
        when(isnan(col(col_name)) | col(col_name).isNull(), lit(median_value)).otherwise(col(col_name))
    )


# Step 2: Create New Features
spark_df = spark_df.withColumn(
    "income_to_police_budget_ratio",
    (lit(100000) * (1 - col("poverty_rate") / 100)) / col("police_budget")
)

spark_df = spark_df.withColumn(
    "unemployment_population_ratio",
    col("unemployment_rate") / col("population_density")
)

# Assume a city size of 50 square miles for spatial density features
city_size = 50  # in square miles
spark_df = spark_df.withColumn(
    "crime_rate_per_sq_mile",
    col("crime_rate") / lit(city_size)
)

spark_df = spark_df.withColumn(
    "police_officer_per_sq_mile",
    (col("police_budget") / lit(100000)) / lit(city_size)
)

# Step 3: Handle Outliers
outlier_cols = ['housing_quality_index', 'drug_abuse_rate', 'crime_rate']
for col_name in outlier_cols:
    mean_stddev = spark_df.select(
        _mean(col_name).alias('mean'), _stddev(col_name).alias('stddev')
    ).first()
    upper_limit = mean_stddev['mean'] + 3 * mean_stddev['stddev']
    lower_limit = mean_stddev['mean'] - 3 * mean_stddev['stddev']
    spark_df = spark_df.withColumn(
        col_name, when(col(col_name) > upper_limit, lit(upper_limit))
                   .when(col(col_name) < lower_limit, lit(lower_limit))
                   .otherwise(col(col_name))
    )

# Step 4: Assemble Features
# Step 4: Assemble Features
assembler = VectorAssembler(
    inputCols=[
        "population_density", "poverty_rate", "police_budget",
        "unemployment_rate", "housing_quality_index",
        "traffic_violation_rate", "drug_abuse_rate",
        "mental_health_services", "gun_ownership_rate",
        "income_to_police_budget_ratio", "unemployment_population_ratio"
    ],
    outputCol="features"
)
spark_df = assembler.transform(spark_df)

# Final Dataset


spark_df.show(5)




+---------+------------------+------------+-------------+-----------------+---------------------+----------------------+---------------+----------------------+------------------+----------+
|city_name|population_density|poverty_rate|police_budget|unemployment_rate|housing_quality_index|traffic_violation_rate|drug_abuse_rate|mental_health_services|gun_ownership_rate|crime_rate|
+---------+------------------+------------+-------------+-----------------+---------------------+----------------------+---------------+----------------------+------------------+----------+
|        0|                 0|          10|            0|                0|                   10|                    10|              0|                     0|                 0|         0|
+---------+------------------+------------+-------------+-----------------+---------------------+----------------------+---------------+----------------------+------------------+----------+

+---------+------------------+------------------+

Task2: Train Baseline Model and Evaluate

In [6]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Step 1: Split the data
train_data, test_data = spark_df.randomSplit([0.8, 0.2], seed=42)

# Step 2: Train the Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="crime_rate")
lr_model = lr.fit(train_data)

# Step 3: Make predictions on the test data
predictions = lr_model.transform(test_data)

# Step 4: Evaluate the model
rmse_evaluator = RegressionEvaluator(labelCol="crime_rate", predictionCol="prediction", metricName="rmse")
rmse = rmse_evaluator.evaluate(predictions)

r2_evaluator = RegressionEvaluator(labelCol="crime_rate", predictionCol="prediction", metricName="r2")
r2 = r2_evaluator.evaluate(predictions)

mae_evaluator = RegressionEvaluator(labelCol="crime_rate", predictionCol="prediction", metricName="mae")
mae = mae_evaluator.evaluate(predictions)

# Print evaluation metrics
print(f"Root Mean Square Error (RMSE): {rmse}")
print(f"R-Squared (R²): {r2}")
print(f"Mean Absolute Error (MAE): {mae}")

# Display sample predictions
predictions.select("features", "crime_rate", "prediction").show(5, truncate=False)


Root Mean Square Error (RMSE): 9.512012966565628
R-Squared (R²): 0.3046881849685754
Mean Absolute Error (MAE): 7.087921291132444
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
|features                                                                                                                                                                                      |crime_rate       |prediction        |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+------------------+
|[16523.0,21.26830804992399,1806197.0,14.510233282914607,4.306876793381472,271.3234064383952,27.12302403755995,10.0,29.60517491789778,0.043589759007503615,8.781839425597413E-4]     

Task 3: Feature Selection and Interpretability

In [7]:
from pyspark.ml.regression import DecisionTreeRegressor, LinearRegression
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Step 1: Train a Decision Tree Regressor to Get Feature Importance
dt = DecisionTreeRegressor(featuresCol="features", labelCol="crime_rate")
dt_model = dt.fit(train_data)

# Extract feature importances
importances = dt_model.featureImportances.toArray()

# Map feature names to their importance scores
feature_names = [
    "population_density", "poverty_rate", "police_budget", "unemployment_rate",
    "housing_quality_index", "traffic_violation_rate", "drug_abuse_rate",
    "mental_health_services", "gun_ownership_rate", "income_to_police_budget_ratio",
    "unemployment_population_ratio", "crime_rate_per_sq_mile", "police_officer_per_sq_mile"
]
feature_importance = list(zip(feature_names, importances))
sorted_features = sorted(feature_importance, key=lambda x: x[1], reverse=True)

# Print the top 5 features
print("# Top 5 Features from Decision Tree Regressor:")
for idx, (feature, importance) in enumerate(sorted_features[:5], start=1):
    print(f"# {idx}. {feature} (Importance: {importance:.2f})")

# Step 2: Compute Correlation Matrix
correlation_assembler = VectorAssembler(inputCols=feature_names, outputCol="correlation_features")
correlation_data = correlation_assembler.transform(spark_df).select("correlation_features")
correlation_matrix = Correlation.corr(correlation_data, "correlation_features").head()[0].toArray()

# Convert to a pandas DataFrame
correlation_df = pd.DataFrame(correlation_matrix, columns=feature_names, index=feature_names)

# Plot the heatmap
plt.figure(figsize=(12, 10))
sns.heatmap(correlation_df, annot=True, cmap="coolwarm", fmt=".2f")
plt.title("Feature Correlation Heatmap")
plt.show()

# Identify and print highly correlated features
print("\n# Correlation Matrix Heatmap:")
high_corr_threshold = 0.8
high_corr_pairs = []
for i in range(len(feature_names)):
    for j in range(i + 1, len(feature_names)):
        if abs(correlation_matrix[i, j]) > high_corr_threshold:
            high_corr_pairs.append((feature_names[i], feature_names[j], correlation_matrix[i, j]))

removed_features = []
for feature_a, feature_b, corr in high_corr_pairs:
    print(f"# High correlation detected between '{feature_a}' and '{feature_b}' (Correlation: {corr:.2f}).")
    if feature_a == "poverty_rate":  # Example logic: Remove 'poverty_rate'
        print(f"# Decided to remove '{feature_a}' to reduce multicollinearity.")
        removed_features.append(feature_a)
    else:
        print(f"# Decided to remove '{feature_b}' to reduce multicollinearity.")
        removed_features.append(feature_b)

# Step 3: Remove Highly Correlated Features
selected_features = [feature for feature in feature_names if feature not in removed_features]

print("\n# Features Selected After Removing Highly Correlated Ones:")
print(selected_features)

# Step 4: Retrain and Evaluate the Linear Regression Model
# Assemble the selected features
selected_assembler = VectorAssembler(inputCols=selected_features, outputCol="selected_features")
selected_df = selected_assembler.transform(spark_df).select("selected_features", "crime_rate")

# Split the data into training and testing sets
train_data_selected, test_data_selected = selected_df.randomSplit([0.8, 0.2], seed=42)

# Retrain the Linear Regression model
lr_selected = LinearRegression(featuresCol="selected_features", labelCol="crime_rate")
lr_selected_model = lr_selected.fit(train_data_selected)

# Make predictions on the testing set
predictions_selected = lr_selected_model.transform(test_data_selected)

# Evaluate the model
rmse_evaluator = RegressionEvaluator(labelCol="crime_rate", predictionCol="prediction", metricName="rmse")
r2_evaluator = RegressionEvaluator(labelCol="crime_rate", predictionCol="prediction", metricName="r2")
mae_evaluator = RegressionEvaluator(labelCol="crime_rate", predictionCol="prediction", metricName="mae")

rmse_selected = rmse_evaluator.evaluate(predictions_selected)
r2_selected = r2_evaluator.evaluate(predictions_selected)
mae_selected = mae_evaluator.evaluate(predictions_selected)

# Print evaluation metrics
print("\n# Evaluation Metrics After Removing Correlated Features:")
print(f"# RMSE: {rmse_selected:.2f}")
print(f"# R²: {r2_selected:.2f}")
print(f"# MAE: {mae_selected:.2f}")

# Interpretation of Results
print("\n# Interpretation:")
print(f"# The RMSE decreased from {rmse:.2f} to {rmse_selected:.2f}, indicating improved model accuracy.")
print(f"# R² increased from {r2:.2f} to {r2_selected:.2f}, showing that the model now explains more variance in the crime rates.")


NameError: name 'feature_names' is not defined

Task 4: Advanced Model Tuning and Comparison

In [1]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor

# Common evaluator
evaluator = RegressionEvaluator(labelCol="crime_rate", predictionCol="prediction", metricName="rmse")

# Linear Regression
lr = LinearRegression(featuresCol="selected_features", labelCol="crime_rate")
lr_param_grid = ParamGridBuilder().addGrid(lr.regParam, [0.0, 0.01, 0.1]).build()
lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=lr_param_grid, evaluator=evaluator, numFolds=5)
lr_cv_model = lr_cv.fit(train_data_selected)

# Random Forest Regressor
rf = RandomForestRegressor(featuresCol="selected_features", labelCol="crime_rate")
rf_param_grid = ParamGridBuilder().addGrid(rf.numTrees, [20, 50, 100]).addGrid(rf.maxDepth, [5, 10, 15]).build()
rf_cv = CrossValidator(estimator=rf, estimatorParamMaps=rf_param_grid, evaluator=evaluator, numFolds=5)
rf_cv_model = rf_cv.fit(train_data_selected)

# Gradient-Boosted Trees Regressor
gbt = GBTRegressor(featuresCol="selected_features", labelCol="crime_rate")
gbt_param_grid = ParamGridBuilder().addGrid(gbt.maxIter, [10, 20, 30]).addGrid(gbt.stepSize, [0.1, 0.2, 0.3]).build()
gbt_cv = CrossValidator(estimator=gbt, estimatorParamMaps=gbt_param_grid, evaluator=evaluator, numFolds=5)
gbt_cv_model = gbt_cv.fit(train_data_selected)


AssertionError: 