<center>
<img src='./img/nsidc_logo.png'/>

# **Using Coiled and h5coro to Produce ICESat-2 Sea Ice Height Time Series**

</center>

---

## **1. Tutorial Introduction/Overview**

Tutorial designed for the "DAAC data access in the cloud hands-on experience" session at the 2023 NSIDC DAAC User Working Group (UWG) Meeting. This is a copy of the `2_ATL07_timeseries` notebook for use with Coiled.


TODOS:
* Explain Coiled
* Question for Luis: Why would I use the decorator function (` @coiled.function()`) vs:

```
cluster = coiled.Cluster(n_workers=20, region="us-west-2")
client = cluster.get_client()
client
```
* How do we incorporate https://medium.com/coiled-hq/processing-a-250-tb-dataset-with-coiled-dask-and-xarray-574370ba5bde ? 


### Installing last versions from earthaccess and coiled

**NOTE**: Restart the kernel and clean output after the next cell

In [1]:
%%capture 

#!pip install coiled==0.9.26

!pip uninstall -y earthaccess
!pip install git+https://github.com/nsidc/earthaccess.git@main

## **2. Tutorial steps**

Resoruces: each granule is approx 60-120 MB, A month of data for the Ross ocean returns 59 granules ~4.6 GB. We should use an instance preferable double the memory of the aprox data size we use.

### **Import Packages**

In [2]:
# For Coiled cloud compute
#import coiled

# For searching NASA data
import earthaccess

from h5coro import h5coro, s3driver
import geopandas as gpd

# For reading data, analysis and plotting
#import xarray as xr
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import hvplot.xarray

#import pprint
from affine import Affine
#from pyproj import CRS

from pqdm.threads import pqdm

#print(coiled.__version__)
print(earthaccess.__version__)


import os
os.environ['USE_PYGEOS'] = '0'
import geopandas

In a future release, GeoPandas will switch to using Shapely by default. If you are using PyGEOS directly (calling PyGEOS functions on geometries from GeoPandas), this will then stop working and you are encouraged to migrate from PyGEOS to Shapely 2.0 (https://shapely.readthedocs.io/en/latest/migration_pygeos.html).
  import geopandas as gpd


0.5.4


### **Authenticate**

In [3]:
auth = earthaccess.login()

EARTHDATA_USERNAME and EARTHDATA_PASSWORD are not set in the current environment, try setting them or use a different strategy (netrc, interactive)
You're now authenticated with NASA Earthdata Login
Using token with expiration date: 11/18/2023
Using .netrc file for EDL


### **Search for ICESat-2 ATL07 data**

Using spatial/temporal range from https://icesat-2-2023.hackweek.io/tutorials/sea_ice/1_sea_ice_tutorial.html :


```
# Spatial extent: Ross Sea, Antarctica
spatial_extent = [-180, -78, -160, -74]

# Time range
date_range = ['2019-09-16','2019-09-16'] # first time period
# date_range = ['2019-11-13','2019-11-13'] # second time period
```

This code cell helps to avoid copying and pasting region tuples

In [4]:
region = "Ross Sea"
ross_sea = (-180, -78, -160, -74)
antarctic = (-180, -90, 180, -60)
this_region = antarctic if region == "Antarctica" else ross_sea

In [5]:
atl10 = {}
total_results = 0

for year in range(2019,2020):
    
    print(f"Searching year {year} ...")
    granules = earthaccess.search_data(
        short_name = 'ATL10',
        version = '006',
        cloud_hosted = True,
        bounding_box = this_region,
        temporal = (f'{year}-09-01',f'{year}-09-30'),
    )
    total_results += len(granules)
    atl10[str(year)] = granules
print(f"Total: {total_results}")

Searching year 2019 ...
Granules found: 59
Total: 59


In [31]:
#r = [display(r) for r in atl10["2019"][0:2]]

### **Extract freeboard segments**

We now create a geopandas dataset from our results. 

Because ATL10 is not a gridded prduct we need to extract coordinates and variables from their groups inside the HDF5 file.

#### Open the files using the `open` method. 

The auth object created at the start of the notebook is used to provide Earthdata Login authentication and AWS credentials.

In [6]:
file_tree = {}

for year, granules in atl10.items():
    file_tree[year] = earthaccess.open(granules)


 Opening 59 granules, approx size: 4.6 GB
using provider: NSIDC_CPRD


QUEUEING TASKS | : 0it [00:00, ?it/s]

PROCESSING TASKS | :   0%|          | 0/59 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/59 [00:00<?, ?it/s]

In [7]:
# files[0].f.s3.storage_options
print(file_tree["2019"][0].f.info())

{'ETag': '"e123bb7ed68661d31fea92a0abdf8fc0-1"', 'LastModified': datetime.datetime(2023, 6, 24, 0, 19, 44, tzinfo=tzutc()), 'size': 46754892, 'name': 'nsidc-cumulus-prod-protected/ATLAS/ATL10/006/2019/09/01/ATL10-02_20190901100614_09980401_006_02.h5', 'type': 'file', 'StorageClass': 'STANDARD_IA', 'VersionId': None, 'ContentType': 'binary/octet-stream'}


In [34]:
# import h5py


# with h5py.File(file_tree["2019"][0],'r') as f:
#     obj = f["gt1r"]['freeboard_segment/delta_time']
#     for attr, value in obj.attrs.items():
#         print(f"{attr}: {value}")
#     time = obj[:]
    
# time

In [35]:
# ds = xr.open_dataset(file_tree["2019"][0], group="gt1r/freeboard_segment/")
# ds

### Pre-warming the Coiled instance.

Once we get to run this with Coiled it would be good to instantiate the cluster beforehand

In [36]:
# @coiled.function(region="us-west-2",
#                  memory="16 GiB")
# def trivial(param):
#     print(param)
#     return param

In [37]:
# trivial("test")

In [8]:
## Based on the READ function form Younghyun Koo for the sea ice tutorial at the IS2 hackweek

# @coiled.function(region="us-west-2",
#                  memory="16 GiB")

# Modifications to streamline
# - helper function for orinetation
# - helper function to reformat credentials
# - use datasets to read arrays
# - add data to dictionary

#from h5coro import h5coro, s3driver, filedriver
from itertools import product
#import geopandas as gpd
# import pandas as pd
# import numpy as np
# import gc
    
GPS_EPOCH = pd.to_datetime('1980-01-06 00:00:00')

def get_strong_beams(f):
    """Returns ground track for strong beams based on IS2 orientation"""
    orient  = f['orbit_info/sc_orient'][0]

    if orient == 0:
        return [f"gt{i}l" for i in [1, 2, 3]]
    elif orient == 1:
        return [f"gt{i}r" for i in [1, 2, 3]]
    else:
        raise KeyError("Spacecraft orientation neither forward nor backward")


def get_credentials(file):
    """Returns credentials dict with keys expected by h5coro
    
    TODO: could add as option for earthaccess
    """
    return {
        "aws_access_key_id": file.s3.storage_options["key"],
        "aws_secret_access_key": file.s3.storage_options["secret"],
        "aws_session_token": file.s3.storage_options["token"]
    }
    
    
def read_atl10_local(files, executors):
    """Returns a consolidated GeoPandas dataframe for a set of ATL10 file pointers.
    
    Parameters:
        files (list[S3FSFile]): list of authenticated fsspec file references to ATL10 on S3 (via earthaccess)
        executors (int): number of threads
    
    """
    def read_atl10(file):
        """Reads datasets required for creating gridded freeboard from a single ATL10 file
        
        file: an authenticated fsspec file reference on S3 (returned by earthaccess)
        
        returns: a list of geopandas dataframes
        """
        
        # Open file object
        f = h5coro.H5Coro(file.info()["name"], s3driver.S3Driver, credentials=get_credentials(file))
        
        # Get strong beams based on orientation
        ancillary_datasets = ["orbit_info/sc_orient", "ancillary_data/atlas_sdp_gps_epoch"]
        f.readDatasets(datasets=ancillary_datasets, block=True)
        strong_beams = get_strong_beams(f)
        atlas_sdp_gps_epoch = f["ancillary_data/atlas_sdp_gps_epoch"][:]
        
        # Create list of datasets to load
        datasets = ["freeboard_segment/latitude",
                    "freeboard_segment/longitude",
                    "freeboard_segment/delta_time",
                    "freeboard_segment/seg_dist_x",
                    "freeboard_segment/heights/height_segment_length_seg",
                    "freeboard_segment/beam_fb_height",
                    "freeboard_segment/heights/height_segment_type"]
        ds_list = ["/".join(p) for p in list(product(strong_beams, datasets))]
        # Load datasets
        f.readDatasets(datasets=ds_list, block=True)
        
        # Create a list of geopandas.DataFrames containing beams
        tracks = []
        for beam in strong_beams:
            ds = {dataset.split("/")[-1]: f[dataset][:] for dataset in ds_list if dataset.startswith(beam)}
            
            # Convert delta_time to datetime
            ds["delta_time"] = GPS_EPOCH + pd.to_timedelta(ds["delta_time"]+atlas_sdp_gps_epoch, unit='s')

            # Add beam identifier
            ds["beam"] = beam
            
            # Set fill values to NaN - assume 100 m as threshold
            ds["beam_fb_height"] = np.where(ds["beam_fb_height"] > 100, np.nan, ds["beam_fb_height"])
            
            geometry = gpd.points_from_xy(ds["longitude"], ds["latitude"])
            del ds["longitude"]
            del ds["latitude"]
            
            gdf = gpd.GeoDataFrame(ds, geometry=geometry, crs="EPSG:4326")
            gdf.dropna(axis=0, inplace=True)
            tracks.append(gdf)

#             gc.collect()
        return tracks
    
    df = pqdm(files, read_atl10, n_jobs=executors)
    combined = pd.concat([t[0] for t in df if type(t) is list])
    
    return combined

The idea would be to split each year into its own Dask worker

In [9]:
%%time
tracks = read_atl10_local(file_tree["2019"], executors=16)

QUEUEING TASKS | :   0%|          | 0/59 [00:00<?, ?it/s]

PROCESSING TASKS | :   0%|          | 0/59 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/59 [00:00<?, ?it/s]

CPU times: user 2min 17s, sys: 1min 3s, total: 3min 20s
Wall time: 1min 34s


In [40]:
tracks

Unnamed: 0,delta_time,seg_dist_x,height_segment_length_seg,beam_fb_height,height_segment_type,beam,geometry
1831,2019-09-01 11:10:21.645610094,2.720752e+07,20.290331,0.253510,1,gt1l,POINT (11.39429 -64.01932)
1832,2019-09-01 11:10:21.647197247,2.720753e+07,19.569931,0.265972,1,gt1l,POINT (11.39427 -64.01942)
1833,2019-09-01 11:10:21.648262024,2.720754e+07,16.069815,0.276316,1,gt1l,POINT (11.39426 -64.01948)
1834,2019-09-01 11:10:21.649195671,2.720755e+07,14.670819,0.303078,1,gt1l,POINT (11.39424 -64.01954)
1835,2019-09-01 11:10:21.650213718,2.720755e+07,14.671224,0.324290,1,gt1l,POINT (11.39423 -64.01960)
...,...,...,...,...,...,...,...
69215,2019-09-29 21:27:06.509730577,3.355327e+07,44.508583,0.181769,7,gt1r,POINT (24.15736 -59.13850)
69216,2019-09-29 21:27:06.512042761,3.355329e+07,39.595520,0.149934,7,gt1r,POINT (24.15733 -59.13835)
69217,2019-09-29 21:27:06.514954329,3.355331e+07,36.785496,0.146254,1,gt1r,POINT (24.15730 -59.13817)
69218,2019-09-29 21:27:06.516793250,3.355332e+07,28.306177,0.137055,1,gt1r,POINT (24.15727 -59.13805)


In [10]:
tracks.info(memory_usage='deep')  # what does this do?

<class 'geopandas.geodataframe.GeoDataFrame'>
Int64Index: 5205785 entries, 1831 to 69219
Data columns (total 7 columns):
 #   Column                     Dtype         
---  ------                     -----         
 0   delta_time                 datetime64[ns]
 1   seg_dist_x                 float64       
 2   height_segment_length_seg  float32       
 3   beam_fb_height             float32       
 4   height_segment_type        int8          
 5   beam                       object        
 6   geometry                   geometry      
dtypes: datetime64[ns](1), float32(2), float64(1), geometry(1), int8(1), object(1)
memory usage: 506.4 MB


### For future IO eficient operations we save the geodataframe as parquet

In [13]:
tracks["delta_time"] = tracks["delta_time"].astype('datetime64[s]')
tracks.info()

<class 'geopandas.geodataframe.GeoDataFrame'>
Int64Index: 5205785 entries, 1831 to 69219
Data columns (total 7 columns):
 #   Column                     Dtype         
---  ------                     -----         
 0   delta_time                 datetime64[ns]
 1   seg_dist_x                 float64       
 2   height_segment_length_seg  float32       
 3   beam_fb_height             float32       
 4   height_segment_type        int8          
 5   beam                       object        
 6   geometry                   geometry      
dtypes: datetime64[ns](1), float32(2), float64(1), geometry(1), int8(1), object(1)
memory usage: 243.3+ MB


In [11]:
#tracks.to_parquet("atl10-2019.parquet")

ArrowInvalid: Casting from timestamp[ns] to timestamp[us] would lose data: 1567336221645610094

#### Geopandas Read function 

The function below extracts latitude, longitude, segment distance, segment length, surface type, and freeboard height. See the [NSIDC's ATL10 User Guide](https://nsidc.org/sites/default/files/documents/user-guide/atl10-v006-userguide.pdf) for more details on these variables.

## Grid track data

This follows the processing steps described in the ATL20 - Gridded Sea Ice Freeboard - ATBD but gridding to a EASE-Grid v2 6.25 km grid.  Any projected coordinate system or grid could be chosen.  The procedure could be modified with extra QC steps or modifications.  **The world is your oyster - or [Aplacophoran](https://antarcticsun.usap.gov/science/4447/)**.

The processing steps are:

- remove non-ice and low quality segments 
- resample freeboard segments to a grid
- calculate aggregate statistics
    + mean segment length
    + segment count
    + length weighted mean freeboard
    + length weighted standard deviation of freeboard
    
#### Grid Cell Mean Segment Length $\bar{L}$

$$
\bar{L}(x, y, D) = \frac{\sum L_i}{N}
$$

where $L_i$ is `/gtx/freeboard_beam_segment/height_segments/height_segment_length_seg`, $x$ and $y$ are projected coordinates for grid centers, and $D$ is day. 

#### Grid Cell Mean Freeboard $\bar{h}$

$$
\bar{h}(x, y, D) = \frac{\sum L_i h_i}{\sum L_i}
$$

where $h_i$ is `gtx/freeboard_beam_segment/beam_freeboard/beam_fb_height`.

#### Grid Cell Standard Deviation of Freeboard $\sigma^2 (x, y, D)$

$$
\sigma^2 (x, y, D) = \frac{\sum L_i (h_i)^2}{\sum L_i} - \bar{h}^2 (x, y, D)
$$

### Resample Freeboard Segments to a Grid

Following the ATL20 ATBD, we will use a _drop-in-the-bucket_ resampling scheme.  This is simple and relatively easy to implement.  More complex resampling schemes could be substituted.

To demonstrate resampling we will resample freeboard segments to WGS84 / NSIDC EASE-Grid v2.0 South with a grid resolution of 6.25 km.  The EPSG code for the WGS84 / NSIDC EASE-Grid South coordinate reference system is [6932](https://epsg.org/crs_6932/WGS-84-NSIDC-EASE-Grid-2-0-South.html).

We will use the standard 6.25 km grid.  To define the grid, we need the grid dimensions (nrows and ncols), the x and y projected coordinates of the upper-left corner of the upper-left grid cell, and the height and width of the grid cells in the same units as the projected coordinates.  In this case, the units are meters.

In [34]:
easegrid2_epsg = 6932

nrow = 2880
ncol = 2880
upper_left_x = -9000000.0
upper_left_y = 9000000.0
width = 10000.0
height = -10000.0

The first step is to reproject the points from geodetic coordinates (latitude and longitude) to projected coordinates (x, y).  Because the data are in a `geopandas.DataFrame` we can use the `to_crs` method.  This takes an EPSG code either as a numeric value (`6932`) or as a string (`"EPSG:6932"`).

You can see that the `POINT` objects in the `geometry` have changed from having latitudes and longitudes as coordinates to x and y in meters.

In [16]:
%%time
tracks = tracks.to_crs(easegrid2_epsg)
tracks.head()

CPU times: user 256 ms, sys: 162 ms, total: 418 ms
Wall time: 417 ms


Unnamed: 0,delta_time,seg_dist_x,height_segment_length_seg,beam_fb_height,height_segment_type,beam,geometry
1831,2019-09-01 11:10:21.645610094,27207520.0,20.290331,0.25351,1,gt1l,POINT (568023.081 2818528.976)
1832,2019-09-01 11:10:21.647197247,27207530.0,19.569931,0.265972,1,gt1l,POINT (568019.787 2818518.673)
1833,2019-09-01 11:10:21.648262024,27207540.0,16.069815,0.276316,1,gt1l,POINT (568017.576 2818511.765)
1834,2019-09-01 11:10:21.649195671,27207550.0,14.670819,0.303078,1,gt1l,POINT (568015.636 2818505.708)
1835,2019-09-01 11:10:21.650213718,27207550.0,14.671224,0.32429,1,gt1l,POINT (568013.520 2818499.103)


A _Drop-in-the-Bucket_ resampling scheme collects points into the grid cells that they intersect with, and then calculates aggregate statistics for each grid cell using attributes associated with those points.

We'll find the grid cell that contains each segment by calculating the row and column coordinates for each segment from the projected coordinates.  This is done by creating an _Affine_ transformation matrix for the grid.  The Affine matrix is just a matrix representation of the algebraic expressions to convert row and column indices of the grid to projected coordinates.  The equations below give the forward transformation from `(row, col)` to `(x, y)`. 

$$
x = width * col + upper\_left\_x \\
y = height * row + upper\_left\_y
$$

These are expressed in matrix form:

$$
\begin{bmatrix}
x \\
y \\
0
\end{bmatrix} = 
\begin{bmatrix}
a & 0 & c \\
0 & d & e \\
0 & 0 & 1
\end{bmatrix}
\begin{bmatrix}
col \\
row \\
1
\end{bmatrix}
$$

where $a$ is $\mathsf{width}$, $c$ is $\mathsf{upper\_left\_x}$, $d$ is $height$, and $e$ is $upper\_left\_y$.

```{note}
The projected coordinate system we are using is a cartesian plane with the origin at the South Pole.  The `x` coordinates increase to the right, and `y` coordinates increase up.  For raster data, which includes grids and images, have the origin at the upper-left corner of the grid.  Column indices increase from right to left, and row indices increase from top to bottom.
```

We use the `affine` package to create a forward transformation matrix (`fwd`) using the grid parameters above.  To transform `(x, y)` projected coordinates to `(row, col)`, we can calculate the reverse transformation matrix using `~fwd`.

`(row, col)` coordinates are still rational numbers.  We want an integer row and column indices for grid cells.  We can use the `floor` function to get integers.  `row` and `column` indices are zero based.

We want to be able to leverage the `geopandas.Dataframe.groupby` functionality to collect points into grid cells, so we need a unique identifier to group the data.  We can calculate a unique cell index from `row` and `column` indices as follows:

$$
cell\_index = row * ncol + col
$$

This is encapsulated in the function `get_grid_index`.  This function is then applied to the `geometry` of tracks.

In [40]:
def get_grid_index(xy):
    geotransform = (upper_left_x, width, 0., upper_left_y, 0., height)
    fwd = Affine.from_gdal(*geotransform)
    row, col = ~fwd * xy
    return (np.floor(row) * ncol) + np.floor(col)


In [41]:
%%time
tracks["grid_index"] = [get_grid_index((x, y)) for x, y in zip(tracks.geometry.x, tracks.geometry.y)]

CPU times: user 31.3 s, sys: 0 ns, total: 31.3 s
Wall time: 31.3 s


In [42]:
tracks.head()

Unnamed: 0,delta_time,seg_dist_x,height_segment_length_seg,beam_fb_height,height_segment_type,beam,geometry,grid_index
1831,2019-09-01 11:10:21.645610094,27207520.0,20.290331,0.25351,1,gt1l,POINT (568023.081 2818528.976),2753898.0
1832,2019-09-01 11:10:21.647197247,27207530.0,19.569931,0.265972,1,gt1l,POINT (568019.787 2818518.673),2753898.0
1833,2019-09-01 11:10:21.648262024,27207540.0,16.069815,0.276316,1,gt1l,POINT (568017.576 2818511.765),2753898.0
1834,2019-09-01 11:10:21.649195671,27207550.0,14.670819,0.303078,1,gt1l,POINT (568015.636 2818505.708),2753898.0
1835,2019-09-01 11:10:21.650213718,27207550.0,14.671224,0.32429,1,gt1l,POINT (568013.520 2818499.103),2753898.0


In [43]:
cell_segment_counts = tracks.groupby("grid_index")["beam_fb_height"].count()

In [44]:
grid = np.zeros((nrow, ncol)).flatten()
grid[cell_segment_counts.index.values.astype(int)] = cell_segment_counts
grid = grid.reshape((nrow, ncol))
#grid = np.where(grid > 0, grid, np.nan)

In [45]:
grid.shape

(2880, 2880)

In [117]:
grid.min(), grid.max()

(0.0, 2317.0)

### **Assign to grid and calculate grid cell mean**

## **3. Learning outcomes recap (optional)**

Provide a brief summary of the learning outcomes of the tutorial


## **4. Additional resources (optional)**

List some additional resources for users to consult, if applicable/desired.

________

### **When your tutorial is ready for review,  please read our [Contributor Guide](https://github.com/nsidc/NSIDC-Data-Tutorials/blob/main/contributor_guide.md) for next steps.**