# Energy Grid Mapping with Apache Sedona on AWS Glue

This notebook demonstrates how to use Apache Sedona for distributed geospatial analysis of energy grid infrastructure in AWS Glue.

## Topics Covered:
1. Sedona setup and configuration in AWS Glue
2. Creating synthetic power grid infrastructure data
3. Spatial operations: distance calculations, buffers, spatial joins
4. Advanced analysis: coverage areas, transmission corridors, gap analysis
5. Performance optimization with spatial indexing
6. Scaling demonstrations
7. Exporting geospatial data to S3

In [None]:
# Glue Environment Configuration
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 2

# Sedona (for Spark 3.1 / Glue 4.0)
%extra_jars s3://YOUR-BUCKET-NAME/sedona-spark-shaded-3.3_2.12-1.7.0.jar,s3://YOUR-BUCKET-NAME/geotools-wrapper-1.7.1-28.5.jar
%additional_python_modules apache-sedona==1.7.0

## 1. Initialize Sedona Context

In [None]:
## Initialize Sedona Contextfrom sedona.spark import SedonaContext
# Import required libraries
from sedona.spark import SedonaContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import numpy as np

# Initialize Spark session and Sedona context
spark = SparkSession.builder.getOrCreate()
sedona = SedonaContext.create(spark)

In [2]:
# Configure Sedona using spark.conf.set()
spark.conf.set("sedona.global.charset", "utf8")
spark.conf.set("sedona.global.index", "quadtree")

# Test Sedona installation
print("Testing Sedona installation...")
test_df = sedona.sql("SELECT ST_Point(1.0, 2.0) AS geom")
test_df.show(truncate=False)
print(f"✓ Sedona initialized successfully!")
print(f"✓ Spark version: {spark.version}")
print(f"✓ Available workers: {sedona.sparkContext.defaultParallelism}")

Testing Sedona installation...
+-----------+
|geom       |
+-----------+
|POINT (1 2)|
+-----------+

✓ Sedona initialized successfully!
✓ Spark version: 3.3.0-amzn-1
✓ Available workers: 4


## 2. Create Synthetic Energy Grid Infrastructure

We'll create a realistic power grid dataset including:
- Power generation facilities (coal, solar, wind, natural gas, hydro, nuclear)
- Substations with different voltage classes
- Transmission lines connecting the infrastructure
- Demand points (cities and industrial zones)

In [8]:
# Define study area bounds (Texas-sized region)
study_area_bounds = {
    'min_lon': -100.0,
    'max_lon': -94.0,
    'min_lat': 29.0,
    'max_lat': 34.0
}

# Create study area polygon using Sedona SQL
study_area_wkt = f"POLYGON(({study_area_bounds['min_lon']} {study_area_bounds['min_lat']}, " \
                 f"{study_area_bounds['max_lon']} {study_area_bounds['min_lat']}, " \
                 f"{study_area_bounds['max_lon']} {study_area_bounds['max_lat']}, " \
                 f"{study_area_bounds['min_lon']} {study_area_bounds['max_lat']}, " \
                 f"{study_area_bounds['min_lon']} {study_area_bounds['min_lat']}))"

study_area_df = sedona.sql(f"""
    SELECT 
        'Study Area' AS name,
        ST_GeomFromWKT('{study_area_wkt}') AS geometry
""")

# Register as temp view FIRST before querying
study_area_df.createOrReplaceTempView("study_area_view")

# Now calculate area (convert to meters using Web Mercator approximation)
study_area_with_area = sedona.sql("""
    SELECT 
        name,
        geometry,
        ST_Area(ST_Transform(geometry, 'EPSG:4326', 'EPSG:3857')) / 1000000 AS area_km2
    FROM study_area_view
""")

print("Study Area Created:")
study_area_with_area.show(truncate=False)

Study Area Created:
+----------+-----------------------------------------------------+------------------+
|name      |geometry                                             |area_km2          |
+----------+-----------------------------------------------------+------------------+
|Study Area|POLYGON ((-100 29, -94 29, -94 34, -100 34, -100 29))|436253.95410384226|
+----------+-----------------------------------------------------+------------------+


In [9]:
# Create power plants data
power_plants_data = [
    ('Coal Creek Power Station', 'Coal', 800, -99.5, 31.5),
    ('Solar Valley Array', 'Solar', 150, -96.5, 30.2),
    ('Wind River Farm', 'Wind', 300, -95.2, 32.8),
    ('Natural Gas Plant 1', 'Natural Gas', 500, -97.8, 29.8),
    ('Hydroelectric Dam', 'Hydro', 200, -98.5, 33.2),
    ('Nuclear Power Station', 'Nuclear', 1200, -96.0, 31.8),
    ('Solar Farm East', 'Solar', 200, -94.8, 31.0),
    ('Wind Farm North', 'Wind', 350, -99.0, 33.8),
    ('Natural Gas Plant 2', 'Natural Gas', 450, -96.8, 30.5)
]

# Create DataFrame
power_plants_df = spark.createDataFrame(
    power_plants_data,
    ['name', 'type', 'capacity_mw', 'lon', 'lat']
)

# Add geometry using Sedona
power_plants_df.createOrReplaceTempView("power_plants_temp")
power_plants_geo = sedona.sql("""
    SELECT 
        name,
        type,
        capacity_mw,
        lon,
        lat,
        ST_Point(CAST(lon AS Decimal(24,20)), CAST(lat AS Decimal(24,20))) AS geometry
    FROM power_plants_temp
""")

power_plants_geo.createOrReplaceTempView("power_plants")

print("Power Generation Facilities:")
power_plants_geo.show(truncate=False)

# Calculate total capacity
total_capacity = power_plants_geo.selectExpr("SUM(capacity_mw) as total_mw").collect()[0]['total_mw']
print(f"\nTotal Generation Capacity: {total_capacity} MW")

# Capacity by type
capacity_by_type = sedona.sql("""
    SELECT type, SUM(capacity_mw) as total_mw, COUNT(*) as count
    FROM power_plants
    GROUP BY type
    ORDER BY total_mw DESC
""")
print("\nCapacity by Type:")
capacity_by_type.show(truncate=False)

Power Generation Facilities:
+------------------------+-----------+-----------+-----+----+------------------+
|name                    |type       |capacity_mw|lon  |lat |geometry          |
+------------------------+-----------+-----------+-----+----+------------------+
|Coal Creek Power Station|Coal       |800        |-99.5|31.5|POINT (-99.5 31.5)|
|Solar Valley Array      |Solar      |150        |-96.5|30.2|POINT (-96.5 30.2)|
|Wind River Farm         |Wind       |300        |-95.2|32.8|POINT (-95.2 32.8)|
|Natural Gas Plant 1     |Natural Gas|500        |-97.8|29.8|POINT (-97.8 29.8)|
|Hydroelectric Dam       |Hydro      |200        |-98.5|33.2|POINT (-98.5 33.2)|
|Nuclear Power Station   |Nuclear    |1200       |-96.0|31.8|POINT (-96 31.8)  |
|Solar Farm East         |Solar      |200        |-94.8|31.0|POINT (-94.8 31)  |
|Wind Farm North         |Wind       |350        |-99.0|33.8|POINT (-99 33.8)  |
|Natural Gas Plant 2     |Natural Gas|450        |-96.8|30.5|POINT (-96.8 30.5)|

### 2.2 Substations

In [11]:
# Generate substations distributed across the study area
np.random.seed(42)
n_substations = 25

substation_ids = [f'SUB-{i+1:03d}' for i in range(n_substations)]
# Convert numpy types to Python native types for Spark
substation_voltages = [int(v) for v in np.random.choice([138, 230, 345, 500], n_substations)]
substation_lons = [float(lon) for lon in np.random.uniform(
    study_area_bounds['min_lon'], 
    study_area_bounds['max_lon'], 
    n_substations
)]
substation_lats = [float(lat) for lat in np.random.uniform(
    study_area_bounds['min_lat'], 
    study_area_bounds['max_lat'], 
    n_substations
)]

substations_data = list(zip(substation_ids, substation_voltages, substation_lons, substation_lats))

# Create DataFrame
substations_df = spark.createDataFrame(
    substations_data,
    ['id', 'voltage_kv', 'lon', 'lat']
)

# Add geometry
substations_df.createOrReplaceTempView("substations_temp")
substations_geo = sedona.sql("""
    SELECT 
        id,
        voltage_kv,
        lon,
        lat,
        ST_Point(CAST(lon AS Decimal(24,20)), CAST(lat AS Decimal(24,20))) AS geometry
    FROM substations_temp
""")

substations_geo.createOrReplaceTempView("substations")

print(f"Created {n_substations} substations")
print("\nVoltage Class Distribution:")
sedona.sql("""
    SELECT voltage_kv, COUNT(*) as count
    FROM substations
    GROUP BY voltage_kv
    ORDER BY voltage_kv
""").show()

print("\nSample substations:")
substations_geo.show(5, truncate=False)

Created 25 substations

Voltage Class Distribution:
+----------+-----+
|voltage_kv|count|
+----------+-----+
|       138|    5|
|       230|    3|
|       345|    9|
|       500|    8|
+----------+-----+


Sample substations:
+-------+----------+------------------+------------------+---------------------------------------------+
|id     |voltage_kv|lon               |lat               |geometry                                     |
+-------+----------+------------------+------------------+---------------------------------------------+
|SUB-001|345       |-94.3686837459055 |30.154469128110744|POINT (-94.3686837459055 30.154469128110744) |
|SUB-002|500       |-99.99532740495391|30.205127330130058|POINT (-99.99532740495391 30.205127330130058)|
|SUB-003|138       |-94.04673064425269|32.41631759412729 |POINT (-94.04673064425269 32.41631759412729) |
|SUB-004|345       |-96.2951109422337 |32.049983288913104|POINT (-96.2951109422337 32.049983288913104) |
|SUB-005|345       |-96.33008103707031|

### 2.3 Transmission Lines

In [12]:
# Create transmission lines connecting power plants to substations
# We'll use a cross join with distance calculation to find nearest connections

transmission_lines = sedona.sql("""
    WITH plant_substation_distances AS (
        SELECT 
            p.name as from_name,
            'Plant' as from_type,
            s.id as to_name,
            'Substation' as to_type,
            s.voltage_kv,
            p.geometry as from_geom,
            s.geometry as to_geom,
            ST_Distance(
                ST_Transform(p.geometry, 'EPSG:4326', 'EPSG:3857'),
                ST_Transform(s.geometry, 'EPSG:4326', 'EPSG:3857')
            ) / 1000 as distance_km,
            ROW_NUMBER() OVER (PARTITION BY p.name ORDER BY ST_Distance(p.geometry, s.geometry)) as rank
        FROM power_plants p
        CROSS JOIN substations s
    ),
    plant_connections AS (
        SELECT 
            from_name,
            from_type,
            to_name,
            to_type,
            voltage_kv,
            ST_MakeLine(from_geom, to_geom) as geometry,
            distance_km
        FROM plant_substation_distances
        WHERE rank <= 3  -- Connect each plant to 3 nearest substations
    ),
    substation_connections AS (
        SELECT 
            s1.id as from_name,
            'Substation' as from_type,
            s2.id as to_name,
            'Substation' as to_type,
            s1.voltage_kv,
            ST_MakeLine(s1.geometry, s2.geometry) as geometry,
            ST_Distance(
                ST_Transform(s1.geometry, 'EPSG:4326', 'EPSG:3857'),
                ST_Transform(s2.geometry, 'EPSG:4326', 'EPSG:3857')
            ) / 1000 as distance_km
        FROM substations s1
        CROSS JOIN substations s2
        WHERE s1.id < s2.id  -- Avoid duplicates
            AND ST_Distance(s1.geometry, s2.geometry) < 2.0  -- Only connect nearby substations
            AND RAND() > 0.5  -- Randomly connect about half
    )
    SELECT * FROM plant_connections
    UNION ALL
    SELECT * FROM substation_connections
""")

transmission_lines.createOrReplaceTempView("transmission_lines")

print("Transmission Lines Created:")
print(f"Total lines: {transmission_lines.count()}")
print("\nStatistics:")
transmission_lines.selectExpr(
    "COUNT(*) as total_lines",
    "ROUND(SUM(distance_km), 2) as total_length_km",
    "ROUND(AVG(distance_km), 2) as avg_length_km",
    "ROUND(MIN(distance_km), 2) as min_length_km",
    "ROUND(MAX(distance_km), 2) as max_length_km"
).show(truncate=False)

print("\nLines by type:")
sedona.sql("""
    SELECT from_type, to_type, COUNT(*) as count
    FROM transmission_lines
    GROUP BY from_type, to_type
""").show()

Transmission Lines Created:
Total lines: 56

Statistics:
+-----------+---------------+-------------+-------------+-------------+
|total_lines|total_length_km|avg_length_km|min_length_km|max_length_km|
+-----------+---------------+-------------+-------------+-------------+
|56         |7363.76        |131.5        |12.46        |225.06       |
+-----------+---------------+-------------+-------------+-------------+


Lines by type:
+----------+----------+-----+
| from_type|   to_type|count|
+----------+----------+-----+
|     Plant|Substation|   27|
|Substation|Substation|   29|
+----------+----------+-----+


### 2.4 Demand Points (Cities and Industrial Zones)

In [13]:
# Create demand points (cities and industrial zones)
demand_points_data = [
    ('City A', 'City', 150, -98.2, 30.5),
    ('City B', 'City', 200, -96.8, 32.1),
    ('City C', 'City', 100, -95.5, 30.8),
    ('Industrial Zone 1', 'Industrial', 300, -99.2, 32.5),
    ('City D', 'City', 180, -97.5, 31.2),
    ('Industrial Zone 2', 'Industrial', 250, -95.8, 33.0),
    ('City E', 'City', 120, -98.8, 29.5)
]

demand_points_df = spark.createDataFrame(
    demand_points_data,
    ['name', 'type', 'demand_mw', 'lon', 'lat']
)

# Add geometry
demand_points_df.createOrReplaceTempView("demand_points_temp")
demand_points_geo = sedona.sql("""
    SELECT 
        name,
        type,
        demand_mw,
        lon,
        lat,
        ST_Point(CAST(lon AS Decimal(24,20)), CAST(lat AS Decimal(24,20))) AS geometry
    FROM demand_points_temp
""")

demand_points_geo.createOrReplaceTempView("demand_points")

print("Demand Points Created:")
demand_points_geo.show(truncate=False)

total_demand = demand_points_geo.selectExpr("SUM(demand_mw) as total_mw").collect()[0]['total_mw']
print(f"\nTotal Demand: {total_demand} MW")

Demand Points Created:
+-----------------+----------+---------+-----+----+------------------+
|name             |type      |demand_mw|lon  |lat |geometry          |
+-----------------+----------+---------+-----+----+------------------+
|City A           |City      |150      |-98.2|30.5|POINT (-98.2 30.5)|
|City B           |City      |200      |-96.8|32.1|POINT (-96.8 32.1)|
|City C           |City      |100      |-95.5|30.8|POINT (-95.5 30.8)|
|Industrial Zone 1|Industrial|300      |-99.2|32.5|POINT (-99.2 32.5)|
|City D           |City      |180      |-97.5|31.2|POINT (-97.5 31.2)|
|Industrial Zone 2|Industrial|250      |-95.8|33.0|POINT (-95.8 33)  |
|City E           |City      |120      |-98.8|29.5|POINT (-98.8 29.5)|
+-----------------+----------+---------+-----+----+------------------+


Total Demand: 1300 MW


## 3. Basic Spatial Operations

### 3.1 Distance Calculations - Find Nearest Substation to Each Demand Point

In [14]:
# Find nearest substation for each demand point using spatial join
nearest_substations = sedona.sql("""
    WITH distances AS (
        SELECT 
            d.name as demand_point,
            d.type as demand_type,
            d.demand_mw,
            s.id as nearest_substation,
            s.voltage_kv,
            ST_Distance(
                ST_Transform(d.geometry, 'EPSG:4326', 'EPSG:3857'),
                ST_Transform(s.geometry, 'EPSG:4326', 'EPSG:3857')
            ) / 1000 as distance_km,
            ROW_NUMBER() OVER (PARTITION BY d.name ORDER BY ST_Distance(d.geometry, s.geometry)) as rank
        FROM demand_points d
        CROSS JOIN substations s
    )
    SELECT 
        demand_point,
        demand_type,
        demand_mw,
        nearest_substation,
        voltage_kv,
        ROUND(distance_km, 2) as distance_km
    FROM distances
    WHERE rank = 1
    ORDER BY distance_km DESC
""")

print("Nearest Substation Analysis:")
nearest_substations.show(truncate=False)

# Calculate statistics
avg_distance = nearest_substations.selectExpr("AVG(distance_km) as avg_km").collect()[0]['avg_km']
max_distance = nearest_substations.selectExpr("MAX(distance_km) as max_km").collect()[0]['max_km']
print(f"\nAverage distance to nearest substation: {avg_distance:.2f} km")
print(f"Maximum distance to nearest substation: {max_distance:.2f} km")

Nearest Substation Analysis:
+-----------------+-----------+---------+------------------+----------+-----------+
|demand_point     |demand_type|demand_mw|nearest_substation|voltage_kv|distance_km|
+-----------------+-----------+---------+------------------+----------+-----------+
|City A           |City       |150      |SUB-015           |500       |111.65     |
|Industrial Zone 1|Industrial |300      |SUB-012           |345       |109.45     |
|City E           |City       |120      |SUB-013           |345       |85.21      |
|Industrial Zone 2|Industrial |250      |SUB-014           |345       |61.33      |
|City B           |City       |200      |SUB-004           |345       |56.59      |
|City C           |City       |100      |SUB-019           |500       |51.34      |
|City D           |City       |180      |SUB-015           |500       |23.71      |
+-----------------+-----------+---------+------------------+----------+-----------+


Average distance to nearest substation: 71.33

### 3.2 Service Areas - Create Buffers Around Substations

In [15]:
# Create service areas (buffers) around substations
# Service radius varies by voltage: higher voltage = larger service area
service_areas = sedona.sql("""
    SELECT 
        id,
        voltage_kv,
        CASE 
            WHEN voltage_kv >= 500 THEN 75000  -- 75 km for 500 kV
            WHEN voltage_kv >= 345 THEN 60000  -- 60 km for 345 kV
            WHEN voltage_kv >= 230 THEN 50000  -- 50 km for 230 kV
            ELSE 40000  -- 40 km for 138 kV
        END as service_radius_m,
        geometry as original_geom,
        ST_Buffer(
            ST_Transform(geometry, 'EPSG:4326', 'EPSG:3857'),
            CASE 
                WHEN voltage_kv >= 500 THEN 75000
                WHEN voltage_kv >= 345 THEN 60000
                WHEN voltage_kv >= 230 THEN 50000
                ELSE 40000
            END
        ) as service_area_geom
    FROM substations
""")

service_areas.createOrReplaceTempView("service_areas")

# Calculate total service area (with overlaps)
total_service_area = sedona.sql("""
    SELECT 
        SUM(ST_Area(service_area_geom) / 1000000) as total_area_km2
    FROM service_areas
""").collect()[0]['total_area_km2']

print(f"Service Areas Created:")
print(f"Total service area (with overlaps): {total_service_area:,.0f} km²")

# Show service area breakdown by voltage class
sedona.sql("""
    SELECT 
        voltage_kv,
        COUNT(*) as num_substations,
        service_radius_m / 1000 as service_radius_km,
        ROUND(SUM(ST_Area(service_area_geom) / 1000000), 2) as total_area_km2
    FROM service_areas
    GROUP BY voltage_kv, service_radius_m
    ORDER BY voltage_kv DESC
""").show(truncate=False)


Service Areas Created:
Total service area (with overlaps): 289,982 km²
+----------+---------------+-----------------+--------------+
|voltage_kv|num_substations|service_radius_km|total_area_km2|
+----------+---------------+-----------------+--------------+
|500       |8              |75.0             |140465.03     |
|345       |9              |60.0             |101134.82     |
|230       |3              |50.0             |23410.84      |
|138       |5              |40.0             |24971.56      |
+----------+---------------+-----------------+--------------+


### 3.3 Point-in-Polygon Analysis - Which Demand Points are Covered?

In [16]:
# Check which demand points fall within service areas using ST_Within
demand_coverage = sedona.sql("""
    SELECT 
        d.name,
        d.type,
        d.demand_mw,
        COUNT(DISTINCT s.id) as num_covering_substations,
        COLLECT_LIST(s.id) as covering_substations
    FROM demand_points d
    LEFT JOIN service_areas s 
        ON ST_Within(
            ST_Transform(d.geometry, 'EPSG:4326', 'EPSG:3857'),
            s.service_area_geom
        )
    GROUP BY d.name, d.type, d.demand_mw
    ORDER BY num_covering_substations DESC
""")

print("Demand Point Coverage Analysis:")
demand_coverage.show(truncate=False)

# Calculate coverage statistics
covered_points = demand_coverage.filter("num_covering_substations > 0").count()
total_points = demand_coverage.count()
coverage_pct = (covered_points / total_points) * 100

print(f"\nCoverage Statistics:")
print(f"Demand points covered: {covered_points} / {total_points} ({coverage_pct:.1f}%)")
print(f"Demand points with redundant coverage (2+ substations): {demand_coverage.filter('num_covering_substations >= 2').count()}")


Demand Point Coverage Analysis:
+-----------------+----------+---------+------------------------+--------------------+
|name             |type      |demand_mw|num_covering_substations|covering_substations|
+-----------------+----------+---------+------------------------+--------------------+
|City B           |City      |200      |1                       |[SUB-004]           |
|City C           |City      |100      |1                       |[SUB-019]           |
|City D           |City      |180      |1                       |[SUB-015]           |
|City A           |City      |150      |0                       |[]                  |
|Industrial Zone 2|Industrial|250      |0                       |[]                  |
|Industrial Zone 1|Industrial|300      |0                       |[]                  |
|City E           |City      |120      |0                       |[]                  |
+-----------------+----------+---------+------------------------+--------------------+


Coverage 

## 4. Advanced Spatial Analysis

### 4.1 Transmission Corridors - Buffers Around Transmission Lines

In [17]:
# Create transmission corridors with varying widths based on voltage
transmission_corridors = sedona.sql("""
    SELECT 
        from_name,
        to_name,
        voltage_kv,
        CASE 
            WHEN voltage_kv >= 500 THEN 500  -- 500m for 500kV lines
            WHEN voltage_kv >= 345 THEN 400  -- 400m for 345kV lines
            WHEN voltage_kv >= 230 THEN 300  -- 300m for 230kV lines
            ELSE 200  -- 200m for lower voltage
        END as corridor_width_m,
        geometry as line_geom,
        ST_Buffer(
            ST_Transform(geometry, 'EPSG:4326', 'EPSG:3857'),
            CASE 
                WHEN voltage_kv >= 500 THEN 500
                WHEN voltage_kv >= 345 THEN 400
                WHEN voltage_kv >= 230 THEN 300
                ELSE 200
            END
        ) as corridor_geom
    FROM transmission_lines
""")

transmission_corridors.createOrReplaceTempView("transmission_corridors")

# Calculate total corridor area
corridor_stats = sedona.sql("""
    SELECT 
        COUNT(*) as num_corridors,
        ROUND(AVG(corridor_width_m), 0) as avg_width_m,
        ROUND(SUM(ST_Area(corridor_geom) / 1000000), 2) as total_area_km2
    FROM transmission_corridors
""")

print("Transmission Corridor Analysis:")
corridor_stats.show(truncate=False)

# Breakdown by voltage class
sedona.sql("""
    SELECT 
        voltage_kv,
        corridor_width_m,
        COUNT(*) as num_lines,
        ROUND(SUM(ST_Area(corridor_geom) / 1000000), 2) as total_corridor_area_km2
    FROM transmission_corridors
    GROUP BY voltage_kv, corridor_width_m
    ORDER BY voltage_kv DESC
""").show(truncate=False)

Transmission Corridor Analysis:
+-------------+-----------+--------------+
|num_corridors|avg_width_m|total_area_km2|
+-------------+-----------+--------------+
|56           |398.0      |6087.47       |
+-------------+-----------+--------------+

+----------+----------------+---------+-----------------------+
|voltage_kv|corridor_width_m|num_lines|total_corridor_area_km2|
+----------+----------------+---------+-----------------------+
|500       |500             |17       |2367.46                |
|345       |400             |28       |3337.09                |
|230       |300             |4        |117.06                 |
|138       |200             |7        |265.85                 |
+----------+----------------+---------+-----------------------+


### 4.2 Coverage Gap Analysis

In [None]:
# Calculate coverage gaps using ST_Union
# First, union all service areas to get total coverage (this removes overlaps)
union_coverage = sedona.sql("""
    SELECT ST_Union_Aggr(service_area_geom) as total_coverage
    FROM service_areas
""")

union_coverage.createOrReplaceTempView("union_coverage")

# Calculate coverage statistics - the gap should equal study_area - union_coverage
coverage_stats = sedona.sql("""
    SELECT 
        ST_Area(ST_Transform(s.geometry, 'EPSG:4326', 'EPSG:3857')) / 1000000 as study_area_km2,
        ST_Area(c.total_coverage) / 1000000 as union_coverage_km2
    FROM study_area_view s
    CROSS JOIN union_coverage c
""")

coverage_stats.createOrReplaceTempView("coverage_stats")

# Calculate gap as simple subtraction
gap_analysis = sedona.sql("""
    SELECT 
        ROUND(study_area_km2, 2) as study_area_km2,
        ROUND(union_coverage_km2, 2) as actual_coverage_km2,
        ROUND(study_area_km2 - union_coverage_km2, 2) as gap_area_km2,
        ROUND((union_coverage_km2 / study_area_km2) * 100, 1) as coverage_pct,
        ROUND(((study_area_km2 - union_coverage_km2) / study_area_km2) * 100, 1) as gap_pct
    FROM coverage_stats
""")

print("Coverage Gap Analysis:")
print("Note: Percentages based on union of service areas (overlaps removed)")
print("      Coverage % + Gap % should equal 100%")
gap_analysis.show(truncate=False)

# Verify the math
verification = sedona.sql("""
    SELECT 
        ROUND((union_coverage_km2 / study_area_km2) * 100, 1) + 
        ROUND(((study_area_km2 - union_coverage_km2) / study_area_km2) * 100, 1) as total_pct
    FROM coverage_stats
""")
print("\nVerification (should be 100.0%):")
verification.show(truncate=False)

# Show overlap statistics
overlap_stats = sedona.sql("""
    SELECT 
        COUNT(*) as num_substations,
        ROUND(SUM(ST_Area(service_area_geom) / 1000000), 2) as total_individual_areas_km2,
        ROUND((SELECT union_coverage_km2 FROM coverage_stats), 2) as actual_coverage_km2,
        ROUND(SUM(ST_Area(service_area_geom) / 1000000) - (SELECT union_coverage_km2 FROM coverage_stats), 2) as overlap_area_km2
    FROM service_areas
""")

print("\nService Area Overlap Statistics:")
print("Note: Individual areas sum to more than actual coverage due to overlaps")
overlap_stats.show(truncate=False)

# Identify substations that contribute most to coverage
substation_coverage_contribution = sedona.sql("""
    SELECT 
        id,
        voltage_kv,
        service_radius_m / 1000 as service_radius_km,
        ROUND(ST_Area(service_area_geom) / 1000000, 2) as individual_area_km2
    FROM service_areas
    ORDER BY individual_area_km2 DESC
    LIMIT 10
""")

print("\nTop 10 Substations by Individual Service Area:")
substation_coverage_contribution.show(truncate=False)


Coverage Gap Analysis:
Note: Percentages based on union of service areas (overlaps removed)
      Coverage % + Gap % should equal 100%
+--------------+-------------------+------------+------------+-------+
|study_area_km2|actual_coverage_km2|gap_area_km2|coverage_pct|gap_pct|
+--------------+-------------------+------------+------------+-------+
|436253.95     |218665.95          |217588.0    |50.1        |49.9   |
+--------------+-------------------+------------+------------+-------+


Verification (should be 100.0%):
+---------+
|total_pct|
+---------+
|100.0    |
+---------+


Service Area Overlap Statistics:
Note: Individual areas sum to more than actual coverage due to overlaps
+---------------+--------------------------+-------------------+----------------+
|num_substations|total_individual_areas_km2|actual_coverage_km2|overlap_area_km2|
+---------------+--------------------------+-------------------+----------------+
|25             |289982.25                 |218665.95         

### 4.3 Spatial Relationships - Intersections and Connectivity

In [22]:
# Find overlapping service areas (ST_Intersects)
overlapping_service_areas = sedona.sql("""
    SELECT 
        s1.id as substation_1,
        s1.voltage_kv as voltage_1,
        s2.id as substation_2,
        s2.voltage_kv as voltage_2,
        ROUND(ST_Area(ST_Intersection(s1.service_area_geom, s2.service_area_geom)) / 1000000, 2) as overlap_area_km2
    FROM service_areas s1
    JOIN service_areas s2 
        ON s1.id < s2.id 
        AND ST_Intersects(s1.service_area_geom, s2.service_area_geom)
    WHERE ST_Area(ST_Intersection(s1.service_area_geom, s2.service_area_geom)) > 0
    ORDER BY overlap_area_km2 DESC
    LIMIT 15
""")

print("Overlapping Service Areas (Top 15 by overlap size):")
overlapping_service_areas.show(truncate=False)

# Find transmission lines that cross service areas
lines_through_service_areas = sedona.sql("""
    SELECT 
        t.from_name,
        t.to_name,
        s.id as service_area_id,
        s.voltage_kv,
        ST_Length(
            ST_Intersection(
                ST_Transform(t.geometry, 'EPSG:4326', 'EPSG:3857'),
                s.service_area_geom
            )
        ) / 1000 as length_in_service_area_km
    FROM transmission_lines t
    JOIN service_areas s 
        ON ST_Intersects(
            ST_Transform(t.geometry, 'EPSG:4326', 'EPSG:3857'),
            s.service_area_geom
        )
    WHERE ST_Length(
        ST_Intersection(
            ST_Transform(t.geometry, 'EPSG:4326', 'EPSG:3857'),
            s.service_area_geom
        )
    ) > 0
""")

lines_through_service_areas.createOrReplaceTempView("lines_through_service")

print(f"\nTransmission Lines Through Service Areas:")
print(f"Total line-service area intersections: {lines_through_service_areas.count()}")

sedona.sql("""
    SELECT 
        service_area_id,
        voltage_kv,
        COUNT(*) as num_lines_crossing,
        ROUND(SUM(length_in_service_area_km), 2) as total_line_length_km
    FROM lines_through_service
    GROUP BY service_area_id, voltage_kv
    ORDER BY num_lines_crossing DESC
    LIMIT 10
""").show(truncate=False)

Overlapping Service Areas (Top 15 by overlap size):
+------------+---------+------------+---------+----------------+
|substation_1|voltage_1|substation_2|voltage_2|overlap_area_km2|
+------------+---------+------------+---------+----------------+
|SUB-009     |345      |SUB-024     |500      |11237.2         |
|SUB-002     |500      |SUB-006     |500      |11117.76        |
|SUB-005     |345      |SUB-014     |345      |10336.24        |
|SUB-001     |345      |SUB-011     |345      |7945.63         |
|SUB-021     |230      |SUB-025     |500      |7803.61         |
|SUB-005     |345      |SUB-023     |230      |5766.55         |
|SUB-014     |345      |SUB-023     |230      |5402.09         |
|SUB-007     |138      |SUB-010     |230      |3824.61         |
|SUB-010     |230      |SUB-025     |500      |3091.21         |
|SUB-015     |500      |SUB-020     |345      |2959.59         |
|SUB-017     |500      |SUB-023     |230      |2448.82         |
|SUB-010     |230      |SUB-021     |2

## 5. Performance Optimization with Spatial Indexing

Sedona supports spatial indexing to dramatically improve query performance on large datasets.

In [25]:
# Demonstrate spatial query optimization
import time

print("="*60)
print("SPATIAL JOIN PERFORMANCE OPTIMIZATION")
print("="*60)
print("\nNote: With small datasets (25 substations, 7 demand points),")
print("performance differences are minimal. Benefits appear with larger datasets.")

# Test 1: Standard spatial join
print("\n1. Standard Spatial Join:")
start_time = time.time()
result_standard = sedona.sql("""
    SELECT COUNT(*) as count
    FROM demand_points d
    JOIN service_areas s 
        ON ST_Within(
            ST_Transform(d.geometry, 'EPSG:4326', 'EPSG:3857'),
            s.service_area_geom
        )
""")
count_standard = result_standard.collect()[0]['count']
time_standard = time.time() - start_time

print(f"   Results: {count_standard} demand points covered")
print(f"   Time: {time_standard:.3f} seconds")

# Test 2: With broadcast hint (good when one side is small)
print("\n2. Broadcast Join (for small dataset joins):")
start_time = time.time()
result_broadcast = sedona.sql("""
    SELECT /*+ BROADCAST(d) */ COUNT(*) as count
    FROM demand_points d
    JOIN service_areas s 
        ON ST_Within(
            ST_Transform(d.geometry, 'EPSG:4326', 'EPSG:3857'),
            s.service_area_geom
        )
""")
count_broadcast = result_broadcast.collect()[0]['count']
time_broadcast = time.time() - start_time

print(f"   Results: {count_broadcast} demand points covered")
print(f"   Time: {time_broadcast:.3f} seconds")
print(f"   Note: Broadcast overhead may not help with very small datasets")

# Test 3: Pre-filtering before spatial join (always beneficial)
print("\n3. Pre-filtered Spatial Join (best practice):")
start_time = time.time()
result_filtered = sedona.sql("""
    SELECT COUNT(*) as count
    FROM demand_points d
    JOIN service_areas s 
        ON ST_Within(
            ST_Transform(d.geometry, 'EPSG:4326', 'EPSG:3857'),
            s.service_area_geom
        )
    WHERE s.voltage_kv >= 230  -- Filter before spatial operation
""")
count_filtered = result_filtered.collect()[0]['count']
time_filtered = time.time() - start_time

print(f"   Results: {count_filtered} demand points covered by high-voltage substations")
print(f"   Time: {time_filtered:.3f} seconds")
print(f"   Note: Pre-filtering reduces data before expensive spatial operations")

print("\n" + "="*60)
print("KEY OPTIMIZATION TECHNIQUES:")
print("="*60)
print("1. BROADCAST JOIN: Use /*+ BROADCAST(table) */ for small (<10MB) to large joins")
print("   - Sends small dataset to all workers")
print("   - Most effective with 1:many join ratios")
print()
print("2. PRE-FILTERING: Apply non-spatial filters BEFORE spatial operations")
print("   - Filter by attributes first (voltage_kv, capacity, etc.)")
print("   - Reduces data volume for expensive spatial computations")
print()
print("3. CACHING: Use .cache() for datasets used multiple times")
print("   - Stores intermediate results in memory")
print("   - Avoids recomputing expensive operations")
print()
print("4. SPATIAL PARTITIONING: Sedona automatically partitions spatially")
print("   - Groups nearby geometries together")
print("   - Reduces cross-partition communication")
print()
print("5. PREDICATE PUSHDOWN: Use spatial predicates efficiently")
print("   - ST_Within, ST_Intersects often faster than ST_Distance")
print("   - Avoid ST_Distance when spatial predicates suffice")


SPATIAL JOIN PERFORMANCE OPTIMIZATION

Note: With small datasets (25 substations, 7 demand points),
performance differences are minimal. Benefits appear with larger datasets.

1. Standard Spatial Join:
   Results: 3 demand points covered
   Time: 1.205 seconds

2. Broadcast Join (for small dataset joins):
   Results: 3 demand points covered
   Time: 1.083 seconds
   Note: Broadcast overhead may not help with very small datasets

3. Pre-filtered Spatial Join (best practice):
   Results: 3 demand points covered by high-voltage substations
   Time: 1.126 seconds
   Note: Pre-filtering reduces data before expensive spatial operations

KEY OPTIMIZATION TECHNIQUES:
1. BROADCAST JOIN: Use /*+ BROADCAST(table) */ for small (<10MB) to large joins
   - Sends small dataset to all workers
   - Most effective with 1:many join ratios

2. PRE-FILTERING: Apply non-spatial filters BEFORE spatial operations
   - Filter by attributes first (voltage_kv, capacity, etc.)
   - Reduces data volume for expensi

### 5.1 Caching for Better Performance

In [26]:
# Cache frequently used spatial datasets
service_areas_cached = service_areas.cache()
substations_cached = substations_geo.cache()

# Force materialization
print(f"Service areas cached: {service_areas_cached.count()} records")
print(f"Substations cached: {substations_cached.count()} records")

# Now subsequent queries on these datasets will be faster
print("\nCached datasets ready for fast repeated queries!")
print("Tip: Use .cache() on DataFrames that will be reused multiple times")

Service areas cached: 25 records
Substations cached: 25 records

Cached datasets ready for fast repeated queries!
Tip: Use .cache() on DataFrames that will be reused multiple times


## 6. Scaling Demonstration

Let's scale up our dataset to show how Sedona handles larger data volumes efficiently.

In [27]:
# Generate a larger dataset of substations (scale from 25 to 150)
# This is reasonable for 2 DPUs and demonstrates scaling without long wait times
np.random.seed(123)
n_substations_scaled = 150

scaled_substation_ids = [f'SUB-SCALED-{i+1:03d}' for i in range(n_substations_scaled)]
# Convert numpy types to Python native types for Spark
scaled_voltages = [int(v) for v in np.random.choice([138, 230, 345, 500], n_substations_scaled)]
scaled_lons = [float(lon) for lon in np.random.uniform(
    study_area_bounds['min_lon'], 
    study_area_bounds['max_lon'], 
    n_substations_scaled
)]
scaled_lats = [float(lat) for lat in np.random.uniform(
    study_area_bounds['min_lat'], 
    study_area_bounds['max_lat'], 
    n_substations_scaled
)]

scaled_substations_data = list(zip(scaled_substation_ids, scaled_voltages, scaled_lons, scaled_lats))

# Create scaled DataFrame
scaled_substations_df = spark.createDataFrame(
    scaled_substations_data,
    ['id', 'voltage_kv', 'lon', 'lat']
)

# Add geometry
scaled_substations_df.createOrReplaceTempView("scaled_substations_temp")
scaled_substations_geo = sedona.sql("""
    SELECT 
        id,
        voltage_kv,
        lon,
        lat,
        ST_Point(CAST(lon AS Decimal(24,20)), CAST(lat AS Decimal(24,20))) AS geometry
    FROM scaled_substations_temp
""")

scaled_substations_geo.createOrReplaceTempView("scaled_substations")

print("="*60)
print("SCALING DEMONSTRATION")
print("="*60)
print(f"Original substations: 25")
print(f"Scaled substations: {n_substations_scaled}")
print(f"Scale factor: {n_substations_scaled/25:.1f}x")
print("\nVoltage distribution:")
sedona.sql("""
    SELECT voltage_kv, COUNT(*) as count
    FROM scaled_substations
    GROUP BY voltage_kv
    ORDER BY voltage_kv
""").show()

SCALING DEMONSTRATION
Original substations: 25
Scaled substations: 150
Scale factor: 6.0x

Voltage distribution:
+----------+-----+
|voltage_kv|count|
+----------+-----+
|       138|   32|
|       230|   36|
|       345|   42|
|       500|   40|
+----------+-----+


In [28]:
# Create scaled service areas
scaled_service_areas = sedona.sql("""
    SELECT 
        id,
        voltage_kv,
        CASE 
            WHEN voltage_kv >= 500 THEN 75000
            WHEN voltage_kv >= 345 THEN 60000
            WHEN voltage_kv >= 230 THEN 50000
            ELSE 40000
        END as service_radius_m,
        geometry as original_geom,
        ST_Buffer(
            ST_Transform(geometry, 'EPSG:4326', 'EPSG:3857'),
            CASE 
                WHEN voltage_kv >= 500 THEN 75000
                WHEN voltage_kv >= 345 THEN 60000
                WHEN voltage_kv >= 230 THEN 50000
                ELSE 40000
            END
        ) as service_area_geom
    FROM scaled_substations
""")

scaled_service_areas.createOrReplaceTempView("scaled_service_areas")

# Perform spatial analysis on scaled dataset
print("Analyzing scaled service areas...")

start_time = time.time()
scaled_coverage = sedona.sql("""
    SELECT 
        d.name,
        d.demand_mw,
        COUNT(DISTINCT s.id) as num_covering_substations
    FROM demand_points d
    LEFT JOIN scaled_service_areas s 
        ON ST_Within(
            ST_Transform(d.geometry, 'EPSG:4326', 'EPSG:3857'),
            s.service_area_geom
        )
    GROUP BY d.name, d.demand_mw
    ORDER BY num_covering_substations DESC
""")

result_count = scaled_coverage.count()
query_time = time.time() - start_time

print(f"\nScaled Analysis Results:")
print(f"Query time: {query_time:.3f} seconds")
print(f"Results: {result_count} demand points analyzed")

scaled_coverage.show(truncate=False)

# Compare coverage between original and scaled datasets
covered_scaled = scaled_coverage.filter("num_covering_substations > 0").count()
coverage_pct_scaled = (covered_scaled / result_count) * 100
redundant_scaled = scaled_coverage.filter("num_covering_substations >= 2").count()

print(f"\nCoverage Comparison:")
print(f"Original (25 substations): {coverage_pct:.1f}% coverage")
print(f"Scaled (150 substations): {coverage_pct_scaled:.1f}% coverage")
print(f"Improvement: +{coverage_pct_scaled - coverage_pct:.1f}%")
print(f"\nRedundancy (2+ substations covering each point):")
print(f"Scaled dataset: {redundant_scaled} / {result_count} points")

Analyzing scaled service areas...

Scaled Analysis Results:
Query time: 0.725 seconds
Results: 7 demand points analyzed
+-----------------+---------+------------------------+
|name             |demand_mw|num_covering_substations|
+-----------------+---------+------------------------+
|City A           |150      |5                       |
|City C           |100      |4                       |
|Industrial Zone 1|300      |3                       |
|City B           |200      |3                       |
|Industrial Zone 2|250      |2                       |
|City D           |180      |2                       |
|City E           |120      |0                       |
+-----------------+---------+------------------------+


Coverage Comparison:
Original (25 substations): 42.9% coverage
Scaled (150 substations): 85.7% coverage
Improvement: +42.9%

Redundancy (2+ substations covering each point):
Scaled dataset: 6 / 7 points


## 7. Data Export

Export processed geospatial data to S3 for further analysis or sharing.

In [None]:
# Export to GeoParquet format (efficient for large geospatial datasets)
# GeoParquet is columnar and supports spatial indexing

# IMPORTANT: Replace with your S3 bucket path
s3_output_base = "s3://{YOUR-BUCKET-NAME}/apache_sedona_output_files/"

# Prepare data for export (convert geometry to WKT for Parquet)
power_plants_export = sedona.sql("""
    SELECT 
        name,
        type,
        capacity_mw,
        lon,
        lat,
        ST_AsText(geometry) as geometry_wkt
    FROM power_plants
""")

substations_export = sedona.sql("""
    SELECT 
        id,
        voltage_kv,
        lon,
        lat,
        ST_AsText(geometry) as geometry_wkt
    FROM substations
""")

transmission_lines_export = sedona.sql("""
    SELECT 
        from_name,
        from_type,
        to_name,
        to_type,
        voltage_kv,
        distance_km,
        ST_AsText(geometry) as geometry_wkt
    FROM transmission_lines
""")

# Export to Parquet (comment out if you don't have S3 write access)
power_plants_export.write.mode("overwrite").parquet(f"{s3_output_base}power_plants/")
substations_export.write.mode("overwrite").parquet(f"{s3_output_base}substations/")
transmission_lines_export.write.mode("overwrite").parquet(f"{s3_output_base}transmission_lines/")

print("GeoParquet Export Configuration:")
print(f"Output path: {s3_output_base}")
print("\nTo export, uncomment the write commands and ensure:")
print("1. S3 bucket exists and you have write permissions")
print("2. AWS credentials are configured")
print("3. Replace 'YOUR-BUCKET' with your actual bucket name")
print("\nDatasets ready for export:")
print(f"  - Power plants: {power_plants_export.count()} records")
print(f"  - Substations: {substations_export.count()} records")
print(f"  - Transmission lines: {transmission_lines_export.count()} records")

In [42]:
%stop_session

Stopping session: 39b9a8bf-691d-4ed1-a548-6e7514e551b0
Stopped session.
