# Query 1

#### Dataframe 4 executors

In [1]:
from pyspark.sql.functions import col, when, count
from pyspark.sql import SparkSession
import time

# Δημιουργία SparkSession
spark = SparkSession.builder \
    .appName("Query1 Dataframe") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Φόρτωση δεδομένων από το S3 bucket και ύστερα ένωση με union δομή ώστε να έχουμε ολα τα data σε ένα Dataframe
crime_data_2010 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True, inferSchema=True
)
crime_data_2020 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True, inferSchema=True
)
# Ένωση των δύο αρχείων
crime_data = crime_data_2010.union(crime_data_2020)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
4192,application_1732639283265_4132,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Μέτρηση χρόνου για DataFrame API
start_df = time.time()

# Ξεκινάμε τη διαδικασία φιλτραρίσματος και κατηγοριοποίησης
# Φιλτράρισμα για έγκυρες ηλικίες και περιστατικά
filtered_data = crime_data.filter(
    (col("Crm Cd Desc").like("%AGGRAVATED ASSAULT%")) &
    (col("Vict Age").isNotNull()) &
    (col("Vict Age") > 0)
)

# Κατηγοριοποίηση ηλικιακών ομάδων
age_grouped_data = filtered_data.withColumn(
    "Age Group",
    when(col("Vict Age") < 18, "Children")
    .when((col("Vict Age") >= 18) & (col("Vict Age") <= 24), "Young Adults")
    .when((col("Vict Age") >= 25) & (col("Vict Age") <= 64), "Adults")
    .otherwise("Seniors")
)

# Ομαδοποίηση και ταξινόμηση κατά ηλικιακή ομάδα
result = age_grouped_data.groupBy("Age Group").agg(count("*").alias("Count")).orderBy(col("Count").desc())

# Εμφάνιση αποτελεσμάτων
result.show()

end_df = time.time()
print(f"DataFrame API Time: {end_df - start_df:.3f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------+
|   Age Group| Count|
+------------+------+
|      Adults|121093|
|Young Adults| 33605|
|    Children| 10830|
|     Seniors|  5985|
+------------+------+

DataFrame API Time: 8.412 seconds

#### RDD API 4 executors

In [1]:
from pyspark.sql.functions import col, when, count
from pyspark.sql import SparkSession
import time

# Δημιουργία SparkSession
spark = SparkSession.builder \
    .appName("Query1 RDD") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Φόρτωση δεδομένων από το S3 bucket και ύστερα ένωση με union δομή ώστε να έχουμε ολα τα data σε ένα Dataframe
crime_data_2010 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True, inferSchema=True
)
crime_data_2020 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True, inferSchema=True
)
# Ένωση των δύο αρχείων
crime_data = crime_data_2010.union(crime_data_2020)

# Μέτρηση χρόνου για RDD API
start_rdd = time.time()

crime_rdd = crime_data.rdd
# Φιλτράρισμα για aggravated assault
filtered_rdd = crime_rdd.filter(
    lambda row: (
        "AGGRAVATED ASSAULT" in row["Crm Cd Desc"] and  # Έλεγχος περιγραφής εγκλήματος
        row["Vict Age"] is not None and                 # Μη-κενές ηλικίες
        row["Vict Age"] > 0                             # Θετικές ηλικίες
    )
)

# Κατηγοριοποίηση ηλικιών και μέτρηση
age_grouped_rdd = filtered_rdd.map(lambda row: (
    "Children" if row["Vict Age"] < 18 else
    "Young Adults" if 18 <= row["Vict Age"] <= 24 else
    "Adults" if 25 <= row["Vict Age"] <= 64 else
    "Seniors", 1
))

# Ομαδοποίηση και ταξινόμηση
result_rdd = age_grouped_rdd.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False)

# Εμφάνιση αποτελεσμάτων
for group, count in result_rdd.collect():
    print(f"{group}: {count}")

end_rdd = time.time()
print(f"RDD API Time: {end_rdd - start_rdd:.3f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
4200,application_1732639283265_4140,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Adults: 121093
Young Adults: 33605
Children: 10830
Seniors: 5985
RDD API Time: 26.141 seconds

# Query 2

#### Dataframe API (Default executors)

In [2]:
from pyspark.sql import SparkSession
# Δημιουργία SparkSession
# Ενεργοποιεί την παλαιότερη συμπεριφορά (Legacy Parser) για την ανάλυση ημερομηνιών
spark = SparkSession.builder \
    .appName("Query2") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

# Φόρτωση δεδομένων από το S3 bucket και ύστερα ένωση με union δομή ώστε να έχουμε ολα τα data σε ένα Dataframe
crime_data_2010 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True, inferSchema=True
)
crime_data_2020 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True, inferSchema=True
)
# Ένωση των δύο αρχείων
crime_data = crime_data_2010.union(crime_data_2020)

from pyspark.sql.functions import col, count, sum, year, rank
from pyspark.sql.window import Window
from pyspark.sql.functions import to_timestamp

start_df = time.time()

# Μετατροπή DATE OCC σε timestamp
crime_data = crime_data.withColumn(
    "DATE OCC",
    to_timestamp(col("DATE OCC"), "MM/dd/yyyy hh:mm:ss a")
)

# Εξαγωγή έτους από τη στήλη DATE OCC
crime_data = crime_data.withColumn("year", year(col("DATE OCC")))


# Μετονομασία της στήλης AREA NAME σε precinct
crime_data = crime_data.withColumnRenamed("AREA NAME", "precinct")

# Σήμανση αν η τιμή ΔΕΝ είναι "Invest Cont" και ΔΕΝ είναι "UNK" (1 αν όχι, 0 αν ναι)
crime_data = crime_data.withColumn(
    "is_closed",
    (~(col("Status Desc").isin("Invest Cont", "UNK"))).cast("int") 
)

# Υπολογισμός συνολικών υποθέσεων και κλειστών υποθέσεων ανά έτος και precinct
case_stats = crime_data.groupBy("year", "precinct").agg(
    count("*").alias("total_cases"),          # Συνολικές υποθέσεις
    sum("is_closed").alias("closed_cases")   # Κλειστές υποθέσεις
)

# Υπολογισμός closed_case_rate
case_stats = case_stats.withColumn(
    "closed_case_rate",
    (col("closed_cases") / col("total_cases") * 100).cast("double")
)

# Δημιουργία παραθύρου για κατάταξη ανά έτος
window_spec = Window.partitionBy("year").orderBy(col("closed_case_rate").desc())

# Υπολογισμός κατάταξης
ranked_data = case_stats.withColumn("ranking", rank().over(window_spec))

# Φιλτράρισμα για τις 3 κορυφαίες κατατάξεις κάθε χρονιάς
top_3_precincts = ranked_data.filter(col("ranking") <= 3)

# Ταξινόμηση κατά έτος και ranking
result = top_3_precincts.orderBy("year", "ranking")

# Εμφάνιση αποτελεσμάτων
result.select("year", "precinct", "closed_case_rate", "ranking").show(n=45, truncate=False)

end_df = time.time()
print(f"DataFrame API Time: {end_df - start_df:.3f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------------+-------+
|year|precinct   |closed_case_rate  |ranking|
+----+-----------+------------------+-------+
|2010|Rampart    |32.84713448949121 |1      |
|2010|Olympic    |31.515289821999087|2      |
|2010|Harbor     |29.36028339237341 |3      |
|2011|Olympic    |35.040060090135206|1      |
|2011|Rampart    |32.4964471814306  |2      |
|2011|Harbor     |28.51336246316431 |3      |
|2012|Olympic    |34.29708533302119 |1      |
|2012|Rampart    |32.46000463714352 |2      |
|2012|Harbor     |29.509585848956675|3      |
|2013|Olympic    |33.58217940999398 |1      |
|2013|Rampart    |32.1060382916053  |2      |
|2013|Harbor     |29.723638951488557|3      |
|2014|Van Nuys   |32.0215235281705  |1      |
|2014|West Valley|31.49754809505847 |2      |
|2014|Mission    |31.224939855653567|3      |
|2015|Van Nuys   |32.265140677157845|1      |
|2015|Mission    |30.463762673676303|2      |
|2015|Foothill   |30.353001803658852|3      |
|2016|Van Nuys   |32.1945184621240

#### SQL API (Default executors)

In [16]:
# Φόρτωση δεδομένων από το S3 bucket και ύστερα ένωση με union δομή ώστε να έχουμε ολα τα data σε ένα Dataframe
crime_data_2010 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True, inferSchema=True
)
crime_data_2020 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True, inferSchema=True
)
# Ένωση των δύο αρχείων
crime_data = crime_data_2010.union(crime_data_2020)

from pyspark.sql.functions import col, count, sum, year, rank
from pyspark.sql.window import Window
from pyspark.sql.functions import to_timestamp

start_sql = time.time()

# Μετατροπή DATE OCC σε timestamp
crime_data = crime_data.withColumn(
    "DATE OCC",
    to_timestamp(col("DATE OCC"), "MM/dd/yyyy hh:mm:ss a")
)

# Εξαγωγή έτους από τη στήλη DATE OCC
crime_data = crime_data.withColumn("year", year(col("DATE OCC")))


# Μετονομασία της στήλης AREA NAME σε precinct
crime_data = crime_data.withColumnRenamed("AREA NAME", "precinct")

# Σήμανση αν η τιμή ΔΕΝ είναι "Invest Cont" και ΔΕΝ είναι "UNK" (1 αν όχι, 0 αν ναι)
crime_data = crime_data.withColumn(
    "is_closed",
    (~(col("Status Desc").isin("Invest Cont", "UNK"))).cast("int") 
)
# Υπολογισμός συνολικών υποθέσεων και κλειστών υποθέσεων ανά έτος και precinct
case_stats = crime_data.groupBy("year", "precinct").agg(
    count("*").alias("total_cases"),          # Συνολικές υποθέσεις
    sum("is_closed").alias("closed_cases")   # Κλειστές υποθέσεις
)

# Υπολογισμός closed_case_rate
case_stats = case_stats.withColumn(
    "closed_case_rate",
    (col("closed_cases") / col("total_cases") * 100).cast("double")
)
# Δημιουργία προσωρινού πίνακα για χρήση SQL
crime_data.createOrReplaceTempView("crime_data_table")

# Γράφουμε το SQL query
query = """
WITH ranked_data AS (
    SELECT 
        year,
        precinct,
        (SUM(is_closed) / COUNT(*) * 100) AS closed_case_rate,
        RANK() OVER (PARTITION BY year ORDER BY (SUM(is_closed) / COUNT(*) * 100) DESC) AS ranking
    FROM crime_data_table
    GROUP BY year, precinct
)
SELECT 
    year,
    precinct,
    closed_case_rate,
    ranking
FROM ranked_data
WHERE ranking <= 3
ORDER BY year, ranking
"""

# Εκτέλεση του SQL Query
result_sql = spark.sql(query)

# Εμφάνιση των αποτελεσμάτων με format παρόμοιο με την εικόνα
result_sql.show(n=45, truncate=False)


# Εκτύπωση χρόνου εκτέλεσης
end_sql = time.time()
print(f"SQL API Execution Time: {end_sql - start_sql:.3f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------------+-------+
|year|precinct   |closed_case_rate  |ranking|
+----+-----------+------------------+-------+
|2010|Rampart    |32.84713448949121 |1      |
|2010|Olympic    |31.515289821999087|2      |
|2010|Harbor     |29.36028339237341 |3      |
|2011|Olympic    |35.040060090135206|1      |
|2011|Rampart    |32.4964471814306  |2      |
|2011|Harbor     |28.51336246316431 |3      |
|2012|Olympic    |34.29708533302119 |1      |
|2012|Rampart    |32.46000463714352 |2      |
|2012|Harbor     |29.509585848956675|3      |
|2013|Olympic    |33.58217940999398 |1      |
|2013|Rampart    |32.1060382916053  |2      |
|2013|Harbor     |29.723638951488557|3      |
|2014|Van Nuys   |32.0215235281705  |1      |
|2014|West Valley|31.49754809505847 |2      |
|2014|Mission    |31.224939855653567|3      |
|2015|Van Nuys   |32.265140677157845|1      |
|2015|Mission    |30.463762673676303|2      |
|2015|Foothill   |30.353001803658852|3      |
|2016|Van Nuys   |32.1945184621240

#### Paqruet & SQL API

In [20]:
# 1. Φόρτωση δεδομένων από τα δύο CSV αρχεία
crime_data_2010 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True, inferSchema=True
)

crime_data_2020 = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True, inferSchema=True
)

# 2. Ένωση των δύο DataFrames
crime_data = crime_data_2010.union(crime_data_2020)

# 3. Μετατροπή και αποθήκευση ως Parquet στο S3
start_parquet_write = time.time()

# Αποθήκευση σε μοναδικό αρχείο .parquet
crime_data.coalesce(1).write.mode("overwrite").parquet(
    "s3://groups-bucket-dblab-905418150721/group27/Crime_Data/"
)

end_parquet_write = time.time()
print(f"Time to write to Parquet: {round(end_parquet_write - start_parquet_write, 3)} seconds")

# Διαβάζοντας από Parquet
start_parquet_read_sql = time.time()

crime_data = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group27/Crime_Data/")

# 2. Μετατροπή DATE OCC σε timestamp (αν χρειαστεί ξανά)
crime_data = crime_data.withColumn(
    "DATE OCC",
    to_timestamp(col("DATE OCC"), "MM/dd/yyyy hh:mm:ss a")
)

# 3. Εξαγωγή έτους από τη στήλη DATE OCC
crime_data = crime_data.withColumn("year", year(col("DATE OCC")))

# 4. Μετονομασία της στήλης AREA NAME σε precinct
crime_data = crime_data.withColumnRenamed("AREA NAME", "precinct")

# 5. Σήμανση αν η τιμή ΔΕΝ είναι "Invest Cont" και ΔΕΝ είναι "UNK" (1 αν όχι, 0 αν ναι)
crime_data = crime_data.withColumn(
    "is_closed",
    (~(col("Status Desc").isin("Invest Cont", "UNK"))).cast("int")
)

# 6. Δημιουργία προσωρινού πίνακα για χρήση SQL
crime_data.createOrReplaceTempView("crime_data_table")

# 7. Γράφουμε το SQL Query για υπολογισμούς και κατάταξη
query = """
WITH ranked_data AS (
    SELECT 
        year,
        precinct,
        (SUM(is_closed) / COUNT(*) * 100) AS closed_case_rate,
        RANK() OVER (PARTITION BY year ORDER BY (SUM(is_closed) / COUNT(*) * 100) DESC) AS ranking
    FROM crime_data_table
    GROUP BY year, precinct
)
SELECT 
    year,
    precinct,
    closed_case_rate,
    ranking
FROM ranked_data
WHERE ranking <= 3
ORDER BY year, ranking
"""

# 8. Εκτέλεση του SQL Query
result_sql = spark.sql(query)

# 9. Εμφάνιση αποτελεσμάτων
result_sql.show(n=45, truncate=False)

end_parquet_read_sql = time.time()
print(f"SQL API Execution Time using Parquet: {round(end_parquet_read_sql - start_parquet_read_sql, 3)} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Time to write to Parquet: 28.166 seconds
+----+-----------+------------------+-------+
|year|precinct   |closed_case_rate  |ranking|
+----+-----------+------------------+-------+
|2010|Rampart    |32.84713448949121 |1      |
|2010|Olympic    |31.515289821999087|2      |
|2010|Harbor     |29.36028339237341 |3      |
|2011|Olympic    |35.040060090135206|1      |
|2011|Rampart    |32.4964471814306  |2      |
|2011|Harbor     |28.51336246316431 |3      |
|2012|Olympic    |34.29708533302119 |1      |
|2012|Rampart    |32.46000463714352 |2      |
|2012|Harbor     |29.509585848956675|3      |
|2013|Olympic    |33.58217940999398 |1      |
|2013|Rampart    |32.1060382916053  |2      |
|2013|Harbor     |29.723638951488557|3      |
|2014|Van Nuys   |32.0215235281705  |1      |
|2014|West Valley|31.49754809505847 |2      |
|2014|Mission    |31.224939855653567|3      |
|2015|Van Nuys   |32.265140677157845|1      |
|2015|Mission    |30.463762673676303|2      |
|2015|Foothill   |30.353001803658852|3 

# Query 3

#### Broadcast Join

In [2]:
from sedona.spark import *
from sedona.register.geo_registrator import SedonaRegistrator
from pyspark.sql.functions import col, sum, count, regexp_replace
from pyspark.sql import SparkSession
import time

# Δημιουργία Spark και Sedona Context
spark = SparkSession.builder \
    .appName("GeoJSON Read and Process") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

# Φόρτωση δεδομένων
crime_data = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group27/Crime_Data/")
income_csv = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = SedonaContext.create(spark).read.format("geojson") \
    .option("multiLine", "true").load(geojson_path) \
    .selectExpr("explode(features) as features") \
    .select("features.*")

# Μορφοποίηση δεδομένων
census_2010 = blocks_df.select(
    [col(f"properties.{col_name}").alias(col_name) for col_name in
     blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]
).drop("properties").drop("type")

# Καθαρισμός των δεδομένων του εισοδήματος
income_csv = income_csv.withColumn(
    "Estimated Median Income",
    regexp_replace(regexp_replace(col("Estimated Median Income"), "\\$", ""), ",", "").cast("int")
).withColumn(
    "Zip Code",
    col("Zip Code").cast("int")
)
# Μετατροπή της στήλης POP_2010 σε αριθμητικό τύπο
census_2010 = census_2010.withColumn("POP_2010", col("POP_2010").cast("int")).withColumn("HOUSING10", col("HOUSING10").cast("int"))

# Φιλτράρισμα δεδομένων
crime_data = crime_data.filter(col("LAT").isNotNull() & col("LON").isNotNull() & ((col("LAT") != 0) & (col("LON") != 0)))
income_csv = income_csv.filter(col("Zip Code").isNotNull())
census_2010 = census_2010.filter((col("ZCTA10").isNotNull()) & (col("COMM").isNotNull()) & (col("POP_2010").isNotNull()) &
    (col("HOUSING10").isNotNull()) & ((col("POP_2010") > 0) & (col("HOUSING10") > 0)) |
    ((col("HOUSING10") == col("POP_2010")) & (col("HOUSING10") < col("POP_2010")))
)


start_broad = time.time()

# Join μεταξύ εισοδήματος και απογραφής
census_income_joined = census_2010.join(
    income_csv.hint("BROADCAST"),
    census_2010["ZCTA10"] == income_csv["Zip Code"],
    "inner"
)

# Υπολογισμός συνολικού μέσου εισοδήματος ανα άτομο ανά COMM = sum(housing*income/pop) / φορες εφαρμογής του sum
aggregated_income_data = census_income_joined.groupBy("COMM").agg(
    (sum((col("Estimated Median Income") * col("HOUSING10") / col("POP_2010")).cast("double")) /
     count("*")).alias("income_per_capita")
)


# Κανονικοποίηση του census_2010 για μοναδικά COMM
census_2010_normalized = census_2010.groupBy("COMM").agg(
    sum("POP_2010").alias("POP_2010")
)


crime_data = crime_data.withColumn(
    "geom",
    ST_Point("LON", "LAT")  # Δημιουργία γεωμετρικών σημείων από το LON και LAT
)

# Σύνδεση crime_data με census_2010 με spatial join
joined_df = crime_data.join(
    census_2010.hint("BROADCAST"),
    ST_Within(crime_data["geom"], census_2010["geometry"]),
    "inner"
)

# Υπολογισμός συνολικών εγκλημάτων ανά COMM
crime_by_area = joined_df.groupBy("COMM").agg(
    count("*").alias("total_crimes")
)

# Συνένωση με πληθυσμό
crime_with_population = crime_by_area.join(
    census_2010_normalized.hint("BROADCAST"),
    "COMM",
    "inner"
)

# Υπολογισμός εγκλημάτων ανά άτομο
crime_with_population = crime_with_population.withColumn(
    "crimes_per_person",
    (col("total_crimes") / col("POP_2010")).cast("double")
)
# Συνένωση με πληθυσμό
crime_income = crime_with_population.join(
    aggregated_income_data.hint("BROADCAST"),
    "COMM",
    "inner"
)
crime_income.explain(mode="formatted")
# Εμφάνιση αποτελεσμάτων
crime_income.select("COMM", "POP_2010", "crimes_per_person", "income_per_capita").show(truncate=False)

end_broad = time.time()
print(f"Execution time with Broadcast joins: {end_broad - start_broad:.3f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (43)
+- Project (42)
   +- BroadcastHashJoin Inner BuildRight (41)
      :- Project (25)
      :  +- BroadcastHashJoin Inner BuildRight (24)
      :     :- HashAggregate (14)
      :     :  +- Exchange (13)
      :     :     +- HashAggregate (12)
      :     :        +- Project (11)
      :     :           +- BroadcastIndexJoin (10)
      :     :              :- Project (3)
      :     :              :  +- Filter (2)
      :     :              :     +- Scan parquet  (1)
      :     :              +- SpatialIndex (9)
      :     :                 +- Project (8)
      :     :                    +- Filter (7)
      :     :                       +- Generate (6)
      :     :                          +- Filter (5)
      :     :                             +- Scan geojson  (4)
      :     +- BroadcastExchange (23)
      :        +- HashAggregate (22)
      :           +- Exchange (21)
      :              +- HashAggregate (20)
      :                 +- 

#### Merge Join

In [3]:
from sedona.spark import *
from sedona.register.geo_registrator import SedonaRegistrator
from pyspark.sql.functions import col, sum, count, regexp_replace
from pyspark.sql import SparkSession
import time

# Δημιουργία Spark και Sedona Context
spark = SparkSession.builder \
    .appName("GeoJSON Read and Process") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

# Φόρτωση δεδομένων
crime_data = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group27/Crime_Data/")
income_csv = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = SedonaContext.create(spark).read.format("geojson") \
    .option("multiLine", "true").load(geojson_path) \
    .selectExpr("explode(features) as features") \
    .select("features.*")

# Μορφοποίηση δεδομένων
census_2010 = blocks_df.select(
    [col(f"properties.{col_name}").alias(col_name) for col_name in
     blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]
).drop("properties").drop("type")

# Καθαρισμός των δεδομένων του εισοδήματος
income_csv = income_csv.withColumn(
    "Estimated Median Income",
    regexp_replace(regexp_replace(col("Estimated Median Income"), "\\$", ""), ",", "").cast("int")
).withColumn(
    "Zip Code",
    col("Zip Code").cast("int")
)
# Μετατροπή της στήλης POP_2010 σε αριθμητικό τύπο
census_2010 = census_2010.withColumn("POP_2010", col("POP_2010").cast("int")).withColumn("HOUSING10", col("HOUSING10").cast("int"))

# Φιλτράρισμα δεδομένων
crime_data = crime_data.filter(col("LAT").isNotNull() & col("LON").isNotNull() & ((col("LAT") != 0) & (col("LON") != 0)))
income_csv = income_csv.filter(col("Zip Code").isNotNull())
census_2010 = census_2010.filter((col("ZCTA10").isNotNull()) & (col("COMM").isNotNull()) & (col("POP_2010").isNotNull()) &
    (col("HOUSING10").isNotNull()) & ((col("POP_2010") > 0) & (col("HOUSING10") > 0)) |
    ((col("HOUSING10") == col("POP_2010")) & (col("HOUSING10") < col("POP_2010")))
)



start_merge = time.time()
# Join μεταξύ εισοδήματος και απογραφής
census_income_joined = census_2010.join(
    income_csv.hint("MERGE"),
    census_2010["ZCTA10"] == income_csv["Zip Code"],
    "inner"
)

# Υπολογισμός συνολικού μέσου εισοδήματος ανα άτομο ανά COMM = sum(housing*income/pop) / φορες εφαρμογής του sum
aggregated_income_data = census_income_joined.groupBy("COMM").agg(
    (sum((col("Estimated Median Income") * col("HOUSING10") / col("POP_2010")).cast("double")) /
     count("*")).alias("income_per_capita")
)


# Κανονικοποίηση του census_2010 για μοναδικά COMM
census_2010_normalized = census_2010.groupBy("COMM").agg(
    sum("POP_2010").alias("POP_2010")
)

# Φόρτωση δεδομένων εγκλημάτων
crime_data = crime_data.withColumn(
    "geom",
    ST_Point("LON", "LAT")  # Δημιουργία γεωμετρικών σημείων από το LON και LAT
)

# Σύνδεση crime_data με census_2010 με spatial join
joined_df = crime_data.join(
    census_2010.hint("MERGE"),
    ST_Within(crime_data["geom"], census_2010["geometry"]),
    "inner"
)

# Υπολογισμός συνολικών εγκλημάτων ανά COMM
crime_by_area = joined_df.groupBy("COMM").agg(
    count("*").alias("total_crimes")
)

# Συνένωση με πληθυσμό
crime_with_population = crime_by_area.join(
    census_2010_normalized.hint("MERGE"),
    "COMM",
    "inner"
)

# Υπολογισμός εγκλημάτων ανά άτομο
crime_with_population = crime_with_population.withColumn(
    "crimes_per_person",
    (col("total_crimes") / col("POP_2010")).cast("double")
)
# Συνένωση με πληθυσμό
crime_income = crime_with_population.join(
    aggregated_income_data.hint("MERGE"),
    "COMM",
    "inner"
)
crime_income.explain(mode="formatted")
# Εμφάνιση αποτελεσμάτων
crime_income.select("COMM", "POP_2010", "crimes_per_person", "income_per_capita").show(truncate=False)
end_merge = time.time()
print(f"Execution time with Merge joins: {end_merge - start_merge:.3f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (46)
+- Project (45)
   +- SortMergeJoin Inner (44)
      :- Project (25)
      :  +- SortMergeJoin Inner (24)
      :     :- Sort (14)
      :     :  +- HashAggregate (13)
      :     :     +- Exchange (12)
      :     :        +- HashAggregate (11)
      :     :           +- Project (10)
      :     :              +- RangeJoin (9)
      :     :                 :- Project (3)
      :     :                 :  +- Filter (2)
      :     :                 :     +- Scan parquet  (1)
      :     :                 +- Project (8)
      :     :                    +- Filter (7)
      :     :                       +- Generate (6)
      :     :                          +- Filter (5)
      :     :                             +- Scan geojson  (4)
      :     +- Sort (23)
      :        +- HashAggregate (22)
      :           +- Exchange (21)
      :              +- HashAggregate (20)
      :                 +- Project (19)
      :                    +- Filter (

#### SHUFFLE_HASH joins

In [4]:
from sedona.spark import *
from sedona.register.geo_registrator import SedonaRegistrator
from pyspark.sql.functions import col, sum, count, regexp_replace
from pyspark.sql import SparkSession
import time

# Δημιουργία Spark και Sedona Context
spark = SparkSession.builder \
    .appName("GeoJSON Read and Process with Suffle_Hash") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

# Φόρτωση δεδομένων
crime_data = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group27/Crime_Data/")
income_csv = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = SedonaContext.create(spark).read.format("geojson") \
    .option("multiLine", "true").load(geojson_path) \
    .selectExpr("explode(features) as features") \
    .select("features.*")

# Μορφοποίηση δεδομένων
census_2010 = blocks_df.select(
    [col(f"properties.{col_name}").alias(col_name) for col_name in
     blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]
).drop("properties").drop("type")

# Καθαρισμός των δεδομένων του εισοδήματος
income_csv = income_csv.withColumn(
    "Estimated Median Income",
    regexp_replace(regexp_replace(col("Estimated Median Income"), "\\$", ""), ",", "").cast("int")
).withColumn(
    "Zip Code",
    col("Zip Code").cast("int")
)
# Μετατροπή της στήλης POP_2010 σε αριθμητικό τύπο
census_2010 = census_2010.withColumn("POP_2010", col("POP_2010").cast("int")).withColumn("HOUSING10", col("HOUSING10").cast("int"))

# Φιλτράρισμα δεδομένων
crime_data = crime_data.filter(col("LAT").isNotNull() & col("LON").isNotNull() & ((col("LAT") != 0) & (col("LON") != 0)))
income_csv = income_csv.filter(col("Zip Code").isNotNull())
census_2010 = census_2010.filter((col("ZCTA10").isNotNull()) & (col("COMM").isNotNull()) & (col("POP_2010").isNotNull()) &
    (col("HOUSING10").isNotNull()) & ((col("POP_2010") > 0) & (col("HOUSING10") > 0)) |
    ((col("HOUSING10") == col("POP_2010")) & (col("HOUSING10") < col("POP_2010")))
)

start_shuffle = time.time()
# Join μεταξύ εισοδήματος και απογραφής
census_income_joined = census_2010.join(
    income_csv.hint("SHUFFLE_HASH"),
    census_2010["ZCTA10"] == income_csv["Zip Code"],
    "inner"
)

# Υπολογισμός συνολικού μέσου εισοδήματος ανα άτομο ανά COMM = sum(housing*income/pop) / φορες εφαρμογής του sum
aggregated_income_data = census_income_joined.groupBy("COMM").agg(
    (sum((col("Estimated Median Income") * col("HOUSING10") / col("POP_2010")).cast("double")) /
     count("*")).alias("income_per_capita")
)


# Κανονικοποίηση του census_2010 για μοναδικά COMM
census_2010_normalized = census_2010.groupBy("COMM").agg(
    sum("POP_2010").alias("POP_2010")
)

# Φόρτωση δεδομένων εγκλημάτων
crime_data = crime_data.withColumn(
    "geom",
    ST_Point("LON", "LAT")  # Δημιουργία γεωμετρικών σημείων από το LON και LAT
)

# Σύνδεση crime_data με census_2010 με spatial join
joined_df = crime_data.join(
    census_2010.hint("SHUFFLE_HASH"),
    ST_Within(crime_data["geom"], census_2010["geometry"]),
    "inner"
)

# Υπολογισμός συνολικών εγκλημάτων ανά COMM
crime_by_area = joined_df.groupBy("COMM").agg(
    count("*").alias("total_crimes")
)

# Συνένωση με πληθυσμό
crime_with_population = crime_by_area.join(
    census_2010_normalized.hint("SHUFFLE_HASH"),
    "COMM",
    "inner"
)

# Υπολογισμός εγκλημάτων ανά άτομο
crime_with_population = crime_with_population.withColumn(
    "crimes_per_person",
    (col("total_crimes") / col("POP_2010")).cast("double")
)
# Συνένωση με πληθυσμό
crime_income = crime_with_population.join(
    aggregated_income_data.hint("SHUFFLE_HASH"),
    "COMM",
    "inner"
)
crime_income.explain(mode="formatted")
# Εμφάνιση αποτελεσμάτων
crime_income.select("COMM", "POP_2010", "crimes_per_person", "income_per_capita").show(truncate=False)
end_shuffle = time.time()
print(f"Execution time with Shuffle Hash joins: {end_shuffle - start_shuffle:.3f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (41)
+- Project (40)
   +- ShuffledHashJoin Inner BuildRight (39)
      :- Project (23)
      :  +- ShuffledHashJoin Inner BuildRight (22)
      :     :- HashAggregate (13)
      :     :  +- Exchange (12)
      :     :     +- HashAggregate (11)
      :     :        +- Project (10)
      :     :           +- RangeJoin (9)
      :     :              :- Project (3)
      :     :              :  +- Filter (2)
      :     :              :     +- Scan parquet  (1)
      :     :              +- Project (8)
      :     :                 +- Filter (7)
      :     :                    +- Generate (6)
      :     :                       +- Filter (5)
      :     :                          +- Scan geojson  (4)
      :     +- HashAggregate (21)
      :        +- Exchange (20)
      :           +- HashAggregate (19)
      :              +- Project (18)
      :                 +- Filter (17)
      :                    +- Generate (16)
      :                     

#### SHUFFLE_REPLICATE_NL joins

In [5]:
from sedona.spark import *
from sedona.register.geo_registrator import SedonaRegistrator
from pyspark.sql.functions import col, sum, count, regexp_replace
from pyspark.sql import SparkSession
import time

# Δημιουργία Spark και Sedona Context
spark = SparkSession.builder \
    .appName("GeoJSON Read and Process with Suffle_Replicate_nl") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

# Φόρτωση δεδομένων
crime_data = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group27/Crime_Data/")
income_csv = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = SedonaContext.create(spark).read.format("geojson") \
    .option("multiLine", "true").load(geojson_path) \
    .selectExpr("explode(features) as features") \
    .select("features.*")

# Μορφοποίηση δεδομένων
census_2010 = blocks_df.select(
    [col(f"properties.{col_name}").alias(col_name) for col_name in
     blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]
).drop("properties").drop("type")

# Καθαρισμός των δεδομένων του εισοδήματος
income_csv = income_csv.withColumn(
    "Estimated Median Income",
    regexp_replace(regexp_replace(col("Estimated Median Income"), "\\$", ""), ",", "").cast("int")
).withColumn(
    "Zip Code",
    col("Zip Code").cast("int")
)
# Μετατροπή της στήλης POP_2010 σε αριθμητικό τύπο
census_2010 = census_2010.withColumn("POP_2010", col("POP_2010").cast("int")).withColumn("HOUSING10", col("HOUSING10").cast("int"))

# Φιλτράρισμα δεδομένων
crime_data = crime_data.filter(col("LAT").isNotNull() & col("LON").isNotNull() & ((col("LAT") != 0) & (col("LON") != 0)))
income_csv = income_csv.filter(col("Zip Code").isNotNull())
census_2010 = census_2010.filter((col("ZCTA10").isNotNull()) & (col("COMM").isNotNull()) & (col("POP_2010").isNotNull()) &
    (col("HOUSING10").isNotNull()) & ((col("POP_2010") > 0) & (col("HOUSING10") > 0)) |
    ((col("HOUSING10") == col("POP_2010")) & (col("HOUSING10") < col("POP_2010")))
)

start_rep = time.time()
# Join μεταξύ εισοδήματος και απογραφής
census_income_joined = census_2010.join(
    income_csv.hint("SHUFFLE_REPLICATE_NL"),
    census_2010["ZCTA10"] == income_csv["Zip Code"],
    "inner"
)

# Υπολογισμός συνολικού μέσου εισοδήματος ανα άτομο ανά COMM = sum(housing*income/pop) / φορες εφαρμογής του sum
aggregated_income_data = census_income_joined.groupBy("COMM").agg(
    (sum((col("Estimated Median Income") * col("HOUSING10") / col("POP_2010")).cast("double")) /
     count("*")).alias("income_per_capita")
)


# Κανονικοποίηση του census_2010 για μοναδικά COMM
census_2010_normalized = census_2010.groupBy("COMM").agg(
    sum("POP_2010").alias("POP_2010")
)

# Φόρτωση δεδομένων εγκλημάτων
crime_data = crime_data.withColumn(
    "geom",
    ST_Point("LON", "LAT")  # Δημιουργία γεωμετρικών σημείων από το LON και LAT
)

# Σύνδεση crime_data με census_2010 με spatial join
joined_df = crime_data.join(
    census_2010.hint("SHUFFLE_REPLICATE_NL"),
    ST_Within(crime_data["geom"], census_2010["geometry"]),
    "inner"
)

# Υπολογισμός συνολικών εγκλημάτων ανά COMM
crime_by_area = joined_df.groupBy("COMM").agg(
    count("*").alias("total_crimes")
)

# Συνένωση με πληθυσμό
crime_with_population = crime_by_area.join(
    census_2010_normalized.hint("SHUFFLE_REPLICATE_NL"),
    "COMM",
    "inner"
)

# Υπολογισμός εγκλημάτων ανά άτομο
crime_with_population = crime_with_population.withColumn(
    "crimes_per_person",
    (col("total_crimes") / col("POP_2010")).cast("double")
)
# Συνένωση με πληθυσμό
crime_income = crime_with_population.join(
    aggregated_income_data.hint("SHUFFLE_REPLICATE_NL"),
    "COMM",
    "inner"
)
crime_income.explain(mode="formatted")
# Εμφάνιση αποτελεσμάτων
crime_income.select("COMM", "POP_2010", "crimes_per_person", "income_per_capita").show(truncate=False)
end_rep = time.time()
print(f"Execution time with SHUFFLE_REPLICATE_NL joins: {end_rep - start_rep:.3f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (39)
+- Project (38)
   +- CartesianProduct Inner (37)
      :- Project (23)
      :  +- CartesianProduct Inner (22)
      :     :- HashAggregate (13)
      :     :  +- Exchange (12)
      :     :     +- HashAggregate (11)
      :     :        +- Project (10)
      :     :           +- RangeJoin (9)
      :     :              :- Project (3)
      :     :              :  +- Filter (2)
      :     :              :     +- Scan parquet  (1)
      :     :              +- Project (8)
      :     :                 +- Filter (7)
      :     :                    +- Generate (6)
      :     :                       +- Filter (5)
      :     :                          +- Scan geojson  (4)
      :     +- HashAggregate (21)
      :        +- Exchange (20)
      :           +- HashAggregate (19)
      :              +- Project (18)
      :                 +- Filter (17)
      :                    +- Generate (16)
      :                       +- Filter (15)
     

# Query 4

### 2 Executors

#### 1core/2 GB memory

In [11]:
from sedona.spark import *
from sedona.register.geo_registrator import SedonaRegistrator
from pyspark.sql.functions import col, sum, count, regexp_replace, desc
from pyspark.sql import SparkSession
import time

# Ρύθμιση Spark Session για 1 core και 2 GB memory
spark = SparkSession.builder \
    .appName("1core/2 GB memory") \
    .config("spark.executor.instances", 2) \
    .config("spark.executor.cores", 1) \
    .config("spark.executor.memory", "2G") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

# Φόρτωση δεδομένων
crime_data = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group27/Crime_Data/")
income_csv = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = SedonaContext.create(spark).read.format("geojson") \
    .option("multiLine", "true").load(geojson_path) \
    .selectExpr("explode(features) as features") \
    .select("features.*")

# Μορφοποίηση δεδομένων
census_2010 = blocks_df.select(
    [col(f"properties.{col_name}").alias(col_name) for col_name in
     blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]
).drop("properties").drop("type")

# Καθαρισμός των δεδομένων του εισοδήματος
income_csv = income_csv.withColumn(
    "Estimated Median Income",
    regexp_replace(regexp_replace(col("Estimated Median Income"), "\\$", ""), ",", "").cast("int")
).withColumn(
    "Zip Code",
    col("Zip Code").cast("int")
)

# Μετατροπή των στηλών που φιλτράρουμε σε αριθμητικό τύπο int.
census_2010 = census_2010.withColumn("POP_2010", col("POP_2010").cast("int")).withColumn("HOUSING10", col("HOUSING10").cast("int"))

# Φιλτράρισμα δεδομένων
crime_data = crime_data.filter(col("LAT").isNotNull() & col("LON").isNotNull() & ((col("LAT") != 0) & (col("LON") != 0)))
income_csv = income_csv.filter(col("Zip Code").isNotNull())
census_2010 = census_2010.filter((col("ZCTA10").isNotNull()) & (col("COMM").isNotNull()) & (col("POP_2010").isNotNull()) &
    (col("HOUSING10").isNotNull()) & ((col("POP_2010") > 0) & (col("HOUSING10") > 0)) |
    ((col("HOUSING10") == col("POP_2010")) & (col("HOUSING10") < col("POP_2010")))
)

start_rep = time.time()

# Join μεταξύ εισοδήματος και απογραφής
census_income_joined = census_2010.join(
    income_csv,
    census_2010["ZCTA10"] == income_csv["Zip Code"],
    "inner"
)

# Υπολογισμός συνολικού εισοδήματος ανά COMM = sum(housing*income/pop) / φορες εφαρμογής του sum
aggregated_income_data = census_income_joined.groupBy("COMM").agg(
    (sum((col("Estimated Median Income") * col("HOUSING10") / col("POP_2010")).cast("double")) /
     count("*")).alias("income_per_capita")
)


# Φόρτωση δεδομένων εγκλημάτων
crime_data_2015 = crime_data.filter(col("DATE OCC").contains("2015")) # Κραταμε μονο του 2015 crime_data
crime_data = crime_data_2015.withColumn(
    "geom",
    ST_Point("LON", "LAT")  # Δημιουργία γεωμετρικών σημείων από το LON και LAT
)

# Σύνδεση crime_data με census_2010 με spatial join
joined_df = crime_data.join(
    census_2010,
    ST_Within(crime_data["geom"], census_2010["geometry"]),
    "inner"
)



# Προσθήκη εισοδήματος
crime_with_income = joined_df.join(
    aggregated_income_data,
    "COMM",
    "inner"
)

# Εύρεση των 3 περιοχών με υψηλότερο και χαμηλότερο εισόδημα
top_income_areas = aggregated_income_data.orderBy(col("income_per_capita").desc()).limit(3)
low_income_areas = aggregated_income_data.orderBy(col("income_per_capita").asc()).limit(3)

# Φιλτράρισμα εγκλημάτων για τις περιοχές αυτές
crime_top_income = crime_with_income.join(top_income_areas, "COMM", "inner")
crime_low_income = crime_with_income.join(low_income_areas, "COMM", "inner")

# Φόρτωση του RE (Race and Ethnicity codes)
re_codes = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=False, inferSchema=True)
re_codes = re_codes.withColumnRenamed("_c1", "Victim Descent")

# Αντιστοίχιση φυλετικής καταγωγής
crime_top_income = crime_top_income.join(re_codes, crime_top_income["Vict Descent"] == re_codes["_c0"], "inner")
crime_low_income = crime_low_income.join(re_codes, crime_low_income["Vict Descent"] == re_codes["_c0"], "inner")


# Ομαδοποίηση ανά Victim Descent, μέτρηση, αύξουσα ταξινόμηση
crime_top_income = crime_top_income.groupBy("Victim Descent").agg(count("*").alias("Victim Number")).orderBy(col("Victim Number").desc())
print("Table for 3 regions with the highest per capita income")
crime_top_income.show(truncate=False)

print("Table for 3 regions with the lowest per capita income")
crime_low_income = crime_low_income.groupBy("Victim Descent").agg(count("*").alias("Victim Number")).orderBy(col("Victim Number").desc())
crime_low_income.show(truncate=False)

end_rep = time.time()
print(f"Execution time with 1core/2 GB memory: {end_rep - start_rep:.3f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Table for 3 regions with the highest per capita income
+----------------------+-------------+
|Victim Descent        |Victim Number|
+----------------------+-------------+
|White                 |252          |
|Other                 |77           |
|Hispanic/Latin/Mexican|23           |
|Black                 |13           |
|Other Asian           |12           |
|Unknown               |8            |
+----------------------+-------------+

Table for 3 regions with the lowest per capita income
+------------------------------+-------------+
|Victim Descent                |Victim Number|
+------------------------------+-------------+
|Hispanic/Latin/Mexican        |2413         |
|Black                         |638          |
|Other                         |111          |
|White                         |53           |
|Other Asian                   |6            |
|Unknown                       |3            |
|Filipino                      |1            |
|Korean                       

#### 2cores/4GB memory

In [30]:
# Ρύθμιση Spark Session για 1 core και 2 GB memory
spark = SparkSession.builder \
    .appName("2cores/4GB memory") \
    .config("spark.executor.instances", 2) \
    .config("spark.executor.cores", 2) \
    .config("spark.executor.memory", "4G") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

# Φόρτωση δεδομένων
crime_data = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group27/Crime_Data/")
income_csv = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = SedonaContext.create(spark).read.format("geojson") \
    .option("multiLine", "true").load(geojson_path) \
    .selectExpr("explode(features) as features") \
    .select("features.*")

# Μορφοποίηση δεδομένων
census_2010 = blocks_df.select(
    [col(f"properties.{col_name}").alias(col_name) for col_name in
     blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]
).drop("properties").drop("type")

# Καθαρισμός των δεδομένων του εισοδήματος
income_csv = income_csv.withColumn(
    "Estimated Median Income",
    regexp_replace(regexp_replace(col("Estimated Median Income"), "\\$", ""), ",", "").cast("int")
).withColumn(
    "Zip Code",
    col("Zip Code").cast("int")
)
# Μετατροπή των στηλών που φιλτράρουμε μετά σε αριθμητικό τύπο int.
census_2010 = census_2010.withColumn("POP_2010", col("POP_2010").cast("int")).withColumn("HOUSING10", col("HOUSING10").cast("int"))

# Φιλτράρισμα δεδομένων
crime_data = crime_data.filter(col("LAT").isNotNull() & col("LON").isNotNull() & ((col("LAT") != 0) & (col("LON") != 0)))
income_csv = income_csv.filter(col("Zip Code").isNotNull())
census_2010 = census_2010.filter((col("ZCTA10").isNotNull()) & (col("COMM").isNotNull()) & (col("POP_2010").isNotNull()) &
    (col("HOUSING10").isNotNull()) & ((col("POP_2010") > 0) & (col("HOUSING10") > 0)) |
    ((col("HOUSING10") == col("POP_2010")) & (col("HOUSING10") < col("POP_2010")))
)

start_rep = time.time()

# Join μεταξύ εισοδήματος και απογραφής
census_income_joined = census_2010.join(
    income_csv,
    census_2010["ZCTA10"] == income_csv["Zip Code"],
    "inner"
)

# Υπολογισμός συνολικού εισοδήματος ανά COMM = sum(housing*income/pop) / φορες εφαρμογής του sum
aggregated_income_data = census_income_joined.groupBy("COMM").agg(
    (sum((col("Estimated Median Income") * col("HOUSING10") / col("POP_2010")).cast("double")) /
     count("*")).alias("income_per_capita")
)


# Φόρτωση δεδομένων εγκλημάτων
crime_data_2015 = crime_data.filter(col("DATE OCC").contains("2015")) # Κραταμε μονο του 2015 crime_data
crime_data = crime_data_2015.withColumn(
    "geom",
    ST_Point("LON", "LAT")  # Δημιουργία γεωμετρικών σημείων από το LON και LAT
)

# Σύνδεση crime_data με census_2010 με spatial join
joined_df = crime_data.join(
    census_2010,
    ST_Within(crime_data["geom"], census_2010["geometry"]),
    "inner"
)



# Προσθήκη εισοδήματος
crime_with_income = joined_df.join(
    aggregated_income_data,
    "COMM",
    "inner"
)

# Εύρεση των 3 περιοχών με υψηλότερο και χαμηλότερο εισόδημα
top_income_areas = aggregated_income_data.orderBy(col("income_per_capita").desc()).limit(3)
low_income_areas = aggregated_income_data.orderBy(col("income_per_capita").asc()).limit(3)

# Φιλτράρισμα εγκλημάτων για τις περιοχές αυτές
crime_top_income = crime_with_income.join(top_income_areas, "COMM", "inner")
crime_low_income = crime_with_income.join(low_income_areas, "COMM", "inner")

# Φόρτωση του RE (Race and Ethnicity codes)
re_codes = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=False, inferSchema=True)
re_codes = re_codes.withColumnRenamed("_c1", "Victim Descent")

# Αντιστοίχιση φυλετικής καταγωγής
crime_top_income = crime_top_income.join(re_codes, crime_top_income["Vict Descent"] == re_codes["_c0"], "inner")
crime_low_income = crime_low_income.join(re_codes, crime_low_income["Vict Descent"] == re_codes["_c0"], "inner")

# Ομαδοποίηση ανά Victim Descent, μέτρηση, αύξουσα ταξινόμηση
crime_top_income = crime_top_income.groupBy("Victim Descent").agg(count("*").alias("Victim Number")).orderBy(col("Victim Number").desc())
print("Table for 3 regions with the highest per capita income")
crime_top_income.show(truncate=False)

print("Table for 3 regions with the lowest per capita income")
crime_low_income = crime_low_income.groupBy("Victim Descent").agg(count("*").alias("Victim Number")).orderBy(col("Victim Number").desc())
crime_low_income.show(truncate=False)

end_rep = time.time()
print(f"Execution time with 2cores/4GB memory: {end_rep - start_rep:.3f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Table for 3 regions with the highest per capita income
+----------------------+-------------+
|Victim Descent        |Victim Number|
+----------------------+-------------+
|White                 |252          |
|Other                 |77           |
|Hispanic/Latin/Mexican|23           |
|Black                 |13           |
|Other Asian           |12           |
|Unknown               |8            |
+----------------------+-------------+

Table for 3 regions with the lowest per capita income
+------------------------------+-------------+
|Victim Descent                |Victim Number|
+------------------------------+-------------+
|Hispanic/Latin/Mexican        |2413         |
|Black                         |638          |
|Other                         |111          |
|White                         |53           |
|Other Asian                   |6            |
|Unknown                       |3            |
|Korean                        |1            |
|American Indian/Alaskan Nativ

#### 4cores/8GB memory

In [29]:
# Ρύθμιση Spark Session για 1 core και 2 GB memory
spark = SparkSession.builder \
    .appName("4cores/8GB memory") \
    .config("spark.executor.instances", 2) \
    .config("spark.executor.cores", 4) \
    .config("spark.executor.memory", "8G") \
    .getOrCreate()

SedonaRegistrator.registerAll(spark)

# Φόρτωση δεδομένων
crime_data = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group27/Crime_Data/")
income_csv = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True, inferSchema=True)
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = SedonaContext.create(spark).read.format("geojson") \
    .option("multiLine", "true").load(geojson_path) \
    .selectExpr("explode(features) as features") \
    .select("features.*")

# Μορφοποίηση δεδομένων
census_2010 = blocks_df.select(
    [col(f"properties.{col_name}").alias(col_name) for col_name in
     blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]
).drop("properties").drop("type")

# Καθαρισμός των δεδομένων του εισοδήματος
income_csv = income_csv.withColumn(
    "Estimated Median Income",
    regexp_replace(regexp_replace(col("Estimated Median Income"), "\\$", ""), ",", "").cast("int")
).withColumn(
    "Zip Code",
    col("Zip Code").cast("int")
)
# Μετατροπή των στηλών που φιλτράρουμε σε αριθμητικό τύπο int.
census_2010 = census_2010.withColumn("POP_2010", col("POP_2010").cast("int")).withColumn("HOUSING10", col("HOUSING10").cast("int"))

# Φιλτράρισμα δεδομένων
crime_data = crime_data.filter(col("LAT").isNotNull() & col("LON").isNotNull() & ((col("LAT") != 0) & (col("LON") != 0)))
income_csv = income_csv.filter(col("Zip Code").isNotNull())
census_2010 = census_2010.filter((col("ZCTA10").isNotNull()) & (col("COMM").isNotNull()) & (col("POP_2010").isNotNull()) &
    (col("HOUSING10").isNotNull()) & ((col("POP_2010") > 0) & (col("HOUSING10") > 0)) |
    ((col("HOUSING10") == col("POP_2010")) & (col("HOUSING10") < col("POP_2010")))
)

start_rep = time.time()

# Join μεταξύ εισοδήματος και απογραφής
census_income_joined = census_2010.join(
    income_csv,
    census_2010["ZCTA10"] == income_csv["Zip Code"],
    "inner"
)

# Υπολογισμός συνολικού εισοδήματος ανά COMM = sum(housing*income/pop) / φορες εφαρμογής του sum
aggregated_income_data = census_income_joined.groupBy("COMM").agg(
    (sum((col("Estimated Median Income") * col("HOUSING10") / col("POP_2010")).cast("double")) /
     count("*")).alias("income_per_capita")
)


# Φόρτωση δεδομένων εγκλημάτων
crime_data_2015 = crime_data.filter(col("DATE OCC").contains("2015")) # Κραταμε μονο του 2015 crime_data
crime_data = crime_data_2015.withColumn(
    "geom",
    ST_Point("LON", "LAT")  # Δημιουργία γεωμετρικών σημείων από το LON και LAT
)

# Σύνδεση crime_data με census_2010 με spatial join
joined_df = crime_data.join(
    census_2010,
    ST_Within(crime_data["geom"], census_2010["geometry"]),
    "inner"
)



# Προσθήκη εισοδήματος
crime_with_income = joined_df.join(
    aggregated_income_data,
    "COMM",
    "inner"
)

# Εύρεση των 3 περιοχών με υψηλότερο και χαμηλότερο εισόδημα
top_income_areas = aggregated_income_data.orderBy(col("income_per_capita").desc()).limit(3)
low_income_areas = aggregated_income_data.orderBy(col("income_per_capita").asc()).limit(3)



# Φιλτράρισμα εγκλημάτων για τις περιοχές αυτές
crime_top_income = crime_with_income.join(top_income_areas, "COMM", "inner")
crime_low_income = crime_with_income.join(low_income_areas, "COMM", "inner")

# Φόρτωση του RE (Race and Ethnicity codes)
re_codes = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=False, inferSchema=True)
re_codes = re_codes.withColumnRenamed("_c1", "Victim Descent")

# Αντιστοίχιση φυλετικής καταγωγής
crime_top_income = crime_top_income.join(re_codes, crime_top_income["Vict Descent"] == re_codes["_c0"], "inner")
crime_low_income = crime_low_income.join(re_codes, crime_low_income["Vict Descent"] == re_codes["_c0"], "inner")

# Ομαδοποίηση ανά Victim Descent, μέτρηση, αύξουσα ταξινόμηση
crime_top_income = crime_top_income.groupBy("Victim Descent").agg(count("*").alias("Victim Number")).orderBy(col("Victim Number").desc())
print("Table for 3 regions with the highest per capita income")
crime_top_income.show(truncate=False)

print("Table for 3 regions with the lowest per capita income")
crime_low_income = crime_low_income.groupBy("Victim Descent").agg(count("*").alias("Victim Number")).orderBy(col("Victim Number").desc())
crime_low_income.show(truncate=False)

end_rep = time.time()
print(f"Execution time with 4cores/8GB memory: {end_rep - start_rep:.3f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Table for 3 regions with the highest per capita income
+----------------------+-------------+
|Victim Descent        |Victim Number|
+----------------------+-------------+
|White                 |252          |
|Other                 |77           |
|Hispanic/Latin/Mexican|23           |
|Black                 |13           |
|Other Asian           |12           |
|Unknown               |8            |
+----------------------+-------------+

Table for 3 regions with the lowest per capita income
+------------------------------+-------------+
|Victim Descent                |Victim Number|
+------------------------------+-------------+
|Hispanic/Latin/Mexican        |2413         |
|Black                         |638          |
|Other                         |111          |
|White                         |53           |
|Other Asian                   |6            |
|Unknown                       |3            |
|Filipino                      |1            |
|Korean                       

 # Query 5

#### 2executors × 4 cores/8GB memory

In [1]:
from pyspark.sql.functions import col, count, mean, lit, udf, desc, trim
from math import radians, sin, cos, sqrt, atan2
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
import time
# Ρύθμιση Spark Session για 1 core και 2 GB memory
spark = SparkSession.builder \
    .appName("1core/2 GB memory") \
    .config("spark.executor.instances", 2) \
    .config("spark.executor.cores", 4) \
    .config("spark.executor.memory", "8G") \
    .getOrCreate()

start = time.time()


# Φόρτωση δεδομένων
crime_data = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group27/Crime_Data/")
lapd_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv")

# Φιλτράρουμε για να αποκλείσουμε τη γραμμή που περιέχει τη λέξη "LOCATION" στη στήλη `_c4`
# Δηλαδή αφαιρούμε την 1η εγγραφη ώστε να έχουμε εγγραφες μόνο τα values.
lapd_data = lapd_data.filter(~col("_c4").contains("LOCATION"))

# Υπολογισμός απόστασης με Haversine Formula
def haversine(lat1, lon1, lat2, lon2):
    R = 6371.0  # Ακτίνα της Γης σε χιλιόμετρα
    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)
    a = sin(dlat / 2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    return R * c

# UDF για τον υπολογισμό απόστασης
haversine_udf = udf(haversine, DoubleType())

# Καθαρισμός των ονομάτων στηλών στο crime_data
crime_data = crime_data.select(
    [trim(col(c)).alias(c.strip()) for c in crime_data.columns]
)

# Φιλτράρισμα για έλεγχο NULL τιμών στις στήλες LAT και LON και (0,0)
crime_data = crime_data.filter(
    (col("LAT").isNotNull()) & (col("LON").isNotNull()) & ((col("LAT") !=0) & (col("LON") != 0))
)

lapd_data = lapd_data.filter(
    (col("_c0").isNotNull()) & (col("_c1").isNotNull()) & ((col("_c0") !=0) & (col("_c1") != 0))
)

lapd_data = lapd_data.withColumn("_c0", col("_c0").cast("double")).withColumn("_c1", col("_c1").cast("double"))
crime_data = crime_data.withColumn("LON", col("LON").cast("double")).withColumn("LAT", col("LAT").cast("double"))

crime_data = crime_data.withColumn("AREA", col("AREA").cast("int"))
lapd_data = lapd_data.withColumn("_c5", col("_c5").cast("int"))

# Join μεταξύ crime_data και LAPD
crime_with_lapd = crime_data.join(
    lapd_data,
    crime_data["AREA"] == lapd_data["_c5"],
    "inner"
)

# Υπολογισμός αποστάσεων
crime_with_lapd = crime_with_lapd.withColumn(
    "distance",
    haversine_udf(
        col("_c1"), col("_c0"),
        col("LAT"), col("LON")
    )
)

# Ομαδοποίηση ανά τμήμα και υπολογισμός αποστάσεων και αριθμού περιστατικών
closest_precinct = crime_with_lapd.groupBy("_c5", "_c4").agg(
    mean("distance").alias("average_distance"),  # Μέση απόσταση
    count("*").alias("incident_count")  # Αριθμός περιστατικών
)

# Μετονομασία της στήλης _c4 σε division
results = closest_precinct.select(
    col("_c4").alias("division"),
    col("average_distance"),
    col("incident_count")
)

# Ταξινόμηση αποτελεσμάτων κατά αριθμό περιστατικών σε φθίνουσα σειρά
results = results.orderBy(col("incident_count").desc())

# Εμφάνιση αποτελεσμάτων
results.show(truncate=False)

end = time.time()
print(f"Execution time with 2executors x 4 cores/8GB memory: {end - start:.3f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
64,application_1738075734771_0065,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------+------------------+--------------+
|division                         |average_distance  |incident_count|
+---------------------------------+------------------+--------------+
|7600 S. BROADWAY                 |2.696608257256237 |206784        |
|1546 MARTIN LUTHER KING JR. BLVD.|2.6995220098942236|192226        |
|12312 CULVER BLVD.               |3.876150507307626 |170903        |
|251 E. 6TH ST.                   |1.1216874261027905|166698        |
|11640 BURBANK BLVD.              |2.614233104134838 |164532        |
|145 W. 108TH ST.                 |2.126199835576711 |161051        |
|1358 N. WILCOX AVE.              |1.5539157186661017|150663        |
|3400 S. CENTRAL AVE.             |2.059420966487143 |148757        |
|1130 S. VERMONT AVE.             |1.8494716521262073|144962        |
|11121 N. SEPULVEDA BLVD.         |4.712272528026431 |143600        |
|3353 SAN FERNANDO RD.            |3.843277713528362 |142732        |
| 6240 SYLMAR AVE.  

#### 4executors × 2 cores/4GB memory

In [1]:
from pyspark.sql.functions import col, count, mean, lit, udf, desc, trim
from math import radians, sin, cos, sqrt, atan2
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
import time

# Ρύθμιση Spark Session για 1 core και 2 GB memory
spark = SparkSession.builder \
    .appName("4executors/2core/2 GB memory") \
    .config("spark.executor.instances", 4) \
    .config("spark.executor.cores", 2) \
    .config("spark.executor.memory", "4G") \
    .getOrCreate()

start = time.time()

# Φόρτωση δεδομένων
crime_data = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group27/Crime_Data/")
lapd_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv")

# Φιλτράρουμε για να αποκλείσουμε τη γραμμή που περιέχει τη λέξη "LOCATION" στη στήλη `_c4`
# Δηλαδή αφαιρούμε την 1η εγγραφη ώστε να έχουμε εγγραφες μόνο τα values.
lapd_data = lapd_data.filter(~col("_c4").contains("LOCATION"))

# Υπολογισμός απόστασης με Haversine Formula
def haversine(lat1, lon1, lat2, lon2):
    R = 6371.0  # Ακτίνα της Γης σε χιλιόμετρα
    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)
    a = sin(dlat / 2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    return R * c

# UDF για τον υπολογισμό απόστασης
haversine_udf = udf(haversine, DoubleType())

# Καθαρισμός των ονομάτων στηλών στο crime_data
crime_data = crime_data.select(
    [trim(col(c)).alias(c.strip()) for c in crime_data.columns]
)

# Φιλτράρισμα για έλεγχο NULL τιμών στις στήλες LAT και LON
crime_data = crime_data.filter(
    (col("LAT").isNotNull()) & (col("LON").isNotNull()) & ((col("LAT") !=0) & (col("LON") != 0))
)

lapd_data = lapd_data.filter(
    (col("_c0").isNotNull()) & (col("_c1").isNotNull()) & ((col("_c0") !=0) & (col("_c1") != 0))
)

lapd_data = lapd_data.withColumn("_c0", col("_c0").cast("double")).withColumn("_c1", col("_c1").cast("double"))
crime_data = crime_data.withColumn("LON", col("LON").cast("double")).withColumn("LAT", col("LAT").cast("double"))

crime_data = crime_data.withColumn("AREA", col("AREA").cast("int"))
lapd_data = lapd_data.withColumn("_c5", col("_c5").cast("int"))

# Join μεταξύ crime_data και LAPD
crime_with_lapd = crime_data.join(
    lapd_data,
    crime_data["AREA"] == lapd_data["_c5"],
    "inner"
)

# Υπολογισμός αποστάσεων
crime_with_lapd = crime_with_lapd.withColumn(
    "distance",
    haversine_udf(
        col("_c1"), col("_c0"),
        col("LAT"), col("LON")
    )
)

# Ομαδοποίηση ανά τμήμα και υπολογισμός αποστάσεων και αριθμού περιστατικών
closest_precinct = crime_with_lapd.groupBy("_c5", "_c4").agg(
    mean("distance").alias("average_distance"),  # Μέση απόσταση
    count("*").alias("incident_count")  # Αριθμός περιστατικών
)

# Μετονομασία της στήλης _c4 σε division
results = closest_precinct.select(
    col("_c4").alias("division"),
    col("average_distance"),
    col("incident_count")
)

# Ταξινόμηση αποτελεσμάτων κατά αριθμό περιστατικών σε φθίνουσα σειρά
results = results.orderBy(col("incident_count").desc())

# Εμφάνιση αποτελεσμάτων
results.show(truncate=False)

end = time.time()
print(f"Execution time with 4 executors, 2 cores/4GB memory: {end - start:.3f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
66,application_1738075734771_0067,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------+------------------+--------------+
|division                         |average_distance  |incident_count|
+---------------------------------+------------------+--------------+
|7600 S. BROADWAY                 |2.696608257256237 |206784        |
|1546 MARTIN LUTHER KING JR. BLVD.|2.6995220098942236|192226        |
|12312 CULVER BLVD.               |3.876150507307626 |170903        |
|251 E. 6TH ST.                   |1.1216874261027905|166698        |
|11640 BURBANK BLVD.              |2.614233104134838 |164532        |
|145 W. 108TH ST.                 |2.126199835576711 |161051        |
|1358 N. WILCOX AVE.              |1.5539157186661017|150663        |
|3400 S. CENTRAL AVE.             |2.059420966487143 |148757        |
|1130 S. VERMONT AVE.             |1.8494716521262073|144962        |
|11121 N. SEPULVEDA BLVD.         |4.712272528026431 |143600        |
|3353 SAN FERNANDO RD.            |3.843277713528362 |142732        |
| 6240 SYLMAR AVE.  

#### 8executors × 1 core/2 GB memory

In [1]:
from pyspark.sql.functions import col, count, mean, lit, udf, desc, trim
from math import radians, sin, cos, sqrt, atan2
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
import time

# Ρύθμιση Spark Session για 1 core και 2 GB memory
spark = SparkSession.builder \
    .appName("8executors/1core/2 GB memory") \
    .config("spark.executor.instances", 8) \
    .config("spark.executor.cores", 1) \
    .config("spark.executor.memory", "2G") \
    .getOrCreate()

start  = time.time()

# Φόρτωση δεδομένων
crime_data = spark.read.parquet("s3://groups-bucket-dblab-905418150721/group27/Crime_Data/")
lapd_data = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv")

# Φιλτράρουμε για να αποκλείσουμε τη γραμμή που περιέχει τη λέξη "LOCATION" στη στήλη `_c4`.
# Δηλαδή αφαιρούμε την 1η εγγραφη ώστε να έχουμε εγγραφες μόνο τα values.
lapd_data = lapd_data.filter(~col("_c4").contains("LOCATION"))

# Υπολογισμός απόστασης με Haversine Formula
def haversine(lat1, lon1, lat2, lon2):
    R = 6371.0  # Ακτίνα της Γης σε χιλιόμετρα
    dlat = radians(lat2 - lat1)
    dlon = radians(lon2 - lon1)
    a = sin(dlat / 2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    return R * c

# UDF για τον υπολογισμό απόστασης
haversine_udf = udf(haversine, DoubleType())

# Καθαρισμός των ονομάτων στηλών στο crime_data
crime_data = crime_data.select(
    [trim(col(c)).alias(c.strip()) for c in crime_data.columns]
)

# Φιλτράρισμα για έλεγχο NULL τιμών στις στήλες LAT και LON
crime_data = crime_data.filter(
    (col("LAT").isNotNull()) & (col("LON").isNotNull()) & ((col("LAT") !=0) & (col("LON") != 0))
)

lapd_data = lapd_data.filter(
    (col("_c0").isNotNull()) & (col("_c1").isNotNull()) & ((col("_c0") !=0) & (col("_c1") != 0))
)

lapd_data = lapd_data.withColumn("_c0", col("_c0").cast("double")).withColumn("_c1", col("_c1").cast("double"))
crime_data = crime_data.withColumn("LON", col("LON").cast("double")).withColumn("LAT", col("LAT").cast("double"))

crime_data = crime_data.withColumn("AREA", col("AREA").cast("int"))
lapd_data = lapd_data.withColumn("_c5", col("_c5").cast("int"))

# Join μεταξύ crime_data και LAPD
crime_with_lapd = crime_data.join(
    lapd_data,
    crime_data["AREA"] == lapd_data["_c5"],
    "inner"
)

# Υπολογισμός αποστάσεων
crime_with_lapd = crime_with_lapd.withColumn(
    "distance",
    haversine_udf(
        col("_c1"), col("_c0"),
        col("LAT"), col("LON")
    )
)

# Ομαδοποίηση ανά τμήμα και υπολογισμός αποστάσεων και αριθμού περιστατικών
closest_precinct = crime_with_lapd.groupBy("_c5", "_c4").agg(
    mean("distance").alias("average_distance"),  # Μέση απόσταση
    count("*").alias("incident_count")  # Αριθμός περιστατικών
)

# Μετονομασία της στήλης _c4 σε division
results = closest_precinct.select(
    col("_c4").alias("division"),
    col("average_distance"),
    col("incident_count")
)

# Ταξινόμηση αποτελεσμάτων κατά αριθμό περιστατικών σε φθίνουσα σειρά
results = results.orderBy(col("incident_count").desc())

# Εμφάνιση αποτελεσμάτων
results.show(truncate=False)

end = time.time()
print(f"Execution time with 8 executors, 1 core/2GB memory: {end - start:.3f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
68,application_1738075734771_0069,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------+------------------+--------------+
|division                         |average_distance  |incident_count|
+---------------------------------+------------------+--------------+
|7600 S. BROADWAY                 |2.696608257256237 |206784        |
|1546 MARTIN LUTHER KING JR. BLVD.|2.6995220098942236|192226        |
|12312 CULVER BLVD.               |3.876150507307626 |170903        |
|251 E. 6TH ST.                   |1.1216874261027905|166698        |
|11640 BURBANK BLVD.              |2.614233104134838 |164532        |
|145 W. 108TH ST.                 |2.126199835576711 |161051        |
|1358 N. WILCOX AVE.              |1.5539157186661017|150663        |
|3400 S. CENTRAL AVE.             |2.059420966487143 |148757        |
|1130 S. VERMONT AVE.             |1.8494716521262073|144962        |
|11121 N. SEPULVEDA BLVD.         |4.712272528026431 |143600        |
|3353 SAN FERNANDO RD.            |3.843277713528362 |142732        |
| 6240 SYLMAR AVE.  