In [1]:
"""
This script was developed to parallel process preformatted time series of input data needed for
the Kljun et. al 2d flux footprint prediction code and ultimately create daily-ETo-weighted
footprint georeferenced footprint rasters.

Checks are performed on the input data to handle data quality issues. The weighting method
uses normalized hourly proportions of ASCE ETo computed from NLDAS v2 data for the closest cell.
NLDAS data is automatically downloaded using OpenDAP given Earthdata login info. Only days with
5 or more hours of data (only from hours between 6:00AM to 8:00 PM) must exist in a day.
Checks are performed to ensure final weighting procedure was successful at different steps of
the process.

This script is not intended to be used by others but to document a workflow that was employed for
scientific purposes.
"""
from pathlib import Path

import sys
import numpy as np
import pandas as pd
import rasterio
import refet
import pyproj as proj
import xarray
import requests
import multiprocessing as mp
__author__='John Volk'

In [2]:
#sys.path.append("//")
sys.path.append("../../../Micromet")
import micromet
from micromet.volk import ffp_climatology as ffp
from micromet import AmerifluxDataProcessor

In [3]:
# read metadata that has each sites' elevation used in ETr/ETo calcs
AMF_meta_path = Path('path/to/site_metadata')
AMF_meta = pd.read_csv(AMF_meta_path, index_col='SITE_ID')

FileNotFoundError: [Errno 2] No such file or directory: 'path\\to\\site_metadata'

In [None]:

# specify path with input CSV files for each station with
# input time series of needed data, e.g. zm, u_star, L,...
in_dir = Path('dir/with/input')
hourly_files = list(in_dir.glob('*.csv'))

In [3]:
import configparser
# load initial flux data
station = 'US-UTW'
config_path = f'../../station_config/{station}.ini'
config = configparser.ConfigParser()
config.read(config_path)

spath = "../../secrets/config.ini"
sconfig = configparser.ConfigParser()
sconfig.read(spath)

ed_pass = sconfig['NLDAS']['pw']
ed_user = sconfig['NLDAS']['user']

In [None]:
import datetime

#nldas_out_dir = Path('C:/Users/paulinkenbrandt/Documents/GitHub/MicroMet/Notebooks/Footprints')


for date in pd.date_range(start='2022-01-01', end='2025-12-31',freq='h'):
    hour = date.hour
    if 5 <= hour <= 21:
        micromet.download_nldas(date,
                           hour,
                           ed_user,
                           ed_pass,)

In [None]:
def runner(path, ed_user, ed_pass):
    """
    Given path to time series of site hourly (or finer) input data,
    compute daily ETo weighted footprint rasters.

    Requires NASA Earthdata username and password to download NLDAS-v2
    primary forcing at point locations for estimated ASCE short ref. ET.
    """

    df, latitude, longitude = micromet.read_compiled_input(path)
    station = path.stem
    elevation = AMF_meta.loc[station, 'station_elevation']

    station_coord = (longitude, latitude)
    # get EPSG code from lat,long, convert to UTM
    EPSG=32700-np.round((45+latitude)/90.0)*100+np.round((183+longitude)/6.0)
    EPSG = int(EPSG)
    in_proj = proj.Proj(init='EPSG:4326')
    out_proj = proj.Proj(init='EPSG:{}'.format(EPSG))
    (station_x,station_y) = proj.transform(in_proj,out_proj,*station_coord)
    print('original coordinates:',station_x,station_y)
    # move coord to snap centroid to 30m grid, minimal distortion
    station_x, station_y = micromet.snap_centroid(station_x, station_y)

    #Other model parameters
    h_s = 2000. #Height of atmos. boundary layer [m] - assumed
    dx = 30. #Model resolution [m]
    origin_d = 300. #Model bounds distance from origin [m]
    #modify if needed
    start_hr = 6 # hours from 1 to 24
    end_hr = 18
    hours_zero_indexed = np.arange(start_hr-1,end_hr)
    hours_one_indexed = np.arange(start_hr,end_hr+1)

    n_hrs = len(hours_zero_indexed)

    nldas_out_dir = Path('NLDAS_data')
    if not nldas_out_dir.is_dir():
        nldas_out_dir.mkdir(parents=True, exist_ok=True)

    out_dir = Path('All_output')/'AMF'/f'{station}'

    if not out_dir.is_dir():
        out_dir.mkdir(parents=True, exist_ok=True)

    #Loop through each day in the dataframe
    for date in df.index.date:
        #Subset dataframe to only values in day of year
        print(f'Date: {date}')
        temp_df = df[df.index.date == date]
        temp_df=temp_df.between_time(f'{start_hr:02}:00', f'{end_hr:02}:00')
        # check on n hours per day
        if len(temp_df) < 5:
            print(f'Less than 5 hours of data on {date}, skipping.')
            continue

        new_dat = None

        out_f = out_dir/ f'{date}.tif'

        final_outf = out_dir/f'{date.year}-{date.month:02}-{date.day:02}_weighted.tif'
        if final_outf.is_file():
            print(f'final daily weighted footprint already wrote to: {final_outf}\nskipping.')
            continue # do not overwrite date/site raster

        # make hourly band raster for the day
        for indx, hour in enumerate(hours_zero_indexed):

            band = indx + 1
            print(f'Hour: {hour}')

            try:
                temp_line = temp_df.loc[temp_df.index.hour == hour,:]
                if temp_line.empty:
                    print(f'Missing all data for {date,hour} skipping')
                    continue
                zm = temp_line.zm.values - temp_line.d.values
                z0 = temp_line.z0.values if 'z0' in temp_line.columns else None
                u_mean = temp_line.u_mean.values if 'u_mean' in temp_line.columns else None
                if u_mean is not None: z0 = None

                #Calculate footprint
                temp_ffp = ffp(domain=[-origin_d,origin_d,-origin_d,origin_d],dx=dx,dy=dx,
                                        zm=zm, h=h_s, rs=None, z0=z0,
                                        ol=temp_line['L'].values,sigmav=temp_line['sigma_v'].values,
                                        ustar=temp_line['u_star'].values, umean=u_mean,
                                        wind_dir=temp_line['wind_dir'].values,
                                        crop=0,fig=0,verbosity=0)
                f_2d = np.array(temp_ffp['fclim_2d'])
                x_2d = np.array(temp_ffp['x_2d']) + station_x
                y_2d = np.array(temp_ffp['y_2d']) + station_y
                f_2d = f_2d*dx**2

                #Calculate affine transform for given x_2d and y_2d
                affine_transform = micromet.find_transform(x_2d,y_2d)

                #Create data file if not already created
                if new_dat is None:
                    #print(f_2d.shape)
                    new_dat = rasterio.open(
                        out_f,'w',driver='GTiff',dtype=rasterio.float64,
                        count=n_hrs,height=f_2d.shape[0],width=f_2d.shape[1],
                        transform=affine_transform, crs=out_proj.srs,
                        nodata=0.00000000e+000
                    )

            except Exception as e:
                print(f'Hour {hour} footprint failed, band {band} not written.')

                temp_ffp = None

                continue

            #Mask out points that are below a % threshold (defaults to 90%)
            f_2d = micromet.mask_fp_cutoff(f_2d)

            #Write the new band
            new_dat.write(f_2d, indx+1)

            #Update tags with metadata
            tag_dict = {'hour':f'{hour*100:04}',
                        'wind_dir':temp_line['wind_dir'].values,
                        'total_footprint':np.nansum(f_2d)}

            new_dat.update_tags(indx+1,**tag_dict)

        #Close dataset if it exists
        try:
            new_dat.close()
        except:
            print(f'ERROR: could not write footprint for site: {station}:\nto: {out_f}')
            continue # skip to next day...

        # for NLDAS data from pymetric
        for hour in hours_zero_indexed:
            micromet.download_nldas(date, hour, ed_user, ed_pass)
            nldas_df = micromet.calc_nldas_refet(date, hour, nldas_out_dir, latitude, longitude, elevation, zm)

        # do hourly weighting - do not necessarily need to do this all in the same loop
        src = rasterio.open(out_f)
        # hourly fetch scalar sums
        global_sum = np.zeros(shape=(n_hrs))
        for hour in range(1,n_hrs+1):
            arr = src.read(hour)
            global_sum[hour-1] = arr.sum()
        # normalized fetch rasters
        normed_fetch_rasters = []
        for hour in range(1,n_hrs+1):
            arr = src.read(hour)
            tmp = arr / global_sum[hour-1]
            if np.isnan(tmp).all():
                tmp = np.zeros_like(tmp)
            normed_fetch_rasters.append(tmp)

        out_dir = Path('All_output')/'AMF'/f'{station}'

        if not out_dir.is_dir():
            out_dir.mkdir(parents=True, exist_ok=True)

        # this one is saved under the site_ID subdir
        nldas_ts_outf = out_dir/ f'nldas_ETr.csv'
        # get NLDAS ts calc fraction of daily ETo
        nldas_df = pd.read_csv(nldas_ts_outf, index_col='date', parse_dates=True).sort_index()
        ETo = nldas_df.loc[nldas_df.index.date == date, 'ETo']
        min_max_normed_ETo = (ETo-min(ETo))/(max(ETo)-min(ETo)) # deal with negative ETo value proportions
        # take out hours where footprint does not exist
        i = 0
        for e, s in zip(min_max_normed_ETo.values, global_sum):
            if s == 0:
                min_max_normed_ETo.iloc[i] = 0
            i+=1
        # after removing hours now calculate hourly proportions
        nldas_df.loc[nldas_df.index.date == date, 'ETo_hr_props'] = min_max_normed_ETo / min_max_normed_ETo.sum()
        # weight normed hourly fetch rasters by hourly ETo proportions
        for i,hour in enumerate(hours_zero_indexed): # everything here is hours 0-23
            normed_fetch_rasters[i] =\
                normed_fetch_rasters[i]*nldas_df.loc[
                    (nldas_df.index.date == date) & (nldas_df.index.hour == hour), 'ETo_hr_props'
            ].values[0]
        # save hourly proportions to time series file
        nldas_df.round(4).to_csv(nldas_ts_outf)

        # Last calculation, sum the weighted hourly rasters to a single daily fetch raster
        final_footprint = sum(normed_fetch_rasters)
        assert np.isclose(final_footprint.sum(), 1), f'check 1 failed! {final_footprint.sum()}\n{temp_line}'
        # next check
        for hour, raster in enumerate(normed_fetch_rasters):
            assert np.isclose(
                nldas_df.loc[
                    (nldas_df.index.date == date) & (nldas_df.index.hour == hour+start_hr-1), 'ETo_hr_props'
                ].values[0], raster.sum()
            ), f'check 2 failed for hour {hour+start_hr-1}!!!'

        # finally, write daily corrected raster with UTM zone reference
        corr_raster_path = final_outf
        out_raster = rasterio.open(
            corr_raster_path,'w',driver='GTiff',dtype=rasterio.float64,
            count=1,height=final_footprint.shape[0],width=final_footprint.shape[1],
            transform=src.transform, crs=out_proj.srs, nodata=0.00000000e+000
        )
        out_raster.write(final_footprint,1)
        out_raster.close()


In [None]:
pool = mp.Pool(processes=8)
pool.map(runner,hourly_files)