In [13]:
from pyspark.sql import SparkSession, functions as F
from sedona.register.geo_registrator import SedonaRegistrator
from sedona.spark import SedonaContext
from pyspark.sql.types import StringType

# Initialize Spark Session and Sedona Context
spark = SparkSession.builder \
    .appName("CrimeAnalysisSpatialJoinSQL") \
    .config("spark.sql.extensions", "org.apache.sedona.sql.SedonaSqlExtensions") \
    .config("sedona.global.charset", "utf8") \
    .getOrCreate()

sedona = SedonaContext.create(spark)

# Load datasets
census_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
income_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv"
crime_path = "s3://groups-bucket-dblab-905418150721/group24/results/q2_parquet_maindata/"

# Load Census Data (GeoJSON format)
census_df = sedona.read.format("geojson") \
    .option("multiLine", "true").load(census_path) \
    .selectExpr("explode(features) as features") \
    .select("features.*")

# Flatten the GeoJSON structure and filter valid populations
census_df = census_df.select(
    F.col("properties.ZCTA10").alias("ZCTA10"),
    F.col("properties.POP_2010").alias("Population"),
    F.col("properties.COMM").alias("COMM"),
    F.col("geometry").alias("geometry"),
    F.col("properties.HOUSING10").alias("HOUSING10"),
    F.col("properties.CITY").alias("CITY"),
).filter((F.col("CITY") == "Los Angeles")& (F.col("Population") > 0) & (F.col("HOUSING10") > 0))

# # Update rows where Population = 0 and HOUSING10 > 0 to set HOUSING10 = 0
# census_df = census_df.withColumn(
#     "HOUSING10",
#     F.when((F.col("Population") == 0) & (F.col("HOUSING10") > 0), 0).otherwise(F.col("HOUSING10"))
# ).filter(F.col("Population") > 0)  # Filter rows where Population is greater than 0

# Load Crime Data (Parquet format)
crime_df = spark.read.parquet(crime_path)

# Create geometry column using ST_Point
crime_df = crime_df.withColumn("geometry", F.expr("ST_Point(LON, LAT)")) \
                   .select("geometry")

# Load Income Data (CSV format)
income_df = spark.read.csv(income_path, header=True, inferSchema=True)
income_df = income_df.withColumn(
    "Income",
    F.regexp_replace(F.col("Estimated Median Income"), "[$,]", "").cast("double")
).withColumnRenamed("Zip Code", "Zip").drop("Estimated Median Income") \
 .filter(F.col("Community").contains("Los Angeles"))

census_agg = census_df.groupBy("COMM").agg(
    F.sum("Population").alias("Total_Population"),
    F.sum("HOUSING10").alias("Total_Households")
)
# Calculate total income per community (sum of income contributions per zip)
income_total = census_df.join(income_df, census_df.ZCTA10 == income_df.Zip, "inner") \
    .groupBy("COMM").agg(
        F.sum(F.col("Income") * F.col("HOUSING10")).alias("Total_Income")
    )

# Join census and income data
census_income = census_agg.join(income_total, "COMM", "inner")

# Calculate Mean Income Per Person
census_income = census_income.withColumn(
    "Mean_Income_Per_Person",
    F.col("Total_Income") / F.col("Total_Population")
)

# Aggregate crime data
crime_agg = crime_df.alias("cr").join(
    census_df.alias("c"),
    F.expr("ST_Within(cr.geometry, c.geometry)"),
    "inner"
).groupBy("c.COMM").agg(
    F.count("*").alias("Total_Crimes")
)

# Final join for all data
final_result = census_income.join(crime_agg, "COMM") \
    .withColumn( #, "left_outer") \
        "Crime_Per_Person_Ratio",
        F.col("Total_Crimes") / F.col("Total_Population")
    )

# Replace NULL values with 0 in the columns "Total_Crimes" and "Crime_Per_Person_Ratio"
final_result = final_result.fillna({
    "Total_Crimes": 0,
    "Crime_Per_Person_Ratio": 0
})

# Display final results
final_result.select(
    "COMM",
    # "Total_Population",
    "Mean_Income_Per_Person",
    # "Total_Crimes",
    "Crime_Per_Person_Ratio",
    # "Total_Households"
).sort("COMM").show(140, truncate=False)


# # Broadcast Join Hint
# broadcast_result = census_income.hint("broadcast").join(crime_agg, "COMM", "left_outer")

# # Explain Broadcast Plan
# print("Broadcast Join Plan:")
# broadcast_result.explain()

# # Merge Join Hint
# merge_result = census_income.hint("merge").join(crime_agg, "COMM", "left_outer")

# # Explain Merge Plan
# print("Merge Join Plan:")
# merge_result.explain()

# # Shuffle Hash Join Hint
# shuffle_hash_result = census_income.hint("shuffle_hash").join(crime_agg, "COMM", "left_outer")

# # Explain Shuffle Hash Plan
# print("Shuffle Hash Join Plan:")
# shuffle_hash_result.explain()

# # Shuffle Replicate NL Join Hint
# shuffle_replicate_result = census_income.hint("shuffle_replicate_nl").join(crime_agg, "COMM", "left_outer")

# # Explain Shuffle Replicate NL Plan
# print("Shuffle Replicate NL Join Plan:")
# shuffle_replicate_result.explain()

# Count total rows in the final result
print("Total communities:", final_result.count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+----------------------+----------------------+
|COMM                   |Mean_Income_Per_Person|Crime_Per_Person_Ratio|
+-----------------------+----------------------+----------------------+
|Adams-Normandie        |8791.458301453711     |0.7148686559551135    |
|Alsace                 |11239.50119372442     |0.5416098226466576    |
|Angeles National Forest|33079.58823529412     |0.4117647058823529    |
|Angelino Heights       |18427.059814658805    |0.5732940185341197    |
|Arleta                 |12110.779170215355    |0.4264509064363061    |
|Atwater Village        |28481.236967160792    |0.5288318320448259    |
|Baldwin Hills          |17303.90612886328     |0.9950061114021302    |
|Bel Air                |62707.32126861155     |0.39922527539038855   |
|Beverly Crest          |22231.4086621278      |0.3689607087195472    |
|Beverlywood            |29267.8210229561      |0.5084977849375755    |
|Boyle Heights          |8494.108238050494     |0.61718873933788

In [None]:
# # Load Census Data (GeoJSON format)


# census_df1 = sedona.read.format("geojson") \
#     .option("multiLine", "true").load(census_path) \
#     .selectExpr("explode(features) as features") \
#     .select("features.*")
# census_df1 = census_df1.select(
#     F.col("properties.ZCTA10").alias("ZCTA10"),
#     F.col("properties.POP_2010").alias("Population"),
#     F.col("properties.COMM").alias("COMM"),
#     # F.col("geometry").alias("geometry"),
#     F.col("properties.HOUSING10").alias("HOUSING10"),
#     F.col("properties.CITY").alias("CITY"),
#     F.col("properties.CITYCOM").alias("CITYCOM"),
# ).filter(F.col("Population") == 0)  # Exclude zero or negative population
    
# # ).filter(F.col("Population") >= 0).filter(F.col("CITY") == "Unincorporated")  # Exclude zero or negative population
# # ).filter(F.col("COMM") == "Angeles National Forest")  # Exclude zero or negative population
# # ).filter((((F.col("HOUSING10") == 0) & (F.col("Population") != 0)) | \
# #          ((F.col("HOUSING10") != 0) & (F.col("Population") == 0))) & \
# #          (F.col("CITY") == "Los Angeles"))  # Exclude zero or negative population



# census_df1.show(200, truncate = False)
# census_df1.count()

In [3]:
# To evaluate the performance of the joins and decide the best-suited one, here’s a consolidated analysis of the **Shuffle Hash Join**, **Broadcast Hash Join**, and **Shuffle Replicate Nested Loop Join** from the physical plans you provided:

# ---

# ### **1. Shuffle Hash Join**:
# - **Description**: 
#   - The **Shuffle Hash Join** partitions the data on both sides of the join condition (`COMM`) and distributes it across the cluster. Each partition processes the join locally.
#   - It is typically used when both datasets are large but can still fit into memory after partitioning.

# - **Advantages**:
#   - Scales well for large datasets.
#   - Ensures distributed parallelism for performance.

# - **Disadvantages**:
#   - Shuffle operations are expensive in terms of network I/O.
#   - Performance can degrade significantly if the data skew exists (i.e., when certain `COMM` values have significantly more rows than others).

# - **Best Suited**: 
#   - When both datasets are large and can be efficiently partitioned on the join key.
#   - Works well for **equi-joins**.

# ---

# ### **2. Broadcast Hash Join**:
# - **Description**: 
#   - The **Broadcast Hash Join** replicates the smaller dataset across all worker nodes. The larger dataset is then scanned, and join conditions are evaluated using the replicated smaller dataset.

# - **Advantages**:
#   - Extremely fast for joins where one dataset is small enough to fit in memory.
#   - Avoids the shuffle cost for the smaller dataset.

# - **Disadvantages**:
#   - Memory-intensive, as the smaller dataset must be broadcast to all nodes.
#   - Not suitable if the smaller dataset cannot fit into memory.

# - **Best Suited**: 
#   - When one dataset is much smaller than the other (e.g., a lookup table or filtered spatial data).
#   - Effective for **equi-joins** when broadcast conditions are met.

# ---

# ### **3. Shuffle Replicate Nested Loop Join (NL Join)**:
# - **Description**: 
#   - The **Shuffle Replicate NL Join** replicates one dataset across all partitions of the other and evaluates the join condition for all record pairs. It is typically used for **non-equi joins** or spatial joins involving complex predicates (e.g., `WITHIN`).

# - **Advantages**:
#   - Handles complex, non-equi join conditions like spatial predicates efficiently.
#   - Can process joins when no other join types are applicable.

# - **Disadvantages**:
#   - Extremely expensive due to the Cartesian product nature of nested loops.
#   - Requires significant memory and processing power, especially for large datasets.

# - **Best Suited**:
#   - When the join condition involves non-equi predicates (e.g., spatial predicates like `WITHIN` or `OVERLAPS`).
#   - For datasets where one side can be replicated without memory issues.

# ---

# ### **Consolidated Comparison**:
# | Join Type                   | Best for                   | Limitations                            | Cost Efficiency |
# |-----------------------------|----------------------------|----------------------------------------|-----------------|
# | **Shuffle Hash Join**       | Large datasets, equi-joins | Expensive shuffle, sensitive to skew  | Moderate        |
# | **Broadcast Hash Join**     | One small, one large set   | Memory constraints, large broadcasts   | High (if feasible) |
# | **Shuffle Replicate NL Join** | Non-equi or spatial joins | Extremely costly for large datasets    | Low (except spatial) |

# ---

# ### **Which is the Best?**
# 1. **For Equi-Joins**:
#    - If both datasets are large: **Shuffle Hash Join**.
#    - If one dataset is small: **Broadcast Hash Join**.

# 2. **For Spatial Joins** (non-equi predicates like `WITHIN`):
#    - **Shuffle Replicate NL Join** is necessary but should be optimized by:
#      - Ensuring the replicated dataset is as small as possible.
#      - Using spatial indexes or specialized libraries (e.g., Sedona) to reduce computation costs.

# ---

# Given the dataset sizes

# 1. **Income Dataset**: ~13KB (very small).
# 2. **Census Dataset**: ~184MB (moderately large).
# 3. **Crime Dataset**: ~500MB (large, split into two parts).

# Here’s how this impacts the choice of join strategies for the different datasets:

# ---

# ### **Analysis of Joins**

# #### **1. Broadcast Hash Join**:
# - **Best Scenario**:
#   - **Broadcast the Income Dataset** (~13KB) as it is very small and fits easily in memory.
#   - This avoids shuffling the larger Census or Crime datasets.
# - **Usage**:
#   - Use Broadcast Hash Join for the join between the **Income** dataset and the **Census** dataset.
#   - Suitable for the equi-join on `COMM` or similar simple keys.

# #### **2. Shuffle Hash Join**:
# - **Best Scenario**:
#   - When joining **Census** (~184MB) with **Crime** (~500MB).
#   - These datasets are too large to broadcast, so shuffling and partitioning on the join key (`COMM`) ensures distributed computation.
# - **Usage**:
#   - Apply for joins between the larger datasets (Census and Crime) where an equi-join is possible.

# #### **3. Shuffle Replicate NL Join**:
# - **Best Scenario**:
#   - Necessary for spatial joins like the **WITHIN** operation between **Census** and **Crime** datasets.
#   - Since spatial joins typically involve non-equi conditions, NL Join is required.
# - **Optimizations**:
#   - Reduce the dataset size before the join:
#     - Use spatial filtering or pre-aggregation to limit the number of rows in the Census or Crime datasets.
#     - Partition the Crime data intelligently to minimize replication overhead.

# ---

# ### **Suggested Join Strategy**

# 1. **Income Join (Income + Census)**:
#    - Use a **Broadcast Hash Join**.
#    - Broadcast the Income dataset (13KB) and join with the Census dataset (184MB).
#    - This ensures minimal shuffle and high performance.

# 2. **Census and Crime Data Join**:
#    - If it's a **spatial join** (e.g., `WITHIN`), use **Shuffle Replicate NL Join**:
#      - Optimize by pre-aggregating or filtering both datasets to reduce their sizes.
#      - Use libraries like **Sedona** for spatial indexing and faster computation.
#    - If it's an **equi-join** on `COMM` or similar, use **Shuffle Hash Join**:
#      - Partition both datasets on the join key to avoid excessive shuffling.

# 3. **Final Join (All Data Combined)**:
#    - Perform the joins in stages, starting with smaller datasets (Income + Census), then join the resulting dataset with the larger Crime dataset.
#    - This hierarchical join approach minimizes intermediate data size and computational overhead.

# ---

# ### **Conclusion**

# - **Primary Strategy**: Use **Broadcast Hash Join** wherever possible (Income + Census), as the Income dataset is very small.
# - **Fallback Strategy**: Use **Shuffle Hash Join** for larger datasets (Census + Crime).
# - **Spatial Joins**: For spatial operations (`WITHIN`), optimize the **Shuffle Replicate NL Join** by reducing data size or using spatial indexes.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…