In [4]:
from sedona.spark import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import time

#Αρχικοποίηση SparkSession
spark = SparkSession.builder \
    .appName("Query 3 - Income/Crimes Per Person") \
    .getOrCreate()

#Δημιουργία sedona context για χρήση geospatial δεδομένων
sedona = SedonaContext.create(spark)

#Διάβασμα του .geojson αρχείου από s3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")
#Formatting magic
flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")

#Φιλτράρισμα για περιοχές του Los Angeles
LA_areas = flattened_df.filter(col("CITY") == "Los Angeles") \
                .groupBy("COMM", "ZCTA10", "POP_2010", "HOUSING10") \
                .agg(ST_Union_Aggr("geometry").alias("geometry"))


#Φόρτωση δεδομένων εισοδήματος από το αρχείο CSV
file_path = 's3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv'  
income_df = spark.read.csv(file_path, header=True, inferSchema=True)

#Μετατροπή του εισοδήματος από string της μορφής "$33,887" σε double της μορφής 33887.0
income_df = income_df.withColumn("Estimated Median Income", 
                                 regexp_replace(substring(col("Estimated Median Income"), 2, 100), ",", "").cast("double"))
income_df = income_df.withColumn("ZipCode", col("Zip Code").cast("double"))

#Μετατροπή του ZCTA10 των περιοχών του Los Angeles από string σε double
LA_areas = LA_areas.withColumn("ZCTA10", col("ZCTA10").cast("double"))


#Join μεταξύ των περιοχών του Los Angeles και των δεδομένων εισοδήματος με βάση το ZCTA10
joined_df = LA_areas.join(income_df,
    LA_areas.ZCTA10 == income_df.ZipCode,"inner").select("COMM", "ZCTA10", "POP_2010", "HOUSING10", *income_df.columns)

print("Joined_df Explain:")
joined_df.explain()

#Υπολογισμός συνολικού πληθυσμού και συνολικού αριθμού νοικοκυριών ανά ZIP Code
zip_totals = joined_df.groupBy("ZCTA10").agg(
    sum("POP_2010").alias("TOTAL_ZIP_POP"),
    sum("HOUSING10").alias("TOTAL_ZIP_HOUSING"))

#Join των συνολικών δεδομένων (πληθυσμού και κατοικιών) με τα δεδομένα του κύριου DataFrame
joined_with_totals = joined_df.join(zip_totals,"ZCTA10","left")

print("joined_with_totals Explain:")
joined_with_totals.explain()

#Ομαδοποίηση ανά περιοχή (COMM) και υπολογισμός εκτιμώμενου εισοδήματος ανά άτομο
result_df = joined_with_totals.groupBy("COMM").agg(
    sum("TOTAL_ZIP_HOUSING").alias("TOTAL_HOUSING"),
    sum("TOTAL_ZIP_POP").alias("TOTAL_POP"),
    avg("Estimated Median Income").alias("AVG_MEDIAN_INCOME")).withColumn("Estimated_Income_Per_Person",
    (col("AVG_MEDIAN_INCOME") * col("TOTAL_HOUSING")) / col("TOTAL_POP"))

#Φιλτράρισμα για να αφαιρεθούν οι περιοχές με κενά δεδομένα (NULL)
result_df = result_df.filter(result_df.Estimated_Income_Per_Person.isNotNull())

#Εμφάνιση των αποτελεσμάτων
result_df.select("COMM", "Estimated_Income_Per_Person").show(result_df.count(), truncate=False)


###################################################################################################

#Φόρτωση δεδομένων εγκλημάτων από αρχείο CSV
file_path = 's3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv'  
crimeData_df = spark.read.csv(file_path, header=True, inferSchema=True)

#Προσθήκη στήλης για το έτος και φιλτράρισμα εγκλημάτων που έγιναν το 2015
crimeData_df = crimeData_df.withColumn("year", year(to_date("Date Rptd", "MM/dd/yyyy hh:mm:ss a")))
crimeData_df = crimeData_df.filter(col("year") == 2015)

#Δημιουργία γεωμετρικής στήλης από τα πεδία (LAT, LON)
crimeData_df = crimeData_df.withColumn("geom", expr("ST_Point(LON, LAT)"))


#Ορισμός συνάρτησης για την εκτέλεση και μέτρηση του χρόνου για διαφορετικές στρατηγικές join
def execute_join(strategy):
    print(f"\nExecuting with {strategy} hint")
    start_time = time.time()

    #Spatial join δεδομένων εγκλημάτων με τις περιοχές του Los Angeles
    crimeDataGEO_df = crimeData_df.hint(strategy).join(LA_areas.hint(strategy), expr("ST_Within(geom, geometry)"), "inner")

    #Ομαδοποίηση κατά περιοχή (COMM) και υπολογισμός των εγκλημάτων ανά περιοχή
    community_crime_stats = crimeDataGEO_df.groupBy("COMM").agg(expr("count(*)").alias("Total_Crimes_Per_Community"))

    #Υπολογισμός του συνολικού πληθυσμού για κάθε κοινότητα
    community_population = LA_areas.groupBy("COMM").agg(sum("POP_2010").alias("TOTAL_COMM_POP"))

    #Join των στατιστικών εγκλημάτων με τα δεδομένα του πληθυσμού
    final_stats = community_crime_stats.hint(strategy).join(community_population.hint(strategy), "COMM", "inner")

    #Υπολογισμός εγκλημάτων ανά άτομο
    final_stats = final_stats.withColumn("Crimes_Per_Person", col("Total_Crimes_Per_Community") / col("TOTAL_COMM_POP"))

    #Μέτρηση του χρόνου εκτέλεσης
    execution_time = time.time() - start_time
    print(f"Execution Time for {strategy}: {execution_time:.4f} seconds")

    #Εμφάνιση των τελικών αποτελεσμάτων
    final_stats.show()
    return execution_time

#Δοκιμή με τις τέσσερις στρατηγικές join
strategies = ["BROADCAST", "SHUFFLE_HASH", "MERGE", "SHUFFLE_REPLICATE_NL"]
results = {}

for strategy in strategies:
    results[strategy] = execute_join(strategy)

#Σύνοψη των αποτελεσμάτων
for strategy, time_taken in results.items():
    print(f"Strategy: {strategy}, Execution Time: {time_taken:.4f} seconds")
    
    
    
    
    
#GENERAL COMMENTS    

#OLA TA COMM TOY CENSUS_BLOCKS ME TA ZCTA10 TOUS, TO IDIO COMM EXEI POLLA ZCTA (ARA POLLA ZIP CODES)
#onlyLA_areas=LA_areas.select("COMM", "ZCTA10")
#onlyLA_areas.sort(col("COMM").asc()).show(truncate=False)


#TA ZCTA10 APO COMMS TOY CENSUS_BLOCKS TA OPOIA DEN YPARXOUN STO LA_income_2015 WS ZIP CODES
#missing_zcta10 = LA_areas.select("ZCTA10").distinct().subtract(
#    income_df.select(col("Zip Code").alias("ZCTA10")).distinct()
#)
#missing_zcta10.show()




FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Joined_df Explain:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [knownfloatingpointnormalized(normalizenanandzero(ZCTA10#4844))], [knownfloatingpointnormalized(normalizenanandzero(ZipCode#4839))], Inner, BuildRight, false
   :- HashAggregate(keys=[COMM#4641, ZCTA10#4658, POP_2010#4650L, HOUSING10#4647L], functions=[], schema specialized)
   :  +- Exchange hashpartitioning(COMM#4641, ZCTA10#4658, POP_2010#4650L, HOUSING10#4647L, 1000), ENSURE_REQUIREMENTS, [plan_id=15988]
   :     +- HashAggregate(keys=[COMM#4641, ZCTA10#4658, POP_2010#4650L, HOUSING10#4647L], functions=[], schema specialized)
   :        +- Project [features#4625.properties.COMM AS COMM#4641, features#4625.properties.HOUSING10 AS HOUSING10#4647L, features#4625.properties.POP_2010 AS POP_2010#4650L, features#4625.properties.ZCTA10 AS ZCTA10#4658]
   :           +- Filter ((isnotnull(features#4625.properties.CITY) AND (features#4625.properties.CITY = Los Angeles)) AND isnotnull(cast(featur