CCDR Hazard Analysis Notebook

In [1]:
from common import *  # import necessary packages

%matplotlib inline

In [2]:
country_code_map = {
    "NPL": 175,
    "PAK": 188 # TODO: Add others
}

In [3]:
def damage_factor(x):
    """A polynomial fit to average damage across multiple sectors relative 
    to water depth in meters in Asia.

    The sectors are commercial, industry, transport, agriculture, infrastructure and residential.

    Values are capped between 0 and 1, where values >= 6m = 1

    References
    ----------
    .. [1] JRC, 2017
    """
    return np.maximum(0.0, np.minimum(1.0, 0.00723*x**3 - 0.1*x**2 + 0.506*x))


In [4]:
def preview_impact_func(bt):
    steps = np.arange(0, 6, 0.1)
    with output:
        output.clear_output()

        fig, ax = plt.subplots()
        line, = ax.plot([damage_factor(x) for x in steps])
        ax.grid(True)
        
        label_steps = range(0, len(steps)+10, 10)
        ax.xaxis.set_ticks(label_steps)
        ax.xaxis.set_ticklabels([i / 10 for i in label_steps])
        ax.set_xlabel("Depth")
        ax.set_ylabel("Impact Factor")
        
        display(fig)

In [5]:
def run_analysis(rb):
    with output:
        output.clear_output()
        print("Running analysis...")
        rb.disabled = True
        preview_impact_button.disabled = True

    # Get user input
    country = country_dd.value
    exp_cat = exp_cat_dd.value
    time_horizon = time_horizon_dd.value
    rcp_scenario = rcp_scenario_dd.value

    target_ADM = adm_dd.value
    adm_name = target_ADM.replace('_', '')

    agg_criteria = agg_dd.value
    min_haz_threshold = min_haz_slider.value

    valid_RPs = [10, 100, 1000]

    # Testing data file locations
    # TODO: Temp data store, to be replaced with a config spec (.env file?) before deployment

    # pop_fn = f"{DATA_DIR}/cache/{fid}_{cache_fn}"
    pop_fn = f"{DATA_DIR}/cache/WorldPop20_{country}_ppp_UNadj_constrained.tif"

    # Flood data location (TODO: replace with pointer to
    #  downloaded data store)
    flood_RP_data_loc = f"{DATA_DIR}"

    # Load or save ISO3 country list
    iso3_path = os.path.join(DATA_DIR, "cache/iso3.json")
    if not os.path.exists(iso3_path):
        resp = json.loads(requests.get(f"https://www.worldpop.org/rest/data/pop/wpgp?iso3={country}").text)

        with open(iso3_path, 'w') as outfile:
            json.dump(resp, outfile)
    else:
        with open(iso3_path, 'r') as infile:
            resp = json.load(infile)


    # TODO: User to select population data set
    # Target population data files are extracted from the JSON list downloaded above
    metadata = resp['data'][1]
    data_src = metadata['files']

    # Save population data to cache location
    for data_fn in tqdm(data_src):
        fid = metadata['id']
        cache_fn = os.path.basename(data_fn)

        # Look for indicated file in cache directory
        # Use the data file if it is found, but warn the user. 
        # (if data is incorrect or corrupted, they should delete it from cache)
        if f"{fid}_{cache_fn}" in os.listdir(CACHE_DIR):
            warnings.warn(f"Found {fid}_{cache_fn} in cache, skipping...")
            continue

        # Write to cache file if not found
        with open(os.path.join(CACHE_DIR, "{fid}_{cache_fn}"), "wb") as handle:
            response = requests.get(data_fn)
            handle.write(response.content)


    # Run analysis
    
    # Open population dataset
    pop_data = rxr.open_rasterio(pop_fn)

    # Indicate -1 values as representing no data.
    pop_data.rio.write_nodata(-1, inplace=True)

    # Load ADM2 based on country code value
    try:
        adm_dataset = gpd.read_file(os.path.join(DATA_DIR, "SAR_ADM.gpkg"), layer=f"{country}_{adm_name}")
    except ValueError:
        # Using national ADM layer if the regional one is not avaialble
        adm_dataset = gpd.read_file(os.path.join(DATA_DIR, f"{country}_ADM.gpkg"), layer=f"{country}_{adm_name}")

    
    adm_data = adm_dataset.loc[adm_dataset.ADM0_CODE == country_code_map[country], :]

    # Prep result structure
    pop_sum_cols = [f"RP{rp_i}_pop_tot" for rp_i in valid_RPs]
    EAI_cols = [f"RP{rp_i}_EAI" for rp_i in valid_RPs]
    
    # Get all ADM code/name columns to save with results
    adm_cols = adm_data.columns
    all_adm_codes = adm_data.columns.str.contains("_CODE")
    all_adm_names = adm_data.columns.str.contains("_NAME")
    
    all_adm_name_tmp = adm_cols[all_adm_names].tolist()
    all_adm_code_tmp = adm_cols[all_adm_codes].to_list()

    result_df = adm_data.loc[:, all_adm_code_tmp + all_adm_name_tmp + ["geometry"]]
    result_df.loc[:, pop_sum_cols + EAI_cols] = 0

    for rp in valid_RPs:
        
        # Get total population for each ADM2 region
        pop_per_ADM = gen_zonal_stats(vectors=adm_data["geometry"], raster=pop_fn, stats=["sum"])
        
        result_df[f"{adm_name}_Pop"] = [x['sum'] for x in pop_per_ADM]

        # Load corresponding flood dataset
        flood_data = rxr.open_rasterio(os.path.join(flood_RP_data_loc, f"{country}_RP{rp}.tif"))

        # Reproject and clip raster to same bounds as population data
        flood_data = flood_data.rio.reproject_match(pop_data)

        # Get raw array values for population and flood
        fld_array = flood_data[0].values
        fld_array[fld_array < min_haz_threshold] = np.nan  # Set values below min threshold to nan
        # fld_array[fld_array > max_haz_threshold] = max_haz_threshold  # Cap large values to maximum threshold value

        # Assign impact factor (this is F_i)
        # TODO: Change this function call to use the relevant function depending on `exp_cat_dd`
        if exp_cat_dd.value == 'population':
            impact_array = damage_factor(fld_array)
        elif exp_cat_dd.value == 'built_up':
            pass
        elif exp_cat_dd.value == 'agri':
            pass
        else:
            ValueError("Unknown exposure category")

        # Create raster from array
        impact_rst = xr.DataArray(np.array([impact_array]).astype(np.float32), 
                                  coords=flood_data.coords, 
                                  dims=flood_data.dims)
        
        if save_inter_rst_chk.value:
            impact_rst.rio.to_raster(os.path.join(OUTPUT_DIR, "impact.tif"))

        # Calculate affected population in ADM        
        # Filter down to valid areas
        valid_impact_areas = impact_rst.values > 0
        affected_pop = pop_data.where(valid_impact_areas)  # Get total population in affected areas
        affected_pop = affected_pop.where(affected_pop > 0)  # Out of the above, get areas that have people
        
        if save_inter_rst_chk.value:
            affected_pop.rio.to_raster(os.path.join(OUTPUT_DIR, f"affected_pop_{rp}.tif"))
        
        # Calculate degree on impact over Exposure category
        impact_pop = affected_pop * impact_rst.where(valid_impact_areas)  # Get impacted population in affected areas
        
        if save_inter_rst_chk.value:
            impact_pop.rio.to_raster(os.path.join(OUTPUT_DIR, f"impact_pop_{rp}.tif"))
        
        impact_pop_per_ADM = gen_zonal_stats(vectors=adm_data["geometry"], raster=impact_pop.data[0], 
                                             stats=["sum"], affine=impact_pop.rio.transform(), nodata=0)
        result_df[f"RP{rp}_pop_imp"] = [x['sum'] for x in impact_pop_per_ADM]
        
        # Sum of impacted population for entire country
        # result_df[f"RP{rp}_pop"] = np.nansum(impact_pop[0].data)

        # Probability of return period
        # Essentially the same as 1/RP, but accounts for cases where RP == 1
        freq = 1 - np.exp(-1/rp)

        # EAI_i := F_i * freq
        EAI_i = impact_pop.where(valid_impact_areas) * freq
        

        if save_inter_rst_chk.value:
            # Save intermediate file if requested
            EAI_i.rio.to_raster(os.path.join(OUTPUT_DIR, f"EAI_{rp}.tif"))

        # Get affected population per ADM
        affected_pop_per_ADM = gen_zonal_stats(vectors=adm_data["geometry"], raster=affected_pop.data[0], 
                                            stats=["sum"], affine=affected_pop.rio.transform(), nodata=0)
        result_df[f"RP{rp}_pop_tot"] = [x['sum'] for x in affected_pop_per_ADM]


        EAI_per_ADM = gen_zonal_stats(vectors=adm_data["geometry"], raster=EAI_i.data[0],
                                    stats=["sum"], affine=EAI_i.rio.transform(), nodata=0)
        result_df[f"RP{rp}_EAI"] = [x['sum'] for x in EAI_per_ADM]

    # Sum all EAI to get total EAI across all RPs
    result_df.loc[:, "Pop_EAI"] = result_df.loc[:, result_df.columns.str.contains('_EAI')].sum(axis=1)

    # Calculate Pop_EAI% (Percent affected population per year)
    result_df.loc[:, "Pop_EAI%"] = (result_df.loc[:, "Pop_EAI"] / result_df.loc[:, f"{adm_name}_Pop"]) * 100.0

    # Aggregated to ADM1
    # agg_func = getattr(np, agg_criteria)
    # result_df.loc[:, f"ADM1_agg_{agg_criteria}"] = agg_func(result_df.loc[:, "Pop_EAI%"])
    
    # Round to two decimal places to avoid giving the impression of high precision
    result_df = result_df.round(2)
    
    # Reorder - need ADM code, name, and pop at the front regardless of ADM level
    cols = result_df.columns
    adm_pop = cols.str.contains("_Pop")
    adm_pop = cols[adm_pop].tolist()

    result_df = result_df.loc[:, all_adm_code_tmp + all_adm_name_tmp + adm_pop +
                                  ["RP10_pop_tot", "RP100_pop_tot", "RP1000_pop_tot", 
                                  "RP10_pop_imp", "RP100_pop_imp", "RP1000_pop_imp", 
                                  "RP10_EAI", "RP100_EAI", "RP1000_EAI", 
                                  "Pop_EAI", "Pop_EAI%", "geometry"]]

    # Write table of total population in each class, in each ADM2
    df_cols = result_df.columns
    result_df.loc[:, df_cols[~df_cols.isin(['geometry'])]].fillna(0).to_csv(os.path.join(OUTPUT_DIR, f"{country}_{adm_name}_flood_EAI.csv"), index=False)

    # Export geopackage
    result_df.to_file(os.path.join(OUTPUT_DIR, f"{country}_{adm_name}_flood_EAI.gpkg"))

    with output:
        print("Finished analysis.")
        rb.disabled = False
    
    if preview_chk.value:
        with output:
            display(result_df.explore(column='Pop_EAI', cmap='plasma'))

    

In [6]:
# Data option widgets
country_dd = widgets.Dropdown(
    options=[('Nepal', 'NPL'), ('Pakistan', 'PAK'),('Bangladesh', 'BGD'),],
    value='NPL',
    description='Country:',
)

exp_cat_dd = widgets.Dropdown(
    options=[("Population", "population"), ("Built-up", "built_up"), ("Agriculture", "agri")],
    value='population',
    description='Exposure Category:',
)

time_horizon_dd = widgets.Dropdown(
    options=[2050, 2080],
    value=2050,
    description='Time Horizon:',
)

rcp_scenario_dd = widgets.Dropdown(
    options=["2.6", "4.5", "6.5", "8.5"],
    value="4.5",
    description='RCP Scenario:',
)

adm_dd = widgets.Dropdown(
    options=['ADM1', 'ADM2', 'ADM3'],
    value='ADM2',
    description='ADM Level:',
)

agg_dd = widgets.Dropdown(
    options=['mean', 'max'],
    value='mean',
    description='Aggregation method:',
    tooltip='Method to aggregate up to ADM1',
)

min_haz_slider = widgets.FloatSlider(
    value=0.5,
    min=0.01,
    max=10.0,
    step=0.01,
    description="Minimum Threshold:",
)


# User action widgets
save_inter_rst_chk = widgets.Checkbox(
    value=False,
    description='Export Intermediate Rasters',
    tooltip='Save rasters generated between each step (saves to nominated output directory)',
    disabled=False,
    indent=False
)

# Display results after runs
preview_chk = widgets.Checkbox(
    value=True,
    description='Preview results',
    tooltip='Display result after analysis',
    disabled=False
)


# Run button to perform analysis
run_button = widgets.Button(
    description='Run Analysis',
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Click to run analysis with selected options',
    # icon='check' # (FontAwesome names without the `fa-` prefix)
)


# Button to preview hazard impact function
preview_impact_button = widgets.Button(
    description='Preview Impact Function',
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Show preview of impact function',
    # icon='check' # (FontAwesome names without the `fa-` prefix)
)

reset_display_button = widgets.Button(
    description='Reset',
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Reset display',
    # icon='check' # (FontAwesome names without the `fa-` prefix)
)


def reset_display(bt):
    output.clear_output()
    run_button.disabled = False
    preview_impact_button.disabled = False

run_button.on_click(run_analysis)
preview_impact_button.on_click(preview_impact_func)
reset_display_button.on_click(reset_display)

# preview_button.on_click()

# class_range = range(3, 11)  # remember that python uses end-exclusive range, so this is 3-10
# selected_bin_edges = [0.5, 1, 1.5, 2, 2.5, 3]
# min_haz_threshold = np.min(selected_bin_edges)  # determine min/max values from user-selected edges
# max_haz_threshold = np.max(selected_bin_edges)
# selected_bin_edges += [np.inf] # add inf last to cover anything above max threshold.

# num_bins = len(selected_bin_edges)-1  # default number of bins, within the range of `class_range`

In [7]:
display(country_dd)
display(exp_cat_dd)
display(time_horizon_dd)
display(rcp_scenario_dd)
display(adm_dd)
# display(agg_dd)
display(min_haz_slider)

display(HBox([run_button, preview_chk, save_inter_rst_chk]), 
        preview_impact_button, reset_display_button)

output = widgets.Output()
display(output)

Dropdown(description='Country:', options=(('Nepal', 'NPL'), ('Pakistan', 'PAK'), ('Bangladesh', 'BGD')), value…

Dropdown(description='Exposure Category:', options=(('Population', 'population'), ('Built-up', 'built_up'), ('…

Dropdown(description='Time Horizon:', options=(2050, 2080), value=2050)

Dropdown(description='RCP Scenario:', index=1, options=('2.6', '4.5', '6.5', '8.5'), value='4.5')

Dropdown(description='ADM Level:', index=1, options=('ADM1', 'ADM2', 'ADM3'), value='ADM2')

FloatSlider(value=0.5, description='Minimum Threshold:', max=10.0, min=0.01, step=0.01)

HBox(children=(Button(description='Run Analysis', style=ButtonStyle(), tooltip='Click to run analysis with sel…

Button(description='Preview Impact Function', style=ButtonStyle(), tooltip='Show preview of impact function')

Button(description='Reset', style=ButtonStyle(), tooltip='Reset display')

Output()