In [16]:
%%configure -f
{
    "driverMemory": "2G",
    "executorMemory": "2G",
    "executorCores": 1,
    "numExecutors": 4
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
749,application_1765289937462_0742,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
729,application_1765289937462_0722,pyspark,idle,Link,Link,,
730,application_1765289937462_0723,pyspark,idle,Link,Link,,
735,application_1765289937462_0728,pyspark,idle,Link,Link,,
737,application_1765289937462_0730,pyspark,idle,Link,Link,,
739,application_1765289937462_0732,pyspark,idle,Link,Link,,
740,application_1765289937462_0733,pyspark,idle,Link,Link,,
747,application_1765289937462_0740,pyspark,idle,Link,Link,,
748,application_1765289937462_0741,pyspark,idle,Link,Link,,
749,application_1765289937462_0742,pyspark,idle,Link,Link,,✔
750,,pyspark,starting,,,,


In [17]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

print(f"Executors: {sc.getConf().get('spark.executor.instances')}")
print(f"Master: {sc.master}")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Executors: 4
Master: yarn

## Query 1
Να ταξινομημηθούν, σε φθίνουσα σειρά, οι ηλικιακές ομάδες των θυμάτων σε περιστατικά που περιλαμβάνουν οποιαδήποτε μορφή “βαριάς σωματικής βλάβης”. Θεωρείστε τις εξής ηλικιακές ομάδες:


• Παιδιά: < 18
• Νεαροί ενήλικοι: 18 – 24
• Ενήλικοι: 25 – 64
• Ηλικιωμένοι: >64


Ως εγκλήματα που περιλαμβάνουν οποιαδήποτε μορφή “βαριάς σωματικής βλάβης” θεωρούμε
όλα εκείνα τα περιστατικά που περιέχουν τον όρο “aggravated assault” στη σχετική περιγραφή.


In [18]:
# Dataframes without UDF

crimes_old_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2010_2019.csv",\
    header=True,
    inferSchema=True)
crimes_new_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Crime_Data/LA_Crime_Data_2020_2025.csv",\
    header=True,
    inferSchema=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
from pyspark.sql.functions import col, lower, when, desc
import time

times = []
crimes_df = crimes_old_df.unionByName(crimes_new_df)
assaults_df = crimes_df.filter(lower(col("Crm Cd Desc")).contains("aggravated assault"))

assaults_grouped_df = assaults_df.withColumn(
    "Age_Group",
    when(col("Vict Age") < 18, "Children")
    .when((col("Vict Age") >= 18) & (col("Vict Age") <= 24), "Young Adults")
    .when((col("Vict Age") >= 25) & (col("Vict Age") <= 64), "Adults")
    .when(col("Vict Age") > 64, "Elderly")
)

result = assaults_grouped_df \
    .filter(col("Age_Group").isNotNull()) \
    .groupBy("Age_Group") \
    .count() \
    .orderBy(desc("count"))

start_time = time.time()
result.show()
end_time = time.time()
times.append(end_time - start_time)

print(f"Native DF Time: {end_time - start_time} sec")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------+
|   Age_Group| count|
+------------+------+
|      Adults|121660|
|Young Adults| 33758|
|    Children| 16014|
|     Elderly|  6011|
+------------+------+

Native DF Time: 5.490391254425049 sec

In [20]:
# Dataframes with UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def age_group(age):
    if age is None:
        return None
    if age < 18:
        return "Children"
    elif 18 <= age <= 24:
        return "Young Adults"
    elif 25 <= age <= 64:
        return "Adults"
    else:
        return "Elderly"

age_group_udf = udf(age_group, StringType())

udf_df = assaults_df.withColumn("Age_Group", age_group_udf(col("Vict Age")))

result_udf = udf_df.groupBy("Age_Group").count().orderBy(desc("count"))

start_time = time.time()
result_udf.show()
end_time = time.time()
times.append(end_time - start_time)

print(f"DF with UDF Time: {end_time - start_time} sec")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------+
|   Age_Group| count|
+------------+------+
|      Adults|121660|
|Young Adults| 33758|
|    Children| 16014|
|     Elderly|  6011|
+------------+------+

DF with UDF Time: 6.8933844566345215 sec

In [21]:
# RDDs
def map_to_age_groups(row):
    crime_desc = row['Crm Cd Desc']
    if not crime_desc or 'aggravated assault' not in crime_desc.lower():
        return None

    age = row['Vict Age']
    if age is None:
        return
    if age < 18:
        age_group = "Children"
    elif 18 <= age <= 24:
        age_group = "Young Adults"
    elif 25 <= age <= 64:
        age_group = "Adults"
    else:
        age_group = "Elderly"

    return (age_group, 1)


assaults_rdd = assaults_df.rdd
counts_rdd = assaults_rdd \
    .map(map_to_age_groups) \
    .filter(lambda x: x is not None) \
    .reduceByKey(lambda a, b: a + b)

sorted_rdd = counts_rdd.map(lambda x: (x[1], x[0])) \
                       .sortByKey(ascending=False)

result_rdd = sorted_rdd.map(lambda x: (x[1], x[0]))


start_time = time.time()
result = result_rdd.collect()
end_time = time.time()
times.append(end_time - start_time)

print(f"RDD Time: {end_time - start_time} sec")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RDD Time: 0.41512179374694824 sec

In [22]:
#Times comparison 
for time in times:
    print(time)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5.490391254425049
6.8933844566345215
0.41512179374694824

# Query 2

### Εκφώνηση:

    Ανά έτος, να βρεθούν τα 3 φυλετικά γκρουπ με τα περισσότερα θύματα καταγεγραμμένων εγκλημάτων
    (Vict Descent) στο Los Angeles. Τα αποτελέσματα να εμφανιστούν με φθίνουσα σειρά αριθμού θυμάτων
    ανά φυλετικό γκρουπ – να υπολογιστεί και να εμφανιστεί επίσης το ποσοστό επί του συνολικού αριθμού θυμάτων ανα περίπτωση


In [23]:
from pyspark.sql import functions as F
from pyspark.sql import Window


descent_diction = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/RE_codes.csv",\
    header=True,
    inferSchema=True)

descent_diction.show(5,False)


year_df = crimes_df.withColumn(
    "year",
    F.year(F.to_timestamp("DATE OCC", "yyyy MMM dd hh:mm:ss a"))
)

year_df.select("year").show(2)

clean_df = year_df.filter(
    (F.col("year").isNotNull()) &
    (F.col("Vict Descent").isNotNull())
)

per_race = (
    clean_df
      .groupBy("year", "Vict Descent")
      .agg(F.count("*").alias("cnt"))
)

per_race.show(5)

# Window: total victims per year

w_year = Window.partitionBy("year")

ranked = per_race.withColumn(
    "year_total",
    F.sum("cnt").over(w_year)
)

#  Percentage per year

ranked = ranked.withColumn(
    "percent",
    F.round(F.col("cnt") / F.col("year_total") * 100, 1)
)

#  Window: rank per year (top-3 per race)

w_rank = Window.partitionBy("year").orderBy(F.desc("cnt"))

ranked = ranked.withColumn(
    "rank",
    F.dense_rank().over(w_rank)
)

#  Keep only Top-3 per year

top3 = ranked.filter(F.col("rank") <= 3)


top3.show(5,False)

#lets join with Racial codes

result = top3.join(
    descent_diction,
    on="Vict Descent",
    how="left"
)

result.show(5,False)


#Final view

result = (
    result
      .select(
          "year",
          F.col("Vict Descent Full").alias("Vict_Descent"),
          "cnt",
          "percent"
      )
      .orderBy(F.desc("year"), F.desc("cnt"))
)

result.show(5,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-----------------+
|Vict Descent|Vict Descent Full|
+------------+-----------------+
|A           |Other Asian      |
|B           |Black            |
|C           |Chinese          |
|D           |Cambodian        |
|F           |Filipino         |
+------------+-----------------+
only showing top 5 rows

+----+
|year|
+----+
|2010|
|2010|
+----+
only showing top 2 rows

+----+------------+-----+
|year|Vict Descent|  cnt|
+----+------------+-----+
|2016|           A| 7249|
|2016|           C|  170|
|2017|           S|    4|
|2017|           X|12081|
|2017|           A| 5849|
+----+------------+-----+
only showing top 5 rows

+----+------------+-----+----------+-------+----+
|year|Vict Descent|cnt  |year_total|percent|rank|
+----+------------+-----+----------+-------+----+
|2010|H           |73558|188969    |38.9   |1   |
|2010|W           |53835|188969    |28.5   |2   |
|2010|B           |33937|188969    |18.0   |3   |
|2011|H           |70845|182588    |38.8   |1   |
|2

# SQL API

In [24]:
crimes_df.createOrReplaceTempView("crimes")
descent_diction.createOrReplaceTempView("descent_diction")

query = """
WITH year_df AS (
    SELECT
        *,
        year(to_timestamp(`DATE OCC`, 'yyyy MMM dd hh:mm:ss a')) AS year
    FROM crimes
),
clean_df AS (
    SELECT
        *
    FROM year_df
    WHERE year IS NOT NULL
      AND `Vict Descent` IS NOT NULL
),
per_race AS (
    SELECT
        year,
        `Vict Descent`,
        COUNT(*) AS cnt
    FROM clean_df
    GROUP BY year, `Vict Descent`
),
ranked AS (
    SELECT
        year,
        `Vict Descent`,
        cnt,
        SUM(cnt) OVER (PARTITION BY year) AS year_total,
        ROUND(cnt / SUM(cnt) OVER (PARTITION BY year) * 100, 1) AS percent,
        DENSE_RANK() OVER (PARTITION BY year ORDER BY cnt DESC) AS rank
    FROM per_race
),
top3 AS (
    SELECT *
    FROM ranked
    WHERE rank <= 3
),
final AS (
    SELECT
        t.year,
        d.`Vict Descent Full` AS Vict_Descent,
        t.cnt,
        t.percent
    FROM top3 t
    LEFT JOIN descent_diction d
      ON t.`Vict Descent` = d.`Vict Descent`
)
SELECT
    *
FROM final
ORDER BY year DESC, cnt DESC
"""

result_sql = spark.sql(query)
result_sql.show(5, False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+----------------------+-----+-------+
|year|Vict_Descent          |cnt  |percent|
+----+----------------------+-----+-------+
|2025|Hispanic/Latin/Mexican|34   |40.5   |
|2025|Unknown               |24   |28.6   |
|2025|White                 |13   |15.5   |
|2024|Hispanic/Latin/Mexican|28576|29.1   |
|2024|White                 |22958|23.3   |
+----+----------------------+-----+-------+
only showing top 5 rows

# Query 4

In [26]:
from sedona.spark import *
from sedona.spark import SedonaContext
from pyspark.sql import functions as F

spark = SedonaContext.create(spark)


stations_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/project_data/LA_Police_Stations.csv",\
    header=True,
    inferSchema=True)

stations_df.show(5,False)

#lets change y=lat and x=lon

stations_geo = stations_df.withColumnRenamed("X", "lon") \
       .withColumnRenamed("Y", "lat")


# filter Null Island + null geo
crimes_df_clean = crimes_df.filter(
    (F.col("LAT").isNotNull()) &
    (F.col("LON").isNotNull()) &
    ~((F.col("LAT") == 0) & (F.col("LON") == 0))
)

# new dataframe with our selected features
crimes_geo = crimes_df_clean.select("AREA", "LAT", "LON")

crimes_geo.show(5,False)

# lets create geom for both dataframes

stations_geo = stations_geo.withColumn(
    "station_geom",
    ST_Point(col("lon"), col("lat"))
)

crimes_geo = crimes_geo.withColumn(
    "crime_geom",
    ST_Point(col("LON"), col("LAT"))
)

stations_geo.show(5,False)

# lets have a join

c = crimes_geo.alias("c")
s = stations_geo.alias("s")

joined_df = c.join(
    s,
    c.AREA == s.PREC,
    "inner"
)

final_df = joined_df.select(
    col("s.FID"),
    col("s.DIVISION"),
    col("s.LOCATION"),
    col("s.PREC"),
    col("s.station_geom"),
    col("c.crime_geom")
)

final_df.show(5, False)

# lets calculate the distances

with_dist = final_df.withColumn(
    "distance_m",
    ST_DistanceSphere("station_geom", "crime_geom")
)

# average dist and count
result = (
    with_dist
    .groupBy("DIVISION")
    .agg(
        F.round(F.avg("distance_m"), 3).alias("average_distance"),
        F.count("*").alias("count")
    )
    .orderBy(F.desc("count"))
)


result.show(5, False)





FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+----------------+---+-----------+---------------------------------+----+
|X             |Y               |FID|DIVISION   |LOCATION                         |PREC|
+--------------+----------------+---+-----------+---------------------------------+----+
|-118.289241553|33.7576608970001|1  |HARBOR     |2175 JOHN S. GIBSON BLVD.        |5   |
|-118.275394206|33.9386273800001|2  |SOUTHEAST  |145 W. 108TH ST.                 |18  |
|-118.277669655|33.9703073800001|3  |77TH STREET|7600 S. BROADWAY                 |12  |
|-118.419841576|33.9916553210001|4  |PACIFIC    |12312 CULVER BLVD.               |14  |
|-118.305141563|34.0105753400001|5  |SOUTHWEST  |1546 MARTIN LUTHER KING JR. BLVD.|3   |
+--------------+----------------+---+-----------+---------------------------------+----+
only showing top 5 rows

+----+-------+---------+
|AREA|LAT    |LON      |
+----+-------+---------+
|13  |33.9825|-118.2695|
|14  |33.9599|-118.3962|
|13  |34.0224|-118.2524|
|6   |34.1016|-118.3295|

In [27]:
spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…