# Προχωρημένα Θέματα Βάσεων Δεδομένων

**Ονοματεπώνυμο:** Κωνσταντίνος Διβριώτης

**ΑΜ:** 03114140

## Query 2: 

Να βρεθούν, για κάθε έτος, τα 3 Αστυνομικά Τμήματα με το υψηλότερο ποσοστό κλεισμένων (περατωμένων) υποθέσεων. 

Να τυπωθούν το έτος, τα ονόματα (τοποθεσίες) των τμημάτων, τα ποσοστά τους καθώς και οι αριθμοί του ranking τους στην ετήσια κατάταξη. 

Τα αποτελέσματα να δοθούν σε σειρά αύξουσα ως προς το έτος και το ranking.

In [1]:
from pyspark.sql import SparkSession
import time

spark = SparkSession \
    .builder \
    .appName("PoliceStationsAnalysis") \
    .getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1038,application_1732639283265_1005,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
DATA_BUCKET = "s3://initial-notebook-data-bucket-dblab-905418150721"
GROUP_BUCKET = "s3://groups-bucket-dblab-905418150721/group15"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DoubleType

# Ορισμός του schema των dataset
crimes_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", IntegerType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", IntegerType()),
    StructField("Crm Cd", IntegerType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", StringType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", IntegerType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", IntegerType()),
    StructField("Crm Cd 2", IntegerType()),
    StructField("Crm Cd 3", IntegerType()),
    StructField("Crm Cd 4", IntegerType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", DoubleType()),
    StructField("LON", DoubleType())
])

police_stations_schema = StructType([
    StructField("X", DoubleType()),
    StructField("Y", DoubleType()),
    StructField("FID", IntegerType()),
    StructField("DIVISION", StringType()),
    StructField("LOCATION", StringType()),
    StructField("PREC", IntegerType())
])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Ερώτημα α

### Υλοποίηση με DataFrame

In [7]:
import time
from pyspark.sql.functions import to_timestamp, col, year, rank, count
from pyspark.sql.window import Window

start_time = time.time()

# Διαβάζουμε τα 2 datasets (2010-2019 και 2020-σήμερα) και τα συνενώνουμε σε 1
crime_data_2010_2019 = spark.read.csv(f"{DATA_BUCKET}/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, schema=crimes_schema)
crime_data_2020_present = spark.read.csv(f"{DATA_BUCKET}/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv", header=True, schema=crimes_schema)
crime_data = crime_data_2010_2019.union(crime_data_2020_present)

# Κάνουμε parse τα datetime strings του CSV ως timestamps και
# δημιουργούμε νέα στήλη για το έτος στο οποίο συνέβη κάθε έγκλημα
datetime_format = "MM/dd/yyyy hh:mm:ss a"
crime_data = crime_data \
                .withColumn("Date Rptd", to_timestamp("Date Rptd", datetime_format)) \
                .withColumn("DATE OCC", to_timestamp("DATE OCC", datetime_format)) \
                .withColumn("year", year(col("DATE OCC"))) \
                .filter(col("Status").isNotNull())

# Βρίσκουμε το σύνολο των περιστατικών καθώς και το σύνολο
# των κλεισμένων υποθέσεων ανά τμήμα και ανά έτος
total_cases = crime_data.groupBy("AREA", "year") \
                .count() \
                .withColumnRenamed("count", "Total Cases")

# Υποθέτουμε ότι οι υποθέσεις που έχουν κλείσει είναι όσες δεν έχουν
# Status Description UNK (Unknown) ή Invest Cont (Investigation Continuing).
# Δηλαδή, όσες έχουν status AA/AO (Adult Arrest/Other) ή
# JA/JO (Juvenile Arrest/Other).
closed_cases = crime_data \
                .filter(~ col("Status Desc").isin("UNK", "Invest Cont")) \
                .groupBy("AREA", "year") \
                .agg(count("*").alias("Closed Cases"))

# Υπολογίζουμε το ποσοστό των κλεισμένων υποθέσεων ανά τμήμα
closed_cases = closed_cases.join(total_cases, on=["AREA", "year"]) \
                        .withColumn("closed_case_rate", 100 * col("Closed Cases") / col("Total Cases"))

# Διαβάζουμε το dataset των Αστυνομικών Τμημάτων
police_stations = spark.read.csv(f"{DATA_BUCKET}/LA_Police_Stations.csv",
                                 header=True, \
                                 schema=police_stations_schema)

# Ενώνουμε τους 2 πίνακες προκειμένου να έχουμε το όνομα του τμήματος σε κάθε γραμμή
closed_cases = closed_cases.join(police_stations,
                                 closed_cases["AREA"] == police_stations["PREC"])

# Υπολογίζουμε το rank του κάθε αστυνομικού τμήματος με βάση το
# closed_case_rate ανά έτος, και κρατάμε μόνο τα πρώτα 3 rank ανά έτος
window_spec = Window.partitionBy("year").orderBy(col("closed_case_rate").desc())
ranked_cases = closed_cases.withColumn("#", rank().over(window_spec)) \
                            .filter(col("#") <= 3) \
                            .orderBy("year", "#")

ranked_cases.withColumnRenamed("DIVISION", "precinct") \
            .select("year", "precinct", "closed_case_rate", "#") \
            .show(3*(2024-2010+1))

end_time = time.time()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+---------------+------------------+---+
|year|       precinct|  closed_case_rate|  #|
+----+---------------+------------------+---+
|2010|        RAMPART| 32.84713448949121|  1|
|2010|        OLYMPIC|31.515289821999087|  2|
|2010|         HARBOR| 29.36028339237341|  3|
|2011|        OLYMPIC|  35.0400600901352|  1|
|2011|        RAMPART|32.496447181430604|  2|
|2011|         HARBOR|28.513362463164313|  3|
|2012|        OLYMPIC| 34.29708533302119|  1|
|2012|        RAMPART| 32.46000463714352|  2|
|2012|         HARBOR| 29.50958584895668|  3|
|2013|        OLYMPIC| 33.58217940999398|  1|
|2013|        RAMPART|  32.1060382916053|  2|
|2013|         HARBOR|29.727164887307236|  3|
|2014|       VAN NUYS|  32.0215235281705|  1|
|2014|    WEST VALLEY| 31.49754809505847|  2|
|2014|        MISSION| 31.22493985565357|  3|
|2015|       VAN NUYS|32.265140677157845|  1|
|2015|        MISSION|30.463762673676303|  2|
|2015|       FOOTHILL|30.353001803658852|  3|
|2016|       VAN NUYS|32.194518462

In [8]:
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Time taken: 6.59 seconds

### Υλοποίηση με SQL

In [10]:
import time
from pyspark.sql.functions import to_timestamp

start_time = time.time()

# Διαβάζουμε τα 2 datasets (2010-2019 και 2020-σήμερα) και τα συνενώνουμε σε 1
crime_data_2010_2019 = spark.read.csv(f"{DATA_BUCKET}/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, schema=crimes_schema)
crime_data_2020_present = spark.read.csv(f"{DATA_BUCKET}/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv", header=True, schema=crimes_schema)
crime_data = crime_data_2010_2019.union(crime_data_2020_present)

# Κάνουμε parse τα datetime strings του CSV ως timestamps και
# δημιουργούμε νέα στήλη για το έτος στο οποίο συνέβη κάθε έγκλημα
datetime_format = "MM/dd/yyyy hh:mm:ss a"
crime_data = crime_data \
                .withColumn("Date Rptd", to_timestamp("Date Rptd", datetime_format)) \
                .withColumn("DATE OCC", to_timestamp("DATE OCC", datetime_format))

# Χρήση σαν SQL table
crime_data.createOrReplaceTempView("crime_data")

# crime_data.groupBy("Status", "Status Desc").count().show()

# Βρίσκουμε το σύνολο των περιστατικών καθώς και το σύνολο
# των κλεισμένων υποθέσεων ανά τμήμα και ανά έτος
query = "SELECT `AREA`, YEAR(`DATE OCC`) AS year, COUNT(*) AS `count` \
            FROM crime_data as cd \
            GROUP BY `AREA`, YEAR(`DATE OCC`)"

total_cases = spark.sql(query)
total_cases.createOrReplaceTempView("total_cases")

# Υποθέτουμε ότι οι υποθέσεις που έχουν κλείσει είναι όσες είναι
# σε status AA/AO (Adult Arrest/Other) ή JA/JO (Juvenile Arrest/Other).
# Εναλλακτικά, θα μπορούσαμε να υποθέσουμε ότι έχουν κλείσει όσες ΔΕΝ
# είναι σε status IC (Investigation Continuing) - αλλά προτιμούμε να
# μην συμπεριλάβουμε τα status CC, 19, 13, TH για τα οποία η περιγραφή
# (Status Desc) έχει τιμή UNK (Unknown)
query = "SELECT `AREA`, YEAR(`DATE OCC`) AS year, COUNT(*) AS `count` \
            FROM crime_data as cd \
            WHERE `Status Desc` NOT IN ('UNK', 'Invest Cont') \
            GROUP BY `AREA`, YEAR(`DATE OCC`)"

closed_cases = spark.sql(query)
closed_cases.createOrReplaceTempView("closed_cases")

# Υπολογίζουμε το ποσοστό των κλεισμένων υποθέσεων ανά τμήμα
query = "SELECT cc.AREA AS `AREA`, cc.year AS `year`, (100.0 * cc.count / tc.count) AS `closed_case_rate` \
            FROM closed_cases AS cc \
            JOIN total_cases AS tc \
            ON cc.AREA = tc.AREA AND cc.year = tc.year"

closed_cases = spark.sql(query)
closed_cases.createOrReplaceTempView("closed_cases")

# Διαβάζουμε το dataset των Αστυνομικών Τμημάτων
police_stations = spark.read.csv(f"{DATA_BUCKET}/LA_Police_Stations.csv",
                                 header=True, \
                                 schema=police_stations_schema)
police_stations.createOrReplaceTempView("police_stations")

# Ενώνουμε τους 2 πίνακες προκειμένου να έχουμε το όνομα του τμήματος σε κάθε γραμμή
query = "SELECT `year`, `DIVISION` AS `precinct`, `closed_case_rate` \
            FROM closed_cases AS cc \
            JOIN police_stations AS ps \
            ON cc.AREA = ps.PREC"

closed_cases = spark.sql(query)
closed_cases.createOrReplaceTempView("closed_cases")

# Υπολογίζουμε το rank του κάθε αστυνομικού τμήματος με βάση το
# closed_case_rate ανά έτος, και κρατάμε μόνο τα πρώτα 3 rank ανά έτος
query = "SELECT * FROM ( \
            SELECT `year`, `precinct`, `closed_case_rate`, RANK() OVER (PARTITION BY `year` ORDER BY `closed_case_rate` DESC) as `#` \
            FROM closed_cases \
            ORDER BY `year`, `#` \
            ) \
            WHERE `#` <= 3"

ranked_cases = spark.sql(query)
ranked_cases.show(3*(2024-2010+1))

end_time = time.time()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+---------------+-----------------+---+
|year|       precinct| closed_case_rate|  #|
+----+---------------+-----------------+---+
|2010|        RAMPART|32.84713448949121|  1|
|2010|        OLYMPIC|31.51528982199909|  2|
|2010|         HARBOR|29.36028339237341|  3|
|2011|        OLYMPIC|35.04006009013520|  1|
|2011|        RAMPART|32.49644718143060|  2|
|2011|         HARBOR|28.51336246316431|  3|
|2012|        OLYMPIC|34.29708533302119|  1|
|2012|        RAMPART|32.46000463714352|  2|
|2012|         HARBOR|29.50958584895668|  3|
|2013|        OLYMPIC|33.58217940999398|  1|
|2013|        RAMPART|32.10603829160530|  2|
|2013|         HARBOR|29.72363895148855|  3|
|2014|       VAN NUYS|32.02152352817050|  1|
|2014|    WEST VALLEY|31.49754809505847|  2|
|2014|        MISSION|31.22493985565357|  3|
|2015|       VAN NUYS|32.26514067715784|  1|
|2015|        MISSION|30.46376267367630|  2|
|2015|       FOOTHILL|30.35300180365885|  3|
|2016|       VAN NUYS|32.19451846212410|  1|
|2016|    

In [11]:
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Time taken: 5.57 seconds

## Συμπεράσματα

Οι δύο υλοποίησεις με χρήση του **DataFrame API** και του **SQL API** ήταν παρόμοιες όσον αφορά τον χρόνο εκτέλεσης, με χρόνους 6.59 και 5.57 δευτερόλεπτα αντίστοιχα. 

Φυσικά αναμέναμε ότι οι διαφορές θα ήταν ελάχιστες, καθώς και τα 2 APIs βασίζονται στον βελτιστοποιητή **Catalyst** του **Spark**.

## Ερώτημα β

### Μετατροπή του κυρίως Dataset σε parquet file format

In [12]:
crime_data_2010_2019 = spark.read.csv(f"{DATA_BUCKET}/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, schema=crimes_schema)
crime_data_2020_present = spark.read.csv(f"{DATA_BUCKET}/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv", header=True, schema=crimes_schema)
crime_data = crime_data_2010_2019.union(crime_data_2020_present)

crime_data.write.mode("overwrite").parquet(f"{GROUP_BUCKET}/Crime_Data_from_2010_to_Present")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Υλοποίηση με DataFrame και αρχείο parquet

In [18]:
start_time = time.time()

crime_data_parquet = spark.read.parquet(f"{GROUP_BUCKET}/Crime_Data_from_2010_to_Present")

# Κάνουμε parse τα datetime strings του CSV ως timestamps και
# δημιουργούμε νέα στήλη για το έτος στο οποίο συνέβη κάθε έγκλημα
datetime_format = "MM/dd/yyyy hh:mm:ss a"
crime_data_parquet = crime_data_parquet \
                .withColumn("Date Rptd", to_timestamp("Date Rptd", datetime_format)) \
                .withColumn("DATE OCC", to_timestamp("DATE OCC", datetime_format)) \
                .withColumn("year", year(col("DATE OCC"))) \
                .filter(col("Status").isNotNull())

# crime_data.groupBy("Status", "Status Desc").count().show()

# Βρίσκουμε το σύνολο των περιστατικών καθώς και το σύνολο
# των κλεισμένων υποθέσεων ανά τμήμα και ανά έτος
total_cases = crime_data_parquet.groupBy("AREA", "year") \
                .count() \
                .withColumnRenamed("count", "Total Cases")

# Υποθέτουμε ότι οι υποθέσεις που έχουν κλείσει είναι όσες είναι
# σε status AA/AO (Adult Arrest/Other) ή JA/JO (Juvenile Arrest/Other).
# Εναλλακτικά, θα μπορούσαμε να υποθέσουμε ότι έχουν κλείσει όσες ΔΕΝ
# είναι σε status IC (Investigation Continuing) - αλλά προτιμούμε να
# μην συμπεριλάβουμε τα status CC, 19, 13, TH για τα οποία η περιγραφή
# (Status Desc) έχει τιμή UNK (Unknown)
closed_cases = crime_data_parquet \
                .filter(~ col("Status Desc").isin("UNK", "Invest Cont")) \
                .groupBy("AREA", "year") \
                .agg(count("*").alias("Closed Cases"))

# Υπολογίζουμε το ποσοστό των κλεισμένων υποθέσεων ανά τμήμα
closed_cases = closed_cases.join(total_cases, on=["AREA", "year"]) \
                        .withColumn("closed_case_rate", 100 * col("Closed Cases") / col("Total Cases"))

# Διαβάζουμε το dataset των Αστυνομικών Τμημάτων
police_stations = spark.read.csv(f"{DATA_BUCKET}/LA_Police_Stations.csv",
                                 header=True, \
                                 schema=police_stations_schema)

# Ενώνουμε τους 2 πίνακες προκειμένου να έχουμε το όνομα του τμήματος σε κάθε γραμμή
closed_cases = closed_cases.join(police_stations,
                                 closed_cases["AREA"] == police_stations["PREC"])

# Υπολογίζουμε το rank του κάθε αστυνομικού τμήματος με βάση το
# closed_case_rate ανά έτος, και κρατάμε μόνο τα πρώτα 3 rank ανά έτος
window_spec = Window.partitionBy("year").orderBy(col("closed_case_rate").desc())
ranked_cases = closed_cases.withColumn("#", rank().over(window_spec)) \
                            .filter(col("#") <= 3) \
                            .orderBy("year", "#")

ranked_cases.withColumnRenamed("DIVISION", "precinct") \
            .select("year", "precinct", "closed_case_rate", "#") \
            .show(3*(2024-2010+1))

end_time = time.time()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+---------------+------------------+---+
|year|       precinct|  closed_case_rate|  #|
+----+---------------+------------------+---+
|2010|        RAMPART| 32.84713448949121|  1|
|2010|        OLYMPIC|31.515289821999087|  2|
|2010|         HARBOR| 29.36028339237341|  3|
|2011|        OLYMPIC|  35.0400600901352|  1|
|2011|        RAMPART|32.496447181430604|  2|
|2011|         HARBOR|28.513362463164313|  3|
|2012|        OLYMPIC| 34.29708533302119|  1|
|2012|        RAMPART| 32.46000463714352|  2|
|2012|         HARBOR| 29.50958584895668|  3|
|2013|        OLYMPIC| 33.58217940999398|  1|
|2013|        RAMPART|  32.1060382916053|  2|
|2013|         HARBOR|29.727164887307236|  3|
|2014|       VAN NUYS|  32.0215235281705|  1|
|2014|    WEST VALLEY| 31.49754809505847|  2|
|2014|        MISSION| 31.22493985565357|  3|
|2015|       VAN NUYS|32.265140677157845|  1|
|2015|        MISSION|30.463762673676303|  2|
|2015|       FOOTHILL|30.353001803658852|  3|
|2016|       VAN NUYS|32.194518462

In [19]:
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Time taken: 3.40 seconds

## Συμπεράσματα

Χρησιμοποιώντας το αρχείο *parquet* που εξάγαμε έναντι των 2 αρχικών CSV του κυρίως dataset παρατηρούμε μία βελτίωση από τα 6.59 στα 3.40 δευτερόλεπτα.

Αυτό οφείλεται αφενός στο ότι το αρχείο parquet πετυχαίνει πολύ καλή συμπίεση, αφού το εξαγόμενο αρχείο έχει μέγεθος 127 MB έναντι των 752 MB των 2 αρχικών CSV.

Η σημαντικότερη διαφορά όμως έχει να κάνει με τον τρόπο που είναι αποθηκευμένα τα δεδομένα, τα οποία είναι σε **Columnar** μορφή αντί για **row-based** όπως το CSV. Αυτό βοηθάει στη γρηγορότερη ανάγνωση και επεξεργασία των δεδομένων.

Τέλος, το αρχείο parquet υποστηρίζει **predicate pushdown**, το οποίο δίνει τη δυνατότητα να φιλτραριστούν τα δεδομένα πριν να διαβαστούν στη μνήμη και συνεπώς πετυχαίνουμε καλύτερη βελτιστοποίηση.