In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, first, regexp_replace, concat, lit, lower
from sedona.spark import *
import time

# Δημιουργία Spark Session
spark = SparkSession.builder \
    .appName("Query 3") \
    .getOrCreate()

# Sedona Context για GeoJSON δεδομένα
sedona = SedonaContext.create(spark)

# Φόρτωση δεδομένων
crime_2010_2019_path = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
crime_2020_present_path = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"
income_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv"
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"

crime_df_2010_2019 = spark.read.csv(crime_2010_2019_path, header=True, inferSchema=True)
crime_df_2020_present = spark.read.csv(crime_2020_present_path, header=True, inferSchema=True)
crime_df = crime_df_2010_2019.union(crime_df_2020_present)
income_df = spark.read.csv(income_path, header=True, inferSchema=True)

# Φόρτωση και επεξεργασία GeoJSON δεδομένων
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")

# Φιλτράρισμα των τετραγώνων του Los Angeles και επιλογή μόνο των επιθυμητών στηλών
LA_blocks = flattened_df.filter(col("CITY") == "Los Angeles") \
    .select("COMM", "HOUSING10", "POP_2010", "ZCTA10", "geometry")

# Φιλτράρισμα των περιοχών του Los Angeles
LA_areas = LA_blocks.groupBy("COMM").agg(ST_Union_Aggr("geometry").alias("geometry"))

# Αφαίρεση του "$" και οποιωνδήποτε κενών ή άλλων μη αριθμητικών χαρακτήρων
income_df = income_df.withColumn(
    "Estimated Median Income",
    regexp_replace(col("Estimated Median Income"), r"[^0-9.]", "").cast("double")
)

# Δημιουργία γεωμετρικών σημείων για τα εγκλήματα
crime_points = crime_df.filter(col("LAT").isNotNull() & col("LON").isNotNull()) \
    .withColumn("geometry", ST_Point(col("LON"), col("LAT"))) \
    .select("geometry")

# Υπολογισμός αριθμού εγκλημάτων ανά περιοχή
total_crimes = crime_points.join(
    LA_areas.hint("BROADCAST"),                                        #κάνουμε BROADCAST των π΄΄ινακα LA_areas επειδή είναι μικρός
    ST_Within(crime_points["geometry"], LA_areas["geometry"]),
    "inner"
).groupBy("COMM").agg(
    count("*").alias("Total Crimes")
)

# Υπολογισμός κατα κεφαλήν εισοδήματος ανά περιοχή
average_income_per_person = LA_blocks.join(
    income_df.hint("BROADCAST"),                                       #κάνουμε BROADCAST των π΄΄ινακα income_df επειδή είναι μικρός
    LA_blocks["ZCTA10"] == income_df["Zip Code"],
    "inner"
).withColumn(
    "Total Income Per Block",
    (col("HOUSING10") * col("Estimated Median Income")).alias("Total Income Per Block")
).groupBy("COMM").agg(
    sum("Total Income Per Block").alias("Total Income"),
    sum("POP_2010").alias("Total Population")
).withColumn(
    "Avg Income Per Person ($)",
    (col("Total Income") / col("Total Population"))
).select(
    "COMM", 
    "Avg Income Per Person ($)",
    "Total Population"
)

# Στρατηγική BROADCAST Join

# Χρόνος πριν την εκτέλεση
broadcast_start_time = time.time()

# Υπολογισμός μέσου αριθμού εγκλημάτων ανά άτομο
broadcast_result = total_crimes.join(
    average_income_per_person.hint("BROADCAST"),
    "COMM",
    "inner"
).withColumn(
    "Crime Ratio Per Person",
    (col("Total Crimes") / col("Total Population"))
).select(
    col("COMM").alias("Area"), 
    "Avg Income Per Person ($)", 
    "Crime Ratio Per Person"
).orderBy("Avg Income Per Person ($)", ascending=False)

# Εμφάνιση αποτελεσμάτων για BROADCAST
print("BROADCAST Join Strategy:\n")
broadcast_result.show()

# Χρόνος μετά την εκτέλεση
broadcast_end_time = time.time()

# Υπολογισμός του χρόνου εκτέλεσης
broadcast_execution_time = broadcast_end_time - broadcast_start_time
print(f"Execution time for BROADCAST join: {broadcast_execution_time} seconds\n")

# Explain για BROADCAST
broadcast_result.explain(mode="formatted")

# Στρατηγική MERGE Join

# Χρόνος πριν την εκτέλεση
merge_start_time = time.time()

# Υπολογισμός μέσου αριθμού εγκλημάτων ανά άτομο
merge_result = total_crimes.join(
    average_income_per_person.hint("MERGE"),
    "COMM",
    "inner"
).withColumn(
    "Crime Ratio Per Person",
    (col("Total Crimes") / col("Total Population"))
).select(
    col("COMM").alias("Area"), 
    "Avg Income Per Person ($)", 
    "Crime Ratio Per Person"
).orderBy("Avg Income Per Person ($)", ascending=False)

# Εμφάνιση αποτελεσμάτων για MERGE
print("MERGE Join Strategy:\n")
merge_result.show()

# Χρόνος μετά την εκτέλεση
merge_end_time = time.time()

# Υπολογισμός του χρόνου εκτέλεσης
merge_execution_time = merge_end_time - merge_start_time
print(f"Execution time for MERGE join: {merge_execution_time} seconds\n")

# Explain για MERGE
merge_result.explain(mode="formatted")

# Στρατηγική SHUFFLE_HASH Join

# Χρόνος πριν την εκτέλεση
shuffle_hash_start_time = time.time()

# Υπολογισμός μέσου αριθμού εγκλημάτων ανά άτομο
shuffle_hash_result = total_crimes.join(
    average_income_per_person.hint("SHUFFLE_HASH"),
    "COMM",
    "inner"
).withColumn(
    "Crime Ratio Per Person",
    (col("Total Crimes") / col("Total Population"))
).select(
    col("COMM").alias("Area"), 
    "Avg Income Per Person ($)", 
    "Crime Ratio Per Person"
).orderBy("Avg Income Per Person ($)", ascending=False)

# Εμφάνιση αποτελεσμάτων για SHUFFLE_HASH
print("SHUFFLE_HASH Join Strategy:\n")
shuffle_hash_result.show()

# Χρόνος μετά την εκτέλεση
shuffle_hash_end_time = time.time()

# Υπολογισμός του χρόνου εκτέλεσης
shuffle_hash_execution_time = shuffle_hash_end_time - shuffle_hash_start_time
print(f"Execution time for SHUFFLE_HASH join: {shuffle_hash_execution_time} seconds\n")

# Explain για SHUFFLE_HASH
shuffle_hash_result.explain(mode="formatted")

# Στρατηγική SHUFFLE_REPLICATE_NL Join

# Χρόνος πριν την εκτέλεση
shuffle_replicate_nl_start_time = time.time()

# Υπολογισμός μέσου αριθμού εγκλημάτων ανά άτομο
shuffle_replicate_nl_result = total_crimes.join(
    average_income_per_person.hint("SHUFFLE_REPLICATE_NL"),
    "COMM",
    "inner"
).withColumn(
    "Crime Ratio Per Person",
    (col("Total Crimes") / col("Total Population"))
).select(
    col("COMM").alias("Area"), 
    "Avg Income Per Person ($)", 
    "Crime Ratio Per Person"
).orderBy("Avg Income Per Person ($)", ascending=False)

# Εμφάνιση αποτελεσμάτων για SHUFFLE_REPLICATE_NL
print("SHUFFLE_REPLICATE_NL Join Strategy:\n")
shuffle_replicate_nl_result.show()

# Χρόνος μετά την εκτέλεση
shuffle_replicate_nl_end_time = time.time()

# Υπολογισμός του χρόνου εκτέλεσης
shuffle_replicate_nl_execution_time = shuffle_replicate_nl_end_time - shuffle_replicate_nl_start_time
print(f"Execution time for SHUFFLE_REPLICATE_NL join: {shuffle_replicate_nl_execution_time} seconds\n")

# Explain για SHUFFLE_REPLICATE_NL
shuffle_replicate_nl_result.explain(mode="formatted")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
384,application_1738075734771_0385,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

BROADCAST Join Strategy:

+-------------------+-------------------------+----------------------+
|               Area|Avg Income Per Person ($)|Crime Ratio Per Person|
+-------------------+-------------------------+----------------------+
|  Pacific Palisades|        70656.11282274863|   0.45085501138400425|
|Palisades Highlands|        66867.43986433603|    0.2055830941821028|
|   Marina Peninsula|        65235.69402813004|    0.6124048881715471|
|            Bel Air|        63041.33809466166|    0.4275511439293064|
|      Beverly Crest|        60947.49019768682|    0.3713395127553113|
|          Brentwood|       60840.624859219824|    0.5036005597078598|
|  Mandeville Canyon|        55572.10949582431|   0.26229508196721313|
|        Playa Vista|        50264.47187990141|    0.8157069235939951|
|            Carthay|       49841.164527155335|    0.9322445879225219|
|             Venice|       47614.883340996166|    1.2549272030651342|
|       Century City|       46103.510597140456|    