In [1]:
!pip install pyspark datasets



In [2]:
import os
import pandas as pd
from datetime import datetime
from datasets import load_dataset

from google.colab import drive
drive.mount("/content/drive")

PROJECT_DIR = "/content/drive/MyDrive/7006SCN_UK_Counties"
DATA_DIR    = f"{PROJECT_DIR}/data"
METRICS_DIR = f"{PROJECT_DIR}/metrics"
MODELS_DIR  = f"{PROJECT_DIR}/models"

for p in [PROJECT_DIR, DATA_DIR, METRICS_DIR, MODELS_DIR]:
    os.makedirs(p, exist_ok=True)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
from datasets import load_dataset

ds = load_dataset("liberatoratif/UK-Counties")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Resolving data files:   0%|          | 0/42 [00:00<?, ?it/s]



In [4]:
subset = ds["train"].select(range(100_000))  # sample 100k rows
pdf = subset.to_pandas()

# Drop index-like column if present
for c in ["Unnamed: 0", "unnamed: 0", "Unnamed:0"]:
    if c in pdf.columns:
        pdf = pdf.drop(columns=[c])

print("Sample rows:", len(pdf))
print("Crime_type value counts (top):")
print(pdf["Crime type"].value_counts().head(10))

Sample rows: 100000
Crime_type value counts (top):
Crime type
Violence and sexual offences    38932
Public order                    15185
Criminal damage and arson       11468
Other theft                      7547
Vehicle crime                    6721
Burglary                         5694
Shoplifting                      5499
Drugs                            2451
Bicycle theft                    2291
Other crime                      1723
Name: count, dtype: int64


In [5]:

# 2) Choose top 2 crimes for binary target

top2 = pdf["Crime type"].value_counts().head(2).index.tolist()
print("Top 2 crimes:", top2)

pdf_bin = pdf[pdf["Crime type"].isin(top2)].copy()

# Binary label: 1 for most frequent, 0 for second most frequent
major_class = top2[0]
minor_class = top2[1]
pdf_bin["label"] = (pdf_bin["Crime type"] == major_class).astype(int)

print("Binary dataset rows:", len(pdf_bin))
print(pdf_bin["label"].value_counts())

Top 2 crimes: ['Violence and sexual offences', 'Public order']
Binary dataset rows: 54117
label
1    38932
0    15185
Name: count, dtype: int64


In [6]:
# Save Tableau CSV (binary subset)

tableau_csv_path = f"{DATA_DIR}/uk_counties_top2_binary_sample.csv"
pdf_bin.to_csv(tableau_csv_path, index=False)
print("Saved Tableau CSV:", tableau_csv_path)


Saved Tableau CSV: /content/drive/MyDrive/7006SCN_UK_Counties/data/uk_counties_top2_binary_sample.csv


In [7]:
# 3) Start Spark

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType

spark = SparkSession.builder \
    .appName("UK_Counties_Top2_Binary") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

In [8]:
# 4) Pandas -> Spark + cleaning

df = spark.createDataFrame(pdf_bin)

# Clean column names (spaces -> underscores)
rename_map = {}
for c in df.columns:
    clean = c.strip().replace(" ", "_").replace("-", "_")
    if clean != c:
        rename_map[c] = clean
for old, new in rename_map.items():
    df = df.withColumnRenamed(old, new)

In [9]:
# Ensure label is integer 0/1
df = df.withColumn("label", F.col("label").cast(IntegerType()))

# Cast numerics if present
for num_col in ["Longitude", "Latitude"]:
    if num_col in df.columns:
        df = df.withColumn(num_col, F.col(num_col).cast(DoubleType()))

# Fill missing categoricals
categorical_cols = [c for c in ["Month", "Location", "Last_outcome_category"] if c in df.columns]
numeric_cols     = [c for c in ["Longitude", "Latitude"] if c in df.columns]

if categorical_cols:
    df = df.fillna("unknown", subset=categorical_cols)

df = df.dropna(subset=["label"])  # safety

In [10]:
# Save Parquet to Drive (binary subset)

parquet_path = f"{DATA_DIR}/uk_counties_top2_binary_parquet"
df.write.mode("overwrite").parquet(parquet_path)
print("Saved Parquet:", parquet_path)

df = spark.read.parquet(parquet_path)

Saved Parquet: /content/drive/MyDrive/7006SCN_UK_Counties/data/uk_counties_top2_binary_parquet


In [12]:
# 5) Feature pipeline

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

indexers = []
encoder_inputs, encoder_outputs = [], []

for c in categorical_cols:
    idx = f"{c}_idx"
    ohe = f"{c}_ohe"
    indexers.append(StringIndexer(inputCol=c, outputCol=idx, handleInvalid="keep"))
    encoder_inputs.append(idx)
    encoder_outputs.append(ohe)

encoder = None
if encoder_inputs:
    encoder = OneHotEncoder(inputCols=encoder_inputs, outputCols=encoder_outputs, handleInvalid="keep")

feature_cols = numeric_cols + (encoder_outputs if encoder_outputs else [])
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="keep")

train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

evaluator_auc = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
evaluator_pr  = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderPR")

def run_and_log(model, model_name):
    stages = []
    stages += indexers
    if encoder is not None:
        stages.append(encoder)
    stages += [assembler, model]

    pipeline = Pipeline(stages=stages)
    fitted = pipeline.fit(train_df)
    preds = fitted.transform(test_df)

    auc = float(evaluator_auc.evaluate(preds))
    pr  = float(evaluator_pr.evaluate(preds))

    model_path = f"{MODELS_DIR}/{model_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
    fitted.write().overwrite().save(model_path)

    return {
        "run_time_utc": datetime.utcnow().isoformat(),
        "dataset": "liberatoratif/UK-Counties",
        "rows_used_from_100k": int(df.count()),
        "target_definition": f"Binary: 1={major_class}, 0={minor_class}",
        "categorical_cols": ",".join(categorical_cols),
        "numeric_cols": ",".join(numeric_cols),
        "model": model_name,
        "AUC_ROC": auc,
        "AUC_PR": pr,
        "saved_model_path": model_path
    }


In [13]:
# 6) Train 4 models

models = [
    (LogisticRegression(featuresCol="features", labelCol="label", maxIter=50), "LogisticRegression"),
    (RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100), "RandomForest"),
    (DecisionTreeClassifier(featuresCol="features", labelCol="label"), "DecisionTree"),
    (GBTClassifier(featuresCol="features", labelCol="label", maxIter=50), "GradientBoostedTrees"),
]

all_metrics = []
for m, name in models:
    print(f"\n--- Training {name} ---")
    row = run_and_log(m, name)
    all_metrics.append(row)
    print("AUC(ROC):", row["AUC_ROC"], "AUC(PR):", row["AUC_PR"])
    print("Saved model:", row["saved_model_path"])


--- Training LogisticRegression ---


  model_path = f"{MODELS_DIR}/{model_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
  "run_time_utc": datetime.utcnow().isoformat(),


AUC(ROC): 0.6172698344548451 AUC(PR): 0.7842550678323783
Saved model: /content/drive/MyDrive/7006SCN_UK_Counties/models/LogisticRegression_20260226_133233

--- Training RandomForest ---
AUC(ROC): 0.63575670615918 AUC(PR): 0.7954931761920535
Saved model: /content/drive/MyDrive/7006SCN_UK_Counties/models/RandomForest_20260226_133334

--- Training DecisionTree ---
AUC(ROC): 0.5877651473833843 AUC(PR): 0.7737510761520723
Saved model: /content/drive/MyDrive/7006SCN_UK_Counties/models/DecisionTree_20260226_133407

--- Training GradientBoostedTrees ---
AUC(ROC): 0.6356313571858963 AUC(PR): 0.7972183970289018
Saved model: /content/drive/MyDrive/7006SCN_UK_Counties/models/GradientBoostedTrees_20260226_134146


In [15]:
# 7) Save metrics to CSV in Drive

metrics_df = pd.DataFrame(all_metrics)
metrics_csv_path = f"{METRICS_DIR}/model_metrics.csv"

if os.path.exists(metrics_csv_path):
    metrics_df.to_csv(metrics_csv_path, mode="a", header=False, index=False)
else:
    metrics_df.to_csv(metrics_csv_path, index=False)

print("\nSaved metrics CSV:", metrics_csv_path)
display(metrics_df)


Saved metrics CSV: /content/drive/MyDrive/7006SCN_UK_Counties/metrics/model_metrics.csv


Unnamed: 0,run_time_utc,dataset,rows_used_from_100k,target_definition,categorical_cols,numeric_cols,model,AUC_ROC,AUC_PR,saved_model_path
0,2026-02-26T13:32:37.976747,liberatoratif/UK-Counties,54117,"Binary: 1=Violence and sexual offences, 0=Publ...","Month,Location,Last_outcome_category","Longitude,Latitude",LogisticRegression,0.61727,0.784255,/content/drive/MyDrive/7006SCN_UK_Counties/mod...
1,2026-02-26T13:33:39.625446,liberatoratif/UK-Counties,54117,"Binary: 1=Violence and sexual offences, 0=Publ...","Month,Location,Last_outcome_category","Longitude,Latitude",RandomForest,0.635757,0.795493,/content/drive/MyDrive/7006SCN_UK_Counties/mod...
2,2026-02-26T13:34:11.883473,liberatoratif/UK-Counties,54117,"Binary: 1=Violence and sexual offences, 0=Publ...","Month,Location,Last_outcome_category","Longitude,Latitude",DecisionTree,0.587765,0.773751,/content/drive/MyDrive/7006SCN_UK_Counties/mod...
3,2026-02-26T13:41:51.324762,liberatoratif/UK-Counties,54117,"Binary: 1=Violence and sexual offences, 0=Publ...","Month,Location,Last_outcome_category","Longitude,Latitude",GradientBoostedTrees,0.635631,0.797218,/content/drive/MyDrive/7006SCN_UK_Counties/mod...
