# Pipeline

This notebook runs the pipeline which creates the sea ice indicators dataset.

## 0: Setup

Run the cell below before running any other cell in this notebook.

In [1]:
# User parameters
# Set begin and end years for running the pipeline
begin_year = 1978
end_year = 2019

# setup environment and handle all file-pathing
# Allow printing some things to the terminal
terminal_output = open("/dev/stdout", "w")
print("Executing pipeline Step 0 (setup)", file=terminal_output, flush=True)

import os
import time
from pathlib import Path

base_dir = Path(os.getenv("BASE_DIR"))
output_dir = Path(os.getenv("OUTPUT_DIR"))
scratch_dir = Path(os.getenv("SCRATCH_DIR"))

# raw NSIDC-0051 data dir
raw_0051_dir = base_dir.joinpath("nsidc_0051/raw")
raw_0051_dir.mkdir(exist_ok=True, parents=True)

# prepped NSIDC-0051 data dir
prepped_0051_dir = base_dir.joinpath("nsidc_0051/prepped")
prepped_0051_dir.mkdir(exist_ok=True, parents=True)

# smoothed daily time series data
smoothed_0051_fp = base_dir.joinpath(f"nsidc_0051/nsidc_0051_sic_{begin_year}-{end_year}_smoothed.nc")

# final output indicators
# final dataset starts in 1979
fubu_fp = output_dir.joinpath(f"arctic_seaice_fubu_dates_{begin_year + 1}-{end_year}.nc")

print("Pipeline Step 0 (setup) complete\n", file=terminal_output, flush=True)

## 1: Download NSIDC-0051 daily data

Download the NSIDC-0051 passive microwave sea ice concentration data from NSIDC.

In [3]:
# User parameters
# clobber - set to True to always re-download all files, 
#   regardless of presence in out_0051_dir. 
# Default is False, download only files already found.
clobber = False

print("Executing pipeline Step 1 (download NSIDC-0051)", file=terminal_output, flush=True)

import download_nsidc as dsic

version = "1"
time_start = f"{begin_year}-10-26T00:00:00Z"
time_end = f"{end_year}-12-31T23:59:59Z"
polygon = ""
filename_filter = "*"

dsic.cmr_url = "https://cmr.earthdata.nasa.gov"
dsic.urs_url = "https://urs.earthdata.nasa.gov"
dsic.cmr_page_size = 2000
dsic.cmr_file_url = (
    "{0}/search/granules.json?provider=NSIDC_ECS"
    "&sort_key[]=start_date&sort_key[]=producer_granule_id"
    "&scroll=true&page_size={1}".format(dsic.cmr_url, dsic.cmr_page_size)
)

urls = dsic.cmr_search(
    "NSIDC-0051",
    version,
    time_start,
    time_end,
    polygon=polygon,
    filename_filter=filename_filter,
)

# dunno what these new files are yet, but remove them
urls = [url for url in urls if "s.bin" not in Path(url).name]
# also filter out monthly data urls
urls = [url for url in urls if len(Path(url).name.split("_")[1]) == 8]  

if not clobber:
    # filter out already downloaded files
    local_fps = [fp.name for fp in list(raw_0051_dir.glob("*"))]
    urls = [url for url in urls if Path(url).name not in local_fps]
    print(
        f"Not overwriting existing NSIDC-0051 files. {len(urls)} files will be downloaded", 
        file=terminal_output, 
        flush=True
    )

if len(urls) > 0:
    results_di = dsic.cmr_download(urls, raw_0051_dir)
    print(f"NSIDC-0051 data written to {raw_0051_dir}", file=terminal_output, flush=True)
    
print("Pipeline Step 1 (download NSIDC-0051) complete\n", file=terminal_output, flush=True)

Querying for data:
	https://cmr.earthdata.nasa.gov/search/granules.json?provider=NSIDC_ECS&sort_key[]=start_date&sort_key[]=producer_granule_id&scroll=true&page_size=2000&short_name=NSIDC-0051&version=001&version=01&version=1&temporal[]=1978-10-26T00:00:00Z,2019-12-31T23:59:59Z&producer_granule_id[]=*&options[producer_granule_id][pattern]=true

Found 27770 matches.
...

NameError: name 'quit' is not defined

## 2: Convert NSIDC-0051 binary data files to GeoTIFF

Convert the raw binary files downloaded from NSIDC to GeoTIFFs for assembling a complete daily timeseries object. Writes converted GeoTIFFs to `$BASE_DIR/nsidc_0051/prepped`.

In [15]:
# User parameters
# set clobber to True to download even if NSIDC-0747 
#   dataset exists locally.
clobber = False
# set the number of CPUs to use for converting the 
ncpus = 32

print("Executing pipeline Step 2 (convert binary data to GeoTIFF)", file=terminal_output, flush=True)

from multiprocessing import Pool
from convert_nsidc import convert_bin_to_gtiff


# list filepaths from input dir
fps = [fp for fp in raw_0051_dir.glob("*.bin")]

# if clobber not specifed, check for existing files in out_dir
if not clobber:
    converted_fns = [fp.name.replace("v1-1", "v1.1").split(".tif")[0] for fp in prepped_0051_dir.glob("*")]
    fps = [fp for fp in fps if fp.name.split(".bin")[0] not in converted_fns]
    print(
        f"Not overwriting existing converted files. {len(fps)} files will be converted", 
        file=terminal_output, 
        flush=True
    )

if len(fps) > 0:
    print(f"Converting {len(fps)} files", file=terminal_output, end="...", flush=True)
    tic = time.perf_counter()
    args = [(fp, prepped_0051_dir) for fp in fps]
    with Pool(ncpus) as pool:
        out = pool.starmap(convert_bin_to_gtiff, args)
    print(f"done, {round(time.perf_counter() - tic)}s", file=terminal_output, flush=True)
    
print("Pipeline Step 2 (convert binary data to GeoTIFF) complete", file=terminal_output, flush=True)

Not overwriting existing converted files. 0 files will be converted


## 3: Make smoothed daily time series

Create a complete time series object of statistically smoothed daily SIC observations in preparation for the FUBU thresholding algorithm. Writes the smoothed output to `$BASE_DIR/nsidc_0051/nsidc_0051_sic_<begin year>-<end year>_smoothed.nc`.

In [6]:
# User parameters
# set the number of CPUs to use for creating the daily timeseries.
ncpus = 32

print("Executing pipeline Step 3 (make smoothed daily time series)", file=terminal_output, flush=True)
start_time = time.perf_counter()

# standard
import os
import time
import datetime as dt
from multiprocessing import Pool
from pathlib import Path
# non-standard
import numpy as np
import pandas as pd
import rasterio as rio
# project
import make_daily_timeseries as mds


fps = sorted(list(prepped_0051_dir.glob("*")))
data_times = [mds.make_datetimes(fp.name.split(".")[0].split("_")[1]) for fp in fps]

# stack the irregularly spaced data to a netcdf
print("Stack the irregularly spaced data to an xarray.DataSet", end="...", file=terminal_output, flush=True)
tic = time.perf_counter()

with rio.open(fps[0]) as template:
    meta = template.meta.copy()
    height, width = template.shape

arr = mds.stack_rasters(fps, ncpus=ncpus)
ds = mds.make_xarray_dset(arr.copy(), pd.DatetimeIndex(data_times), meta)
da = ds["sic"].copy()
print(f"done, {np.round(time.perf_counter() - tic)}s", file=terminal_output, flush=True)

# interpolate to daily
print("Interpolate to daily values", end="...", file=terminal_output, flush=True)
tic = time.perf_counter()

da_interp = da.resample(time="1D").asfreq()

# get a masks layer from the raw files.  These are all values > 250
# ------------ ------------ ------------ ------------ ------------ ------------ ------------
# 251 Circular mask used in the Arctic to cover the irregularly-shaped data
#       gap around the pole (caused by the orbit inclination and instrument swath)
# 252 Unused
# 253 Coastlines
# 254 Superimposed land mask
# 255 Missing data
# make a mask of the known nodata values when we start...
mask = (arr[0] > 250) & (arr[0] < 300)

# set masks to nodata
dat = da_interp.values.copy()

# make the nodata mask np.nan for computations
out_masked = []
for i in dat:
    i[mask] = np.nan
    out_masked = out_masked + [i]

# put the cleaned up data back into the stacked NetCDF
da_interp.data = np.array(out_masked)
da_interp.data = np.apply_along_axis(mds.interpolate, axis=0, arr=da_interp).round(4)
print(f"done, {np.round(time.perf_counter() - tic)}s", file=terminal_output, flush=True)

# spatially smooth the 2-D daily slices of data using a 
#   mean generic filter (without any aggregation)
print(f"Perform spatial smooth", end="...", file=terminal_output, flush=True)
tic = time.perf_counter()

footprint = np.array([[1, 1, 1], [1, 1, 1], [1, 1, 1]])

# 'run using multiprocessing -- YMMV this is a tad flaky at times.'
# ^^ comment from original author, not sure where flakiness manifested
# previously but left this note anyway.

# Some profiling revealed that paritioning the interpolated array
#   and looping over it with 10 cores on the dev machine (Atlas) was 
#   about optimal.
filter_ncpus = 10
filter_slices = mds.chunkit(da_interp.values.shape[0], filter_ncpus)
args = [(da_interp.values[sl], footprint) for sl in filter_slices]
with Pool(filter_ncpus) as pool:
    out = pool.starmap(mds.run_jit_mean_filter, args)

# mask the spatial smoothed outputs with the mask at each 2D slice.
smoothed = np.array([mds._maskit(i, mask) for i in np.concatenate(out)]).copy()
print(f"done, {np.round(time.perf_counter() - tic)}s", file=terminal_output, flush=True)

print("Perform three rounds of Hanning smoothing", end="...", file=terminal_output, flush=True)
tic = time.perf_counter()

n = 3  # perform 3 iterative smooths on the same series
for i in range(n):
    smoothed = np.apply_along_axis(mds.hanning_smooth, arr=smoothed, axis=0)
    print(f"pass {i + 1} complete", end="...", file=terminal_output)
print(f"done, {np.round(time.perf_counter() - tic)}s", file=terminal_output, flush=True)

print("Clean up data and re-mask", end="...", file=terminal_output, flush=True)
tic = time.perf_counter()

# make sure no values < 0, set to 0
smoothed[np.where((smoothed < 0) & (~np.isnan(smoothed)))] = 0

# make sure no values > 1, set to 1
smoothed[np.where((smoothed > 1) & (~np.isnan(smoothed)))] = 1

# mask it again to make sure the nodata and land are properly masked following hanning.
smoothed = np.array([mds._maskit(i, mask) for i in smoothed]).copy()
print(f"done, {np.round(time.perf_counter() - tic)}s", flush=True)

print("Write smoothed daily dataset to a single netCDF", end="...", file=terminal_output, flush=True)
# write it out as a NetCDF
out_ds = da_interp.copy(deep=True)
out_ds.values = smoothed.astype(np.float32)
out_ds = out_ds.to_dataset(name="sic")
out_ds.attrs = ds.attrs
# output encoding
encoding = out_ds.sic.encoding.copy()
encoding.update({"zlib": True, "comp": 5, "contiguous": False, "dtype": "float32"})
out_ds.sic.encoding = encoding

out_ds.to_netcdf(smoothed_0051_fp, format="NETCDF4")
print(
    f"done, {np.round(time.perf_counter() - tic)}s. Smoothed daily SIC time series written to {smoothed_0051_fp}", 
    file=terminal_output, 
    flush=True
)
print(
    ("Pipeline Step 3 (make smoothed daily timeseries) completed at "
     f"{dt.datetime.utcfromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')}"
     ", elapsed time: {round(time.perf_counter() - start_time) / 60}m\n"),
    file=terminal_output,
    flush=True
)

Stack the irregularly spaced data to an xarray.DataSet...done, 36.6s
Interpolate to daily values...done, 37.6s
Perform spatial smooth...done, 95.6s
Perform three rounds of Hanning smoothing...pass 1 complete...pass 2 complete...pass 3 complete...done, 103.0s
Smoothed dataset written to /workspace/Shared/Tech_Projects/SeaIce_NOAA_Indicators/project_data/nsidc_0051/nsidc_0051_sic_1978-2019_smoothed.nc


## 4. Compute Freeze-up / Break-up dates

Compute the freeze-up / break-up start/end dates from the smoothed SIC daily time series. 

In [2]:
# User parameters
# set the number of CPUs to use for creating the daily timeseries.
ncpus = 32

print("Executing pipeline Step 4 (compute freeze-up/break-up dates)", file=terminal_output, flush=True)
start_time = time.perf_counter()

# standard
import os
import time
from datetime import datetime
from functools import partial
from multiprocessing import Pool
from pathlib import Path
# non-standard
import numpy as np
import pandas as pd
import xarray as xr
from pyproj.crs import CRS
# project
import compute_fubu as cpf


# Open dataset of smoothed daily SIC
# load it so it processes a LOT faster plus it is small...
print("Load smoothed SIC daily time series data", end="...", file=terminal_output, flush=True)
tic = time.perf_counter()
ds = xr.load_dataset(smoothed_0051_fp)  

# slice the data to the full years... currently this is 1979-20**
ds_sic = ds.sel(time=slice("1979", str(end_year)))["sic"]
years = (
    ds_sic.time.to_index().map(lambda x: x.year).unique().tolist()[:-1]
)  # cant compute last year

# set all nodata pixels to np.nan
ds_sic.data[ds_sic.data > 1] = np.nan

# make a no data mask
mask = np.isnan(ds_sic.isel(time=0).data)

# get the summer and winter seasons that were determined in table 1 in the paper
summer = ds_sic.sel(time=cpf.get_summer(ds_sic["time.month"]))
winter = ds_sic.sel(time=cpf.get_winter(ds_sic["time.month"]))
print(f"done, {np.round(time.perf_counter() - tic)}s", file=terminal_output, flush=True)

# get the means and standard deviations of these season aggregates
print("Get statistics for FUBU thresholding algorithm", end="...", file=terminal_output, flush=True)

summer_mean = summer.groupby("time.year").mean(dim="time").round(4)
summer_std = summer.groupby("time.year").std(dim="time", ddof=1).round(4)
winter_mean = winter.groupby("time.year").mean(dim="time").round(4)
winter_std = winter.groupby("time.year").std(dim="time", ddof=1).round(4)
print(f"done, {np.round(time.perf_counter() - tic)}s", file=terminal_output)

print("Running the FUBU thresholding algorithm", end="...", file=terminal_output, flush=True)
f = partial(
    cpf.wrap_fubu,
    ds_sic=ds_sic,
    summer_mean=summer_mean,
    summer_std=summer_std,
    winter_mean=winter_mean,
    winter_std=winter_std,
    ncpus=ncpus,
)

# run serial
fubu_years = {year: f(year) for year in years}
print(f"done, {np.round(time.perf_counter() - tic)}s", file=terminal_output, flush=True)

# Make xarray.DataSet from yearly results, stack into arrays by indicator type
print("Make xarray.DataSet from yearly results and write as CF-compliant netCDF to $OUTPUT_DIR", end="...", file=terminal_output, flush=True)
stacked = {
    indicator: np.array([fubu_years[year][indicator] for year in years])
    for indicator in fubu_years[list(fubu_years.keys())[0]].keys()
}
fubu = cpf.make_cf_dataset(stacked, years, ds.coords)

# dump to disk
fubu.to_netcdf(fubu_fp)    
print(f"done, {np.round(time.perf_counter() - tic)}s. Indicators dataset written to {fubu_fp}.", file=terminal_output, flush=True)
print(
    ("Pipeline Step 4 (compute freeze-up/break-up dates) completed at "
     f"{datetime.utcfromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')}"
     f", elapsed time: {round((time.perf_counter() - start_time) / 60)}m\n"), 
    file=terminal_output,
    flush=True
)

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# SOME NOTES ABOUT TRANSLATING FROM MLAB :
# ----------------------------------------
# [ 1 ]: to make an ordinal date that matches the epoch used by matlab in python
# ordinal_date = date.toordinal(date(1971,1,1)) + 366
# if not the number will be 366 days off due to the epoch starting January 0, 0000 whereas in Py Jan 1, 0001.
# [ 2 ]: when computing stdev it is important to set the ddof=1 which is the matlab default.  Python leaves it at 0 default.
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

Starting indicators program.
Load smoothed SIC daily time series data...done, 45.0s
Get statistics for FUBU thresholding algorithm...done, 53.0s
Running the FUBU thresholding algorithm...1979 complete. 1980 complete. 1981 complete. 1982 complete. 1983 complete. 1984 complete. 1985 complete. 1986 complete. 1987 complete. 1988 complete. 1989 complete. 1990 complete. 1991 complete. 1992 complete. 1993 complete. 1994 complete. 1995 complete. 1996 complete. 1997 complete. 1998 complete. 1999 complete. 2000 complete. 2001 complete. 2002 complete. 2003 complete. 2004 complete. 2005 complete. 2006 complete. 2007 complete. 2008 complete. 2009 complete. 2010 complete. 2011 complete. 2012 complete. 2013 complete. 2014 complete. 2015 complete. 2016 complete. 2017 complete. 2018 complete. done, 467.0s
Make xarray.DataSet from yearly results and write as CF-compliant netCDF to $OUTPUT_DIR...done, 468.0s. Indicators dataset written to /workspace/Shared/Tech_Projects/SeaIce_NOAA_Indicators/final_produ