In [None]:
from pyspark.sql import SparkSession

# Δημιουργία SparkSession
spark = SparkSession.builder \
    .appName("All_Queries") \
    .getOrCreate()


### Το ακόλουθο configuration για τα Queries 1 και 2 και 3

In [None]:
%%configure -f
{
    "conf": {
    "spark.executor.instances": "4"
    }
}

### Το ακόλουθο configuration για τo Query 4α)

In [None]:
%%configure -f
{
    "conf": {
    "spark.executor.instances": "2",
    "spark.executor.cores": "1",
     "spark.executor.memory": "2g"
    }
}

### Το ακόλουθο configuration για τo Query 4β)

In [None]:
%%configure -f
{
    "conf": {
    "spark.executor.instances": "2",
    "spark.executor.cores": "2",
     "spark.executor.memory": "4g"
    }
}

### Το ακόλουθο configuration για τo Query 4γ)

In [None]:
%%configure -f
{
    "conf": {
    "spark.executor.instances": "2",
    "spark.executor.cores": "4",
     "spark.executor.memory": "8g"
    }
}

### Το ακόλουθο configuration για τo Query 5α)

In [None]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.cores": "4",
        "spark.executor.memory": "8g"
    }
}

### Το ακόλουθο configuration για τo Query 5β)

In [None]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "4",
        "spark.executor.cores": "2",
        "spark.executor.memory": "4g"
    }
}

### Το ακόλουθο configuration για τo Query 5γ)

In [None]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "8",
        "spark.executor.cores": "1",
        "spark.executor.memory": "2g"
    }
}

### Query 1 με χρήση DataFrame API

In [None]:
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, desc


# Φόρτωση δεδομένων

start_time = time.time()

data1 = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, inferSchema=True)
data2 = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv", header=True, inferSchema=True)

# Ένωση των δύο DataFrames
data = data1.union(data2)


# Φιλτράρισμα για "aggravated assault"
filtered = data.filter(col("Crm Cd Desc").contains("AGGRAVATED ASSAULT"))

# Κατηγοριοποίηση σε ηλικιακές ομάδες
grouped = filtered.withColumn(
    "AgeGroup",
    when((col("Vict Age") < 18) & (col("Vict Age") > 0), "Children")
    .when((col("Vict Age") >= 18) & (col("Vict Age") <= 24), "Young Adults")
    .when((col("Vict Age") >= 25) & (col("Vict Age") <= 64), "Adults")
    .when(col("Vict Age") > 64, "Elderly")
    .when(col("Vict Age") <= 0, "Invalid Age")
)

# Ομαδοποίηση και καταμέτρηση
result_df = grouped.groupBy("AgeGroup").count().orderBy(desc("count"))

# Εμφάνιση αποτελεσμάτων
result_df.show()

end_time = time.time()

# Καταγραφή χρόνου
elapsed_time = end_time - start_time

elapsed_time_row = spark.createDataFrame([("Elapsed Time", elapsed_time)], ["AgeGroup", "count"])
result_with_time = result_df.union(elapsed_time_row)

#Αποθήκευση του αποτελέσματος
output_path = "s3://groups-bucket-dblab-905418150721/group37/query1_DataFrame_result_and_time"

result_with_time.write.mode("overwrite").option("header", "true").csv(output_path)




print(f"Execution time (DataFrame API): {elapsed_time:.2f} seconds")


### Query 1 χρησιμοποιώντας RDD (filter, map, reduceByKey και sortBy)

In [None]:
import time
from pyspark.sql import SparkSession


start_time = time.time()

# Φόρτωση δεδομένων
data1 = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, inferSchema=True).rdd
data2 = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv", header=True, inferSchema=True).rdd

# Ένωση των δύο RDDs
data = data1.union(data2)


# Φιλτράρισμα για "aggravated assault"
filtered = data.filter(lambda row: "AGGRAVATED ASSAULT" in str(row["Crm Cd Desc"]))

# Κατηγοριοποίηση σε ηλικιακές ομάδες
def categorize_age(row):
    age = row["Vict Age"]
    if age is None:  # Έλεγχος για null τιμές
        return None
    if age < 18 and age > 0:
        return "Children"
    elif 18 <= age <= 24:
        return "Young Adults"
    elif 25 <= age <= 64:
        return "Adults"
    elif age > 64:
        return "Elderly"
    else:
        return "Invalid Age"


# Δημιουργία RDD με κατηγοριοποιημένες ηλικίες
age_group_rdd = filtered.map(lambda row: (categorize_age(row), 1))

# Αφαίρεση null τιμών
age_group_rdd = age_group_rdd.filter(lambda x: x[0] is not None)

# Ομαδοποίηση και Καταμέτρηση
result_rdd = age_group_rdd.reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False)



# Εμφάνιση αποτελεσμάτων
for group, count in result_rdd.collect():
    print(f"{group}: {count}")

end_time = time.time()
    
# Μετατροπή του αποτελέσματος σε DataFrame
result_df = spark.createDataFrame(result_rdd, ["AgeGroup", "count"])

# Προσθήκη του χρόνου εκτέλεσης ως νέα γραμμή
elapsed_time = end_time - start_time
elapsed_time_row = spark.createDataFrame([("Elapsed Time", elapsed_time)], ["AgeGroup", "count"])
result_with_time = result_df.union(elapsed_time_row)

# Αποθήκευση του αποτελέσματος
output_path = "s3://groups-bucket-dblab-905418150721/group37/query1_MapReduce_result_and_time"
result_with_time.write.mode("overwrite").option("header", "true").csv(output_path)

# Εμφάνιση αποτελεσμάτων
print(f"Execution time (RDD API): {elapsed_time:.2f} seconds")

RDD vs DataFrame 

Η DataFrame API είναι πιο γρήγορη από την RDD API στη συγκεκριμένη περίπτωση λόγω της αρχιτεκτονικής και των χαρακτηριστικών του Spark.

Catalyst Optimizer

Η DataFrame API αξιοποιεί τον Catalyst Optimizer, έναν ισχυρό βελτιστοποιητή ερωτημάτων που:

    Εφαρμόζει βελτιστοποιήσεις λογικών και φυσικών σχεδίων εκτέλεσης.
    Αναδιοργανώνει τις λειτουργίες (π.χ., φιλτράρισμα, ομαδοποίηση) για να ελαχιστοποιήσει το κόστος εκτέλεσης.
    Εκμεταλλεύεται ευκαιρίες για predicate pushdown, μειώνοντας τα δεδομένα που μεταφέρονται ή υποβάλλονται σε επεξεργασία.

Στην περίπτωσή μας:

    Το φιλτράρισμα (filter) για Crm Cd Desc και η ομαδοποίηση (groupBy) υλοποιούνται αποδοτικά, χωρίς περιττές λειτουργίες.


### Query 2 α) (DataFrame)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, count, sum, when, to_date, to_timestamp
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import time


# Start timing
start_time = time.time()

# Φόρτωση δεδομένων ως DataFrames
data1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True,
    inferSchema=True
)
data2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True,
    inferSchema=True
)

# Ένωση των δύο DataFrames
data = data1.union(data2)

# Correctly parse DATE OCC and extract the year
data = data.withColumn("DATE OCC", to_date(to_timestamp(col("DATE OCC"), "MM/dd/yyyy hh:mm:ss a")))
data = data.withColumn("Year", year(col("DATE OCC")))



# Φιλτράρισμα για ανοικτές και κλεισμένες υποθέσεις
data = data.withColumn(
    "is_closed_case",
    when(~col("Status Desc").isin("UNK", "Invest Cont"), 1).otherwise(0)
)

# Υπολογισμός συνολικών και κλεισμένων υποθέσεων ανά τμήμα και έτος
aggregated = data.groupBy("Year", "AREA NAME") \
    .agg(
        count("*").alias("total_cases"),
        sum(col("is_closed_case")).alias("closed_cases")
    ) \
    .withColumn("closed_case_rate", (col("closed_cases") / col("total_cases")) * 100)

# Υπολογισμός κατάταξης για κάθε τμήμα ανά έτος
windowSpec = Window.partitionBy("Year").orderBy(col("closed_case_rate").desc())
aggregated = aggregated.withColumn("#", dense_rank().over(windowSpec))

# Επιλογή των top 3 τμημάτων ανά έτος
result = aggregated.filter(col("#") <= 3).orderBy(["Year", "#"])

# Επιλογή των απαραίτητων στηλών και μετονομασία για καλύτερη αναγνωσιμότητα
final_result = result.select(
    col("Year").alias("year"),
    col("AREA NAME").alias("precinct"),
    col("closed_case_rate").alias("closed_case_rate"),
    col("#").alias("#")
)

# Εμφάνιση του τελικού αποτελέσματος
final_result.show(truncate=False)

# End timing
end_time = time.time()

# Υπολογισμός χρόνου εκτέλεσης
execution_time = end_time - start_time

# Δημιουργία DataFrame για τον χρόνο εκτέλεσης
execution_time_schema = StructType([
    StructField("year", StringType(), True),
    StructField("precinct", StringType(), True),
    StructField("closed_case_rate", DoubleType(), True),
    StructField("#", StringType(), True)
])
execution_time_df = spark.createDataFrame(
    [("Execution Time", None, execution_time, None)],
    schema=execution_time_schema
)

# Προσθήκη του χρόνου εκτέλεσης στο αποτέλεσμα
final_result_with_time = final_result.union(execution_time_df)



# Save the final result with elapsed time to a CSV file
output_path = "s3://groups-bucket-dblab-905418150721/group37/query2_DataFrame"
final_result_with_time.write.mode("overwrite").option("header", "true").csv(output_path)

# Print the time taken
print(f"Query 2 execution time using DataFrame: {execution_time:.2f} seconds")


Query 2 α) (SQL API)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, to_timestamp, year
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import time

# Set the legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")



# Φόρτωση δεδομένων ως DataFrames
data1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True,
    inferSchema=True
)
data2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True,
    inferSchema=True
)

# Ένωση των δύο DataFrames
data = data1.union(data2)

# Correctly parse DATE OCC and extract the year
data = data.withColumn("DATE OCC", to_date(to_timestamp(col("DATE OCC"), "MM/dd/yyyy hh:mm:ss a")))
data = data.withColumn("Year", year(col("DATE OCC")))


# Start timing
start_time = time.time()

# Create a temporary SQL view
data.createOrReplaceTempView("crime_data")

# SQL Query
query = """
WITH processed_data AS (
    SELECT
        `Year` AS year,
        `AREA NAME` AS precinct,
        COUNT(*) AS total_cases,
        SUM(CASE WHEN `Status Desc` NOT IN ('UNK', 'Invest Cont') THEN 1 ELSE 0 END) AS closed_cases,
        (SUM(CASE WHEN `Status Desc` NOT IN ('UNK', 'Invest Cont') THEN 1 ELSE 0 END) * 100.0 / COUNT(*)) AS closed_case_rate
    FROM crime_data
    GROUP BY `Year`, `AREA NAME`
),
ranked_data AS (
    SELECT
        year,
        precinct,
        closed_case_rate,
        ROW_NUMBER() OVER (PARTITION BY year ORDER BY closed_case_rate DESC) AS rank
    FROM processed_data
)
SELECT 
    year,
    precinct,
    closed_case_rate,
    rank AS `#`
FROM ranked_data
WHERE rank <= 3
ORDER BY year ASC, rank ASC
"""

# Execute the SQL query
result = spark.sql(query)


result.show()

# End timing
end_time = time.time()

# Calculate execution time
execution_time = end_time - start_time

# Add execution time to the result
execution_time_schema = StructType([
    StructField("year", StringType(), True),
    StructField("precinct", StringType(), True),
    StructField("closed_case_rate", DoubleType(), True),
    StructField("#", StringType(), True)
])

execution_time_row = spark.createDataFrame(
    [("Execution Time", None, execution_time, None)],
    schema=execution_time_schema
)

# Append the execution time row to the result
final_result = result.union(execution_time_row)

# Save the final result to a CSV file
output_path = "s3://groups-bucket-dblab-905418150721/group37/query2_SQL"
final_result.write.mode("overwrite").option("header", "true").csv(output_path)

# Print the time taken
print(f"Query 2 execution time using SQL_API: {execution_time:.2f} seconds")


Στην πράξη, το SQL API είναι πιο γρήγορο από το DataFrame API για ερωτήματα που εκφράζονται φυσικά μέσω SQL, όπως φιλτραρίσματα, ενώσεις (joins) και ομαδοποιήσεις (groupBy). Αυτό συμβαίνει επειδή το SQL API μπορεί να εκμεταλλευτεί πληρέστερα τον Catalyst Optimizer, μειώνοντας την επεξεργασία δεδομένων και τις λειτουργίες ανάγνωσης (I/O). Το DataFrame API, αν και βελτιστοποιημένο, ενδέχεται να περιλαμβάνει περιττά στάδια επεξεργασίας αν οι λειτουργίες δεν περιγράφονται με σαφήνεια.

Επιπλέον, το SQL API είναι πιο εύκολο να διαβαστεί και να διατηρηθεί για όσους έχουν ήδη εμπειρία στη SQL. Παρέχει επίσης καλύτερη υποστήριξη εργαλείων τρίτων για την παρακολούθηση και την ανάλυση ερωτημάτων. Ωστόσο, το DataFrame API είναι καλύτερο για πιο σύνθετες λογικές, όπως υπολογισμοί σε πολλαπλά στάδια, ενσωμάτωση εξωτερικών δεδομένων ή χρήση συνδυασμένων λειτουργιών προγραμματισμού.

Συμπερασματικά, το SQL API είναι η προτιμότερη επιλογή για δηλωτικές ερωτήσεις με σταθερή δομή και έμφαση στην απόδοση, ενώ το DataFrame API είναι ιδανικό για πιο σύνθετες και ευέλικτες εργασίες δεδομένων. Η επιλογή μεταξύ των δύο εξαρτάται από τις απαιτήσεις του έργου και την εμπειρία του χρήστη.

### Query 2 β) Εκτέλεση για Parquet Format με DataFrame

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, count, sum, when, to_date, to_timestamp
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import time

# Set the legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Load CSV data as DataFrames
data1 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True,
    inferSchema=True
)
data2 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True,
    inferSchema=True
)

# Combine the two DataFrames
data = data1.union(data2)

# Save combined data in Parquet format
output_path = "s3://groups-bucket-dblab-905418150721/group37/CrimeData_parquet_ReExecution"
data.write.mode("overwrite").parquet(output_path)


# Start timing optimized query
start_optimized_parquet_query = time.time()

# Load Parquet data
parquet_data = spark.read.parquet(output_path)

# Select relevant columns for processing
selected_data = parquet_data.select("DATE OCC", "AREA NAME", "Status Desc")

# Filter and preprocess data
filtered_data = selected_data.withColumn(
    "DATE OCC", to_date(to_timestamp(col("DATE OCC"), "MM/dd/yyyy hh:mm:ss a"))
).withColumn(
    "Year", year(col("DATE OCC"))
).filter(col("Status Desc").isNotNull())

# Add a column to indicate if a case is closed
filtered_data = filtered_data.withColumn(
    "is_closed_case",
    when(~col("Status Desc").isin("UNK", "Invest Cont"), 1).otherwise(0)
)

# Partition data by Year for efficient parallel processing
partitioned_data = filtered_data.repartition("Year")

# Aggregate data
aggregated_data = partitioned_data.groupBy("Year", "AREA NAME") \
    .agg(
        count("*").alias("total_cases"),
        sum(col("is_closed_case")).alias("closed_cases")
    ) \
    .withColumn("closed_case_rate", (col("closed_cases") / col("total_cases")) * 100)

# Use window function for ranking precincts
windowSpec = Window.partitionBy("Year").orderBy(col("closed_case_rate").desc())
ranked_data = aggregated_data.withColumn("#", dense_rank().over(windowSpec))

# Filter top 3 precincts per year and select relevant columns
result = ranked_data.filter(col("#") <= 3).orderBy(["Year", "#"]).select(
    col("Year").alias("year"),
    col("AREA NAME").alias("precinct"),
    col("closed_case_rate").alias("closed_case_rate"),
    col("#").alias("#")
)


result.show()

# End timing
end_optimized_parquet_query = time.time()

# Add execution time as a new row
execution_time = end_optimized_parquet_query - start_optimized_parquet_query

# Define the schema explicitly for the execution time DataFrame
execution_time_schema = StructType([
    StructField("year", StringType(), True),
    StructField("precinct", StringType(), True),
    StructField("closed_case_rate", DoubleType(), True),
    StructField("#", StringType(), True)
])

# Create the execution time DataFrame with the schema
execution_time_df = spark.createDataFrame(
    [("Execution Time", None, execution_time, None)],
    schema=execution_time_schema
)

# Append execution time to the result DataFrame
final_result = result.union(execution_time_df)


# Save final result to CSV
final_output_path = "s3://groups-bucket-dblab-905418150721/group37/query2_optimized_results"
final_result.write.mode("overwrite").option("header", "true").csv(final_output_path)

# Print execution time
print(f"Parquet query execution time: {execution_time:.2f} seconds")


Το Parquet είναι πιο γρήγορο και αποδοτικό από το CSV για την ανάλυση δεδομένων σε Spark, εξαιτίας του τρόπου αποθήκευσης και ανάγνωσης των δεδομένων. Το Parquet αποθηκεύει τα δεδομένα σε στηλοθετημένη μορφή (columnar storage), κάτι που επιτρέπει στο Spark να διαβάζει μόνο τις στήλες που είναι απαραίτητες για ένα ερώτημα, μειώνοντας την I/O επιβάρυνση. Αντίθετα, το CSV αποθηκεύει τα δεδομένα ανά γραμμή (row-based storage), αναγκάζοντας το Spark να διαβάσει ολόκληρες γραμμές, ακόμα και αν χρειάζονται μόνο συγκεκριμένες στήλες.

Επιπλέον, το Parquet υποστηρίζει το λεγόμενο predicate pushdown, μια τεχνική που εφαρμόζει φίλτρα, όπως ένα WHERE ερώτημα, απευθείας στο επίπεδο αποθήκευσης. Αυτό έχει ως αποτέλεσμα τη μείωση του όγκου δεδομένων που διαβάζεται και επεξεργάζεται στη μνήμη. Από την άλλη πλευρά, το CSV δεν έχει αυτή τη δυνατότητα, με αποτέλεσμα όλα τα δεδομένα να φορτώνονται πρώτα και να φιλτράρονται αργότερα, κάτι που αυξάνει τον χρόνο εκτέλεσης και τη χρήση της μνήμης.

Το Parquet χρησιμοποιεί επίσης αποδοτικούς αλγορίθμους συμπίεσης, όπως Snappy ή GZIP, για να μειώσει το μέγεθος αποθήκευσης των δεδομένων και να μειώσει την I/O επιβάρυνση κατά την ανάγνωση. Σε αντίθεση με αυτό, τα αρχεία CSV είναι συνήθως μη συμπιεσμένα, αυξάνοντας τόσο το μέγεθος όσο και τον χρόνο που απαιτείται για την ανάγνωση. Ένα άλλο σημαντικό πλεονέκτημα του Parquet είναι ότι περιλαμβάνει ενσωματωμένο σχήμα (schema information), κάτι που επιτρέπει την ταχύτερη ανάγνωση των δεδομένων χωρίς την ανάγκη ανάλυσης του σχήματος (schema inference). Αντίθετα, το Spark πρέπει να αναγνωρίσει το σχήμα των δεδομένων σε ένα CSV αρχείο, κάτι που είναι χρονοβόρο για μεγάλα datasets.

Το Parquet επιτρέπει επίσης αποδοτική παράλληλη ανάγνωση των δεδομένων σε κατανεμημένα συστήματα, αξιοποιώντας καλύτερα τους πόρους του cluster. Το CSV, αν και υποστηρίζει παράλληλη ανάγνωση, είναι λιγότερο αποδοτικό λόγω του τρόπου αποθήκευσης των δεδομένων σε γραμμές. Επίσης, το Parquet αποθηκεύει σημαντικά μεταδεδομένα, όπως τις ελάχιστες και μέγιστες τιμές για κάθε στήλη, επιτρέποντας στο Spark να παραλείπει δεδομένα που δεν είναι σχετικά με ένα ερώτημα. Το CSV, από την άλλη πλευρά, δεν περιλαμβάνει μεταδεδομένα, αναγκάζοντας το Spark να διαβάσει ολόκληρο το αρχείο.

Συνολικά, το Parquet είναι ιδανικό για την ανάλυση μεγάλων δεδομένων, προσφέροντας σημαντικά πλεονεκτήματα σε ταχύτητα, αποδοτικότητα και μείωση της I/O επιβάρυνσης. Το Parquet είναι ειδικά σχεδιασμένο για συστήματα κατανεμημένης επεξεργασίας, όπως το Spark, καθιστώντας το την καλύτερη επιλογή για την αποθήκευση και επεξεργασία μεγάλων datasets σε σύγκριση με το CSV.

### Query 3 με χρήση DataFrame API

In [None]:
import io
import sys
from sedona.spark import *
from pyspark.sql.functions import col, regexp_replace, sum, round, desc, count
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import time


def capture_explain_output(df):
    """
    Calls df.explain() but captures its stdout text in a StringIO
    and returns that string without printing to console.
    """
    buffer = io.StringIO()
    old_stdout = sys.stdout
    try:
        sys.stdout = buffer
        df.explain(mode="formatted")
    finally:
        sys.stdout = old_stdout
    return buffer.getvalue()


def query_3(join_strategy="", join_explain=True):
    """
    join_strategy: optional. Καθορίζει με ποια στρατηγική θα γίνει το join.
    Έχει default τιμη "". Πιθανές Τιμές:
    "BROADCAST", "MERGE", "SHUFFLE_HASH", "SHUFFLE_REPLICATE_NL"
    
    join_explain: optional. Είναι boolean και έχει default τιμή True.
    Εάν είναι True, τότε εκτυπώνονται λεπτομέρειες σχετικά με το join strategy που ακολουθήθηκε
    """
    
    if join_strategy == "":
        strategy = "DEFAULT"
    else:
        strategy = join_strategy

    # Create sedona context
    sedona = SedonaContext.create(spark)
    
    # Διαβάζουμε τα δεδομένα της απογραφής πληθυσμού (geojson)
    geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
    blocks_df = (
        sedona.read
        .format("geojson")
        .option("multiLine", "true")
        .load(geojson_path)
        .selectExpr("explode(features) as features")
        .select("features.*")
    )

    # Formatting magic - Flattening
    flattened_df = (
        blocks_df.select(
            [
                col(f"properties.{col_name}").alias(col_name)
                for col_name in blocks_df.schema["properties"].dataType.fieldNames()
            ]
            + ["geometry"]
        )
        .drop("properties")
        .drop("type")
    )

    # Διαβάζουμε τα δεδομένα με το μέσο εισόδημα νοικοκυριού
    income_df = (
        spark.read.format("csv")
        .options(header="true", inferSchema="true", quote='"', escape='"')
        .load("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv")
    )

    # Αφαιρούμε τα σύμβολα $ και , από την στήλη 'Estimated Median Income'
    income_df = income_df.withColumn(
        "Estimated Median Income",
        regexp_replace(col("Estimated Median Income"), r"\$", "")
    )
    income_df = income_df.withColumn(
        "Estimated Median Income",
        regexp_replace(col("Estimated Median Income"), r",", "")
    )

    # Μετατρέπουμε την στήλη 'Estimated Median Income' από string σε decimal
    income_df = income_df.withColumn(
        "Estimated Median Income",
        col("Estimated Median Income").cast(DecimalType())
    )

    ########################## Υπολογισμός μέσου ετήσιου εισοδήματος ανά άτομο ########################
    
    # Αφαιρούμε τις περιοχές που έχουν < 0 κατοίκους και κρατάμε
    # μόνο τις περιοχές που ανήκουν στην πόλη Los Angeles
    non_zero = flattened_df.filter(
        ((flattened_df.HOUSING10 >= 0) & (flattened_df.POP_2010 >= 0))
        & (flattened_df.CITY == "Los Angeles")
    )
    
    # Ομαδοποιούμε κατά (COMM, ZCTA10) και βρίσκουμε για κάθε (COMM, ZCTA10)
    # το συνολικό αριθμό νοικοκυριών (total_houses) και κατοίκων (total_pop)
    population_agg = (
        non_zero
        .groupBy([non_zero.COMM, non_zero.ZCTA10])
        .agg(
            sum("HOUSING10").alias("total_houses"),
            sum("POP_2010").alias("total_pop")
        )
    )

    # Inner join μεταξύ population_agg και income_df βάσει του zip code
    # Με αυτόν τον τρόπο, για κάθε ζεύγος (COMM, zip_code), υπολογίζουμε
    # το γινόμενο total_houses * Estimated Median Income. Έτσι, βρίσκουμε το άθροισμα
    # όλων των εισοδημάτων των κατοίκων για κάθε ζεύγος (COMM, zip_code).
    if join_strategy == "":
        income_hint = income_df
    else:
        income_hint = income_df.hint(join_strategy)

    start_time = time.time()    # Start timer
    comm_income = (
        population_agg
        .join(income_df.hint(join_strategy), population_agg.ZCTA10 == income_df["Zip Code"], "inner")
        .select(
            population_agg.ZCTA10,
            population_agg.COMM,
            population_agg.total_pop,
            (population_agg.total_houses * income_df["Estimated Median Income"]).alias("total_money")
        )
    )
    
    comm_income.show()
    end_time = time.time()     # Stop timer   
    elapsed_time = end_time - start_time
        
    # Με τη μέθοδο explain() βλέπουμε πληροφορίες για την στρατηγική του join
    if join_explain:
        execution_plan = capture_explain_output(comm_income)
        
        # Αποθηκεύουμε το χρόνο εκτ΄έλεσης και το execution plan σε ένα CSV
        output_path = f"s3://groups-bucket-dblab-905418150721/group37/query3_{strategy}/First_Join"
        spark.createDataFrame([(strategy, elapsed_time, execution_plan)],
                             ["Join Strategy", "Elapsed Time (sec)", "Execution Plan"]) \
                            .coalesce(1) \
                            .write \
                            .mode("overwrite") \
                            .option("header", "true") \
                            .csv(output_path)
        
    # Για κάθε COMM, υπολογίζουμε το άθροισμα των κατοίκων και
    # το άθροισμα όλων των εισοδημάτων.
    comm_income = (
        comm_income
        .groupBy(comm_income.COMM)
        .agg(
            sum("total_money").alias("total_money"),
            sum("total_pop").alias("total_pop")
        )
    )

    ################ Υπολογισμός αναλογίας συνολικού αριθμού εγκλημάτων ανά άτομο ################

    # Διαβάζουμε τα δεδομένα με τα εγκλήματα 2010 - 2024
    crime_data = (
        spark.read.format("csv")
        .options(header="true", inferSchema="true", quote='"', escape='"')
        .load(
            [
                "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
                "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
            ]
        )
        .filter((col("LAT") != 0) & (col("LON") != 0))
        .select(col("DR_NO"), col("LAT"), col("LON"))
        .withColumn("geom", ST_Point("LON", "LAT"))
    )

    # Εντοπίζουμε τα crimes που βρίσκονται εντός κάθε COMM
    if join_strategy == "":
        non_zero_hint = non_zero
    else:
        non_zero_hint = non_zero.hint(join_strategy)
        
    start_time = time.time()    # Start timer
    crime_comm = crime_data.join(
        non_zero_hint,
        ST_Within(crime_data["geom"], non_zero["geometry"]),
        "inner"
    )
    crime_comm.show()
    end_time = time.time()    # Stop timer
    elapsed_time = end_time - start_time

    # Με τη μέθοδο explain() βλέπουμε πληροφορίες για την στρατηγική του join
    if join_explain:
        execution_plan = capture_explain_output(crime_comm)
        
        # Αποθηκεύουμε το χρόνο εκτ΄έλεσης και το execution plan σε ένα CSV
        output_path = f"s3://groups-bucket-dblab-905418150721/group37/query3_{strategy}/Second_Join"
        spark.createDataFrame([(strategy, elapsed_time, execution_plan)],
                             ["Join Strategy", "Elapsed Time (sec)", "Execution Plan"]) \
                            .coalesce(1) \
                            .write \
                            .mode("overwrite") \
                            .option("header", "true") \
                            .csv(output_path)

    # Μετράμε το πλήθος των εγκλημάτων κάθε COMM
    num_of_crimes = (
        crime_comm
        .groupBy(crime_comm["COMM"])
        .count()
    )

    ####################################### Τελικά αποτελέσματα #######################################

    # Τελικό join: Συνδυάζουμε num_of_crimes και comm_income για να 
    # υπολογίσουμε τους λόγους εισόδημα/κάτοικο και εγκλήματα/κάτοικο
    if join_strategy == "":
        num_of_crimes_hint = num_of_crimes
    else:
        num_of_crimes_hint = num_of_crimes.hint(join_strategy)
    
    start_time = time.time()   # Start timer
    crimes_income_per_comm = (
        comm_income
        .join(num_of_crimes.hint(join_strategy), comm_income["COMM"] == num_of_crimes["COMM"], "outer")
        .select(
            comm_income["COMM"],
            (col("total_money") / col("total_pop")).alias("Annual Average Income Per Person ($)"),
            (col("count") / col("total_pop")).alias("Rate of Total Crimes Per Person")
        )
    )
    crimes_income_per_comm.show()
    end_time = time.time()     # Stop timer
    elapsed_time = end_time - start_time


    # Με τη μέθοδο explain() βλέπουμε πληροφορίες για την στρατηγική του join
    if join_explain:
        execution_plan = capture_explain_output(crimes_income_per_comm)
        
        # Αποθηκεύουμε το χρόνο εκτ΄έλεσης και το execution plan σε ένα CSV
        output_path = f"s3://groups-bucket-dblab-905418150721/group37/query3_{strategy}/Third_Join"
        spark.createDataFrame([(strategy, elapsed_time, execution_plan)],
                             ["Join Strategy", "Elapsed Time (sec)", "Execution Plan"]) \
                            .coalesce(1) \
                            .write \
                            .mode("overwrite") \
                            .option("header", "true") \
                            .csv(output_path)
    
        
    # Επιστρέφουμε το dataframe με τα τελικά αποτελέσματα
    return crimes_income_per_comm

<p>Τώρα, δοκιμάζουμε διάφορα join strategies</p>

In [None]:
strategies = ["", "BROADCAST", "MERGE", "SHUFFLE_HASH", "SHUFFLE_REPLICATE_NL"]

for strategy in strategies:
    result_query_3 = query_3(strategy, True)

# Αποθηκέυουμε τα αποτελέσματα σε ένα CSV
# output_path = f"s3://groups-bucket-dblab-905418150721/group37/query3_results"
# result_query_3.write.mode("overwrite").option("header", "true").csv(output_path)

### Query 3 (αποτυχημένη προσπάθεια)

<p>Προσπαθήσαμε να κάνουμε enforce τα διάφορα join strategies, αλλά δεν το καταφέραμε.
Ο catalyst optimizer συνέχισε να αγνοεί τα hints που δίναμε.</p>

In [None]:
import sys
import io
import time
from sedona.register import SedonaRegistrator
from pyspark.sql.functions import broadcast
from sedona.spark import SedonaContext
from sedona.spark import *

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, sum, count
from pyspark.sql.types import DecimalType



# --------------------------------------------------------------------------------
# Function to capture explain() output in a string
# --------------------------------------------------------------------------------
def capture_explain_output(df):
    """
    Calls df.explain() but captures its stdout text in a StringIO
    and returns that string without printing to console.
    """
    buffer = io.StringIO()
    old_stdout = sys.stdout
    try:
        sys.stdout = buffer
        df.explain()
    finally:
        sys.stdout = old_stdout
    return buffer.getvalue()

# --------------------------------------------------------------------------------
# The main query function
# --------------------------------------------------------------------------------
def query_3(join_strategy=""):
    """
    :param join_strategy: Enforces a specific join strategy using Spark hints.
                         Valid values: "BROADCAST", "MERGE", "SHUFFLE_HASH", "SHUFFLE_REPLICATE_NL"
                         If empty, no hint is used, and Spark picks the plan.
    """

    # ------------------------------------------------------------------------
    # 1) Force no auto-broadcast inside this function.
    # ------------------------------------------------------------------------
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    spark.conf.set("spark.sql.crossJoin.enabled", True)
    # ------------------------------------------------------------------------
    # 2) Create SedonaContext
    # ------------------------------------------------------------------------
    sedona = SedonaContext.create(spark)

    # ------------------------------------------------------------------------
    # 3) Read Census Blocks (GEOJSON)
    # ------------------------------------------------------------------------
    geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
    blocks_df = (
        sedona.read
        .format("geojson")
        .option("multiLine", "true")
        .load(geojson_path)
        .selectExpr("explode(features) as features")
        .select("features.*")
    )

    # Flatten
    flattened_df = (
        blocks_df.select(
            [
                col(f"properties.{c}").alias(c)
                for c in blocks_df.schema["properties"].dataType.fieldNames()
            ] + ["geometry"]
        )
        .drop("properties")
        .drop("type")
    )

    # ------------------------------------------------------------------------
    # 4) Read Income CSV
    # ------------------------------------------------------------------------
    income_df = (
        spark.read.format("csv")
        .options(header="true", inferSchema="true", quote='"', escape='"')
        .load("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv")
    )

    # Clean up 'Estimated Median Income' column
    income_df = (
        income_df
        .withColumn("Estimated Median Income",
                    regexp_replace(col("Estimated Median Income"), r"\$", ""))
        .withColumn("Estimated Median Income",
                    regexp_replace(col("Estimated Median Income"), r",", ""))
        .withColumn("Estimated Median Income",
                    col("Estimated Median Income").cast(DecimalType(10, 2)))
    )

    # ------------------------------------------------------------------------
    # 5) Prepare data: only LA City, remove negative values
    # ------------------------------------------------------------------------
    non_zero = flattened_df.filter(
        (col("HOUSING10") >= 0) & 
        (col("POP_2010") >= 0) & 
        (col("CITY") == "Los Angeles")
    )

    population_agg = (
        non_zero
        .groupBy("COMM", "ZCTA10")
        .agg(
            sum("HOUSING10").alias("total_houses"),
            sum("POP_2010").alias("total_pop")
        )
    )

    # --------------------------------------------------------------------------------
    # FIRST JOIN: population_agg vs income_df
    # --------------------------------------------------------------------------------
    if join_strategy:
        income_hint = income_df.hint(join_strategy)
    else:
        income_hint = income_df

    # Build the join DataFrame
    join_df_1 = (
        population_agg
        .join(income_hint, population_agg["ZCTA10"] == col("Zip Code"), "inner")
        .select(
            population_agg.ZCTA10,
            population_agg.COMM,
            population_agg.total_pop,
            (population_agg.total_houses * col("Estimated Median Income")).alias("total_money")
        )
    )

    # Measure time by forcing execution with .show()
    start_time = time.time()
    join_df_1.show(5, False)   # triggers actual job
    end_time = time.time()
    elapsed_1 = end_time - start_time

    # Capture plan (without printing)
    explain_text_1 = capture_explain_output(join_df_1)

    # Write plan/time to S3
    first_join_output = [("Execution Plan", explain_text_1),
                         ("Elapsed Time (seconds)", str(elapsed_1))]
    first_join_df = spark.createDataFrame(first_join_output, ["Description", "Details"])
    first_join_df.write.mode("overwrite").option("header", "true").csv(
        f"s3://groups-bucket-dblab-905418150721/group37/query3_{join_strategy}/First_Join"
    )

    # Aggregate partial result by COMM
    comm_income = (
        join_df_1
        .groupBy("COMM")
        .agg(
            sum("total_money").alias("total_money"),
            sum("total_pop").alias("total_pop")
        )
    )

    # --------------------------------------------------------------------------------
    # SECOND JOIN: crime_data vs non_zero (spatial join via ST_Within)
    # --------------------------------------------------------------------------------
    crime_data = (
        spark.read.format("csv")
        .options(header="true", inferSchema="true", quote='"', escape='"')
        .load([
            "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
            "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
        ])
        .filter((col("LAT") != 0) & (col("LON") != 0))
        .select(col("DR_NO"), col("LAT"), col("LON"))
        .withColumn("geom", ST_Point("LON", "LAT"))
    )

   
 
    if join_strategy == "SHUFFLE_HASH":
        spark.conf.set("spark.sql.join.preferShuffledHashJoin", True)
        non_zero_hint = non_zero.hint(join_strategy)

   
    elif join_strategy == "MERGE":
        spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
        spark.conf.set("spark.sql.join.preferSortMergeJoin", True)
        spark.conf.set("spark.sql.join.preferShuffledHashJoin", False)
        spark.conf.set("spark.sql.rangeJoin.enabled", False)  # Disable Range Join (if supported)
        non_zero_hint = non_zero.hint(join_strategy)
    
    elif join_strategy == "":
        non_zero_hint = non_zero
        
    else:
        non_zero_hint = non_zero.hint(join_strategy)


    start_time = time.time()    
    join_df_2 = crime_data.join(
        non_zero_hint,
        ST_Within(crime_data["geom"], non_zero_hint["geometry"]),
        "inner"
    )

   
    join_df_2.show(5, False)  # triggers execution
    end_time = time.time()
    elapsed_2 = end_time - start_time

    # Capture plan
    explain_text_2 = capture_explain_output(join_df_2)

    # Write plan/time to S3
    second_join_output = [("Execution Plan", explain_text_2),
                          ("Elapsed Time (seconds)", str(elapsed_2))]
    second_join_df = spark.createDataFrame(second_join_output, ["Description", "Details"])
    second_join_df.write.mode("overwrite").option("header", "true").csv(
        f"s3://groups-bucket-dblab-905418150721/group37/query3_{join_strategy}/Second_Join"
    )

    num_of_crimes = join_df_2.groupBy("COMM").count()

    # --------------------------------------------------------------------------------
    # THIRD JOIN: comm_income vs num_of_crimes
    # --------------------------------------------------------------------------------
    start_time = time.time()

# Ensure `crimes_hint` is always assigned
    if join_strategy == "BROADCAST":
        crimes_hint = broadcast(num_of_crimes)  # Apply broadcast hint
   
    elif join_strategy != "":
        crimes_hint = num_of_crimes.hint(join_strategy)
    
    else:
        crimes_hint = num_of_crimes  # Default case when no join strategy is given

# Perform the join
    join_df_3 = (
    comm_income
    .join(crimes_hint, "COMM", "outer")
    .select(
        comm_income["COMM"],
        (col("total_money") / col("total_pop")).alias("Annual Average Income Per Person ($)"),
        (col("count") / col("total_pop")).alias("Rate of Total Crimes Per Person")
    )
)

    join_df_3.show(5, False)
    end_time = time.time()
    elapsed_3 = end_time - start_time

# Capture plan
    explain_text_3 = capture_explain_output(join_df_3)

# Write plan/time to S3
    third_join_output = [("Execution Plan", explain_text_3),
                     ("Elapsed Time (seconds)", str(elapsed_3))]
    third_join_df = spark.createDataFrame(third_join_output, ["Description", "Details"])
    third_join_df.write.mode("overwrite").option("header", "true").csv(
    f"s3://groups-bucket-dblab-905418150721/group37/query3_{join_strategy}/Third_Join"
)

# Return the final result DataFrame
    return join_df_3

# --------------------------------------------------------------------------------
# 4) Example Usage: run across multiple strategies
# --------------------------------------------------------------------------------

strategies = ["BROADCAST", "MERGE", "SHUFFLE_HASH", "SHUFFLE_REPLICATE_NL"]
for strategy in strategies:
        print(f"\n>>> Running with strategy = {strategy}")
        final_df = query_3(strategy)
        print(">>> Final Results (first 10 rows):")
        final_df.show(10, truncate=False)


### Query 4 με χρήση DataFrame API

<p>Πρέπει να εκτελέσουμε πρώτα το query 3, γιατί χρησιμοποιούμε τα αποτελέσματά του</p>

In [None]:
# Access configuration
conf = spark.sparkContext.getConf()

exec_instances = conf.get("spark.executor.instances")
exec_mem = conf.get("spark.executor.memory")
exec_cores = conf.get("spark.executor.cores")

# Print relevant executor settings
print("Executor Instances:", exec_instances)
print("Executor Memory:", exec_mem)
print("Executor Cores:", exec_cores)

In [None]:
import time
from sedona.spark import *
from pyspark.sql.functions import (
    col, count, desc, asc, lit
)
from pyspark.sql import SparkSession

# Create Sedona context
sedona = SedonaContext.create(spark)

# Start timing
start = time.time()

# Διαβάζουμε τα δεδομένα με τα εγκλήματα 2010 - 2019
# Αφαιρούμε τα εγκλήματα που αναφέρονται στο Null Island (0, 0) και
# τα εγκλήματα που δεν αναφέρουν το φυλετικό προφιλ του θύματος (Null)
# Κρατάμε μόνο τα εγκλήματα που καταγράφηκαν (όχι συνέβησαν) το 2015
crime_data = (
    spark.read.format("csv")
    .options(header="true", inferSchema="true", quote='"', escape='"')
    .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv")
    .filter(
        (col("LAT") != 0) &
        (col("LON") != 0) &
        (col("Vict Descent").isNotNull()) &
        (col("Date Rptd").contains("/2015"))
    )
)

# Κρατάμε μόνο τις στήλες DR_NO, Vict Descent, LAT και LON του crime_data για απλότητα
# Δημιουργούμε μία στήλη geometry type από τις συντεταγμένες (lat, lon)
crime_data_simplified = (
    crime_data.select(
        col("DR_NO"),
        col("Vict Descent"),
        col("LAT"),
        col("LON")
    )
    .withColumn("geom", ST_Point("LON", "LAT"))
)

# Διαβάζουμε το σύνολο δεδομενων Race and Ethnicity codes
re_codes = (
    spark.read.format("csv")
    .options(header="true", inferSchema="true")
    .load("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv")
)

# Διαβάζουμε τα δεδομένα της απογραφής πληθυσμού (geojson)
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = (
    sedona.read.format("geojson")
    .option("multiLine", "true")
    .load(geojson_path)
    .selectExpr("explode(features) as features")
    .select("features.*")
)
# Formatting magic - Flattening
flattened_df = (
    blocks_df.select(
        [
            col(f"properties.{c}").alias(c)
            for c in blocks_df.schema["properties"].dataType.fieldNames()
        ] + ["geometry"]
    )
    .drop("properties")
    .drop("type")
)

# Κρατάμε τα rows που βρίσκονται στην πόλη του Los Angeles.
comm_in_la = flattened_df.filter(col("CITY") == "Los Angeles")

# Θα χρησιμοποιήσουμε τα αποτελέσματα του προηγούμενου ερωτη΄ματος,
# τα οποία βρίσκονται στο dataframe result_query_3. Από αυτά, θα
# κρατήσουμε τα top 3 rows με το υψηλότερο μέσο εισόδημα. Χρειαζόμαστε
# μόνο τις στήλες COMM και Annual Average Income Per Person ($)
three_highest_paid = (
    result_query_3
    .select(
        col("COMM").alias("community"),
        col("Annual Average Income Per Person ($)")
    )
    .orderBy(desc("Annual Average Income Per Person ($)"))
    .limit(3)
    .withColumn("rank_label", lit("highest"))  # Add label
)

# Ομοίως, φτιάχνουμε και ένα dataframe με τις top 3 περιοχές με το
# χαμηλότερο μέσο εισόδημα
three_lowest_paid = (
    result_query_3
    .select(
        col("COMM").alias("community"),
        col("Annual Average Income Per Person ($)")
    )
    .orderBy(asc("Annual Average Income Per Person ($)"))
    .limit(3)
    .withColumn("rank_label", lit("lowest"))   # Add label
)

# Ενώνουμε τα δύο dataframes three_highest_paid και three_lowest_paid
# σε ένα ενιαίο dataframe
top_3_comms_union = three_highest_paid.union(three_lowest_paid)

# Πραγματοποιούμε ένα join μεταξύ του top_3_comms_union και comm_in_la
# για να προσθέσουμε τις συντεταγμένες κάθε περιοχής του top_3_comms_union
top_3_comms_coords = (
    comm_in_la.join(
        top_3_comms_union,
        comm_in_la["COMM"] == top_3_comms_union["community"],
        "inner"
    )
    .select(
        col("COMM"),
        col("geometry"),
        col("rank_label")
    )
)

# Βρίσκουμε ποιες αστυνομικές υποθέσεις αναφέρονται σε σημεία εντός των 3 αυτών περιοχών
# και κρατάμε μόνο τις στήλες DR_NO, Vict Descent και rank_label.
top_3_comms_crimes = (
    crime_data_simplified.join(
        top_3_comms_coords,
        ST_Within(crime_data_simplified["geom"], top_3_comms_coords["geometry"]),
        "inner"
    )
    .select(
        col("DR_NO"),
        col("Vict Descent"),
        col("rank_label")  # χρειαζόμαστε την στήλη για να ξεχωρίσουμε
    )                      # αργ΄ότερα τα αποτελέσματα σε δύο dataframes
)

# Ομαδοποιούμε κατά (rank_label, Vict Descent) και
# βρίσκουμε, για κάθε φυλή, πόσα θύματα καταγράφηκαν
vict_count_per_race = (
    top_3_comms_crimes
    .groupBy(col("rank_label"), col("Vict Descent"))
    .count()
)

# Join race-ethnicity codes
vict_race_count = (
    vict_count_per_race
    .join(
        re_codes,
        vict_count_per_race["Vict Descent"] == re_codes["Vict Descent"],
        "inner"
    )
    .select(
        col("rank_label"),
        col("Vict Descent Full").alias("Victim Descent"),
        col("count")
    )
)

# Το end θα περιέχει τη χρονική στιγμή τέλους. Προς το παρόν, όμως, αποθη-
# κεύουμε σε αυτό τη χρονική στιγμή μόλις πριν το for loop.
end = time.time()

# Δημιουργούμε δύο ξεχωριστά dataframes: ένα για τις περιοχές με το υψηλότερο
# “highest” εισόδημα και ένα για τις περιοχές με το χαμηλότερο “lowest” εισόδημα
for label in ["highest", "lowest"]:
    start_loop = time.time()        # η χρονική στιγμή έναρξης του iteration
    df_label = vict_race_count.filter(col("rank_label") == label).orderBy(desc("count"))
    df_label = df_label.drop('rank_label')
    print(f"Victim Descent in top 3 {label} paid communities in Los Angeles:")
   
    df_label.show()
    
    # ενημερώνουμε τη μεταβλητή end
    end = end + (time.time() - start_loop)

    output_path = f"s3://groups-bucket-dblab-905418150721/group37/query4_{label}_paid"
    (
        df_label
        .write
        .mode("overwrite")
        .option("header", "true")
        .csv(output_path)
    )

# Αποθηκεύουμε το χρόνο εκτέλεσης του query σε ένα CSV αρχείο

elapsed_time = end - start
output_path = f"s3://groups-bucket-dblab-905418150721/group37/query4_time_{exec_instances}executors_{exec_cores}cores_{exec_mem}mem"

spark.createDataFrame([(exec_instances, exec_cores, exec_mem, elapsed_time)],
                      [
                          "Executor Instances", "Executor Cores",
                           "Executor Memory", "Elapsed Time (sec)"
                      ]) \
                    .coalesce(1) \
                    .write \
                    .mode("overwrite") \
                    .option("header", "true") \
                    .csv(output_path)


print(f"Elapsed time: {elapsed_time} sec")

### Query 5 με χρήση DataFrame API

In [None]:
# Access configuration
conf = spark.sparkContext.getConf()

exec_instances = conf.get("spark.executor.instances")
exec_mem = conf.get("spark.executor.memory")
exec_cores = conf.get("spark.executor.cores")

print(exec_instances)
print(exec_mem)
print(exec_cores)

In [None]:
from sedona.spark import *
from pyspark.sql.functions import (
    col, count, desc, asc, lit, expr, row_number, broadcast, avg
)
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import time

# Create Sedona context
sedona = SedonaContext.create(spark)

start = time.time()

# Load and filter crime data
crime_data1 = (
    spark.read.format("csv")
    .options(header="true", inferSchema="true", quote='"', escape='"')
    .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv")
    .filter((col("LAT") != 0) & (col("LON") != 0))
)

crime_data2 = (
    spark.read.format("csv")
    .options(header="true", inferSchema="true", quote='"', escape='"')
    .load("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv")
    .filter((col("LAT") != 0) & (col("LON") != 0))
)

# Combine the datasets
crime_data = crime_data1.union(crime_data2)

# Keep needed columns and create points
crime_data_simplified = (
    crime_data
    .select(col("LAT"), col("LON"), col("DR_NO"))
    .withColumn("Crime_Location_Point", ST_Point("LON", "LAT"))
    .drop("LON")
    .drop("LAT")
)

# Load the Police Stations data
police_stations = (
    spark.read.format("csv")
    .options(header="true", inferSchema="true", quote='"', escape='"')
    .load("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv")
)

# Simplify police stations data
police_stations_simplified = (
    police_stations
    .select(
        col("X").alias("LON"),
        col("Y").alias("LAT"),
        col("DIVISION").alias("division")
    )
    .withColumn("Police_Station_Location_Point", ST_Point("LON", "LAT"))
    .drop("LON")
    .drop("LAT")
)

# Perform cross join with broadcast
cross_df = crime_data_simplified.crossJoin(broadcast(police_stations_simplified))

# Compute distance in km
cross_df = cross_df.withColumn(
    "distance_km",
    expr("ST_DistanceSphere(Crime_Location_Point, Police_Station_Location_Point) / 1000.0")
)

# For each crime pick the station with the minimal distance
window = Window.partitionBy("DR_NO").orderBy(col("distance_km").asc())
ranked_df = cross_df.withColumn("rn", row_number().over(window))

assigned_stations = ranked_df.filter(col("rn") == 1)

# Group by station: average distance and count (#)
query_5_df = (
    assigned_stations
    .groupBy("division")
    .agg(
        avg("distance_km").alias("average_distance"),
        count("*").alias("#")
    )
    .orderBy(desc("#"))
)


query_5_df.show()

end = time.time()

elapsed_time = end - start

print(f"Elapsed time: {elapsed_time} sec")


# Define output path
output_path = f"s3://groups-bucket-dblab-905418150721/group37/query5_results_{exec_instances}executors_{exec_cores}cores_{exec_mem}mem"

output_path_2 = f"s3://groups-bucket-dblab-905418150721/group37/query5_results"

query_5_df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(f"{output_path_2}")

# Save results
spark.createDataFrame([(exec_instances, exec_cores, exec_mem, elapsed_time)],
                      [
                          "Executor Instances", "Executor Cores",
                          "Executor Memory", "Elapsed Time (sec)"
                      ]) \
    .coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(output_path)
