In [1]:
# Cell 1: Imports & Spark setup
from pyspark.sql import SparkSession
import pandas as pd
import folium
from folium.plugins import MarkerCluster
from pyspark.sql.functions import col
import matplotlib.pyplot as plt

spark = SparkSession.builder \
    .appName("PlotCrimeHotspots") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/09 00:51:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Cell 2: Read clustered data and cast coords
df = spark.read.parquet("hdfs://localhost:9000/user/yashwanthreddy/crime/clusters")
df = (df
      .withColumn("latitude",  col("latitude").cast("double"))
      .withColumn("longitude", col("longitude").cast("double")))
# Drop any nulls just in case
df = df.na.drop(subset=["latitude","longitude","prediction"])


In [3]:
# Cell 3: Pull a sample to pandas (for performance)
pdf = df.select("latitude","longitude","prediction") \
        .sample(0.1, seed=42) \
        .toPandas()


In [4]:
# Cell 4: Compute centroids for each cluster
centroids = pdf.groupby("prediction")[["latitude","longitude"]].mean().reset_index()


In [5]:
# Cell 5: Create a Folium map
# Center on overall mean
m = folium.Map(location=[pdf.latitude.mean(), pdf.longitude.mean()], zoom_start=12)

# Add clustered markers
marker_cluster = MarkerCluster().add_to(m)
for _, row in pdf.iterrows():
    folium.CircleMarker(
        location=(row.latitude, row.longitude),
        radius=3,
        color=plt.cm.tab10(row.prediction % 10),
        fill=True, fill_opacity=0.6
    ).add_to(marker_cluster)

# Add centroids
for _, row in centroids.iterrows():
    folium.Marker(
        location=(row.latitude, row.longitude),
        icon=folium.Icon(color="red", icon="info-sign"),
        popup=f"Cluster {int(row.prediction)}"
    ).add_to(m)

# Save to HTML
m.save("cluster_map.html")


In [None]:
m