In [1]:
#QUERY 1 WITH DF
from pyspark.sql.functions import col, when, count, lower
from pyspark.sql import SparkSession

# Δημιουργία SparkSession
spark = SparkSession.builder \
    .appName("Query 1") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Φόρτωση δεδομένων από τα δύο αρχεία CSV
crime_data_path_2010_2019 = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
crime_data_path_2020_present = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"

crime_df_2010_2019 = spark.read.csv(crime_data_path_2010_2019, header=True, inferSchema=True)
crime_df_2020_present = spark.read.csv(crime_data_path_2020_present, header=True, inferSchema=True)

# Ένωση των δύο DataFrames
combined_crime_df = crime_df_2010_2019.union(crime_df_2020_present)

# Φιλτράρισμα εγγραφών που αναφέρονται στο Null Island
filtered_crime_df = combined_crime_df.filter((col("LAT") != 0) & (col("LON") != 0))

# Φιλτράρισμα για περιστατικά που περιέχουν τον όρο "aggravated assault"
aggr_as_df = filtered_crime_df.filter(lower(col("Crm Cd Desc")).like("%aggravated assault%"))

# Προσθήκη στήλης για την κατηγοριοποίηση των θυμάτων σε ηλικιακές ομάδες
age_grouped_df = aggr_as_df.withColumn(
    "Age Group",
    when(col("Vict Age") < 18, "Children")
    .when((col("Vict Age") >= 18) & (col("Vict Age") <= 24), "Young Adults")
    .when((col("Vict Age") >= 25) & (col("Vict Age") <= 64), "Adults")
    .when(col("Vict Age") > 64, "Seniors")
)

# Ομαδοποίηση ανά ηλικιακή ομάδα και καταμέτρηση
result_1_df = age_grouped_df.groupBy("Age Group").agg(count("*").alias("Total Cases"))

# Ταξινόμηση σε φθίνουσα σειρά
sorted_result_1_df = result_1_df.orderBy(col("Total Cases").desc())

import time
start_time = time.time()

sorted_result_1_df.show()

end_time = time.time()
print(f"Execution Time (DataFrame API): {end_time - start_time} seconds")


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
472,application_1738075734771_0472,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-----------+
|   Age Group|Total Cases|
+------------+-----------+
|      Adults|     121052|
|Young Adults|      33588|
|    Children|      15923|
|     Seniors|       5985|
+------------+-----------+

Execution Time (DataFrame API): 4.208456993103027 seconds

In [2]:
#QUERY 1 WITH RDD
crime_rdd_2010_2019 = spark.read.csv(crime_data_path_2010_2019, header=True, inferSchema=True).rdd
crime_rdd_2020_present = spark.read.csv(crime_data_path_2020_present, header=True, inferSchema=True).rdd

# Ένωση των δύο RDD
combined_rdd = crime_rdd_2010_2019.union(crime_rdd_2020_present)

# Φιλτράρισμα για Null Island
filtered_rdd = combined_rdd.filter(lambda row: row["LAT"] != 0 and row["LON"] != 0)

# Φιλτράρισμα για τον όρο "aggravated assault"
assault_rdd = filtered_rdd.filter(lambda row: "aggravated assault" in row["Crm Cd Desc"].lower())

# Κατηγοριοποίηση θυμάτων σε ηλικιακές ομάδες
def categorize_age(row):
    if row["Vict Age"] < 18:
        age_group = "Children"
    elif 18 <= row["Vict Age"] <= 24:
        age_group = "Young Adults"
    elif 25 <= row["Vict Age"] <= 64:
        age_group = "Adults"
    elif row["Vict Age"] > 64:
        age_group = "Seniors"
    else:
        age_group = "Unknown"
    return (age_group, 1)

age_group_rdd = assault_rdd.map(categorize_age)

# Συγκέντρωση και υπολογισμός πλήθους ανά ηλικιακή ομάδα
age_group_count_rdd = age_group_rdd.reduceByKey(lambda a, b: a + b)

# Ταξινόμηση σε φθίνουσα σειρά
sorted_rdd = age_group_count_rdd.sortBy(lambda x: x[1], ascending=False)

import time
start_time = time.time()

result = sorted_rdd.collect()  # Χρήση collect() για να πάρουμε τα αποτελέσματα

end_time = time.time()
print(f"Execution Time (RDD API): {end_time - start_time} seconds")

# Εκτύπωση αποτελεσμάτων
for group, count in result:
    print(f"{group}: {count}")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Execution Time (RDD API): 0.34343934059143066 seconds
Adults: 121052
Young Adults: 33588
Children: 15923
Seniors: 5985

In [9]:
#QUERY 2 WITH DF
from pyspark.sql.functions import col, count, when, year, to_date, row_number
from pyspark.sql.window import Window

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Μετατροπή "DATE OCC" σε ημερομηνία και εξαγωγή του έτους
filtered_crime_df = filtered_crime_df.withColumn("DATE OCC", to_date(col("DATE OCC"), "MM/dd/yyyy"))
filtered_crime_df = filtered_crime_df.withColumn("YEAR", year(col("DATE OCC")))

# Υπολογισμός του ποσοστού κλεισμένων υποθέσεων
closed_rate_df = filtered_crime_df.groupBy("YEAR", "AREA NAME").agg(
    count(when(col("Status").isin("JA", "AA", "JO", "AO"), True)).alias("closed_cases"),
    count("*").alias("total_cases")
).withColumn(
    "closed_case_rate", (col("closed_cases") / col("total_cases")) * 100
)

# Δημιουργία παραθύρου για ταξινόμηση
window_spec = Window.partitionBy("YEAR").orderBy(col("closed_case_rate").desc())

# Υπολογισμός του ranking
ranked_df = closed_rate_df.withColumn("rank", row_number().over(window_spec))

# Επιλογή των 3 πρώτων τμημάτων ανά έτος
top3_df = ranked_df.filter(col("rank") <= 3).orderBy("YEAR", "rank")

start_time_df = time.time()

# Εμφάνιση αποτελεσμάτων
top3_df.show(50)

end_time_df = time.time()
print(f"Execution Time (DataFrame API): {end_time_df - start_time_df} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------+-----------+------------------+----+
|YEAR|  AREA NAME|closed_cases|total_cases|  closed_case_rate|rank|
+----+-----------+------------+-----------+------------------+----+
|2010|    Rampart|        2860|       8706|    32.85090742017|   1|
|2010|    Olympic|        2762|       8764|31.515289821999087|   2|
|2010|     Harbor|        2818|       9598| 29.36028339237341|   3|
|2011|    Olympic|        2798|       7987| 35.03192688118192|   1|
|2011|    Rampart|        2744|       8443|32.500296103280824|   2|
|2011|     Harbor|        2806|       9840|28.516260162601625|   3|
|2012|    Olympic|        2923|       8523|34.295435879385195|   1|
|2012|    Rampart|        2791|       8598|32.461037450569904|   2|
|2012|     Harbor|        2781|       9416|29.534834324553948|   3|
|2013|    Olympic|        2789|       8305| 33.58217940999398|   1|
|2013|    Rampart|        2616|       8148|  32.1060382916053|   2|
|2013|     Harbor|        2504|       8429| 29.7

In [10]:
#QUERY 2 WITH SQL API
from pyspark.sql.functions import to_date, year

# Ρύθμιση legacy time parser
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Μετατροπή της στήλης "DATE OCC" σε ημερομηνία και εξαγωγή του έτους
filtered_crime_df = filtered_crime_df.withColumn("DATE OCC", to_date(col("DATE OCC"), "MM/dd/yyyy"))
filtered_crime_df = filtered_crime_df.withColumn("YEAR", year(col("DATE OCC")))

# Δημιουργία προσωρινού view για χρήση SQL
filtered_crime_df.createOrReplaceTempView("crime_data")

# Υλοποίηση του query
sql_query = """
WITH closed_cases_data AS (
    SELECT
        YEAR,
        `AREA NAME`,
        COUNT(CASE WHEN Status IN ('JA', 'AA', 'JO', 'AO') THEN 1 END) AS closed_cases,
        COUNT(*) AS total_cases,
        (COUNT(CASE WHEN Status IN ('JA', 'AA', 'JO', 'AO') THEN 1 END) * 100.0 / COUNT(*)) AS closed_case_rate
    FROM crime_data
    GROUP BY YEAR, `AREA NAME`
),
ranked_data AS (
    SELECT
        YEAR,
        `AREA NAME`,
        closed_cases,
        total_cases,
        closed_case_rate,
        ROW_NUMBER() OVER (PARTITION BY YEAR ORDER BY closed_case_rate DESC) AS rank
    FROM closed_cases_data
)
SELECT
    YEAR,
    `AREA NAME`,
    closed_cases,
    total_cases,
    closed_case_rate,
    rank
FROM ranked_data
WHERE rank <= 3
ORDER BY YEAR, rank
"""

start_time_sql = time.time()

# Εκτέλεση του SQL query
top3_sql_df = spark.sql(sql_query)
top3_sql_df.show(50)

end_time_sql = time.time()
print(f"Execution Time (SQL API): {end_time_sql - start_time_sql} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------+-----------+-----------------+----+
|YEAR|  AREA NAME|closed_cases|total_cases| closed_case_rate|rank|
+----+-----------+------------+-----------+-----------------+----+
|2010|    Rampart|        2860|       8706|32.85090742017000|   1|
|2010|    Olympic|        2762|       8764|31.51528982199909|   2|
|2010|     Harbor|        2818|       9598|29.36028339237341|   3|
|2011|    Olympic|        2798|       7987|35.03192688118192|   1|
|2011|    Rampart|        2744|       8443|32.50029610328082|   2|
|2011|     Harbor|        2806|       9840|28.51626016260163|   3|
|2012|    Olympic|        2923|       8523|34.29543587938519|   1|
|2012|    Rampart|        2791|       8598|32.46103745056990|   2|
|2012|     Harbor|        2781|       9416|29.53483432455395|   3|
|2013|    Olympic|        2789|       8305|33.58217940999398|   1|
|2013|    Rampart|        2616|       8148|32.10603829160530|   2|
|2013|     Harbor|        2504|       8429|29.70696405267529| 

In [13]:
#QUERY3
from sedona.spark import *
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

# Create sedona context
sedona = SedonaContext.create(spark)

# Read the file from s3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

# Formatting magic
flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")

from pyspark.sql.functions import regexp_replace

# Διαδρομή του αρχείου εισοδήματος
income_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv"

# Φόρτωση του αρχείου CSV στο Spark
income_df = spark.read.csv(income_path, header=True, inferSchema=True)

# Καθαρισμός της στήλης "Estimated Median Income"
income_df = income_df.withColumn(
    "Estimated Median Income",
    regexp_replace(col("Estimated Median Income"), "[^0-9.]", "").cast("double")
)

from pyspark.sql.functions import col, sum, broadcast


# Εγγραφή Sedona functions
SedonaRegistrator.registerAll(spark)

# Υπολογισμός συνολικού πληθυσμού και αριθμού κατοικιών
census_population_df = flattened_df.groupBy("COMM", "ZCTA10").agg(
    sum("POP_2010").alias("Total Population"),
    sum("HOUSING10").alias("Total Housing Units"),
     ST_Union_Aggr("geometry").alias("geometry")
)

# Υπολογισμός μέσου μεγέθους νοικοκυριού
census_population_df = census_population_df.withColumn(
    "Average Household Size",
    (col("Total Population") / col("Total Housing Units")).cast("double")
)

# Φιλτράρισμα μηδενικών τιμών (προαιρετικό)
census_population_df = census_population_df.filter(
    (col("Total Population") > 0) & (col("Total Housing Units") > 0)
)

# Σύνδεση GeoJSON και εισοδηματικών δεδομένων με βάση το Zip Code
joined_income_df = income_df.join(
    broadcast(census_population_df),
    income_df["Zip Code"] == census_population_df["ZCTA10"],
    "inner"
)

# Υπολογισμός μέσου εισοδήματος ανά άτομο
final_income_df = joined_income_df.withColumn(
    "Mean Income Per Person",
    (col("Estimated Median Income").cast("double") / col("Average Household Size")).cast("double")
)

# Δημιουργία γεωμετρικών σημείων από LAT και LON
crime_points_df = filtered_crime_df.withColumn(
    "geom", ST_Point("LON", "LAT")
)

# Geospatial join μεταξύ σημείων εγκλημάτων και περιοχών απογραφής
crime_population_df = crime_points_df.join(
    census_population_df.hint("shuffle_hash"),
    ST_Within(crime_points_df["geom"], census_population_df["geometry"]),
    "inner"
)

# Υπολογισμός συνολικών εγκλημάτων και πληθυσμού ανά περιοχή
crime_population_df = crime_population_df.groupBy("COMM").agg(
    count("*").alias("Total Crimes"),
    sum("Total Population").alias("Total Population")
).withColumn(
    "Crime Per Capita",
    (col("Total Crimes") / col("Total Population")).cast("double")
)

from pyspark.sql.functions import col, avg

# Υπολογισμός μέσου εισοδήματος ανά περιοχή
income_aggregated_df = final_income_df.groupBy("COMM").agg(
    avg("Estimated Median Income").alias("Mean Estimated Income")
)

# Σύνδεση του Crime Per Capita με το μέσο εισόδημα
final_df = crime_population_df.hint("merge").join(
    income_aggregated_df,
    "COMM",
    "inner"
).select(
    "COMM",
    "Crime Per Capita",
    "Mean Estimated Income"
)

from pyspark.sql.functions import format_number

# Μορφοποίηση της στήλης "Crime Per Capita" σε δεκαδικούς αριθμούς
formatted_df = final_df.withColumn(
    "Crime Per Capita", format_number("Crime Per Capita", 6)  # Διατήρηση 6 δεκαδικών ψηφίων
)

# Εμφάνιση αποτελεσμάτων
formatted_df.show(20, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+----------------+---------------------+
|COMM                   |Crime Per Capita|Mean Estimated Income|
+-----------------------+----------------+---------------------+
|                       |0.001976        |82810.42857142857    |
|Acton                  |0.000127        |78868.25             |
|Adams-Normandie        |0.000254        |28084.0              |
|Agoura Hills           |0.000049        |114417.0             |
|Alhambra               |0.000020        |51822.333333333336   |
|Alsace                 |0.000085        |38330.0              |
|Altadena               |0.000028        |80547.66666666667    |
|Anaverde               |0.001211        |69699.66666666667    |
|Angeles National Forest|0.009003        |73013.5294117647     |
|Angelino Heights       |0.000638        |39784.5              |
|Arcadia                |0.000032        |66056.0              |
|Arleta                 |0.000030        |44588.0              |
|Athens Village         |

In [37]:
#JOINS
from pyspark.sql.functions import broadcast

# Ενδεικτικό Join χωρίς hint
basic_join = crime_population_df.join(
    income_aggregated_df,
    "COMM",
    "inner"
)

# Εμφάνιση του φυσικού πλάνου εκτέλεσης
print("Basic Join:")
basic_join.explain()

# Προσθήκη Broadcast Hint
broadcast_join = crime_population_df.join(
    broadcast(income_aggregated_df),
    "COMM",
    "inner"
)

print("\nBroadcast Join:")
broadcast_join.explain()

# Προσθήκη Merge Hint
merge_join = crime_population_df.hint("merge").join(
    income_aggregated_df.hint("merge"),
    "COMM",
    "inner"
)

print("\nMerge Join:")
merge_join.explain()

# Προσθήκη Shuffle Hash Hint
shuffle_hash_join = crime_population_df.hint("shuffle_hash").join(
    income_aggregated_df.hint("shuffle_hash"),
    "COMM",
    "inner"
)

print("\nShuffle Hash Join:")
shuffle_hash_join.explain()

# Προσθήκη Shuffle Replicate NL Hint
shuffle_replicate_nl_join = crime_population_df.hint("shuffle_replicate_nl").join(
    income_aggregated_df.hint("shuffle_replicate_nl"),
    "COMM",
    "inner"
)

print("\nShuffle Replicate NL Join:")
shuffle_replicate_nl_join.explain()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Basic Join:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [COMM#4107, Total Crimes#4602L, Total Population#4604L, Crime Per Capita#4608, Mean Estimated Income#10679]
   +- SortMergeJoin [COMM#4107], [COMM#10308], Inner
      :- Sort [COMM#4107 ASC NULLS FIRST], false, 0
      :  +- Project [COMM#4107, Total Crimes#4602L, Total Population#4604L, (cast(Total Crimes#4602L as double) / cast(Total Population#4604L as double)) AS Crime Per Capita#4608]
      :     +- HashAggregate(keys=[COMM#4107], functions=[count(1), sum(Total Population#4313L)], schema specialized)
      :        +- Exchange hashpartitioning(COMM#4107, 1000), ENSURE_REQUIREMENTS, [plan_id=4799]
      :           +- HashAggregate(keys=[COMM#4107], functions=[partial_count(1), partial_sum(Total Population#4313L)], schema specialized)
      :              +- Project [COMM#4107, Total Population#4313L]
      :                 +- RangeJoin geom#4462: geometry, geometry#4320: geometry, WITHIN
      :       

In [31]:
#QUERY5

from pyspark.sql import SparkSession
import time

# Συνάρτηση για τη δημιουργία νέου SparkSession
def create_spark_session(executors, cores_per_executor, memory_per_executor):
    return SparkSession.builder \
        .appName("Query5 Execution") \
        .config("spark.executor.instances", executors) \
        .config("spark.executor.cores", cores_per_executor) \
        .config("spark.executor.memory", f"{memory_per_executor}G") \
        .config("spark.driver.memory", "4G") \
        .getOrCreate()

# Διαφορετικά configurations
configs = [
    (2, 4, 8),  # 2 executors × 4 cores/8GB memory
    (4, 2, 4),  # 4 executors × 2 cores/4GB memory
    (8, 1, 2)   # 8 executors × 1 core/2GB memory
]

# Επανάληψη για κάθε configuration
for executors, cores, memory in configs:
    if 'spark' in locals():
        spark.stop()

# Δημιουργία νέου SparkSession
    spark = create_spark_session(executors, cores, memory)
    print(f"Executing Query 5 with {executors} executors × {cores} cores/{memory}GB memory")


police_stations_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv"

# Φόρτωση δεδομένων αστυνομικών τμημάτων
police_stations_df = spark.read.csv(police_stations_path, header=True, inferSchema=True)

from sedona.register import SedonaRegistrator
from sedona.sql.types import GeometryType

# Εγγραφή Sedona functions
SedonaRegistrator.registerAll(spark)

# Μετατροπή δεδομένων εγκλημάτων σε γεωμετρικά σημεία
filtered_crime_df = filtered_crime_df.withColumn("geom", ST_Point("LON", "LAT"))

# Μετατροπή δεδομένων αστυνομικών τμημάτων σε γεωμετρικά σημεία
police_stations_df = police_stations_df.withColumn("geom", ST_Point("X", "Y"))

# Υπολογισμός απόστασης μεταξύ σημείων εγκλημάτων και αστυνομικών τμημάτων
crime_with_distance = filtered_crime_df.crossJoin(police_stations_df).withColumn(
    "distance",
    ST_DistanceSphere(filtered_crime_df["geom"], police_stations_df["geom"]) / 1000  # Απόσταση σε χιλιόμετρα
)

# Εύρεση του πλησιέστερου τμήματος για κάθε έγκλημα
window_spec = Window.partitionBy("DR_NO").orderBy("distance")
crime_with_nearest_station = crime_with_distance.withColumn(
    "row_num",
    row_number().over(window_spec)
).filter("row_num = 1").drop("row_num")

# Ομαδοποίηση ανά αστυνομικό τμήμα
result_3_df = crime_with_nearest_station.groupBy("DIVISION").agg(
    count("*").alias("Total Crimes"),
    avg("distance").alias("Average Distance")
).orderBy("Total Crimes", ascending=False)


start_time = time.time()

# Εμφάνιση των αποτελεσμάτων
result_3_df.show(truncate=False)

end_time = time.time()
    print(f"Execution Time: {start_time - end_time} seconds\n")

# Τερματισμός της συνεδρίας για να επιτρέψει την αλλαγή των ρυθμίσεων
    spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
unexpected indent (<stdin>, line 76)
  File "<stdin>", line 76
    print(f"Execution Time: {start_time - end_time} seconds\n")
IndentationError: unexpected indent

