In [None]:
from osgeo import gdal
import numpy as np
from osgeo import gdal
from scipy.ndimage import generic_filter
#import arcpy
#from arcpy.sa import Curvature
import os
import tempfile
from scipy.ndimage import gaussian_filter
import rasterio
from rasterio.enums import Resampling
from rasterio.windows import from_bounds
from skimage.transform import resize
from numpy.lib.stride_tricks import as_strided

"""
# List all registered drivers and their capabilities
for i in range(gdal.GetDriverCount()):
    driver = gdal.GetDriver(i)
    print(f"{driver.ShortName} - {driver.LongName}")
"""

#Hier Paramter für PRA berechnung Angeben

slope_range = (28, 60)  # Degrees range
ruggedness_threshold =  7 # Normalized ruggedness value
curvature_threshold = -0.1  # Negative for concave areas
output_raster_path = "D:\\Bachelerarbeit\\Daten\\Gis_Daten\\PRA_PF\\pra_mask__.tif"
dem_path = r"D:\Bachelerarbeit\Daten\Gis_Daten\Passeiertal DGM\DGM_2_5_Burggrafenamt.tif"
curvature_raster_path = r"D:\Bachelerarbeit\Daten\Gis_Daten\Passeiertal DGM\Curv_Plan_BG.tif"
forest_path = r"D:\Bachelerarbeit\Daten\Gis_Daten\Wald_Suedtirol\ForestSuedtirol.tif"
wind_direction = 0  # Windrichtung in Grad (z.B. 270 für Westwind)
tolerance_angle = 180  # Toleranzwinkel in Grad
cell_size = 2.5  # Zellengröße des DEMs in Metern
radius = 5


def read_raster_data(raster_path):
    """
    Reads a raster file and returns it as a numpy array.

    Parameters:
    - raster_path: str, path to the raster file.

    Returns:
    - raster_array: numpy array, the raster data.
    """
    with rasterio.open(raster_path) as src:
        raster_array = src.read(1)  # Read the first band into a numpy array
    return raster_array

#------------------------------------------------------------------------------------------------------------------
def read_dem_gdal(dem_path):
    """
    Reads a DEM using GDAL and returns it as a numpy array.
    """
    dem_ds = gdal.Open(dem_path)
    if dem_ds is None:
        print("Error: unable to open DEM file.")
        return None

    dem_band = dem_ds.GetRasterBand(1)
    dem_array = dem_band.ReadAsArray()
    print("DEM Geladen")
    return dem_array
#-------------------------------------------------------------------------------------------------------------
def calculate_slope_gdal(dem_path):

    """
    Calculates the slope from a DEM using GDAL and returns the slope as a numpy array.

    Parameters:
    - dem_path: str, path to the DEM file.

    Returns:
    - slope_array: numpy array, the slope data in degrees.
    """
    # Create a temporary file
    slope_path = 'temp_slope.tif'
    # Use GDAL's DEMProcessing to calculate the slope
    gdal.DEMProcessing(slope_path, dem_path, 'slope', format='GTiff')

    # Open
    slope_ds = gdal.Open(slope_path)
    if slope_ds is None:
        print("Error: unable to calculate slope.")
        return None

    slope_band = slope_ds.GetRasterBand(1)
    slope_array = slope_band.ReadAsArray()

    # Clean up: close dataset
    slope_ds = None
    gdal.Unlink(slope_path)
    print("slope errechnet")
    return slope_array
#-------------------------------------------------------------------------------------------------------------
def calculate_aspect_gdal(dem_path):
    """
    Calculates the aspect from a DEM using GDAL and returns it as a numpy array.

    Parameters:
    - dem_path: str, path to the DEM file.

    Returns:
    - aspect_array: numpy array, the aspect data in degrees.
    """
    aspect_path = 'temp_aspect.tif'
    gdal.DEMProcessing(aspect_path, dem_path, 'aspect', format='GTiff')

    aspect_ds = gdal.Open(aspect_path)
    if aspect_ds is None:
        print("Error: unable to calculate aspect.")
        return None

    aspect_band = aspect_ds.GetRasterBand(1)
    aspect_array = aspect_band.ReadAsArray()

    aspect_ds = None
    gdal.Unlink(aspect_path)

    return aspect_array
#-------------------------------------------------------------------------------------------------------------
def calculate_ruggedness(dem_array):
    """
    Calculates the ruggedness of a DEM based on the deviation of elevation within a 9x9 window.
    """
    def ruggedness_func(window):
        center = window[window.size // 2]
        return np.sqrt(np.sum((window - center) ** 2) / window.size)

    ruggedness_array = generic_filter(dem_array, ruggedness_func, size=9)
    print("ruggedness berrechnet")
    return ruggedness_array


#-------------------------------------------------------------------------------------------------------------
def load_curvature_raster(curvature_tiff_path):
    """
    Loads a curvature raster file generated in ArcGIS and returns it as a numpy array.

    Parameters:
    - curvature_tiff_path: str, path to the curvature TIFF file.

    Returns:
    - curvature_array: numpy array, the curvature data loaded from the TIFF file.
    """
    with rasterio.open(curvature_tiff_path) as src:
        curvature_array = src.read(1)  # Read the first band into a numpy array
    print("curvature geladen")
    return curvature_array

#-------------------------------------------------------------------------------------------------------------
def sector_mask(shape, centre, radius, angle_range):
    """
    Erstellt eine Maske für einen Kreissektor.
    """
    x, y = np.ogrid[:shape[0], :shape[1]]
    cx, cy = centre
    tmin, tmax = np.deg2rad(angle_range)

    if tmax < tmin:
        tmax += 2 * np.pi

    r2 = (x - cx) ** 2 + (y - cy) ** 2
    theta = np.arctan2(x - cx, y - cy) - tmin

    theta %= (2 * np.pi)

    circmask = r2 <= radius ** 2
    anglemask = theta <= (tmax - tmin)
    print("sektor mask berechnet")
    return circmask * anglemask
#-------------------------------------------------------------------------------------------------------------
def windshelter_prep(radius, direction, tolerance, cellsize):
    """
    Bereitet Distanz und Maske für die Windshelter-Berechnung vor.
    """
    x_size = y_size = 2 * radius + 1
    centre = radius, radius
    x_arr, y_arr = np.mgrid[0:x_size, 0:y_size]
    dist = np.sqrt((x_arr - centre[0]) ** 2 + (y_arr - centre[1]) ** 2) * cellsize

    mask = sector_mask(dist.shape, centre, radius, (direction - tolerance, direction + tolerance))
    mask[radius, radius] = True  # Zentrumspunkt immer einbeziehen
    print("windshelter prerp fertig")
    return dist, mask

def calculate_wind_shelter(dem_array, wind_direction, tolerance_angle, cell_size, radius):
    """
    Berechnet den Windshelter-Index für ein DEM-Array.
    """
    dist, mask = windshelter_prep(radius, wind_direction, tolerance_angle, cell_size)
    wind_shelter_array = np.full(dem_array.shape, np.nan, dtype=float)

    for i in range(radius, dem_array.shape[0] - radius):
        for j in range(radius, dem_array.shape[1] - radius):
            window = dem_array[i-radius:i+radius+1, j-radius:j+radius+1]
            center = window[radius, radius]
            relative_heights = window - center
            epsilon = 1e-10  # Ein kleiner Wert, um Division durch Null zu vermeiden
            shelter_values = np.arctan(relative_heights / (dist + epsilon)) * mask
            wind_shelter_array[i, j] = np.nanquantile(shelter_values[mask], 0.75)  # Verwenden des dritten Quartils
    print("windshelter berechnet")
    return wind_shelter_array
#----------------------------------------------------------------------------------------------------------------------

def process_forest_tiff(forest_tiff_path, reference_tiff_path):
    # Öffnen der Referenz-TIFF-Datei, um die Grenzen zu erhalten
    with rasterio.open(reference_tiff_path) as ref_ds:
        bounds = ref_ds.bounds
        transform = ref_ds.transform
        crs = ref_ds.crs

    # Öffnen der Wald-TIFF
    with rasterio.open(forest_tiff_path) as forest_ds:
        # Erstellen eines Fensters basierend auf den Grenzen der Referenz-TIFF-Datei
        window = rasterio.windows.from_bounds(*bounds, transform=forest_ds.transform)

        # Lesen des Wald-Datensatzes und Neuskalieren auf die Größe der Referenz-TIFF-Datei
        forest_data = forest_ds.read(1, window=window, out_shape=(ref_ds.height, ref_ds.width), resampling=rasterio.enums.Resampling.nearest)

    # Erstellen einer Maske, die Waldgebiete ausschließt (Werte 1 und 2 für Wald, 0 für Nicht-Wald)
    non_forest_mask = np.isin(forest_data, [1, 2], invert=True)
    print("Wald Tiff geladen")
    return non_forest_mask

def identify_pra(slope, ruggedness, wind_shelter, forest_data,
                 slope_range, ruggedness_threshold, wind_shelter_threshold):
    """
    Identifies potential avalanche release areas based on weighted criteria.
    This function assumes that higher slope and wind shelter values, and lower ruggedness values,
    indicate higher risk. Areas covered by forest are excluded.
    """

    slope = slope.astype(float)   #Ensure input arrays are floats
    ruggedness = ruggedness.astype(float)
    wind_shelter = wind_shelter.astype(float)
    forest_data = forest_data.astype(float)

    valid_area_mask = (forest_data == 1) # Mask for valid (non-forest) areas

    # Normalize slope
    normalized_slope = np.clip((slope - slope_range[0]) / (slope_range[1] - slope_range[0]), 0, 1)

    normalized_ruggedness = np.clip(1 - (ruggedness / ruggedness_threshold), 0, 1) #invert and normalize


    # Normalize wind shelter: higher values indicate higher risk
    max_wind_shelter = np.max(wind_shelter[valid_area_mask])  # Consider max within valid areas
    normalized_wind_shelter = np.clip((wind_shelter - wind_shelter_threshold) / (max_wind_shelter - wind_shelter_threshold), 0, 1)

    # Calculate the weighted scores
    slope_weight = 0.6
    ruggedness_weight = 0.2
    wind_shelter_weight = 0.2
    total_score = (normalized_slope * slope_weight +
                   normalized_ruggedness * ruggedness_weight +
                   normalized_wind_shelter * wind_shelter_weight)

    # Apply thresholds
    pra_threshold = 0.5  # Adjust desired sensitivity
    pra_mask = np.zeros_like(slope, dtype=np.uint8)
    pra_mask[valid_area_mask & (total_score > pra_threshold)] = 1

    return pra_mask


def identify_pra_with_weighted_criteria(slope, ruggedness, wind_shelter, forest_data, slope_range, ruggedness_threshold, wind_shelter_threshold, weights={'slope': 0.6, 'ruggedness': 0.2, 'wind_shelter': 0.2}, score_threshold=0.5):
    """
    Identifies potential avalanche release areas based on weighted criteria of slope, ruggedness, wind shelter, and exclusion of forest areas.

    Parameters:
    - slope: numpy array, the slope data in degrees.
    - ruggedness: numpy array, the ruggedness data.
    - wind_shelter: numpy array, the wind shelter data.
    - forest_data: numpy array, the forest classification data (0 for non-forest, 1 and 2 for forest).
    - slope_range: tuple, the minimum and maximum slope in degrees for PRA.
    - ruggedness_threshold: float, the maximum ruggedness value for PRA.
    - wind_shelter_threshold: float, the threshold for wind shelter value to be considered for PRA.
    - weights: dict, the weights for each criteria (slope, ruggedness, wind_shelter).
    - score_threshold: float, the minimum combined score to identify an area as a potential PRA.

    Returns:
    - pra_mask: numpy array, a binary mask indicating potential avalanche release areas excluding forest regions.
    """

    score = np.zeros_like(slope, dtype=float) #initialisieren score array

    slope_criteria = (slope >= slope_range[0]) & (slope <= slope_range[1]) # Apply slope criteria and update score
    score += slope_criteria * weights['slope']


    ruggedness_criteria = ruggedness < ruggedness_threshold# Apply ruggedness criteria and update score
    score += ruggedness_criteria * weights['ruggedness']


    wind_shelter_criteria = wind_shelter > wind_shelter_threshold # Apply wind shelter criteria and update score
    score += wind_shelter_criteria * weights['wind_shelter']

    # Identify areas meeting the score threshold excluding forest areas
    non_forest_mask = (forest_data == 0)
    pra_mask = (score >= score_threshold) & non_forest_mask

    return pra_mask



def save_pra_mask_to_raster(input_dem_path, pra_mask, output_raster_path):
    """
    Saves the PRA mask to a raster file.

    Parameters:
    - input_dem_path: str, path to the input DEM file used to obtain georeferencing information.
    - pra_mask: numpy array, binary mask indicating potential avalanche release areas.
    - output_raster_path: str, path for the output raster file.
    """
    # Open input DEM
    dem_ds = gdal.Open(input_dem_path)
    if dem_ds is None:
        raise RuntimeError("Error: unable to open input DEM file.")

    # Get geotransform and projection from the input DEM
    geotransform = dem_ds.GetGeoTransform()
    projection = dem_ds.GetProjection()

    # output raster dataset
    driver = gdal.GetDriverByName('GTiff')
    output_ds = driver.Create(output_raster_path, dem_ds.RasterXSize, dem_ds.RasterYSize, 1, gdal.GDT_Byte)
    if output_ds is None:
        raise RuntimeError("Error: unable to create output raster file.")

    # Set geotransform and projection
    output_ds.SetGeoTransform(geotransform)
    output_ds.SetProjection(projection)

    output_band = output_ds.GetRasterBand(1)
    output_band.WriteArray(pra_mask.astype(np.uint8))  # Convert boolean mask to uint8
    output_band.SetNoDataValue(0)


    output_band.FlushCache() # Flush data to disk
    output_ds = None
    print("daten gespeichert")
    return pra_mask



dem_path = r"D:\Bachelerarbeit\Daten\Gis_Daten\Passeiertal DGM\PasseiertalDGM.tif"
dem_array = read_dem_gdal(dem_path)
slope_array = calculate_slope_gdal(dem_path)#aspect = calculate_aspect_gdal(dem_path)
curvature = load_curvature_raster(curvature_raster_path)
ruggedness = calculate_ruggedness(dem_array)
wind_shelter_array = calculate_wind_shelter(dem_array, wind_direction, tolerance_angle, cell_size, radius)
non_forest_mask = process_forest_tiff(forest_path, dem_path)

ruggedness_path = r"D:\Bachelerarbeit\Daten\Gis_Daten\PRA_PF\roughness.tif"
wind_shelter_path = r"D:\Bachelerarbeit\Daten\Gis_Daten\PRA_PF\Windshelter_180.tif"


ruggedness = read_raster_data(ruggedness_path) # Read the additional raster data
wind_shelter = read_raster_data(wind_shelter_path)

wind_shelter_threshold = 0.1  # This is an example value
pra_mask = identify_pra_with_ruggedness_and_windshelter(slope_array,
                                                        ruggedness,
                                                        wind_shelter,
                                                        non_forest_mask,
                                                        slope_range,
                                                        ruggedness_threshold,
                                                        wind_shelter_threshold)
save_pra = save_pra_mask_to_raster(dem_path, pra_mask, output_raster_path)


DEM Geladen




In [None]:
import rasterio
from rasterio.features import shapes
from scipy.ndimage import binary_closing
import geopandas as gpd
from shapely.geometry import shape, Polygon
from shapely.ops import unary_union
import numpy as np

# Open the raster file
with rasterio.open('D:\\Bachelerarbeit\\Daten\\Gis_Daten\\PRA_PF\\pra_mask.tif') as src:
    raster_data = src.read(1)
    transform = src.transform

# Apply a morphological closing operation to fill small holes and connect nearby regions
closed_raster = binary_closing(raster_data == 1, structure=np.ones((5,5))).astype(np.uint8)

# Vectorize the morphologically processed raster
vectorized_features = shapes(closed_raster, transform=transform)
geometries = [shape(geometry) for geometry, value in vectorized_features if value == 1]

# Combine adjacent polygons to create larger shapes
combined_polygons = unary_union([Polygon(g) for g in geometries])

# Simplify the combined polygon geometries to generalize the shapes
simplified_polygons = combined_polygons.simplify(10.0, preserve_topology=False)

# Create a GeoDataFrame
gdf = gpd.GeoDataFrame({'geometry': [simplified_polygons]}, crs=src.crs)

# Save the GeoDataFrame to a new shapefile
gdf.to_file('D:\\Bachelerarbeit\\Daten\\Gis_Daten\\PRA_PF\\pra_mask_shp_.shp')


In [None]:
def identify_pra(slope, ruggedness, plan_curvature, forest_data, slope_range=(30, 60), ruggedness_threshold=0.06, curvature_threshold=-0.1):
    """
    Identifies potential avalanche release areas based on slope, curvature criteria, and exclusion of forest areas.

    Parameters:
    - slope: numpy array, the slope data in degrees.
    - ruggedness: numpy array, the ruggedness data.
    - plan_curvature: numpy array, the plan curvature data.
    - forest_data: numpy array, the forest classification data (0 for non-forest, 1 and 2 for forest).
    - slope_range: tuple, the minimum and maximum slope in degrees for PRA.
    - ruggedness_threshold: float, the maximum ruggedness value for PRA.
    - curvature_threshold: float, the maximum plan curvature value for PRA (negative values indicate concave areas).

    Returns:
    - pra_mask: numpy array, a binary mask indicating potential avalanche release areas excluding forest regions.
    """
    # Apply slope criteria
    slope_criteria = (slope >= slope_range[0]) & (slope <= slope_range[1])

    # Convert plan_curvature to float, if not already
    curvature_float = plan_curvature.astype(float)

    # Resize curvature to match slope array dimensions if necessary
    # This step assumes curvature and slope arrays might be of different sizes; adjust as needed
    curvature_resized = resize(curvature_float, slope.shape, mode='edge', preserve_range=True, anti_aliasing=True)

    # Apply curvature criteria (focusing on concave areas for snow accumulation)
    curvature_criteria = curvature_resized < curvature_threshold

    # Create a mask to exclude forest areas (assuming values 1 and 2 represent forests)
    #non_forest_mask = np.isin(forest_data, [0], invert=False)

    # Apply ruggedness criteria
    ruggedness_criteria = ruggedness < ruggedness_threshold

    # Combine criteria to identify PRAs, excluding forest areas
    pra_mask = slope_criteria & curvature_criteria & ruggedness_criteria & non_forest_mask
    print("PRA mask applied, excluding forest areas")

    return pra_mask