In [53]:
# 
!pip install numpy==1.23.5
packages = ['pysptools', 'cvxopt','sklearn','rioxarray','xarray','scipy']
for a in packages:
    !pip install --upgrade {a}

import io
import os
import math
import time
import stackstac
import geojson
import dask_gateway
import planetary_computer
import rasterio.features
import azure.storage.blob
import numpy as np
import xarray as xr
import rioxarray as rioxr
import matplotlib.pyplot as plt
import dask
import pysptools
import cvxopt
from dask.distributed import PipInstall, Lock
import pysptools.abundance_maps as amp
from scipy.stats import mode
from dask_gateway import GatewayCluster
from pystac_client import Client
from sklearn import linear_model
from sklearn.metrics import mean_squared_error

Collecting numpy==1.23.5
  Downloading numpy-1.23.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.1/17.1 MB[0m [31m45.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: numpy
  Attempting uninstall: numpy
    Found existing installation: numpy 1.24.3
    Uninstalling numpy-1.24.3:
      Successfully uninstalled numpy-1.24.3
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
cmip6-preprocessing 0.6.0 requires pint-xarray, which is not installed.
esmpy 8.4.1 requires pytest-json-report, which is not installed.
cmip6-preprocessing 0.6.0 requires xgcm<0.7.0, but you have xgcm 0.8.1 which is incompatible.[0m[31m
[0mSuccessfully installed numpy-1.23.5


In [54]:
def read_file(file):
    with open(file) as f:
        content = f.read()
    return content

def load_blob_grid(blob_client):
    return geojson.loads(blob_client.download_blob().readall())

def get_tile(grid, h, v):
    return [x for x in grid['features'] if x['properties']['h'] == h
            and x['properties']['v'] == v][0]['geometry']

def get_bbox(geometry):
    return rasterio.features.bounds(geometry)

def get_container(container, connection_string):
    container_client = azure.storage.blob.ContainerClient.from_connection_string(
        connection_string, container_name=container
    )
    return container_client

def get_blob(container_client, blob_name):
    blob_client = container_client.get_blob_client(blob_name)
    return blob_client

def get_cluster(n=20, ncore=8, memory=16):
    gateway = dask_gateway.Gateway()
    cluster_options = gateway.cluster_options()
    cluster_options["worker_cores"] = ncore
    cluster_options["worker_memory"] = memory
    
    cluster = gateway.new_cluster(cluster_options)
    #cluster.adapt(minimum=n, maximum=400)
    cluster.scale(n)
    client = cluster.get_client()
    return (cluster, client)

# A warning shows that the package varsions are not consistent between Client and 
def register_package(client,packages):
    plugin = PipInstall(packages=packages, pip_options=['--upgrade'])
    client.register_worker_plugin(plugin)

In [55]:
def boxes(x,y,scale=0.5):
    # when scale = 0.5
    # h from 0 to 720
    # v from 0 to 360
    h = (360/scale)/2 + np.floor(x/scale)
    v = (180/scale)/2 + np.floor(y/scale)
    def get_box(x, y, step_x, step_y):
        return (x, y, x+step_x,y+step_y)
    l, d, r, u = get_box(x,y,scale,scale)
    box = [[[l,d],[r,d],[r,u],[l,u],[l,d]]]
    return box,h.astype(int),v.astype(int)
def geometry_from_box(bbox):
    return geojson.loads('{"coordinates": %s, "geodesic": false, "type": "Polygon"}' % geojson.dumps(bbox))


In [56]:

def get_epsg(items):
    epsgs = [x.properties['proj:epsg'] for x in items]
    return 'EPSG: ' + str(epsgs[0]) # Might cause problems in the future

def search_sentinel_2_images(start,end,geometry,limit=1000):
    search = catalog.search(
        intersects = geometry,
        datetime = start + '/' + end,
        collections = ["sentinel-2-l2a"],
        limit = 1000,
        query={'eo:cloud_cover': {'lt': 90}},
    )
    return search.get_all_items() # use get_all_items instead. It seems that we don't need to change it to "list"
def get_sentinel_2_stack(start, end, geometry, chunksize=128):
    items = search_sentinel_2_images(start, end, geometry)
    # signed_items = [planetary_computer.sign(item).to_dict() for item in items] deleted, the planetary_computer.sign() is set when initialize the catalog
    # manually sign the images seems to causal problems in accessing/opening the Landsat images 
    bbox = get_bbox(geometry)
    epsg = get_epsg(items)
    data = (
        stackstac.stack(
            items,
            assets=['B02','B03','B04','B08','B11','B12'],
            #assets=['SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B6', 'SR_B7', 'QA_PIXEL'],
            chunksize=(-1, -1, chunksize, chunksize),
            resolution=30,
            epsg=epsg,
            bounds_latlon=bbox
        )
        .where(lambda x: x > 0, other=np.nan)  # sentinel-2 uses 0 as nodata
        .assign_coords(band=['Blue', 'Green', 'Red', 'NIR', 'SWIR1', 'SWIR2'])
    )
    return data
def array_to_frac_year(array, days_in_year=365.25):
    return array.time.dt.year + array.time.dt.day / days_in_year

def construct_dependents(array, days_in_year=365.25):
    x1 = array_to_frac_year(array, days_in_year)
    omega = 2 * math.pi
    x2 = np.cos(x1 * omega)
    x3 = np.sin(x1 * omega)
    return (
        xr.concat([x1, x2, x3], dim='x')
        .assign_coords(x=['x1', 'x2', 'x3'])
        .transpose(*('time', 'x'))
    )
def fnrt_sentinel_2(M, U, X, scale=10000):
    sr = M.astype('int16')
    #qa = M2[:, -1]
    #good = [21824, 21952, 5440, 5504]
    #mask = np.isin(qa, good)
    #sr = (M2[:, 0:6] * 0.0000275 - 0.2) * scale
    #sr = M2[:, 0:6] # * scale
    unmixed = amp.amaps.FCLS(sr, U)
    #unmixed[mask==0, :] = np.nan
    
    gv = unmixed[:, 0]
    npv = unmixed[:, 1]
    soil = unmixed[:, 2]
    shade = unmixed[:, 3]
    cloud = unmixed[:, 4]
    
    gv_frac = (gv / (1 - shade)) + (npv + soil)
    mask = ((cloud < 0.2) & (shade < 1) & (gv_frac > 0)).astype('uint16')
    ndfi = (gv / (1 - shade) - (npv + soil)) / gv_frac * scale
    #ndfi[mask==0] = np.nan
    
    regr = linear_model.LinearRegression()
    y_true = ndfi[~np.isnan(ndfi)]
    x_true = X[~np.isnan(ndfi), :]
    try: # to prevent non data cases, if all the data in one location are excluded, the "fit" function will fail due to lacking data
        lm = regr.fit(x_true, y_true) 
    except:
        lm = regr.fit([0], [0])
    coef = lm.coef_
    intercept = lm.intercept_
    y_pred = lm.predict(x_true)
    rmse = mean_squared_error(y_true=y_true, y_pred=y_pred, squared=False)
    
    return np.array(
        [intercept, coef[0], coef[1], coef[2], rmse], 
        ndmin=2, 
        dtype='float64'
    )

def xr_fnrt_sentinel_2(col, endmembers, scale=10000):
    X = construct_dependents(col)
    return (
        xr.apply_ufunc(
            fnrt_sentinel_2, col,
            input_core_dims=[['time', 'band']], 
            output_core_dims=[['time', 'fit']],
            exclude_dims=set(('time', 'band')), 
            kwargs={'X': X, 'U': endmembers,'scale': scale},
            dask='parallelized', 
            vectorize=True,
            output_dtypes=[col.dtype],
            dask_gufunc_kwargs=dict(output_sizes={'time': 1, 'fit': 5}),
        )
        .rename({'fit': 'band'})
        .assign_coords(band=['incpt','slope','cos','sin','rmse'])
        .transpose(*col.dims)
        .squeeze()
    )

def export_to_drive(img, des, driver='COG', nodata=0, dask=False, client=None):
    dataset = (img
               .to_dataset(dim='band')
               .rio.write_crs(img.coords['epsg'].item())
              )
    
    for data_var in dataset.data_vars:
        dataset[data_var].rio.write_nodata(nodata, inplace=True)
    
    if dask:
        dataset.rio.to_raster(des, driver=driver, tiled=True, lock=Lock('fnrtm', client=client))
    else:
        dataset.rio.to_raster(des, driver=driver)
    
def export_to_blob(img, container_client, blob, driver='GTiff', nodata=0, dask=False, client=None):
    dataset = (img
               .to_dataset(dim='band')
               .rio.write_crs(img.coords['epsg'].item())
              )
    
    for data_var in dataset.data_vars:
        dataset[data_var].rio.write_nodata(nodata, inplace=True)
    
    with io.BytesIO() as buffer:
        if dask:
            dataset.rio.to_raster(buffer, driver=driver, tiled=True, lock=Lock('fnrtm', client=client)) # Don't really understand this one yet
        else:
            dataset.rio.to_raster(buffer, driver=driver) 
        buffer.seek(0)
        blob_client = container_client.get_blob_client(blob)
        blob_client.upload_blob(buffer, overwrite=True)

def divide_box(bbox, scale=4):
    def get_box(x, y, step_x, step_y):
        return (x, y, x+step_x,y+step_y)
    l, d, r, u = bbox 
    step_x = (r - l)/scale
    step_y = (u - d)/scale
    lists = [] 
    for idx in range(scale):
        for idy in range(scale):
            l, d, r, u = get_box(l+idx*step_x, d+idy*step_y, step_x, step_y)
            lists.append([[[l,d],[r,d],[r,u],[l,u],[l,d]]])
    return lists


In [57]:
endmembers = np.array([[500, 900, 400, 6100, 3000, 1000],
                       [1400, 1700, 2200, 3000, 5500, 3000],
                       [2000, 3000, 3400, 5800, 6000, 5800],
                       [0, 0, 0, 0, 0, 0],
                       [9000, 9600, 8000, 7800, 7200, 6500]], dtype=np.int16)
catalog = Client.open('https://planetarycomputer.microsoft.com/api/stac/v1',modifier=planetary_computer.sign_inplace)
connection_string = read_file('/home/jovyan/fnrtm/files/connect.txt')
container_client = get_container('misc', connection_string)
blob_client = get_blob(container_client, 'one_degree_grid.geojson')
training_container = get_container('training', connection_string)
grid_blob = load_blob_grid(blob_client)
grid_lists = []
for i in grid_blob['features']:
    grid_lists.append(i['properties'])

In [60]:
(cluster, client) = get_cluster(100,1,4) # n=20, ncore=8, memory=16
register_package(client, packages)
print(cluster.dashboard_link)

https://pccompute.westeurope.cloudapp.azure.com/compute/services/dask-gateway/clusters/prod.1c9d98c2ec434d6e90677621a942096d/status


Exception in callback None()
handle: <Handle cancelled>
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/tornado/iostream.py", line 1367, in _do_ssl_handshake
    self.socket.do_handshake()
  File "/srv/conda/envs/notebook/lib/python3.10/ssl.py", line 1342, in do_handshake
    self._sslobj.do_handshake()
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self-signed certificate (_ssl.c:997)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/tornado/platform/asyncio.py", line 192, in _handle_events
    handler_func(fileobj, events)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/tornado/iostream.py", line 691, in _handle_events
    self._ha

In [63]:
gateway = dask_gateway.Gateway()
clusters = gateway.list_clusters()
print(clusters)

[ClusterReport<name=prod.1c9d98c2ec434d6e90677621a942096d, status=RUNNING>]


In [64]:
cluster = gateway.connect(clusters[0].name)
client = cluster.get_client()
print(cluster.dashboard_link)
cluster.shutdown()

https://pccompute.westeurope.cloudapp.azure.com/compute/services/dask-gateway/clusters/prod.1c9d98c2ec434d6e90677621a942096d/status


In [62]:
chunksize = 64
for x,y in zip([-67,-67,-66.5,-66.5],[-9,-8.5,-9,-8.5]):
    bbox,h,v = boxes(x,y,scale=0.5)
    output_name = ('FNRT_%03d%03d_1921_sentinel_2.tif' % (h,v))
    exits = [blob.name for blob in training_container.list_blobs()]
    if output_name in exits:
        print(output_name, 'exists')
        continue
    print(output_name,'working')
    geometry = geometry_from_box(bbox)
    lst = get_sentinel_2_stack('2019-01-01', '2021-12-31', geometry, chunksize)
    trained =  xr_fnrt_sentinel_2(lst, endmembers)
    export_to_blob(trained, training_container, output_name, dask=False)
    del trained

FNRT_226162_1921_sentinel_2.tif exists
FNRT_226163_1921_sentinel_2.tif exists
FNRT_227162_1921_sentinel_2.tif exists
FNRT_227163_1921_sentinel_2.tif working


  times = pd.to_datetime(


RuntimeError: Error during deserialization of the task graph. This frequently occurs if the Scheduler and Client have different environments. For more information, see https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments


In [51]:
client.get_versions()

{'scheduler': {'host': {'python': '3.10.10.final.0',
   'python-bits': 64,
   'OS': 'Linux',
   'OS-release': '5.4.0-1101-azure',
   'machine': 'x86_64',
   'processor': 'x86_64',
   'byteorder': 'little',
   'LC_ALL': 'C.UTF-8',
   'LANG': 'C.UTF-8'},
  'packages': {'python': '3.10.10.final.0',
   'dask': '2023.4.1',
   'distributed': '2023.4.1',
   'msgpack': '1.0.5',
   'cloudpickle': '2.2.1',
   'tornado': '6.3',
   'toolz': '0.12.0',
   'numpy': '1.23.5',
   'pandas': '2.0.1',
   'lz4': '4.3.2'}},
 'workers': {'tls://10.244.117.173:40251': {'host': {'python': '3.10.10.final.0',
    'python-bits': 64,
    'OS': 'Linux',
    'OS-release': '5.4.0-1101-azure',
    'machine': 'x86_64',
    'processor': 'x86_64',
    'byteorder': 'little',
    'LC_ALL': 'C.UTF-8',
    'LANG': 'C.UTF-8'},
   'packages': {'python': '3.10.10.final.0',
    'dask': '2023.4.1',
    'distributed': '2023.4.1',
    'msgpack': '1.0.5',
    'cloudpickle': '2.2.1',
    'tornado': '6.3',
    'toolz': '0.12.0',
    '