In [1]:
import rioxarray as riox
import rasterio as rio
import xarray as xr
import os
import re
import numpy as np
import pandas as pd
import geopandas as gpd
from datetime import datetime, timedelta
from hlsstack.hls_funcs.masks import shp2mask
from tqdm import tqdm

In [2]:
# set the prefix (currently only works for cper)
prefix = 'cper'
# set the year for the template dataset for mask creation (suggest using most recent/current year)
yr = 2023
# flag for whether to process daily rather than weekly for most recent year (currently must be set to 'False')
keep_recent_days = False
# location of the cluster ('hpc' for CERES, 'local' for laptop)
cluster_loc = 'hpc'

In [3]:
if cluster_loc == 'local':
    #os.chdir(wkDIR)
    print('   setting up Local cluster...')
    from dask.distributed import LocalCluster, Client
    import dask
    cluster = LocalCluster(n_workers=8, threads_per_worker=2)
    client = Client(cluster)
    display(client)
    inDIR = 'data/'
    hlsDIR = 'data/hls_nrt/'
elif cluster_loc == 'hpc':
    from dask.distributed import LocalCluster, Client
    import dask_jobqueue as jq
    import dask
    from jupyter_server import serverapp
    wkDIR = '/project/cper_neon_aop/hls_nrt/'
    inDIR = '/90daydata/cper_neon_aop/hls_nrt/'
    hlsDIR = inDIR
    os.chdir(wkDIR)
    # get the server address for porting
    try:
        jupServer = [x for x in serverapp.list_running_servers()][0]
    except IndexError:
        # manually copy/paste the server address
        jupServer = {'base_url': '/node/ceres19-compute-98-eth.scinet.local/17710/'}
    print('   setting up cluster on HPC...')
    dask.config.set({'distributed.dashboard.link': jupServer['base_url'] + 'proxy/{port}/status'})
    partition='short',#'short','debug', 'mem', 'mem-low',
    num_processes = 4
    num_threads_per_processes = 2
    mem = 2.5*num_processes*num_threads_per_processes
    n_cores_per_job = num_processes*num_threads_per_processes
    clust = jq.SLURMCluster(queue=partition,
                            processes=num_processes,
                            cores=n_cores_per_job,
                            memory=str(mem)+'GB',
                            #interface='ib0',
                            interface='ens7f0',
                            # interface='enp24s0f0',
                            local_directory='$TMPDIR',
                            death_timeout=30,
                            walltime='02:00:00',
                            job_extra=["--output=/dev/null","--error=/dev/null"])
    client=Client(clust)
    #Scale Cluster 
    num_jobs=16
    clust.scale(jobs=num_jobs)
    try:
        client.wait_for_workers(n_workers=num_jobs*num_processes, timeout=60)
    except dask.distributed.TimeoutError as e:
        print(str(num_jobs*num_processes) + ' workers not available. Continuing with available workers.')
        #print(e)
        pass
    display(client)

  from distributed.utils import tmpfile


   setting up cluster on HPC...
64 workers not available. Continuing with available workers.


0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.SLURMCluster
Dashboard: /node/ceres19-compute-28-eth.scinet.local/15625/proxy/8787/status,

0,1
Dashboard: /node/ceres19-compute-28-eth.scinet.local/15625/proxy/8787/status,Workers: 4
Total threads: 8,Total memory: 18.64 GiB

0,1
Comm: tcp://10.1.5.128:39973,Workers: 4
Dashboard: /node/ceres19-compute-28-eth.scinet.local/15625/proxy/8787/status,Total threads: 8
Started: 1 minute ago,Total memory: 18.64 GiB

0,1
Comm: tcp://10.1.6.87:35023,Total threads: 2
Dashboard: /node/ceres19-compute-28-eth.scinet.local/15625/proxy/46025/status,Memory: 4.66 GiB
Nanny: tcp://10.1.6.87:41013,
Local directory: /local/bgfs/sean.kearney/10469228/dask-worker-space/worker-0d4ge4cs,Local directory: /local/bgfs/sean.kearney/10469228/dask-worker-space/worker-0d4ge4cs

0,1
Comm: tcp://10.1.6.87:41955,Total threads: 2
Dashboard: /node/ceres19-compute-28-eth.scinet.local/15625/proxy/33413/status,Memory: 4.66 GiB
Nanny: tcp://10.1.6.87:40719,
Local directory: /local/bgfs/sean.kearney/10469228/dask-worker-space/worker-7rd_qpbs,Local directory: /local/bgfs/sean.kearney/10469228/dask-worker-space/worker-7rd_qpbs

0,1
Comm: tcp://10.1.6.87:34089,Total threads: 2
Dashboard: /node/ceres19-compute-28-eth.scinet.local/15625/proxy/41559/status,Memory: 4.66 GiB
Nanny: tcp://10.1.6.87:38029,
Local directory: /local/bgfs/sean.kearney/10469228/dask-worker-space/worker-lp8h4pwb,Local directory: /local/bgfs/sean.kearney/10469228/dask-worker-space/worker-lp8h4pwb

0,1
Comm: tcp://10.1.6.87:38695,Total threads: 2
Dashboard: /node/ceres19-compute-28-eth.scinet.local/15625/proxy/35105/status,Memory: 4.66 GiB
Nanny: tcp://10.1.6.87:44569,
Local directory: /local/bgfs/sean.kearney/10469228/dask-worker-space/worker-qvwgx0vj,Local directory: /local/bgfs/sean.kearney/10469228/dask-worker-space/worker-qvwgx0vj


In [4]:
# create output path
outPATH = os.path.join(inDIR, 'gcloud', 'hls_' + prefix + '_means.csv')

# load existing data if it has been previously computed
if os.path.exists(outPATH):
    df_out = pd.read_csv(outPATH, parse_dates=[0])
else:
    df_out = None

In [5]:
# open existing HLS dataset from disk for year specified above as template
ds = riox.open_rasterio(os.path.join(inDIR, 'gcloud', 'hls_' + prefix + '_' + str(yr) + '_gcloud.nc'), masked=True)

# open existing long-term average NDVI (calculated from earth engine)
ds_ndvi_lta = riox.open_rasterio(os.path.join(inDIR, 'ee_lta', prefix + '_ee_ndvi_landsat_wkly_lta.nc'), masked=True)
# set meaningless date for long-term average data to allow overlay in the app
ds_ndvi_lta['date'] = [datetime.strptime(re.sub('2020', '2099', str(x)),'%Y-%m-%d %H:%M:%S') for x in ds_ndvi_lta['date'].values]
# reset the index of long-term data to match the HLS data
ds_ndvi_lta = ds_ndvi_lta.reindex({'y': ds.y, 'x': ds.x}, method='nearest', tolerance=30)

In [6]:
# load CPER pasture data and covert to mask (currently only works for CPER)
if prefix == 'cper':
    cper_f = 'data/ground/cper_pastures_2017_dissolved.shp'
    cper = gpd.read_file(cper_f).to_crs(ds.rio.crs.to_epsg())
    cper_info = cper[['Pasture', 'geometry']].reset_index(drop=True).reset_index().rename(columns={'index': 'id'})
    past_dict = {row.id+1: row.Pasture for _, row in cper_info.iterrows()}
    past_dict[0] = 'UNK'
    cper_mask_shp = [(row.geometry, row.id+1) for _, row in cper_info.iterrows()]
    cper_mask = shp2mask(shp=cper_mask_shp, 
                         transform=ds.rio.transform(), 
                         outshape=ds['NDVI'].shape[1:], 
                         xr_object=ds['NDVI'])
    past_mask = np.array([past_dict[i] for i in cper_mask.values.flatten()]).reshape(cper_mask.shape)

# assign pasture mask to HLS dataset
ds = ds.assign(Pasture=(['y', 'x'], past_mask)).chunk({'y': 50, 'x': 50})
ds = ds.set_coords('Pasture')

In [7]:
# get long-term average NDVI data as a dataframe
df_ndvi_lta = ds_ndvi_lta.groupby(ds['Pasture']).mean(dim='stacked_y_x').to_dataframe().reset_index().drop(columns='spatial_ref')
df_ndvi_lta['Year'] = '30-yr avg.'

In [8]:
# create list of potential years to be analyzed
yr_list = [2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023]

# loop through each year and process weekly HLS data
for idx, yr_i in enumerate(tqdm(yr_list, miniters=1)):
    # set Jan. date of first week
    #mon_day = (8 - datetime(2020, 1, 1).weekday()) % 7
    mon_day = 6
    # create list of dates of each week
    yr_dates_tmp = [datetime(yr_i, 1, mon_day) + timedelta(weeks=w) for w in range(52)]
    # skip this year if already completely processed and saved to disk
    if df_out is not None and pd.Series(yr_dates_tmp).isin(df_out['date']).all():
        print('skipping year - already in saved output')
        continue
    # process weekly means for year
    else:
        # remove any existing saved data for process year
        df_out = df_out[df_out['date'].dt.isocalendar().year != yr].copy()
        # use template year if process year matches
        if yr_i == yr:
            ds_i = ds
        # open dataset for year 
        else:
            ds_i = riox.open_rasterio(os.path.join(inDIR, 'gcloud', 'hls_' + prefix + '_' + str(yr_i) + '_gcloud.nc'), masked=True)
        # reformat data for dataset
        ds_i['date'] = [datetime.strptime(str(x), '%Y-%m-%d %H:%M:%S') for x in ds_i['date'].values]

        # update the date range to match the dataset
        yr_dates_tmp = [x for x in yr_dates_tmp if (x <= pd.to_datetime(ds_i['NDVI'].date.max().values) + timedelta(days=1)) and 
                        (x >= pd.to_datetime(ds_i['NDVI'].date.min().values) - timedelta(days=1))]
        # get weekly value based on weekly date for each vegetation variable
        if (yr_i != yr_list[-1]) or not keep_recent_days:
            ds_ndvi_yr_wkly = ds_i['NDVI'].sel(date=yr_dates_tmp, method='nearest', tolerance=timedelta(days=1), drop=True)
            ds_bm_yr_wkly = ds_i['Biomass'].sel(date=yr_dates_tmp, method='nearest', tolerance=timedelta(days=1), drop=True)
            ds_bare_yr_wkly = ds_i['BARE'].sel(date=yr_dates_tmp, method='nearest', tolerance=timedelta(days=1), drop=True)
            ds_sd_yr_wkly = ds_i['SD'].sel(date=yr_dates_tmp, method='nearest', tolerance=timedelta(days=1), drop=True)
            ds_green_yr_wkly = ds_i['GREEN'].sel(date=yr_dates_tmp, method='nearest', tolerance=timedelta(days=1), drop=True)
            ds_litt_yr_wkly = ds_i['LITT'].sel(date=yr_dates_tmp, method='nearest', tolerance=timedelta(days=1), drop=True)
            # merge all individual vegetation variables together
            df_yr_wkly = xr.merge([ds_ndvi_yr_wkly,
                                   ds_bm_yr_wkly,
                                   ds_bare_yr_wkly,
                                   ds_sd_yr_wkly,
                                   ds_green_yr_wkly,
                                   ds_litt_yr_wkly]).groupby(
                ds['Pasture']).mean(
                dim='stacked_y_x').to_dataframe().reset_index().drop(
                columns='spatial_ref')
        # get daily values, because specified with 'keep_recent_days' parameter and because most recent date
        else:
            df_yr_wkly = xr.merge([ds_i['NDVI'],
                               ds_i['Biomass'],
                               ds_i['BARE'],
                               ds_i['SD'],
                               ds_i['GREEN'],
                               ds_i['LITT']]).groupby(
            ds['Pasture']).mean(
            dim='stacked_y_x').to_dataframe().reset_index().drop(
            columns='spatial_ref')
        # add year to the output dataframe
        df_yr_wkly['Year'] = str(yr_i)
        # create output dataframe if it doesn't exist
        if df_out is None and idx == 0:
            df_out = df_yr_wkly.copy()
        # merge yearly dataset to full output dataset
        else:
            df_out = pd.concat([df_out, df_yr_wkly])

  0%|          | 0/11 [00:00<?, ?it/s]

skipping year - already in saved output
skipping year - already in saved output
skipping year - already in saved output
skipping year - already in saved output
skipping year - already in saved output
skipping year - already in saved output
skipping year - already in saved output
skipping year - already in saved output
skipping year - already in saved output
skipping year - already in saved output


100%|██████████| 11/11 [00:16<00:00,  1.46s/it]


In [9]:
# add long-term NDVI to output if not already there
if not df_ndvi_lta['date'].isin(df_out['date'][df_out['NDVI'].notnull()]).all():
    print('adding long-term average NDVI to output')
    df_out = pd.concat([df_ndvi_lta, df_out])

In [10]:
# get AOI-wide average as a separate dataframe
df_out_aoi = df_out.groupby('date').mean().reset_index()

In [11]:
# remove any existing AOI-wide averages
df_out = df_out[df_out['Pasture'] != prefix].copy()
# name the AOI-wide average based on 'prefix' parameter
df_out_aoi['Pasture'] = prefix
# rename the long-term average for the AOI-wide average
df_out_aoi['Year'] = df_out_aoi['date'].dt.isocalendar().year.transform(lambda x: '30-yr avg.' if x == 2099 else str(x))
# add the AOI-wide average to the output
df_out = pd.concat([df_out, df_out_aoi])

In [12]:
# reset the index after merge
df_out = df_out.reset_index(drop=True)

In [13]:
# get the sum of fractional cover
df_out['cov_sum'] = df_out[['BARE', 'SD', 'GREEN', 'LITT']].sum(axis=1)

# make sure pasture-scale means of fractional cover sum to 1
for c in tqdm(['BARE', 'SD', 'GREEN', 'LITT']):
    df_out[c] = df_out.groupby(['date', 'Year', 'Pasture']).apply(lambda x: x[c]/x['cov_sum']).reset_index(level=[0, 1, 2])[0]

100%|██████████| 4/4 [00:36<00:00,  9.13s/it]


In [14]:
# drop the temporary column, no longer needed
df_out = df_out.drop(columns=['cov_sum'])

In [15]:
# round all variables for better readability later
df_out[['NDVI', 
        'Biomass',
        'BARE',
        'SD', 
        'GREEN',
        'LITT']] = df_out.transform({'NDVI': lambda x: np.round(x, 3),
                  'Biomass': lambda x: np.round(x, 0),
                  'BARE': lambda x: np.round(x * 100, 1),
                  'SD': lambda x: np.round(x * 100, 1),
                  'GREEN': lambda x: np.round(x * 100, 1),
                  'LITT': lambda x: np.round(x * 100, 1)})

In [16]:
# delete output if it already exists
if os.path.exists(os.path.join(inDIR, 'gcloud', 'hls_' + prefix + '_means.csv')):
    os.remove(os.path.join(inDIR, 'gcloud', 'hls_' + prefix + '_means.csv'))
# save output to disk
df_out.to_csv(os.path.join(inDIR, 'gcloud', 'hls_' + prefix + '_means.csv'), index=False)

In [17]:
# set permissions to be read/write/execute for all users
os.chmod(os.path.join(inDIR, 'gcloud', 'hls_' + prefix + '_means.csv'), 0o777)