# How to exploit data on Pangeo

#### Pangeo Workshop - Snow and Cloud Cover
converted from https://github.com/EO-College/cubes-and-clouds/blob/main/lectures/3.1_data_processing/exercises/_alternatives/31_data_processing_stac.ipynb

original author: Michele Clous @clausmichele
conversion by: Pangeo volunteers (Pier Lorenzo Marasco @pl-marasco, Alejandro Coca-Castro @acocac, Anne Fouilloux @annefou, Justus Magin @keewis, Tina Odaka @tinaodaka)

#### Introduction
In this exercise, we will build a complete the same EO workflow as OpenEO using cloud provided data (STAC Catalogue), processing it locally; from data access to obtaining the result.

We are going to follow these steps in our analysis:

- Load satellite collections
- Specify the spatial, temporal extents and the features we are interested in
- Process the satellite data to retrieve snow cover information
- aggregate information in data cubes
- Visualize and analyse the results

###
Important Infos 

More information on Pangeo can be found here: https://pangeo.io/
More information on the STAC specification can be found here: https://stacspec.org/


#### Import libraries

In [None]:
# Data Manipulation and Analysis Libraries
import pandas as pd  
import numpy as np 

# Geospatial Data Handling Libraries
import geopandas as gpd 
from shapely.geometry import mapping  
import pyproj

# Multidimensional and Satellite Data Libraries
import xarray as xr 
import rioxarray as rio
import stackstac

# Data Visualization Libraries
import holoviews as hv
import hvplot.xarray
import hvplot.pandas

# Data parallelization and distributed computing libraries
import dask
from dask.distributed import Client, progress, LocalCluster

# STAC Catalogue Libraries
import pystac_client

Here we creates a Dask client, which is essential for managing and executing parallel computations efficiently in the subsequent parts of the notebook. There are situation where you can connect to a Dask Gateway, but for this exercise we will use a local cluster.

In [None]:
cluster = LocalCluster()
client = Client(cluster)

Client address can be copy and pasted to the dashboard to monitor the progress of the computations.

In [None]:
client

We will use the catchment as our area of interest (AOI) for the analysis. The catchment is defined by a polygon, which we will load from a GeoJSON file. 
The GeoJSON file contains the geometry of the catchment in the WGS84 coordinate reference system (EPSG:4326) and that has to be defined. 

In [None]:
aoi = gpd.read_file('../data/catchment_outline.geojson', crs="EPGS:4326")
aoi_geojson = mapping(aoi.iloc[0].geometry)

#### Load satellite collections
We will utilize the STAC API to search for satellite data in this exercise, specifically leveraging the API provided by AWS/Element84. The STAC API operates as a RESTful service, enabling the querying of satellite data with various filters such as spatial range, time period, and other specific metadata. This API is constructed based on the STAC specification, a collaborative, community-driven standard aimed at enhancing the discoverability and usability of satellite data. Numerous data providers, including AWS, Google Earth Engine, and Planet (Copernicus Data Space Ecosystem (CDSE) is coming soon **), among others, have implemented the STAC API, exemplifying its widespread adoption and utility in accessing diverse satellite datasets.
We will limit the serch to the Sentinel 2 L2A collection, which is a collection of Sentinel 2 data that has been processed to surface reflectance (Top Of Canopy).
We will also limit the search to the time period between 1st February 2019 and 10th June 2019 and to the extent of the catchment.
** at the moment of writing the STAC catalog of the CDSE is not yet fully operational.

In [None]:
URL = "https://earth-search.aws.element84.com/v1"
catalog = pystac_client.Client.open(URL)
items = catalog.search(
    intersects=aoi_geojson,
    collections=["sentinel-2-l2a"],
    datetime="2019-02-01/2019-06-10"
).item_collection()
len(items)

#### Get bands information
As the original data provides bands with different names than the original Sentinel 2 bands, we need to get the information about the bands.

In [None]:
# Get bands information
# selected_item = items[1]
# for key, asset in selected_item.assets.items():
#     print(f"{key}: {asset.title}")

#### Load data
We will use the stackstac library to load the data. The stackstac library is a library that allows loading data from a STAC API into an xarray dataset.
Here we will load the green and swir16 bands, which are the bands we will use to calculate the snow cover. We will also load the scl band, which is the scene classification layer, which we will use to mask out clouds.
Spatial resolution of 20m is selected for the analysis. The data is loaded in chunks of 2048x2048 pixels, about this we will talk more later.

In [None]:
ds = stackstac.stack(items,
                    bounds_latlon=aoi.iloc[0].geometry.bounds,
                    resolution=20,
                    chunksize=2048,
                    assets=['green', 'swir16', 'scl'])

#### Calculate snow cover
We will calculate the Normalized Difference Snow Index (NDSI) to calculate the snow cover. The NDSI is calculated as the difference between the green and the swir16 bands divided by the sum of the green and the swir16 bands.
For a metter of clarity we will define the green and the swir16 bands as variables. Other approches can be used to manage the data, but this is the one we will use in this exercise.

In [None]:
green = ds.sel(band='green')
swir = ds.sel(band='swir16')
scl = ds.sel(band='scl')

We will calculate the NDSI and we will mask out the clouds. We will use the scene classification layer (scl) to mask out the clouds. The scl is a layer that contains information about the type of land cover. We will mask out the clouds, which are identified by the values 8 and 9 in the scl layer.

In [None]:
ndsi = (green - swir) / (green + swir)

Dask allow to persist the data in memory, which is useful to speed up the computation. The persist method will load the data in memory and will keep it there until the end of the analysis. More explanation about persist, load and other dask related topics will be given in the next part of the exercise.

In [None]:
ndsi = ndsi.persist()

We will mask out the clouds, which are identified by the values 8 and 9 in the scl layer.
More dettailed info can be found here: https://sentinel.esa.int/web/sentinel/technical-guides/sentinel-2-msi/level-2a/algorithm-overview

In [None]:
snow = xr.where((ndsi > 0.42) & ~np.isnan(ndsi), 1, ndsi)
snowmap = xr.where((snow <= 0.42) & ~np.isnan(snow), 0, snow)
# mask = (scl != 8) & (scl != 9) & (scl != 3) 
mask = np.logical_not(scl.isin([8, 9, 3]))  # more elegant but not sure about it from a teaching perspective
snow_cloud = xr.where(mask, snowmap, 2)

#### Mask data
As we are only interestd to the snow cover in the catchment, we will mask out the data outside the catchment.
To acheive it we need to convert the catchment geometry to the same coordinate reference system as the data. The data is in the UTM32N coordinate reference system (EPSG:32632).

In [None]:
aoi_utm32 = aoi.to_crs(epsg=32632)
geom_utm32 = aoi_utm32.iloc[0]['geometry']

As we are going to use the RioXarray library to mask out the data, we need to add some more information to the data. We need to specify the coordinate reference system and the nodata value. 
Both informations can be found in the metadata of the data but we need to reinforce it so that RioXarray can use it. 

In [None]:
snow_cloud.rio.write_crs("EPSG:32632", inplace=True)
snow_cloud.rio.set_nodata(np.nan, inplace=True)

The clipping is done by the RioXarray library. The RioXarray library is a library that allows to manipulate geospatial data in xarray datasets. Underneath it uses the rasterio library that is a library built on top of GDAL.

In [None]:
snowmap_clipped = snow_cloud.rio.clip([geom_utm32])

It's time to persist the data in memory. We will use the persist method to load the data in memory and keep it there until the end of the analysis.

In [None]:
clipped_date = snowmap_clipped.persist()

### Aggregate data

Data aggregation is a very important step in the analysis. It allows to reduce the amount of data and to make the analysis more efficient. Moreover as in this case we are going to aggregate the date to daily values, this will allow use to compute statistic on the data at the basin scale later on. 
The groupby method allows to group the data by a specific dimension. In this case we will group the data by the time dimension, aggregating to the date and removing the time information, once the group is obtained we will aggregate the data by taking the maximum value.

In [None]:
clipped_date.time

In [None]:
clipped_date = snowmap_clipped.groupby(snowmap_clipped.time.dt.floor('D')).max(skipna=True)

as the data has been aggregated to daily values, we need to rename the floor method to something more meaningfull as date.

In [None]:
clipped_date = clipped_date.rename({'floor': 'date'})

In [None]:
clipped_date = clipped_date.persist()

#### Visualize data
We will use the hvplot library to visualize the data. The hvplot library is a library that allows to visualize data in xarray datasets. It is based on the holoviews library, which is a library that allows to visualize multidimensional data.
As we are going to visualize the data on a map, we need to specify the coordinate reference system of the data. The data is in the UTM32N coordinate reference system (EPSG:32632). This will allow the library to project the data on a map.
More info on the hvplot library can be found here: https://hvplot.holoviz.org/

In [None]:
clipped_date.hvplot.image(
    x='x',
    y='y',
    groupby='date',
    crs=pyproj.CRS.from_epsg(32632),
    cmap='Pastel2',
    clim=(-1, 2),
    frame_width=500,
    frame_height=500,
    title='Snowmap',
    geo=True, tiles='OSM')

### Compute statistics

from the orinal notebook:
Calculate Catchment Statistics
We are looking at a region over time. We need to make sure that the information content meets our expected quality. Therefore, we calculate the cloud percentage for the catchment for each timestep. We use this information to filter the timeseries. All timesteps that have a cloud coverage of over 25% will be discarded.

Ultimately we are interested in the snow covered area (SCA) within the catchment. We count all snow covered pixels within the catchment for each time step. Multiplied by the pixel size that would be the snow covered area. Divided the pixel count by the total number of pixels in the catchment is the percentage of pixels covered with snow. We will use this number.

Get number of pixels in catchment: total, clouds, snow.

In [None]:
# number of cloud pixels
cloud = xr.where(clipped_date == 2, 1, np.nan).count(dim=['x', 'y']).persist()

In [None]:
# number of all pixels per each single date
aot_total = clipped_date.count(dim=['x', 'y']).persist()

In [None]:
# Cloud fraction per each single date expressed in % 
cloud_fraction = (cloud / aot_total * 100).persist()

In [None]:
# Visualize cloud fraction
cloud_fraction.hvplot.line(title='Cloud cover %', ylabel="&") * hv.HLine(25).opts(
    color='red',
    line_dash='dashed',
    line_width=2.0,
)

We are going to get the same information for the snow cover.

In [None]:
snow = xr.where(clipped_date == 1, 1, np.nan).count(dim=['x', 'y']).persist()

In [None]:
snow_fraction = (snow / aot_total * 100).persist()

In [None]:
# viaualize snow fraction
snow_fraction.hvplot.line(title='Snow cover area (%)', ylabel="%")

In [None]:
# mask out cloud fraction > 30% 
masked_cloud_fraction = cloud_fraction < 30

In [None]:
snow_selected = snow_fraction.sel(date=masked_cloud_fraction)

In [None]:
snow_selected.name = 'SCA'
snow_selected.hvplot.line(title="Snow fraction")

Let's compare the date with the discharge data.

In [None]:
discharge_ds = pd.read_csv('data/ADO_DSC_ITH1_0025.csv', sep=',', index_col='Time', parse_dates=True)

Let's refine a little bit the data so that we can compare it with the snow cover data

In [None]:
start_date = pd.to_datetime("2019/02/01")
end_date = pd.to_datetime("2019/06/30")
# filter discharge data to start and end dates
discharge_ds = discharge_ds.loc[start_date:end_date]

In [None]:
discharge_ds.discharge_m3_s.hvplot(title='Discharge volume', ylabel='Discharge (m$^3$/s)') * snow_selected.hvplot(ylabel='Snow cover area (%)')  

### Conclusion

In this analysis, we have comprehensively examined the features, capabilities, and limitations of two prominent geospatial data processing frameworks: OpenEO and Pangeo. OpenEO offers a unified API that simplifies the process of accessing and processing earth observation data across various backends, allowing users to interact with different data sources seamlessly. Its standardized interface is a strong asset, making it accessible to a wide range of users, from researchers to application developers.

On the other hand, Pangeo excels in facilitating big data geoscience. Its robust ecosystem, built around existing Python libraries like Dask and Xarray, makes it a powerful tool for large-scale data analysis and visualization. Pangeo’s community-driven approach and open-source nature foster collaboration and innovation, promoting a dynamic and adaptable framework.

Each platform has its own set of advantages and constraints. OpenEO simplifies interoperability and enhances accessibility, making it particularly beneficial for users who wish to work with diverse data sources without delving deeply into the complexities of each backend. Pangeo, with its emphasis on leveraging existing Python tools and libraries, is particularly potent for those comfortable with Python and seeking to perform extensive, scalable analyses.

Choosing between OpenEO and Pangeo ultimately depends on the specific requirements and constraints of a project. Considerations such as the user's familiarity with Python, the necessity for interoperability across various data backends, and the scale of data processing required should guide the decision-making process.

