# Quantifying Flood and Landslide Impacts Using the OPERA DIST-HLS Product

### This example showcases an application of the OPERA DIST-HLS dataset to visualize and explore land surface disturbance related to flooding and landslides in the Appalachian Mountains (North Carolina, USA) associated with Hurricane Helene (09/24- 09/29/2024).

This workflow utilized the Leafmap library, which provides a suite of tools for interactive mapping and visualization in Jupyter Notebooks. Leafmap version 0.30.0 and later offer tools specifically for accessing NASA Earthdata by building on the newly developed NASA Earthaccess library. Earthaccess provides streamlined access to NASA Earthdata and simplifies the authentication and querying process over previously developed approaches.This notebook is designed to leverage tools within Earthaccess and Leafmap to facility easier access and vizualization of OPERA data products for a user-specified area of interest (AOI). 


## OPERA info
See website https://www.jpl.nasa.gov/go/opera/products/

## Import Libraries
Notebook dependencies may be installed into a self-contained python environment using the `environment.yml` file available in the [OPERA Applications Github repository](https://github.com/OPERA-Cal-Val/OPERA_Applications) or they may be installed manually.

In [None]:
from datetime import datetime
import numpy as np
import leafmap
import os
from osgeo import gdal
import pandas as pd
import rasterio
from rasterio.merge import merge
import shutil

In [None]:
# Make and set working directory (modify to your desired path)
working_directory = os.path.expanduser('~/opera/disaster_response/hurricane_helene/')
os.makedirs(working_directory, exist_ok=True)

# Change to working directory
os.chdir(working_directory)

# Verify the working directory
print("Current working directory:", os.getcwd())

## Authentication 
A [NASA Earthdata Login](https://urs.earthdata.nasa.gov/) account is required to download the data used in this tutorial. You can create an account at the link provided. After establishing an account, the code in the next cell will verify authentication. If this is your first time running the notebook, you will be prompted to enter your Earthdata login credentials, which will be saved in ~/.netrc.

In [None]:
leafmap.nasa_data_login()

## View NASA Earthdata datasets
A tab separated values (TSV) file, made available through the opengeos Github repository, catalogues metadata for more than 9,000 datasets available through NASA Earthdata. In the next cell we load the TSV into a pandas dataframe and view the metadata for the first five (5) Earthdata products

In [None]:
### Load Earthdata datasets from .tsv file into a pandas dataframe
earthdata_url = 'https://github.com/opengeos/NASA-Earth-Data/raw/main/nasa_earth_data.tsv'
earthdata_df = pd.read_csv(earthdata_url, sep='\t')

## View the available OPERA products
Note above that the `earthdata_df` contains a number of columns with metadata about each available product. the `ShortName` column will be used to produce a new dataframe containing only OPERA products. Let's view the available products and their metadata.

In [None]:
opera_df = earthdata_df[earthdata_df['ShortName'].str.contains('OPERA', case=False)]

## Define an area of interest (AOI) and time period of interest (TOI)
Below we define an area of interest (AOI) and the time period over which we would like to discover DIST-HLS data. The AOI is selected based off reported flood/landslide impact. We select data from 02-22-2025 in order to explore vegetation loss/disturbance long after the flood water have receded and to allow for multiple Landsat / Sentinel-2 overpasses to constrain areal extent of land surface change.

In [None]:
# AOI selected based on region of known significant flooding and landsliding associated with Hurricane Helene in Western NC, USA
AOI = (-82.80, 35.45, -82.20, 35.83) #W, S, E, N; Western NC, USA

#A single Landsat 8 granule acquired on 02/22/2025 captures a region affected by substantial flooding and landslides associated with Hurricane Helene
StartDate_PostFlood="2025-02-22T00:00:00"  #Post-flood image start date
EndDate_PostFlood="2025-02-22T23:59:59"    #Post-flood image end date

## Query Earthdata and return metadata for OPERA products within the AOI
The `earthaccess` library makes it simple to quickly query NASA's Common Metadata Repository (CMR) and return the associated metadata as a Geodataframe. `Leafmap` has recently added functionality that builds on `earthaccess` to enable interactive viewing of this data. 
In the next cell, the user should specify which OPERA product and the date range of interest. The AOI defined previously is used as the boundary in the query.

### View OPERA Product Shortnames

In [None]:
### Print the available OPERA datasets 
print('Available OPERA datasets:', opera_df['ShortName'].values)

### Query the OPERA DIST-ALERT-HLS dataset for the AOI
The below query should return a single granule covering a large region near Asheville, North Carolina, USA. It is also possible to return and merge multiple granules to explore a larger area.

In [None]:
dist_results_PostFlood, dist_gdf_PostFlood = leafmap.nasa_data_search(
    short_name='OPERA_L3_DIST-ALERT-HLS_V1',
    cloud_hosted=True,
    bounding_box= AOI,
    temporal=(StartDate_PostFlood, EndDate_PostFlood),
    count=-1,  # use -1 to return all datasets
    return_gdf=True,
)

### See the available DIST-ALERT-HLS layers
Functionality within earthaccess enables more more asthetic views of the available layers, as well as displaying the thumbnail. These links are clickable and will download in the browser when clicked. 

In [None]:
dist_results_PostFlood[0] #Note this just shows a single MGRS/HLS tile

### View the DIST-ALERT-HLS metadata and footprints
We can use `geopandas.explore()` to visualize the footprint of our granule.

In [None]:
### Plot the location of the tiles 
dist_gdf_PostFlood.explore(fill=False)

## Download data with leafmap
Let's download the data from one of our above queries. In the cell below we create a directory where the OPERA DIST-HLS data will be stored.

### Create a subdirectory for the OPERA DIST-HLS data
This will be the location on your file system where OPERA DIST-HLS files are downloaded. It will be a subdirectory inside of a directory called `data`, and the directory name will be the date that it was created. In this way, you can return to this notebook at a later date, and the data will not overwrite previous data versions.

In [None]:
def create_data_directory(dirname=None):
    # Get the current date and time
    current_datetime = datetime.now().strftime("%m_%d_%Y")

    # Define the base directory
    base_directory = "data"

    # Create the full path for the new directory
    if dirname is None:
        new_directory_path = os.path.join(base_directory, f"data_{current_datetime}")
    else:
        new_directory_path = os.path.join(base_directory, f"data_{current_datetime}", dirname)

    # Create the new directory
    os.makedirs(new_directory_path, exist_ok=True)

    print(f"Directory '{new_directory_path}' created successfully.")

    return new_directory_path 

directory_path_PostFlood = create_data_directory()

### Download the data
1 OPERA DIST-HLS granuels intersect our AOI, each granule consisting of 19 unique data layers. The below will download the data to your newly created subdirectory. Look on your file system for a directory `/data/date` where `date` is the date the directory was created.

In [None]:
dist_data_PostFlood = leafmap.nasa_data_download(dist_results_PostFlood, out_dir=directory_path_PostFlood)     

### Filter to post-event disturbance
We would like to view only disturbance detected after Hurricane Helene, in order to pinpoint the regions affected by flooding and landsliding. To do so, we produce a 'filtered' derivative of the DIST-ALERT HLS layers that is constained by the date of intitial disturbance detection (determined by the `VEG-DIST-DATE` layer). Pixel values in the `VEG-DIST-DATE` layer correspond the date of initial disturbance detection in units of days since December 31, 2020. Because we are interested in only post-hurricane disturbance, we filter out disturbance that occurred before this date, corresponding to `VEG-DIST-DATE` pixel values less than 1371 (the number of days spanning the from 12/31/2020 to the first post-flood DIST-HLS product, 10/02/2024).

Below we calculate the number of days since December 31, 2020 and store it as a variable, which will be used as input in a subsequent step.

In [None]:
# Define the reference date and the target date
reference_date = datetime(2020, 12, 31)
target_date = datetime(2024, 10, 2)

# Calculate the difference between the two dates
delta = target_date - reference_date

# Get the number of days from the timedelta object
days_since_reference = delta.days

print("Number of days:", days_since_reference)

### Filtering by time and slope
The two cells below defines and executes a series of functions that produces two subdirectories of DIST-HLS data:
- (1) The DIST-HLS layers filtered to post-hurricane distubance
- (2) The DIST-HLS layers filtered by the date of disturbance detection and by slope.

**Note:** Slope filtered layers are time filtered by default.

The filtered data are saved in a new subdirectory called `filtered` and within individual subdirectories `time_filtered` and `slope_filtered`.

An additional function called `color_filtered_layers()` colorizes the filtered data by their corresponding color schema from the original DIST-HLS layers. This enables direct visualization in GIS software.

Docstrings are provided to aid in the user's understanding of each function's purpose and internal functionality.

#### Define a slope threshold
Modify the value of `slope_threshold` in the cell below. DIST-HLS pixels with corresponding slopes less than this value will be discarded in the resulting slope filtered DIST-HLS product. Default value of `slope_threshold` is 20 degrees.

In [None]:
slope_threshold = 20 # degrees

In [None]:
def merge_layers(directory_path_PostFlood):
    """Merge the layers in the data directory and save the merged rasters. This function is necessary to produce a mosaicked product from multiple granules, if needed.
      This will produce merged rasters for each DIST-HLS layer in a subdirectory called 'merged'.
      :param directory_path_PostFlood: The directory containing the downloaded HLS data.
      :return layer_dict: Dictionary of lists of filenames for each DIST-HLS layer
      :return merged_dir: Merged directory path
    """
    # Create a directory for merged rasters
    merged_dir = os.path.join(directory_path_PostFlood, 'merged')
    os.makedirs(merged_dir, exist_ok=True)
    
    # Dictionary to hold lists of filenames for each layer
    layer_dict = {}

    # Iterate through filtered rasters to populate the dictionary
    for filename in os.listdir(directory_path_PostFlood):
        if filename.endswith('.tif'):
            # Split the filename to extract the layer name
            parts = filename.split('_')
            if len(parts) >= 6:  # Check if there are enough parts to avoid index errors
                layer_name = parts[-1].replace('.tif', '') # Extract the unique layer name (last part)
            if layer_name not in layer_dict:
                layer_dict[layer_name] = []
            layer_dict[layer_name].append(os.path.join(directory_path_PostFlood, filename))

    # Merge rasters for each layer and save them
    for layer_name, files in layer_dict.items():
        # Open the rasters and extract nodata values
        src_files_to_mosaic = [rasterio.open(f) for f in files]
        
        # Get the consistent nodata value for the layer
        nodata_value = src_files_to_mosaic[0].nodata
        
        # Perform the merge with preference for data pixels
        mosaic, out_trans = merge(src_files_to_mosaic, nodata=nodata_value, method='first')
        
        # Create metadata for the merged raster
        out_meta = src_files_to_mosaic[0].meta.copy()
        out_meta.update({
            "driver": "GTiff",
            "height": mosaic.shape[1],
            "width": mosaic.shape[2],
            "transform": out_trans,
            "nodata": nodata_value
        })

        # Save the merged raster
        merged_filename = f"{layer_name}_merged.tif"
        merged_filepath = os.path.join(merged_dir, merged_filename)

        with rasterio.open(merged_filepath, 'w', **out_meta) as dest:
            dest.write(mosaic)

        print(f"Merged raster saved as: {merged_filename}")

        # Close all opened raster files
        for src in src_files_to_mosaic:
            src.close()

    return layer_dict, merged_dir

def generate_filtered_rasters(date_threshold, slope_threshold=20, slope_filter = False):
    """Generate a filtered version of each raster based on date/slope thresholds. Date threshold is the number of days since the reference date.
    Production of slope filtered rasters is optional. Filtered files are stored in subdirectories within the 'merged/filtered' directory.
    If 'slope_filter' is True, additional filtered rasters are generated based on a slope threshold.
    :param date_threshold: The number of days since the reference date to filter the data.
    :param slope_threshold: The slope threshold value to use for filtering. Default is 20 degrees.
    :param slope_filter: If True, additional filtered rasters are generated based on a slope threshold.
    """

    # Merge layers
    layer_dict, merged_dir = merge_layers(directory_path_PostFlood)

    # Ensure the output subdirectory exists
    filtered_dir = 'filtered'
    time_filtered_dirname = 'time_filtered'
    time_filtered_dir = os.path.join(merged_dir, filtered_dir, time_filtered_dirname)
    os.makedirs(time_filtered_dir, exist_ok=True)
    generate_time_filtered_rasters(merged_dir, time_filtered_dir, date_threshold)
    color_filtered_layers(layer_dict, time_filtered_dir)

    if slope_filter:
        slope_filtered_dirname = 'slope_filtered'
        slope_filtered_dir = os.path.join(merged_dir, filtered_dir, slope_filtered_dirname)
        os.makedirs(slope_filtered_dir, exist_ok=True)
        generate_slope_filtered_rasters(time_filtered_dir, slope_filtered_dir, slope_threshold)
        color_filtered_layers(layer_dict, slope_filtered_dir)
    return

def generate_time_filtered_rasters(merged_dir, time_filtered_dir, date_threshold):
    """Generate a filtered version of each raster based on the date threshold. 
    These files are stored in subdirectory called 'time_filtered' within the 'merged/filtered' directory.
    :param merged_dir: The directory containing the merged rasters.
    :param time_filtered_dir: The directory to save the time-filtered rasters.
    :param date_threshold: The number of days since the reference date to filter the data.
    """

    # Process each merged layer
    date_file = 'VEG-DIST-DATE_merged.tif'
    date_file_path = os.path.join(merged_dir, date_file)
    with rasterio.open(date_file_path) as src:
        date_data = src.read(1)  # Read the first (and only) band
        date_mask = date_data >= date_threshold  # Mask where date data exceeds the date_threshold
        
    # Apply the mask to each layer file
    for file in os.listdir(merged_dir):
        if not file.endswith('.tif'):
            continue
        
        print("working on file:", file)
        # If the file is _VEG-DIST-DATE.tif, apply the date_threshold and save a filtered version
        if file == date_file:
            with rasterio.open(date_file_path) as src:
                date_filtered_data = np.where(date_mask, date_data, src.nodata)  # Apply the mask
                date_filtered_filename = file.replace('.tif', '_filtered.tif')  # Update filename
                date_filtered_path = os.path.join(time_filtered_dir, date_filtered_filename)
                
                # Save the filtered _VEG-DIST-DATE.tif raster
                src_meta = src.meta
                src_meta.update({"nodata": src.nodata})
                with rasterio.open(date_filtered_path, 'w', **src_meta) as dest:
                    dest.write(date_filtered_data, 1)  # Write to the first band
                    print(f"Generated filtered file: {date_filtered_filename}")

        # If file is _DATA-MASK.tif, copy it directly to the output directory with "_filtered" added
        elif file.endswith('DATA-MASK_merged.tif'):
            data_mask_filtered_filename = file.replace('.tif', '_time_filtered.tif')
            data_mask_filtered_path = os.path.join(time_filtered_dir, data_mask_filtered_filename)
            print(f"Copied _DATA-MASK file: {data_mask_filtered_filename}")
            shutil.copy(os.path.join(merged_dir, file), data_mask_filtered_path)
            print(f"Copied _DATA-MASK file: {data_mask_filtered_filename}")

        else:
            print(f"Processing file: {file}")
            # Open the layer file
            file_path = os.path.join(merged_dir, file)
            with rasterio.open(file_path) as src:
                layer_data = src.read(1)  # Read the first band
                layer_meta = src.meta  # Metadata to use for the output file
                layer_nodata = src.nodata  # Get the 'nan' value for this layer

                # Apply the mask: where date_mask is False, set layer_data to layer_nodata
                filtered_data = np.where(date_mask, layer_data, layer_nodata)

                # Update the filename to include "_filtered"
                filtered_filename = file.replace('.tif', '_time_filtered.tif')
                filtered_file_path = os.path.join(time_filtered_dir, filtered_filename)

                # Save the filtered raster with the same metadata
                layer_meta.update({"nodata": layer_nodata})
                with rasterio.open(filtered_file_path, 'w', **layer_meta) as dest:
                    dest.write(filtered_data, 1)  # Write to the first band
                    print(f"Generated filtered file: {filtered_filename}")

def generate_slope_filtered_rasters(time_filtered_dir, slope_filtered_dir, slope_threshold=20):
    """Generate a filtered version of each raster based a slope mask. Slope is derived from COP30 DEM.
    These files are stored in subdirectory called 'slope_filtered' within the 'merged/filtered' directory.
    :param time_filtered_dir: The directory containing the time-filtered rasters.
    :param slope_filtered_dir: The directory to save the slope-filtered rasters.
    :param slope_threshold: The slope threshold value to use for filtering. Default is 20 degrees.
    """
    
    # Download the COP30 DEM data (band 10 of OPERA DSWx-HLS dataset)
    dem_results, dem_gdf = leafmap.nasa_data_search(
        short_name='OPERA_L3_DSWX-HLS_V1',
        cloud_hosted=True,
        bounding_box= AOI,
        temporal=(StartDate_PostFlood, EndDate_PostFlood),
        count=1,  # return the first granule
        return_gdf=True,
    )
    dem_directory = create_data_directory(dirname='DEM')
    leafmap.nasa_data_download(dem_results, out_dir=dem_directory)

    # Delete unwanted layers, retain DEM.tif
    for filename in os.listdir(dem_directory):
        file_path = os.path.join(dem_directory, filename)
        if os.path.isfile(file_path) and not filename.endswith("DEM.tif"):
            os.remove(file_path)
            print(f"Deleted: {file_path}")

    # Make merged directory
    merged_dir = os.path.join(dem_directory, "merged")
    os.makedirs(merged_dir, exist_ok=True)

    # Create a list of DEM files for the mosaic
    dem_files = [os.path.join(dem_directory, f) for f in os.listdir(dem_directory) if f.endswith("DEM.tif")]

    # Define the output mosaic file path
    output_mosaic = os.path.join(merged_dir, "mosaic.tif")

    # Get the UTM EPSG code from the first DEM file
    with rasterio.open(dem_files[0]) as src:
        utm_epsg = f"EPSG:{src.crs.to_epsg()}"

    # Merge the DEMs, assigning the UTM zone of the first file in dem_file list
    gdal.Warp(
        output_mosaic,
        dem_files,
        format="GTiff",
        dstSRS=utm_epsg  # Target projection: UTM Zone 17N
    )

    print("Reprojected mosaic created successfully and saved in:", output_mosaic)

    # Make new directory for slope output
    slope_directory = create_data_directory(dirname='SLOPE')

    # Generate the slope using GDAL
    gdal.DEMProcessing(slope_directory+'/slope.tif', dem_directory+'/merged/'+'mosaic.tif', 'slope', format='GTiff', computeEdges=True)
    print(f"Slope output saved to: {slope_directory}")
    
    # Open the slope file
    slope_file_path = os.path.join(slope_directory, 'slope.tif')
    print("Slope file path:", slope_file_path)

    with rasterio.open(slope_file_path) as src:
        slope_data = src.read(1)
        slope_mask = slope_data >= slope_threshold  # Mask where slope data is less than or equal to threshold

    # Perform the slope filtering
    for file in os.listdir(time_filtered_dir):

        # Open the layer file
        file_path = os.path.join(time_filtered_dir, file)
        filtered_filename = file.replace('merged_time_filtered.tif', 'merged_time_and_slope_filtered.tif')
        filtered_file_path = os.path.join(slope_filtered_dir, filtered_filename)

        # If the file ends with _DATA-MASK.tif, copy it directly to the output directory
        if file.endswith('DATA-MASK_merged_time_filtered.tif'):
            shutil.copy(file_path, filtered_file_path)
            print(f"Copied _DATA-MASK file: {filtered_filename}")
            continue  # Move to the next file in the list

        else:
            with rasterio.open(file_path) as src:
                layer_data = src.read(1)
                layer_meta = src.meta
                layer_nodata = src.nodata
                filtered_data = np.where(slope_mask, layer_data, layer_nodata)

                layer_meta.update({"nodata": layer_nodata})

                with rasterio.open(filtered_file_path, 'w', **layer_meta) as dest:
                    dest.write(filtered_data, 1)

                print(f"Generated slope filtered file: {filtered_filename}")

    return

def color_filtered_layers(layer_dict, filtered_dir):
    """Colorize the filtered rasters using the symbology from the original HLS data.
    The files are colorized in place within the time/slope filtered directories.
    :param layer_dict: Dictionary of lists of filenames for each DIST-HLS layer
    :param filtered_dir: The directory containing the filtered rasters.
    """
    symbology_layers = {}

    # Loop over each layer in the layer_dict
    for layer in layer_dict:
        # Check if we already found a file for this layer
        if layer not in symbology_layers:
            # Loop over each file in the directory
            for filename in os.listdir(directory_path_PostFlood):
                # Check if the file is a .tif file
                if filename.endswith('.tif'):
                    # Extract the layer name (last part before the extension)
                    layer_name = filename.split('_')[-1].split('.')[0]
                    
                    # Check if the layer name matches the current layer
                    if layer_name == layer:
                        # Save the full file path for the first match
                        symbology_layers[layer] = os.path.join(directory_path_PostFlood, filename)
                        break  # Stop once the first file for this layer is found

    for file in os.listdir(filtered_dir):
        parts = file.split('_')
        layer_name = parts[0]
        print(f"Layer {layer_name}: {symbology_layers.get(layer_name)}")
        try:
            # Read the reference symbology raster
            with rasterio.open(symbology_layers.get(layer_name)) as src:

                # Check if the symbology raster has a colormap
                if 1 in src.colormap(1):
                    print(f"Colormap found for {symbology_layers.get(layer_name)}")
                    src_colormap = src.colormap(1)  # Assuming symbology is in band 1
                else:
                    print(f"No colormap found for {symbology_layers.get(layer)}")
                    return  # Exit if no colormap exists

            # Open the merged raster in write mode
            filename = os.path.join(filtered_dir, file)
            print(f"Opening file: {filename}")
            with rasterio.open(filename, 'r+') as dst:
                # Write the color map to the first band
                dst.write_colormap(1, src_colormap)
                print(f"Colormap written to: {filename}")

        except Exception as e:
            print(f"Symbology not present for {symbology_layers.get(layer_name)}: {e}...skipping")

generate_filtered_rasters(days_since_reference, slope_threshold, slope_filter=True)

### View one of the layers

In [None]:
### Create a map and add the merged VEG-ANOM-MAX filtered raster
merged_dir = os.path.join(directory_path_PostFlood, 'merged')
filtered_dir = 'filtered'
time_filtered_dirname = 'time_filtered'
time_filtered_dir = os.path.join(merged_dir, filtered_dir, time_filtered_dirname)
filename = 'VEG-ANOM-MAX_merged_time_filtered.tif'
m = leafmap.Map(basemap="Esri.WorldImagery")
m.add_raster(os.path.join(merged_dir, filtered_dir, time_filtered_dirname, filename), opacity=1)

### Conclusions
The filtered DIST-HLS data are now available on your filesystem for use in GIS software or for further python post-processing.