# How to exploit data on Pangeo

#### Pangeo Workshop - Snow and Cloud Cover
converted from https://github.com/EO-College/cubes-and-clouds/blob/main/lectures/3.1_data_processing/exercises/_alternatives/31_data_processing_stac.ipynb

original author: Michele Clous @clausmichele
conversion by: Pangeo volunteers (Pier Lorenzo Marasco @pl-marasco, Alejandro Coca-Castro @acocac, Anne Fouilloux @annefou, Justus Magin @keewis, Tina Odaka @tinaok)

#### Introduction
In this exercise, we will build a complete the same EO workflow as OpenEO using cloud provided data (STAC Catalogue), processing it locally; from data access to obtaining the result.

We are going to follow these steps in our analysis:

- Load satellite collections
- Specify the spatial, temporal extents and the features we are interested in
- Process the satellite data to retrieve snow cover information
- aggregate information in data cubes
- Visualize and analyse the results

###
Important Infos 

More information on Pangeo can be found here: https://pangeo.io/
More information on the STAC specification can be found here: https://stacspec.org/


#### Import libraries

In [None]:
# Data Manipulation and Analysis Libraries
import pandas as pd  
import numpy as np 

# Geospatial Data Handling Libraries
import geopandas as gpd 
from shapely.geometry import mapping  
import pyproj

# Multidimensional and Satellite Data Libraries
import xarray as xr 
import rioxarray as rio
import stackstac

# Data Visualization Libraries
import holoviews as hv
import hvplot.xarray
import hvplot.pandas

# Data parallelization and distributed computing libraries
import dask
from dask.distributed import Client, progress, LocalCluster

# STAC Catalogue Libraries
import pystac_client

Here we creates a Dask client, which is essential for managing and executing parallel computations efficiently in the subsequent parts of the notebook. There are situation where you can connect to a Dask Gateway, but for this exercise we will use a local cluster.

In [None]:
cluster = LocalCluster()
client = Client(cluster)

Client address can be copy and pasted to the dashboard to monitor the progress of the computations.

In [None]:
client

#### Calculate snow cover with apply_ufunc

<div class="alert alert-warning">
    <i class="fa-check-circle fa" style="font-size: 22px;color:#666;"></i> <b>Calculate snow cover using Xarray's apply_ufunc </b>
    <br>
    <ul>
        <li>The procedure for computing snow cover can be summed up as following python function.  
</li> 
        <li>We first verify that Green, swir16 and scr are in the order of 0,1,2 th variable in band variable.   Then we simply copy and past all the python codes in a function.  </li>
            </ul>
</div>

In [None]:
def calculate_ndsi_snow_cloud(data):
    green = data[0]
    swir = data[1]
    scl = data[2]
    ndsi = (green - swir) / (green + swir)
    ndsi_mask = ( ndsi > 0.4 )& ~np.isnan(ndsi)
    snow = np.where(ndsi_mask, 1, ndsi)
    snowmap = np.where((snow <= 0.42) & ~np.isnan(snow), 0, snow)
    mask = ~( (scl == 8) | (scl == 9) | (scl == 3) )
    snow_cloud = np.where(mask, snowmap, 2)
    return snow_cloud

In [None]:
aoi = gpd.read_file('../data/catchment_outline.geojson', crs="EPGS:4326")
aoi_geojson = mapping(aoi.iloc[0].geometry)

#### Mask data before apply_ufunc
<div class="alert alert-warning">
    <i class="fa-check-circle fa" style="font-size: 22px;color:#666;"></i> <b>Apply mask then persist the data, then apply_ufunc to perform computation.   </b>
    <br>
    <ul>
        <li>The masking procedure can be applied before the computation to not to download the data we do not use.     
</li> 
            </ul>
</div>

In [None]:
%%time
URL = "https://earth-search.aws.element84.com/v1"
catalog = pystac_client.Client.open(URL)
items = catalog.search(
    intersects=aoi_geojson,
    collections=["sentinel-2-l2a"],
    datetime="2019-02-01/2019-06-10"
).item_collection()
len(items)

ds = stackstac.stack(items,
                    bounds_latlon=aoi.iloc[0].geometry.bounds,
                    resolution=20,
                    chunksize=(1,-1,-1,-1),
                    assets=['green', 'swir16', 'scl'])

aoi_utm32 = aoi.to_crs(epsg=32632)
geom_utm32 = aoi_utm32.iloc[0]['geometry']
ds.rio.write_crs("EPSG:32632", inplace=True)
ds.rio.set_nodata(np.nan, inplace=True)
ds = ds.rio.clip([geom_utm32]).persist()
ds

snow_cloud=xr.apply_ufunc(
    calculate_ndsi_snow_cloud
    ,ds
    ,input_core_dims=[["band","y","x"]]
    ,output_core_dims=[["y","x"]]
    ,exclude_dims=set(["band"])
    ,vectorize=True
    ,dask="parallelized"
    ,output_dtypes=[ds.dtype]
    ).assign_attrs({'long_name': 'snow_cloud'}).to_dataset(name='snow_cloud')


snowmap_clipped=snow_cloud

clipped_date = snowmap_clipped.groupby(snowmap_clipped.time.dt.floor('D')).max(skipna=True).rename({'floor': 'date'})

#clipped_date = clipped_date.compute()
clipped_date


test=clipped_date


def remove_attrs(obj, to_remove):
    new = obj.copy()
    new.attrs = {k: v for k, v in obj.attrs.items() if k not in to_remove}

    return new

def encode(obj):
    object_coords = [name for name, coord in obj.coords.items() if coord.dtype.kind == "O"]
    return obj.drop_vars(object_coords).pipe(remove_attrs, ["spec", "transform"])


test.pipe(encode).to_zarr('test_applyufunc.zarr',mode='w')

In [None]:
import xarray as xr
clipped_date=xr.open_zarr('test_applyufunc.zarr').snow_cloud

In [None]:
%%time
clipped_date.hvplot.image(
    x='x',
    y='y',
    groupby='date',
    crs=pyproj.CRS.from_epsg(32632),
    cmap='Pastel2',
    clim=(-1, 2),
    frame_width=500,
    frame_height=500,
    title='Snowmap',
    geo=True, tiles='OSM')

### Compute statistics

from the orinal notebook:
Calculate Catchment Statistics
We are looking at a region over time. We need to make sure that the information content meets our expected quality. Therefore, we calculate the cloud percentage for the catchment for each timestep. We use this information to filter the timeseries. All timesteps that have a cloud coverage of over 25% will be discarded.

Ultimately we are interested in the snow covered area (SCA) within the catchment. We count all snow covered pixels within the catchment for each time step. Multiplied by the pixel size that would be the snow covered area. Divided the pixel count by the total number of pixels in the catchment is the percentage of pixels covered with snow. We will use this number.

Get number of pixels in catchment: total, clouds, snow.

In [None]:
# number of cloud pixels
cloud = xr.where(clipped_date == 2, 1, np.nan).count(dim=['x', 'y']).persist()

In [None]:
# number of all pixels per each single date
aot_total = clipped_date.count(dim=['x', 'y']).persist()

In [None]:
# Cloud fraction per each single date expressed in % 
cloud_fraction = (cloud / aot_total * 100).persist()

In [None]:
# Visualize cloud fraction
cloud_fraction.hvplot.line(title='Cloud cover %', ylabel="&") * hv.HLine(25).opts(
    color='red',
    line_dash='dashed',
    line_width=2.0,
)

We are going to get the same information for the snow cover.

In [None]:
snow = xr.where(clipped_date == 1, 1, np.nan).count(dim=['x', 'y']).persist()

In [None]:
snow_fraction = (snow / aot_total * 100).persist()

In [None]:
# viaualize snow fraction
snow_fraction.hvplot.line(title='Snow cover area (%)', ylabel="%")

In [None]:
# mask out cloud fraction > 30% 
masked_cloud_fraction = cloud_fraction < 30

In [None]:
snow_selected = snow_fraction.sel(date=masked_cloud_fraction)

In [None]:
snow_selected.name = 'SCA'
snow_selected.hvplot.line(title="Snow fraction")

Let's compare the date with the discharge data.

In [None]:
discharge_ds = pd.read_csv('data/ADO_DSC_ITH1_0025.csv', sep=',', index_col='Time', parse_dates=True)

Let's refine a little bit the data so that we can compare it with the snow cover data

In [None]:
start_date = pd.to_datetime("2019/02/01")
end_date = pd.to_datetime("2019/06/30")
# filter discharge data to start and end dates
discharge_ds = discharge_ds.loc[start_date:end_date]

In [None]:
discharge_ds.discharge_m3_s.hvplot(title='Discharge volume', ylabel='Discharge (m$^3$/s)') * snow_selected.hvplot(ylabel='Snow cover area (%)')  

### Conclusion

In this analysis, we have comprehensively examined the features, capabilities, and limitations of two prominent geospatial data processing frameworks: OpenEO and Pangeo. OpenEO offers a unified API that simplifies the process of accessing and processing earth observation data across various backends, allowing users to interact with different data sources seamlessly. Its standardized interface is a strong asset, making it accessible to a wide range of users, from researchers to application developers.

On the other hand, Pangeo excels in facilitating big data geoscience. Its robust ecosystem, built around existing Python libraries like Dask and Xarray, makes it a powerful tool for large-scale data analysis and visualization. Pangeo’s community-driven approach and open-source nature foster collaboration and innovation, promoting a dynamic and adaptable framework.

Each platform has its own set of advantages and constraints. OpenEO simplifies interoperability and enhances accessibility, making it particularly beneficial for users who wish to work with diverse data sources without delving deeply into the complexities of each backend. Pangeo, with its emphasis on leveraging existing Python tools and libraries, is particularly potent for those comfortable with Python and seeking to perform extensive, scalable analyses.

Choosing between OpenEO and Pangeo ultimately depends on the specific requirements and constraints of a project. Considerations such as the user's familiarity with Python, the necessity for interoperability across various data backends, and the scale of data processing required should guide the decision-making process.

