# Cloudbutton Geospatial: Water Consumption Workflow

---

In [None]:
import sys
sys.path.append('/work')

In [None]:
from collections import defaultdict
from cloudbutton_geospatial.io_utils.plot import plot_results
from cloudbutton_geospatial.utils.notebook import date_picker
from rasterio.windows import Window
from scipy.spatial import distance_matrix
from shapely.geometry import Point, MultiPoint, box
from pprint import pprint
import functools
import collections
import datetime
import os
import shutil
import math
import numpy as np
import pandas as pd
import lithops
import requests
import rasterio
import fiona
import json
import random
import re
import tempfile
import concurrent.futures
from IPython.display import Image
import matplotlib.pyplot as plt
from lithops.storage import Storage
from lithops.storage.utils import StorageNoSuchKeyError
from io import BytesIO


from cloud_data_cockpit import DataCockpit


## Workflow parameters

Area outside the processed tile that we want to consider for taking SIAM stations into account:

In [None]:
AREA_OF_INFLUENCE = 16000

Lithops Variables:

In [None]:
DATA_BUCKET = 'pyrundemo'
COMPUTE_BACKEND = 'aws_lambda'
STORAGE_BACKEND = 'aws_s3'
STORAGE_PREFIX = 's3://'
RUNTIME_MEMORY = 4096

In [None]:
storage = lithops.storage.Storage(backend=STORAGE_BACKEND)

In [None]:
data_loader = DataCockpit()

In [None]:
chunks = data_loader.get_data_slices()
SPLITS = data_loader.get_batch_size()

In [None]:
SPLITS

In [None]:
chunks

In [None]:
DTM_PREFIX = 'DTMs/'
DTM_ASC_PREFIX = 'DTMs/asc/'
DTM_GEOTIFF_PREFIX = 'DTMs/chunks/'

Split tile into square chunks (number of tiles = SPLITS^2):

Correlation coefficient between elevation and temperature:

In [None]:
r = -0.0056

Elevation to interpolate temperature:

In [None]:
zdet = 2000

Day of year to calculate solar irradiation:

In [None]:
date = date_picker(default=datetime.date(2022, 5, 15))

In [None]:
DAY_OF_YEAR = date.value.timetuple().tm_yday
DAY_OF_YEAR

Initialize Lithops Storage and Function Executor:

In [None]:
fexec = lithops.FunctionExecutor(backend=COMPUTE_BACKEND, storage=STORAGE_BACKEND, runtime_memory=RUNTIME_MEMORY)

## Data preparation

### SIAM data

In [None]:
siam_data_key = 'siam_data.csv'
try:
    siam_data_head = storage.head_object(bucket=DATA_BUCKET, key=siam_data_key)
    print(f'SIAM meteo data already in storage: {siam_data_head}')
except StorageNoSuchKeyError:
    print('Uploading SIAM meteo data to Object Storage...')
    with open(siam_data_key, 'rb') as f:
        storage.put_object(bucket=DATA_BUCKET, key=siam_data_key, body=f)

### Shapefile

In [None]:
shapefile_key = 'shapefile_murcia.zip'
try:
    shapefile_head = storage.head_object(bucket=DATA_BUCKET, key=shapefile_key)
    print(f'Shapefile already in storage: {siam_data_head}')
except StorageNoSuchKeyError:
    print('Uploading shapefile to Object Storage...')
    with open(shapefile_key, 'rb') as f:
        storage.put_object(bucket=DATA_BUCKET, key=shapefile_key, body=f)

Download DTM files for free from http://centrodedescargas.cnig.es/CentroDescargas/buscadorCatalogo.do?codFamilia=MDT05# and put them in `input_DTMs` folder.

## Raster Data Interpolation

Split data tiles in subtiles for increased parallelism:

In [None]:
def compute_solar_irradiation(inputFile, outputFile, crs='32630'):
    # Define grass working set
    GRASS_GISDB = '/tmp/grassdata'
    #GRASS_GISDB = 'grassdata'
    GRASS_LOCATION = 'GEOPROCESSING'
    GRASS_MAPSET = 'PERMANENT'
    GRASS_ELEVATIONS_FILENAME = 'ELEVATIONS'

    os.environ['GRASSBIN'] = 'grass76'

    from grass_session import Session
    import grass.script as gscript
    import grass.script.setup as gsetup
    from grass.pygrass.modules.shortcuts import general
    from grass.pygrass.modules.shortcuts import raster
    
    os.environ.update(dict(GRASS_COMPRESS_NULLS='1'))

    # Clean previously processed data
    if os.path.isdir(GRASS_GISDB):
        shutil.rmtree(GRASS_GISDB)
    
    with Session(gisdb=GRASS_GISDB, location=GRASS_LOCATION, mapset=GRASS_MAPSET, create_opts='EPSG:32630') as ses:
        # Set project projection to match elevation raster projection
        general.proj(epsg=crs, flags='c') 
        # Load raster file into working directory
        raster.import_(input=inputFile, output=GRASS_ELEVATIONS_FILENAME, flags='o')    
        
        # Set project region to match raster region
        general.region(raster=GRASS_ELEVATIONS_FILENAME, flags='s')    
        # Calculate solar irradiation
        gscript.run_command('r.slope.aspect', elevation=GRASS_ELEVATIONS_FILENAME,
                            slope='slope', aspect='aspect')
        gscript.run_command('r.sun', elevation=GRASS_ELEVATIONS_FILENAME,
                            slope='slope', aspect='aspect', beam_rad='beam',
                            step=1, day=DAY_OF_YEAR)
        
        # Get extraterrestrial irradiation from history metadata
        regex = re.compile(r'\d+\.\d+')
        output = gscript.read_command("r.info", flags="h", map=["beam"])
        splits = str(output).split('\n')
        line = next(filter(lambda line: 'Extraterrestrial' in line, splits))
        extraterrestrial_irradiance = float(regex.search(line)[0])
        
        # Export generated results into a GeoTiff file
        if os.path.isfile(outputFile):
            os.remove(outputFile)

        raster.out_gdal(input='beam', output=outputFile)
        
        return extraterrestrial_irradiance

Get stations contained in the area of interest:

In [None]:
def filter_stations(bounds, stations):
    total_points = MultiPoint([Point(x, y) for x, y in stations[['X', 'Y']].to_numpy()])
    total_points_list = list(total_points.geoms)
    intersection = bounds.buffer(AREA_OF_INFLUENCE).intersection(total_points)
    filtered_stations = [point for point in total_points_list if intersection.contains(point)]

    return stations[[point in filtered_stations for point in total_points_list]]

Inverse Distance Weighting interpolation:

In [None]:
def compute_basic_interpolation(shape, stations, field_value, offset = (0,0)):
    station_pixels = [[pixel[0], pixel[1]] for pixel in stations['pixel'].to_numpy()]
    
    # Get an array where each position represents pixel coordinates
    tile_pixels = np.indices(shape).transpose(1,2,0).reshape(shape[0]*shape[1], 2) + offset
    dist = distance_matrix(station_pixels, tile_pixels)
    weights = np.where(dist == 0, np.finfo('float32').max, 1.0 / dist )
    weights /=  weights.sum(axis=0)
    
    return np.dot(weights.T, stations[field_value].to_numpy()).reshape(shape).astype('float32')

Interpolate temperatures from a subset of the tile:

In [None]:
import os
import tempfile
import numpy as np
import rasterio

def radiation_interpolation(
    tile_key: str,
    block_x: int,
    block_y: int,
    chunk_slice,
    storage
) -> list[tuple]:
    """
    For a given COG slice, compute:
     - extraterrestrial irradiation raster
     - beam radiation raster
    Returns two entries:
      (tile_key, 'extr', block_x, block_y, extr_cloudobject)
      (tile_key, 'rad',  block_x, block_y, rad_cloudobject)
    """
    tile_id, _ = os.path.splitext(tile_key)

    # 1) Write the slice to a temporary GeoTIFF
    chunk_file = os.path.join(
        tempfile.gettempdir(),
        f"{tile_id}_{block_x}_{block_y}.tif"
    )
    chunk_slice.to_file(chunk_file)

    # 2) Open that chunk to get elevation data and profile
    with rasterio.open(chunk_file) as src:
        elevation = src.read(1)
        profile   = src.profile.copy()
        height, width = elevation.shape

    # 3) Update profile transform, size, dtype
    #    (transform already baked into to_file output)
    profile.update({
        "height": height,
        "width":  width,
        "driver": "GTiff",
        "dtype":  elevation.dtype,
    })

    # 4) Paths for output rasters
    extr_path = os.path.join(
        tempfile.gettempdir(),
        f"{tile_id}_extr_{block_x}_{block_y}.tif"
    )
    rad_path = os.path.join(
        tempfile.gettempdir(),
        f"{tile_id}_rad_{block_x}_{block_y}.tif"
    )

    # 5) Compute beam radiation via your GRASS/r.sun helper
    #    It will write out rad_path and return the 
    #    extraterrestrial irradiation constant.
    extraterrestrial_irradiation = compute_solar_irradiation(
        inputFile=chunk_file,
        outputFile=rad_path
    )

    # 6) Write a constant‐value raster for extraterrestrial irradiation
    with rasterio.open(extr_path, "w", **profile) as dst:
        layer = np.full((height, width),
                        extraterrestrial_irradiation,
                        dtype=elevation.dtype)
        dst.write(layer, 1)

    # 7) Upload both rasters to object storage
    print(extr_path)
    print(rad_path)
    with open(extr_path, "rb") as f:
        extr_co = storage.put_cloudobject(body=f, bucket=DATA_BUCKET)

    with open(rad_path, "rb") as f:
        rad_co = storage.put_cloudobject(body=f, bucket=DATA_BUCKET)

    return [
        (tile_key, "extr", block_x, block_y, extr_co),
        (tile_key, "rad",  block_x, block_y, rad_co),
    ]


In [None]:
import os
import tempfile
import numpy as np
import pandas as pd
import rasterio
from shapely.geometry import box
from lithops.storage import Storage

def map_interpolation(
    tile_key: str,
    block_x: int,
    block_y: int,
    chunk_slice,
    data_field: str
) -> list[tuple]:
    """
    Interpolate a meteorological field over one COG slice.
    Returns [(tile_key, data_field, block_x, block_y, CloudObject), …].
    """

    # Re-create Storage client inside the worker
    storage = Storage(backend=STORAGE_BACKEND)

    # 1) Read SIAM stations
    siam_stream = storage.get_object(bucket=DATA_BUCKET,
                                     key=siam_data_key,
                                     stream=True)
    siam_data = pd.read_csv(siam_stream)

    # 2) Write the single-window chunk to a temp GeoTIFF
    tile_id = os.path.splitext(tile_key)[0]
    tmp_chunk = os.path.join(tempfile.gettempdir(),
                             f"{tile_id}_{block_x}_{block_y}.tif")
    chunk_slice.to_file(tmp_chunk)

    # 3) Open that chunk to get elevation + metadata
    with rasterio.open(tmp_chunk) as src:
        elevation = src.read(1)
        profile   = src.profile.copy()
        transform = src.transform
        nodata    = src.nodata
        height, width = elevation.shape

    # 4) Compute the slice’s bounding box and buffer it
    minx, miny, maxx, maxy = src.bounds
    bbox = box(minx, miny, maxx, maxy).buffer(AREA_OF_INFLUENCE)

    # 5) Filter stations inside the buffered bbox
    stations = pd.DataFrame(filter_stations(bbox, siam_data))
    if stations.empty:
        return [(tile_key, data_field, block_x, block_y, None)]

    # 6) Convert station coords to pixel indices
    stations["pixel"] = stations.apply(
        lambda r: rasterio.transform.rowcol(transform, r["X"], r["Y"]),
        axis=1
    )

    # 7) Prepare output filename & profile (already correct size/transform)
    out_file = os.path.join(
        tempfile.gettempdir(),
        f"{tile_id}_{data_field}_{block_x}_{block_y}.tif"
    )

    # 8) Perform the interpolation
    with rasterio.open(out_file, "w", **profile) as dst:
        if data_field == "temp":
            interp = compute_basic_interpolation(elevation.shape,
                                                 stations,
                                                 "tdet",
                                                 (0, 0))
            interp += r * (elevation - zdet)
            layer = np.where(elevation == nodata, np.nan, interp)
        elif data_field == "humi":
            layer = compute_basic_interpolation((height, width),
                                                stations,
                                                "hr",
                                                (0, 0))
        elif data_field == "wind":
            layer = compute_basic_interpolation((height, width),
                                                stations,
                                                "v",
                                                (0, 0))
        else:
            raise ValueError(f"Unknown data_field {data_field!r}")

        dst.write(layer.astype(profile["dtype"]), 1)

    # 9) Upload result and return
    print(out_file)
    with open(out_file, "rb") as f:
        co = storage.put_cloudobject(body=f, bucket=DATA_BUCKET)

    return [(tile_key, data_field, block_x, block_y, co)]


In [None]:
from typing import List, Tuple


def generate_iterdata(chunks) -> List[Tuple]:
    """Generates the iterdata array with the data blocks extracted from the COG."""
    iterdata = []
    
    for i, window in enumerate(chunks):
        tile_key = window.tile_key
        block_x = window.block_x
        block_y = window.block_y
        chunk_data = window

        iterdata.append((tile_key, block_x, block_y, chunk_data))
    
    return iterdata

In [None]:
iterdata = generate_iterdata(chunks)

In [None]:
iterdata

Lithops serverless computation:

In [None]:
res_rad = fexec.map(radiation_interpolation, iterdata, runtime_memory=2048).get_result()

In [None]:
iterdata

In [None]:
res_temp = fexec.map(map_interpolation, iterdata, extra_args=('temp', ), runtime_memory=2048).get_result()

In [None]:
res_temp

In [None]:
res_humi = fexec.map(map_interpolation, iterdata, extra_args=('humi', ), runtime_memory=2048).get_result()
res_wind = fexec.map(map_interpolation, iterdata, extra_args=('wind', ), runtime_memory=2048).get_result()

In [None]:
res_flatten = []
for l in [res_rad, res_temp, res_humi, res_wind]:
    for elem in l:
        for sub_elem in elem:
            print(sub_elem)
            res_flatten.append(sub_elem)

In [None]:
# res_flatten

In [None]:
grouped_chunks = collections.defaultdict(list)

for chunk_result in res_flatten:
    tile_key, data_field, block_x, block_y, co = chunk_result
    grouped_chunks[(tile_key, data_field)].append((block_x, block_y, co))

In [None]:
grouped_chunks

Join split subsets into a tile:

In [None]:
import os
import math
import boto3
import tempfile
from io import BytesIO

import rasterio
from affine import Affine
from rasterio.windows import Window

from dataplug.cloudobject import CloudObject
from dataplug.formats.geospatial.cog import CloudOptimizedGeoTiff

def merge_blocks(tile_data, chunks, storage):
    """
    tile_data: (tile_key, data_field)
    chunks:    [(block_x, block_y, chunk_co), ...]
    storage:   Lithops Storage client
    """
    tile_key, data_field = tile_data

    session = boto3.Session()
    creds = session.get_credentials().get_frozen_credentials()
    s3_config = {
        "credentials": {
            "AccessKeyId": creds.access_key,
            "SecretAccessKey": creds.secret_key,
            "SessionToken": creds.token,
        },
        "region_name": session.region_name,
    }

    # 1) Load the original COG metadata (no full download)
    source_uri = f"s3://{DATA_BUCKET}/{tile_key}"
    print(source_uri)
    orig_co    = CloudObject.from_s3(CloudOptimizedGeoTiff, source_uri, s3_config=s3_config)
    attrs      = orig_co.attributes

    width, height = attrs.width, attrs.height
    base_tf       = Affine(*attrs.transform[:6])  # six values only

    # 2) Seed a profile from the first chunk’s profile
    bx0, by0, first_co = chunks[0]
    first_bytes        = storage.get_cloudobject(first_co)
    with rasterio.open(BytesIO(first_bytes)) as src:
        profile = src.profile.copy()

    # 3) Update to full‐tile size & transform
    profile.update({
        "width":     width,
        "height":    height,
        "transform": base_tf,
        # keep driver, dtype, nodata, count from the chunk
    })

    # 4) Create an empty output file
    merged_file = os.path.join(
        tempfile.gettempdir(),
        f"{data_field}_{os.path.splitext(tile_key)[0]}.tif"
    )
    with rasterio.open(merged_file, "w", **profile) as dest:
        # compute step sizes
        step_w = math.floor(width  / SPLITS)
        step_h = math.floor(height / SPLITS)

        for block_x, block_y, chunk_co in chunks:
            # 5) Read only that small chunk
            chunk_bytes = storage.get_cloudobject(chunk_co)
            with rasterio.open(BytesIO(chunk_bytes)) as src:
                arr = src.read(1)
                h, w = arr.shape

            # 6) Compute window in the big tile
            col_off = block_y * step_w
            row_off = block_x * step_h
            window  = Window(col_off, row_off, w, h)

            # 7) Write it in
            dest.write(arr, 1, window=window)

    # 8) Upload final merged tile
    output_key = os.path.join(DTM_PREFIX, data_field, tile_key)
    with open(merged_file, "rb") as f:
        storage.put_object(bucket=DATA_BUCKET, key=output_key, body=f)

    return output_key


Combine previous split subsets:

In [None]:
iterdata = []
for (tile_id, data_field), chunks in grouped_chunks.items():
    iterdata.append(((tile_id, data_field), chunks))

In [None]:
iterdata

In [None]:
tiles_merged = fexec.map(merge_blocks, iterdata, runtime_memory=4096).get_result()

In [None]:
tile_keys_merged = set([os.path.basename(t) for t in tiles_merged])

In [None]:
# tile_keys_merged

## Computation of potential evaporation

In [None]:
def compute_crop_evapotranspiration(temperatures,
                                    humidities,
                                    wind_speeds,
                                    external_radiations,
                                    global_radiations,
                                    KCs):
    gamma = 0.665*101.3/1000
    eSat = 0.6108 * np.exp((17.27*temperatures)/(temperatures+237.3))
    delta = 4098 * eSat / np.power((temperatures + 237.3),2)
    eA = np.where(humidities < 0, 0, eSat * humidities / 100)     # Avoid sqrt of a negative number
    T4 = 4.903 * np.power((273.3 + temperatures),4)/1000000000
    rSrS0 = global_radiations/(external_radiations * 0.75)
    rN = 0.8* global_radiations-T4*(0.34-0.14*np.sqrt(eA))*((1.35*rSrS0)-0.35)
    den = delta + gamma *(1 + 0.34* wind_speeds)
    tRad = 0.408 * delta * rN / den
    tAdv = gamma * (900/(temperatures+273))*wind_speeds * (eSat - eA)/den
    return ((tRad + tAdv) * 7 * KCs).astype('float32')

In [None]:
vineyard = ['VI', 'VO', 'VF', 'FV', 'CV' ]
olive_grove = ['OV', 'VO', 'OF', 'FL', 'OC']
fruit = ['FY', 'VF', 'OF', 'FF', 'CF']
nuts = ['FS', 'FV', 'FL', 'FF', 'CS' ]
citrus = ['CI', 'CV', 'OC', 'CF', 'CS' ]

def get_kc(feature):
    # TODO: Get more precise values of Kc
    print(feature['properties'].keys())
    # sigpac_use = feature['properties']['uso_sigpac']
    sigpac_use = 'FF'
    if sigpac_use in vineyard:
        # Grapes for wine - 0.3, 0.7, 0.45
        return 0.7  
    if sigpac_use in olive_grove:
        # Olive grove - ini: 0.65, med: 0.7, end: 0.7
        return 0.7 
    if sigpac_use in fruit:
        # Apples, Cherries, Pears - 0.45, 0.95, 0.7
        return 0.95
    if sigpac_use in nuts:
        # Almonds - 0.4, 0.9, 0.65
        return 0.9
    if sigpac_use in citrus:
        # Citrus, without ground coverage - 0.7, 0.65, 0.7
        return 0.65
    
    return None

In [None]:
def get_geometry_window(src, geom_bounds):
    left, bottom, right, top = geom_bounds
    src_left, src_bottom, src_right, src_top = src.bounds
    window = src.window(max(left,src_left), max(bottom,src_bottom), min(right,src_right), min(top,src_top))
    window_floored = window.round_offsets(op='floor', pixel_precision=3)
    w = math.ceil(window.width + window.col_off - window_floored.col_off)
    h = math.ceil(window.height + window.row_off - window_floored.row_off)
    return Window(window_floored.col_off, window_floored.row_off, w, h)     

In [None]:
def compute_evapotranspiration_by_shape(tem, hum, win, rad, extrad, dst):

    import fiona
    from shapely.geometry import shape, box
    from rasterio import features

    non_arable_land = ['AG', 'CA', 'ED', 'FO', 'IM', 'PA', 'PR', 'ZU', 'ZV']

    #with fiona.open('zip://home/docker/shape.zip') as shape_src:
    with fiona.open('zip:///tmp/shape.zip') as shape_src:
        for feature in shape_src.filter(bbox=tem.bounds):
            KC = get_kc(feature)
            if KC is not None:
                geom = shape(feature['geometry'])
                window = get_geometry_window(tem, geom.bounds)
                win_transform = rasterio.windows.transform(window, tem.transform)
                # Convert shape to raster matrix
                image = features.rasterize([geom],
                                           out_shape=(window.height, window.width),
                                           transform = win_transform,
                                           fill = 0,
                                           default_value = 1).astype('bool')
                # Get values to compute evapotranspiration
                temperatures = tem.read(1, window=window)
                humidities = hum.read(1, window=window)
                wind_speeds = win.read(1, window=window)
                # Convert from W to MJ (0.0036)
                global_radiations = rad.read(1, window=window) * 0.0036
                external_radiations = extrad.read(1, window=window) * 0.0036
                KCs = np.full(temperatures.shape, KC)
                # TODO: compute external radiation
                #external_radiations = np.full(temperatures.shape, 14)
                # TODO: compute global radiation
                # global_radiations = np.full(temperatures.shape, 10)
                etc = compute_crop_evapotranspiration(
                        temperatures,
                        humidities,
                        wind_speeds,
                        external_radiations,
                        global_radiations,
                        KCs
                )
                etc[temperatures == tem.nodata] = dst.nodata
                etc[np.logical_not(image)] = dst.nodata
                dst.write(etc + dst.read(1, window=window), 1, window=window)

In [None]:
def compute_global_evapotranspiration(tem, hum, win, rad, extrad, dst):    
    for ji, window in tem.block_windows(1):
        bounds = rasterio.windows.bounds(window, tem.transform)
        temperatures = tem.read(1, window=window)
        humidities = hum.read(1, window=window)
        wind_speeds = win.read(1, window=window)
         # Convert from W to MJ (0.0036)
        global_radiations = rad.read(1, window=window) * 0.0036
        external_radiations = extrad.read(1, window=window) * 0.0036
        # TODO: compute external radiation
        #external_radiations = np.full(temperatures.shape, 14)
        # TODO: compute global radiation
        # global_radiations = np.full(temperatures.shape, 10)
        # TODO: compute KCs
        KCs = np.full(temperatures.shape, 1)
        etc = compute_crop_evapotranspiration(
                temperatures,
                humidities,
                wind_speeds,
                external_radiations,
                global_radiations,
                KCs
        )
        dst.write(np.where(temperatures == tem.nodata, dst.nodata, etc), 1, window=window)

In [None]:
def combine_calculations(tile_key, storage):
    from functools import partial
      
    # Download shapefile
    shapefile = storage.get_object(bucket=DATA_BUCKET, key='shapefile_murcia.zip', stream=True)

    with open('/tmp/shape.zip', 'wb') as shapf:
        for chunk in iter(partial(shapefile.read, 200 * 1024 * 1024), ''):
            if not chunk:
                break
            shapf.write(chunk)
    try:
        temp = storage.get_object(bucket=DATA_BUCKET, key=os.path.join(DTM_PREFIX, 'temp', tile_key))
        humi = storage.get_object(bucket=DATA_BUCKET, key=os.path.join(DTM_PREFIX, 'humi', tile_key))
        rad = storage.get_object(bucket=DATA_BUCKET, key=os.path.join(DTM_PREFIX, 'rad', tile_key))
        extrad = storage.get_object(bucket=DATA_BUCKET, key=os.path.join(DTM_PREFIX, 'extr', tile_key))
        wind = storage.get_object(bucket=DATA_BUCKET, key=os.path.join(DTM_PREFIX, 'wind', tile_key))
    except StorageNoSuchKeyError:
        print("Storage error")
        return None
    
    output_file = os.path.join(tempfile.gettempdir(), 'eva' + '_' + tile_key)
    with rasterio.open(BytesIO(temp)) as temp_raster:
        with rasterio.open(BytesIO(humi)) as humi_raster:
            with rasterio.open(BytesIO(rad)) as rad_raster:
                with rasterio.open(BytesIO(extrad)) as extrad_raster:
                    with rasterio.open(BytesIO(wind)) as wind_raster:
                        profile = temp_raster.profile
                        profile.update(nodata=0)
                        with rasterio.open(output_file, 'w+', **profile) as dst:
#                             compute_global_evapotranspiration(temp_raster, humi_raster, wind_raster,
#                                                               rad_raster, extrad_raster, dst)
                            compute_evapotranspiration_by_shape(temp_raster, humi_raster, wind_raster,
                                                                rad_raster, extrad_raster, dst)
    
    output_key = os.path.join(DTM_PREFIX, 'eva', tile_key)
    with open(output_file, 'rb') as output_f:
        storage.put_object(bucket=DATA_BUCKET, key=output_key, body=output_f)
    return output_key

In [None]:
fs_eva = fexec.map(combine_calculations, tile_keys_merged, runtime_memory=2048)
res_eva = fexec.get_result(fs=fs_eva)

In [None]:
res_eva

---

In [None]:
fexec.clean(clean_cloudobjects=True)

In [None]:
fexec.job_summary()

---

## Visualization of results

In [None]:
%matplotlib inline


In [None]:
from matplotlib import pyplot as plt

tile = res_eva[0]
tile_key = os.path.basename(tile)
tile_id, _ = os.path.splitext(tile_key)
fig, ax = plt.subplots()

with rasterio.open(BytesIO(storage.get_object(bucket=DATA_BUCKET, key=tile))) as src:
    arr = src.read(1, out_shape=(src.height, src.width))
    ax.set_title(tile_id)
    img = ax.imshow(arr, cmap='Greens')
    fig.colorbar(img, shrink=0.5)

fig.set_size_inches(18.5, 10.5)
plt.show()

# obj.seek(0)

---