In [None]:
# Import necessary libraries
import geopandas as gpd
import pandas as pd


In [None]:
# Load the GeoJSON with administrative areas
admin_areas = gpd.read_file('../data/raw/Prognoseräume.json')

# Load the GeoJSON with soil sealing data (vg_2021 attribute)
sealing_blocks = gpd.read_file('../data/raw/Versiegelung_2021_-_Block-_und_Blockteilflächen_(Umweltatlas).json')

# Load the second GeoJSON with additional soil sealing data (vg_0 attribute)
sealing_streets = gpd.read_file('../data/raw/Versiegelung_2021_-_Straßenflächen_(Umweltatlas).json')


In [None]:
# Ensure both GeoDataFrames have the same CRS (coordinate reference system)
if admin_areas.crs != sealing_blocks.crs:
    sealing_blocks = sealing_blocks.to_crs(admin_areas.crs)

if admin_areas.crs != sealing_streets.crs:
    sealing_streets = sealing_streets.to_crs(admin_areas.crs)


In [None]:
# Re-project to a projected CRS, e.g., UTM zone 33N (EPSG:32633)
projected_crs = 'EPSG:32633'  # Choose a suitable CRS for your region

# Re-project both datasets to a projected CRS
admin_areas = admin_areas.to_crs(projected_crs)
sealing_blocks = sealing_blocks.to_crs(projected_crs)
sealing_streets = sealing_streets.to_crs(projected_crs)

In [None]:
# Check for invalid geometries in the administrative areas
print("Checking for invalid geometries in admin areas...")
invalid_admin_areas = admin_areas[~admin_areas.is_valid]
print(f"Number of invalid admin areas: {len(invalid_admin_areas)}")

# Fix invalid geometries in the admin areas by applying a zero-width buffer
admin_areas['geometry'] = admin_areas['geometry'].buffer(0)

# Check for invalid geometries in the soil sealing dataset (vg_2021)
print("Checking for invalid geometries in soil sealing (vg_2021)...")
invalid_sealing_blocks = sealing_blocks[~sealing_blocks.is_valid]
print(f"Number of invalid geometries in sealing_blocks: {len(invalid_sealing_blocks)}")

# Fix invalid geometries in the soil sealing dataset (vg_2021)
sealing_blocks['geometry'] = sealing_blocks['geometry'].buffer(0)

# Check for invalid geometries in the second soil sealing dataset (vg_0)
print("Checking for invalid geometries in soil sealing (vg_0)...")
invalid_sealing_streets = sealing_streets[~sealing_streets.is_valid]
print(f"Number of invalid geometries in sealing_streets: {len(invalid_sealing_streets)}")

# Fix invalid geometries in the soil sealing dataset (vg_0)
sealing_streets['geometry'] = sealing_streets['geometry'].buffer(0)

In [None]:
# Safe conversion to float with error handling for empty strings
def safe_float(value):
    try:
        # Strip any potential surrounding whitespace before conversion
        value = value.strip()
        if value == '':
            return 0.0  # Return 0.0 if the string is empty
        return float(value)
    except (ValueError, TypeError):
        return 0.0  # Return 0.0 if conversion fails


In [None]:
# Updated function to calculate the intersection area and multiply it by the sealing factor for each dataset
def calculate_sealing(admin_area, sealing_blocks, sealing_streets):
    # Find intersections for the first dataset (vg_2021)
    intersections_blocks = sealing_blocks[sealing_blocks.intersects(admin_area.geometry)]
    # Find intersections for the second dataset (vg_0)
    intersections_streets = sealing_streets[sealing_streets.intersects(admin_area.geometry)]
    
    # Get the area of the admin_area
    admin_area_size = admin_area.geometry.area
    
    # Calculate the total sealing area for blocks
    sealing_area_blocks = 0
    for _, row in intersections_blocks.iterrows():
        intersection = admin_area.geometry.intersection(row.geometry)
        if not intersection.is_empty:
            # Calculate the proportion of the intersection area relative to the admin area
            sealing_area_blocks += ((intersection.area / admin_area_size) * 100) * (safe_float(row['vg_2021'])/100)  # Safe float conversion
    
    # Calculate the total sealing area for streets
    sealing_area_streets = 0
    for _, row in intersections_streets.iterrows():
        intersection = admin_area.geometry.intersection(row.geometry)
        if not intersection.is_empty:
            # Calculate the proportion of the intersection area relative to the admin area
            sealing_area_streets += ((intersection.area / admin_area_size) * 100) * (safe_float(row['vg_0'])/100)  # Safe float conversion

    # Return the combined sealing area from both datasets
    return sealing_area_blocks + sealing_area_streets


In [None]:
# Calculate soil sealing for each administrative area considering both datasets
admin_areas['total_sealing_area'] = admin_areas.apply(
    lambda row: calculate_sealing(row, sealing_blocks, sealing_streets), axis=1
)


In [None]:
# Display the results as a DataFrame
admin_areas[['pgr_name', 'total_sealing_area']].head()  # Replace with actual admin area name column

In [None]:
# Save the DataFrame (without geometry) as CSV
admin_areas[['pgr_name', 'total_sealing_area']].to_csv('../data/out/admin_areas_sealing.csv', index=False)


In [None]:
# Save the result to a new GeoJSON file if needed
# admin_areas.to_file('admin_areas_with_sealing_combined.geojson', driver='GeoJSON')


In [None]:
# Optionally, display a summary table of administrative areas and their sealing areas
# admin_areas[['admin_area_name_column', 'total_sealing_area']].head()  # Replace with actual admin area name column