In [None]:
# QUERY 2 - DataFrame API
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import udf
from pyspark.sql.types import StructField, StructType, StringType, DoubleType

# Initialize SparkSession
spark = SparkSession \
    .builder \
    .appName("DF query 2 execution") \
    .getOrCreate()

# Define the schema for the crime data
crime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", DoubleType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", DoubleType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", DoubleType()),
    StructField("LON", DoubleType())
])


def calculate_percentage(closed_cases, total_cases):
    if total_cases == 0:
        return 0.0
    return ((closed_cases / total_cases) * 100)


# Register
calculate_percentage = udf(calculate_percentage, DoubleType())

start_time = time.time()

# Read the crime data from CSV files
crime1_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721//CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", 
                           header=False, schema=crime_schema)
crime2_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721//CrimeData/Crime_Data_from_2020_to_Present_20241101.csv", 
                           header=False, schema=crime_schema)

# Union the dataframes to combine the two datasets
crime_df = crime1_df.union(crime2_df)

# DATE OCC from StringType() to TimestampType()
crime_df_fixed = crime_df.withColumn(
    "DATE OCC", F.to_timestamp("DATE OCC", "MM/dd/yyyy hh:mm:ss a")
)

# F.year("DATE OCC") extracts the year from the DATE OCC
crime_df_adjusted = crime_df_fixed.withColumn("year", F.year("DATE OCC"))

crime_df_filtered = crime_df_adjusted.filter(~F.col("Status Desc").isin("UNK", "Invest Cont"))

total_cases = (
    crime_df_adjusted.groupBy("year", "AREA NAME")
    .agg(F.count("*").alias("TOTAL CASES")))

closed_cases = (
    crime_df_filtered.groupBy("year", "AREA NAME")
    .agg(F.count("*").alias("CLOSED CASES")))


percentage = closed_cases.join(total_cases, ["year", "AREA NAME"]) \
    .withColumn("closed_case_rate",
                calculate_percentage(F.col("CLOSED CASES"),
                                     F.col("TOTAL CASES")))

crime_df_ranked = percentage.withColumn("#", F.row_number().over(
    Window.partitionBy("year").orderBy(F.col("closed_case_rate").desc())))

crime_df_ranked_top3 = crime_df_ranked.filter(F.col("#") <= 3)

crime_df_ranked_top3_output = crime_df_ranked_top3.select("year", "AREA NAME",
                                                          "closed_case_rate", "#")

# Show the result ordered by Year and Ranking
crime_df_ranked_top3_output.orderBy("year", "#").show()

end_time = time.time()

execution_time = end_time - start_time
print(f"Execution time: {execution_time:.2f} seconds")

In [None]:
# QUERY 2 - Spark SQL

# Initialize SparkSession
spark = SparkSession \
    .builder \
    .appName("SQL query 2 execution") \
    .getOrCreate()

# Define the schema for the crime data
crime_schema = StructType([
    StructField("DR_NO", StringType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", StringType()),
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", StringType()),
    StructField("Part 1-2", DoubleType()),
    StructField("Crm Cd", StringType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", StringType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", DoubleType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", StringType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", StringType()),
    StructField("Crm Cd 2", StringType()),
    StructField("Crm Cd 3", StringType()),
    StructField("Crm Cd 4", StringType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", DoubleType()),
    StructField("LON", DoubleType())
])

start_time = time.time()

# Read the crime data from CSV files
crime_1 = spark.read.format('csv') \
                    .options(header='false') \
                    .schema(crime_schema) \
                    .load("s3://initial-notebook-data-bucket-dblab-905418150721//CrimeData/Crime_Data_from_2010_to_2019_20241101.csv")

crime_2 = spark.read.format('csv') \
                    .options(header='false') \
                    .schema(crime_schema) \
                    .load("s3://initial-notebook-data-bucket-dblab-905418150721//CrimeData/Crime_Data_from_2020_to_Present_20241101.csv")

# To utilize as SQL tables
crime_1.createOrReplaceTempView("crime_1")
crime_2.createOrReplaceTempView("crime_2")

crime_df = spark.sql("""
SELECT * FROM crime_1
UNION
SELECT * FROM crime_2
""")

# To utilize as SQL table
crime_df.createOrReplaceTempView("crime_df")

query = """
SELECT *,
       TO_TIMESTAMP(`DATE OCC`, 'MM/dd/yyyy hh:mm:ss a') AS `DATE OCC TIMESTAMP`,
       YEAR(TO_TIMESTAMP(`DATE OCC`, 'MM/dd/yyyy hh:mm:ss a')) AS `year`
FROM crime_df
"""

crime_df_adjusted = spark.sql(query)
# To utilize as SQL table
crime_df_adjusted.createOrReplaceTempView("crime_df_adjusted")

query = """
WITH total_cases AS (
    SELECT `year`, `AREA NAME`, COUNT(*) AS TOTAL_CASES
    FROM crime_df_adjusted
    WHERE `year` IS NOT NULL
    GROUP BY `year`, `AREA NAME`
),
closed_cases AS (
    SELECT `year`, `AREA NAME`, COUNT(*) AS CLOSED_CASES
    FROM crime_df_adjusted
    WHERE `Status Desc` NOT IN ('UNK', 'Invest Cont')
    GROUP BY `year`, `AREA NAME`
),
percentage AS (
    SELECT t.`year`, t.`AREA NAME`, t.TOTAL_CASES, c.CLOSED_CASES,
        CASE WHEN t.TOTAL_CASES = 0 THEN 0 
        ELSE (c.CLOSED_CASES / t.TOTAL_CASES) * 100 END AS closed_case_rate
    FROM total_cases t
    LEFT JOIN closed_cases c
    ON t.`year` = c.`year` AND t.`AREA NAME` = c.`AREA NAME`
),
ranked_percentage AS (
    SELECT `year`, `AREA NAME`, closed_case_rate,
        ROW_NUMBER() OVER (PARTITION BY `year` ORDER BY closed_case_rate DESC) AS rank
    FROM percentage
)
SELECT `year`, `AREA NAME`, closed_case_rate, rank
FROM ranked_percentage
WHERE rank <= 3
ORDER BY `year`, rank;
"""

crime_df_ranked_top3_output = spark.sql(query)

crime_df_ranked_top3_output.show()

end_time = time.time()

execution_time = end_time - start_time
print(f"Execution time: {execution_time:.2f} seconds")

In [None]:
# QUERY 2 - parquet

spark.conf.set("spark.hadoop.fs.s3a.access.key", "el21103@mail.ntua.gr")
spark.conf.set("spark.hadoop.fs.s3a.secret.key", "BVwyriNr@Pt^G&PVex9}")
spark.conf.set("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")

path = "s3://groups-bucket-dblab-905418150721/group42/crime_data.parquet"

crime_df.coalesce(1).write.mode("overwrite").parquet(path)

In [None]:
# QUERY 2 - DataFrame API - parquet
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

# Initialize SparkSession
spark = SparkSession \
    .builder \
    .appName("DF query 2 execution with Parquet") \
    .getOrCreate()

# Define a UDF to calculate the percentage of closed cases
def calculate_percentage(closed_cases, total_cases):
    if total_cases == 0:
        return 0.0
    return ((closed_cases / total_cases) * 100)

# Register
calculate_percentage = udf(calculate_percentage, DoubleType())

start_time = time.time()

crime_df = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group42/crime_data.parquet")

# Ensure DATE OCC is in TimestampType
crime_df_fixed = crime_df.withColumn(
    "DATE OCC", F.to_timestamp("DATE OCC", "MM/dd/yyyy hh:mm:ss a")
)


crime_df_adjusted = crime_df_fixed.withColumn("year", F.year("DATE OCC"))


crime_df_filtered = crime_df_adjusted.filter(~F.col("Status Desc").isin("UNK", "Invest Cont"))


total_cases = (
    crime_df_adjusted.groupBy("year", "AREA NAME")
    .agg(F.count("*").alias("TOTAL CASES"))
)


closed_cases = (
    crime_df_filtered.groupBy("year", "AREA NAME")
    .agg(F.count("*").alias("CLOSED CASES"))
)


percentage = closed_cases.join(total_cases, ["year", "AREA NAME"]) \
    .withColumn("closed_case_rate", 
                calculate_percentage(F.col("CLOSED CASES"), 
                                     F.col("TOTAL CASES")))


crime_df_ranked = percentage.withColumn("#", F.row_number().over(
    Window.partitionBy("year").orderBy(F.col("closed_case_rate").desc())))


crime_df_ranked_top3 = crime_df_ranked.filter(F.col("#") <= 3)

# Select the required columns for output
crime_df_ranked_top3_output = crime_df_ranked_top3.select("year", "AREA NAME", 
                                                          "closed_case_rate", "#")


crime_df_ranked_top3_output.orderBy("year", "#").show()

end_time = time.time()

execution_time = end_time - start_time
print(f"Execution time: {execution_time:.2f} seconds")