In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType
from pyspark.sql.functions import col, when, count, to_timestamp, year
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import time

spark = SparkSession \
    .builder \
    .appName("Query 2 with Dataframe") \
    .getOrCreate()

start_time = time.time()

schema = StructType([
    StructField("dr_no", IntegerType(), True),
    StructField("date_rptd", StringType(), True),
    StructField("date_occ", StringType(), True),
    StructField("time_occ", StringType(), True),
    StructField("area", StringType(), True),
    StructField("area_name", StringType(), True),
    StructField("rpt_dist_no", StringType(), True),
    StructField("part_1_2", IntegerType(), True),
    StructField("crm_cd", StringType(), True),
    StructField("crm_cd_desc", StringType(), True),
    StructField("mocodes", StringType(), True),
    StructField("vict_age", StringType(), True),
    StructField("vict_sex", StringType(), True),
    StructField("vict_descent", StringType(), True),
    StructField("premis_cd", StringType(), True),
    StructField("premis_desc", StringType(), True),
    StructField("weapon_used_cd", StringType(), True),
    StructField("weapon_desc", StringType(), True),
    StructField("status", StringType(), True),
    StructField("status_desc", StringType(), True),
    StructField("crm_cd_1", StringType(), True),
    StructField("crm_cd_2", StringType(), True),
    StructField("crm_cd_3", StringType(), True),
    StructField("crm_cd_4", StringType(), True),
    StructField("location", StringType(), True),
    StructField("cross_street", StringType(), True),
    StructField("lat", IntegerType(), True),
    StructField("lon", IntegerType(), True),
])

file_1 = "s3://initial-notebook-data-bucket-dblab-905418150721/\
CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
file_2 = "s3://initial-notebook-data-bucket-dblab-905418150721/\
CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"
df1 = spark.read.csv(file_1, header=True, schema=schema)
df2 = spark.read.csv(file_2, header=True, schema=schema)
df = df1.union(df2)
# Δημιουργουμε νεα στηλη που θα δηλωνει το αν εχει κλεισει η υποθεση
# ή οχι, βασει του αν υπαρχει το Invest Cont ή UNK στην στηλη case_status
df = df.withColumn(
    "case_status",
    when(col("status_desc").isin("UNK", "Invest Cont"), "Case not closed")
    .otherwise("Case closed")
)
# Κραταμε τις στηλες που μας ενδιαφερουν
df = df.select("date_occ", "area_name", "case_status")
# Δημιουργουμε στηλη με συγκεκριμενο φορματ με τις ημερομηνιες του συμβαντος
df = df.withColumn("date", to_timestamp(col("date_occ"),
                                        "MM/dd/yyyy hh:mm:ss a"))
# για να παρουμε την χρονολογια απο καθε υποθεση
df = df.withColumn("year", year(col("date")))
df = df.select("year", "area_name", "case_status")
# Υπολογιζουμε το συνολικο αριθμο υποθεσεων και των περατωμενων υποθεσεων
# ανα περιοχη και ετος
df_stats = df.groupBy("year", "area_name").agg(
    count("*").alias("total_cases"),
    count(when(col("case_status") == "Case closed", 1)).alias("closed_cases")
)
# Υπολογιζουμε το ποσοστο περατωμενων υποθεσεων
df_stats = df_stats.withColumn("closed_case_rate",
                               (col("closed_cases") / col("total_cases"))*100)
# Δημιουργουμε παραθυρο για καταταξη ανα ετος και ποσοστο κλεισμενων υποθεσεων
window_spec = Window.partitionBy("year")\
            .orderBy(col("closed_case_rate").desc())
# Υπολογιζουμε της καταταξης
df_stats = df_stats.withColumn("#", F.rank().over(window_spec))
# Επιλεγουμε τις 3 πρωτες περιοχες για καθε ετος
df_top_3 = df_stats.filter(col("#") <= 3)
df_top_3 = df_top_3.select("year", "area_name", "closed_case_rate", "#")
df_top_3 = df_top_3.withColumnRenamed("area_name", "precinct")
# Ταξινομουμε τα αποτελεσματα κατα ετος και καταταξη και τα εμφανιζουμε
df_top_3.orderBy("year", "#").show()

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------------+---+
|year|   precinct|  closed_case_rate|  #|
+----+-----------+------------------+---+
|2010|    Rampart| 32.84713448949121|  1|
|2010|    Olympic|31.515289821999087|  2|
|2010|     Harbor| 29.36028339237341|  3|
|2011|    Olympic|35.040060090135206|  1|
|2011|    Rampart|  32.4964471814306|  2|
|2011|     Harbor| 28.51336246316431|  3|
|2012|    Olympic| 34.29708533302119|  1|
|2012|    Rampart| 32.46000463714352|  2|
|2012|     Harbor|29.509585848956675|  3|
|2013|    Olympic| 33.58217940999398|  1|
|2013|    Rampart|  32.1060382916053|  2|
|2013|     Harbor|29.723638951488557|  3|
|2014|   Van Nuys|  32.0215235281705|  1|
|2014|West Valley| 31.49754809505847|  2|
|2014|    Mission|31.224939855653567|  3|
|2015|   Van Nuys|32.265140677157845|  1|
|2015|    Mission|30.463762673676303|  2|
|2015|   Foothill|30.353001803658852|  3|
|2016|   Van Nuys|32.194518462124094|  1|
|2016|West Valley| 31.40146437042384|  2|
+----+-----------+----------------

In [3]:
spark = SparkSession \
    .builder \
    .appName("Query 2 with SQL") \
    .getOrCreate()

start_time = time.time()

schema = StructType([
    StructField("dr_no", IntegerType(), True),
    StructField("date_rptd", StringType(), True),
    StructField("date_occ", StringType(), True),
    StructField("time_occ", StringType(), True),
    StructField("area", StringType(), True),
    StructField("area_name", StringType(), True),
    StructField("rpt_dist_no", StringType(), True),
    StructField("part_1_2", IntegerType(), True),
    StructField("crm_cd", StringType(), True),
    StructField("crm_cd_desc", StringType(), True),
    StructField("mocodes", StringType(), True),
    StructField("vict_age", StringType(), True),
    StructField("vict_sex", StringType(), True),
    StructField("vict_descent", StringType(), True),
    StructField("premis_cd", StringType(), True),
    StructField("premis_desc", StringType(), True),
    StructField("weapon_used_cd", StringType(), True),
    StructField("weapon_desc", StringType(), True),
    StructField("status", StringType(), True),
    StructField("status_desc", StringType(), True),
    StructField("crm_cd_1", StringType(), True),
    StructField("crm_cd_2", StringType(), True),
    StructField("crm_cd_3", StringType(), True),
    StructField("crm_cd_4", StringType(), True),
    StructField("location", StringType(), True),
    StructField("cross_street", StringType(), True),
    StructField("lat", IntegerType(), True),
    StructField("lon", IntegerType(), True),
])

file_1 = "s3://initial-notebook-data-bucket-dblab-905418150721/\
CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
file_2 = "s3://initial-notebook-data-bucket-dblab-905418150721/\
CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"
df1 = spark.read.csv(file_1, header=True, schema=schema)
df2 = spark.read.csv(file_2, header=True, schema=schema)
df = df1.union(df2)
# Δημιουργουμε εναν προσωρινο πινακα που θα χρησιμοποιησουμε
# στο πρωτο SQL query
df.createOrReplaceTempView("crime_data")
# Εκτελουμε SQL Query για τον υπολογισμο του case_status
query_case_status = """
    SELECT
        to_timestamp(date_occ, 'MM/dd/yyyy hh:mm:ss a') AS date,
        area_name,
        CASE
            WHEN status_desc IN ('UNK', 'Invest Cont') THEN 'Case not closed'
            ELSE 'Case closed'
        END AS case_status
    FROM crime_data
"""
df_case_status = spark.sql(query_case_status)
# Δημιουργουμε προσωρινο πινακα για τα αποτελεσματα με case_status
df_case_status.createOrReplaceTempView("case_status_data")
# Εκτελουμε SQL query για υπολογισμο των στατιστικων
query_stats = """
    SELECT 
        YEAR(date) AS year,
        area_name,
        COUNT(*) AS total_cases,
        COUNT(CASE WHEN case_status = 'Case closed' THEN 1 END) AS closed_cases
    FROM case_status_data
    GROUP BY year, area_name
"""
df_stats = spark.sql(query_stats)
# Υπολογιζουμε ποσοστο κλεισμενων υποθεσεων
df_stats.createOrReplaceTempView("stats_data")
# Εκτελουμε SQL query για υπολογισμο ποσοστου και καταταξης
query_final = """
    SELECT
        year,
        area_name,
        closed_cases / total_cases * 100 AS closed_case_rate,
        RANK() OVER (PARTITION BY year ORDER BY closed_cases
        / total_cases * 100 DESC) AS rank
    FROM stats_data
"""
df_final = spark.sql(query_final)
# Επιλεγουμε τις 3 πρωτες περιοχες για καθε ετος
df_top_3 = df_final.filter(col("rank") <= 3)
# Επιλεγουμε τις επιθυμητες στηλες και αλλαζουμε ονομα στο επιθυμητο
df_top_3 = df_top_3.select("year", "area_name", "closed_case_rate", "rank")
df_top_3 = df_top_3.withColumnRenamed("area_name", "precinct")
df_top_3 = df_top_3.withColumnRenamed("rank", "#")
# Ταξινομουμε τα αποτελεσματα κατα ετος και καταταξη και τα εμφανιζουμε
df_top_3.orderBy("year", "rank").show()

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------------+---+
|year|   precinct|  closed_case_rate|  #|
+----+-----------+------------------+---+
|2010|    Rampart| 32.84713448949121|  1|
|2010|    Olympic|31.515289821999087|  2|
|2010|     Harbor| 29.36028339237341|  3|
|2011|    Olympic|35.040060090135206|  1|
|2011|    Rampart|  32.4964471814306|  2|
|2011|     Harbor| 28.51336246316431|  3|
|2012|    Olympic| 34.29708533302119|  1|
|2012|    Rampart| 32.46000463714352|  2|
|2012|     Harbor|29.509585848956675|  3|
|2013|    Olympic| 33.58217940999398|  1|
|2013|    Rampart|  32.1060382916053|  2|
|2013|     Harbor|29.723638951488557|  3|
|2014|   Van Nuys|  32.0215235281705|  1|
|2014|West Valley| 31.49754809505847|  2|
|2014|    Mission|31.224939855653567|  3|
|2015|   Van Nuys|32.265140677157845|  1|
|2015|    Mission|30.463762673676303|  2|
|2015|   Foothill|30.353001803658852|  3|
|2016|   Van Nuys|32.194518462124094|  1|
|2016|West Valley| 31.40146437042384|  2|
+----+-----------+----------------

In [4]:
# Ακολουθει κωδικας για την μετατροπη ενος αρχειο CSV σε αρχειο Parquet
spark = SparkSession \
    .builder \
    .appName("CSV to Parquet Conversion") \
    .getOrCreate()

file_1 = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
file_2 = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"

schema = StructType([
    StructField("dr_no", IntegerType(), True),
    StructField("date_rptd", StringType(), True),
    StructField("date_occ", StringType(), True),
    StructField("time_occ", StringType(), True),
    StructField("area", StringType(), True),
    StructField("area_name", StringType(), True),
    StructField("rpt_dist_no", StringType(), True),
    StructField("part_1_2", IntegerType(), True),
    StructField("crm_cd", StringType(), True),
    StructField("crm_cd_desc", StringType(), True),
    StructField("mocodes", StringType(), True),
    StructField("vict_age", StringType(), True),
    StructField("vict_sex", StringType(), True),
    StructField("vict_descent", StringType(), True),
    StructField("premis_cd", StringType(), True),
    StructField("premis_desc", StringType(), True),
    StructField("weapon_used_cd", StringType(), True),
    StructField("weapon_desc", StringType(), True),
    StructField("status", StringType(), True),
    StructField("status_desc", StringType(), True),
    StructField("crm_cd_1", StringType(), True),
    StructField("crm_cd_2", StringType(), True),
    StructField("crm_cd_3", StringType(), True),
    StructField("crm_cd_4", StringType(), True),
    StructField("location", StringType(), True),
    StructField("cross_street", StringType(), True),
    StructField("lat", IntegerType(), True),
    StructField("lon", IntegerType(), True),
])

df1 = spark.read.csv(file_1, header=True, schema=schema)
df2 = spark.read.csv(file_2, header=True, schema=schema)
df = df1.union(df2)

# Διαδρομη αποθηκευσης του αρχειου Parquet στο S3
output_path = "s3://groups-bucket-dblab-905418150721/group25/Outputs"

# Αποθήκευση σε Parquet format
# Με το mode="overwrite" διασφαλιζουμε οτι αν υπαρχει ηδη αρχείο
# θα αντικατασταθεί
df.write.mode("overwrite").parquet(output_path)
print("Done")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Done

In [5]:
spark = SparkSession \
    .builder \
    .appName("Query 2 with Parquet") \
    .getOrCreate()

start_time = time.time()

# Διαδρομη του Parquet φακελου
parquet_path = "s3://groups-bucket-dblab-905418150721/group25/Outputs/"

# Φορτωση δεδομενων από το Parquet
df = spark.read.parquet(parquet_path)

df = df.withColumn(
    "case_status",
    when(col("status_desc").isin("UNK", "Invest Cont"), "Case not closed")
    .otherwise("Case closed")
)

df = df.select("date_occ", "area_name", "case_status")
df = df.withColumn("date", to_timestamp(col("date_occ"), "MM/dd/yyyy hh:mm:ss a"))
df = df.withColumn("year", year(col("date")))
df = df.select("year", "area_name", "case_status")


df_stats = df.groupBy("year", "area_name").agg(
    count("*").alias("total_cases"),
    count(when(col("case_status") == "Case closed", 1)).alias("closed_cases")
)

df_stats = df_stats.withColumn("closed_case_rate",
                               (col("closed_cases")/col("total_cases"))*100)


window_spec = Window.partitionBy("year")\
                .orderBy(col("closed_case_rate").desc())

df_stats = df_stats.withColumn("#", F.rank().over(window_spec))

df_top_3 = df_stats.filter(col("#") <= 3)

df_top_3 = df_top_3.select("year", "area_name", "closed_case_rate", "#")
df_top_3 = df_top_3.withColumnRenamed("area_name", "precinct")

df_top_3.orderBy("year", "#").show()

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------------+---+
|year|   precinct|  closed_case_rate|  #|
+----+-----------+------------------+---+
|2010|    Rampart| 32.84713448949121|  1|
|2010|    Olympic|31.515289821999087|  2|
|2010|     Harbor| 29.36028339237341|  3|
|2011|    Olympic|35.040060090135206|  1|
|2011|    Rampart|  32.4964471814306|  2|
|2011|     Harbor| 28.51336246316431|  3|
|2012|    Olympic| 34.29708533302119|  1|
|2012|    Rampart| 32.46000463714352|  2|
|2012|     Harbor|29.509585848956675|  3|
|2013|    Olympic| 33.58217940999398|  1|
|2013|    Rampart|  32.1060382916053|  2|
|2013|     Harbor|29.723638951488557|  3|
|2014|   Van Nuys|  32.0215235281705|  1|
|2014|West Valley| 31.49754809505847|  2|
|2014|    Mission|31.224939855653567|  3|
|2015|   Van Nuys|32.265140677157845|  1|
|2015|    Mission|30.463762673676303|  2|
|2015|   Foothill|30.353001803658852|  3|
|2016|   Van Nuys|32.194518462124094|  1|
|2016|West Valley| 31.40146437042384|  2|
+----+-----------+----------------