In [1]:
import os

# dask/parallelization libraries
import coiled
import dask
from dask.distributed import Client, LocalCluster
from dask.distributed import print as dask_print
import dask.config
import distributed

# scipy basics
import numpy as np
import rasterio
import rasterio.features
import rasterio.transform
import rasterio.windows

from numba import jit
import concurrent.futures

import boto3
import time
import math
import ctypes
import pandas as pd

<font size="6">Making cloud and local clusters</font> 

In [None]:
# Full cluster
coiled_cluster = coiled.Cluster(
    n_workers=20,
    use_best_zone=True, 
    compute_purchase_option="spot_with_fallback",
    idle_timeout="10 minutes",
    region="us-east-1",
    name="next_gen_forest_carbon_flux_model", 
    account='jterry64', # Necessary to use the AWS environment that Justin set up in Coiled
    worker_memory = "32GiB" 
)

# Coiled cluster (cloud run)
coiled_client = coiled_cluster.get_client()
coiled_client

In [None]:
# Test cluster
coiled_cluster = coiled.Cluster(
    n_workers=1,
    use_best_zone=True, 
    compute_purchase_option="spot_with_fallback",
    idle_timeout="10 minutes",
    region="us-east-1",
    name="next_gen_forest_carbon_flux_model", 
    account='jterry64', # Necessary to use the AWS environment that Justin set up in Coiled
    worker_memory = "32GiB" 
)

# Coiled cluster (cloud run)
coiled_client = coiled_cluster.get_client()
coiled_client

In [None]:
# Local single-process cluster (local run). Will run .compute() on just one process, not a whole cluster.
local_client = Client(processes=False)
local_client

In [None]:
local_client = Client()
local_client

In [2]:
# Local cluster with multiple workers
local_cluster = LocalCluster()  
local_client = Client(local_cluster)
local_client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 24.91 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:41321,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 24.91 GiB

0,1
Comm: tcp://127.0.0.1:44941,Total threads: 2
Dashboard: http://127.0.0.1:41215/status,Memory: 6.23 GiB
Nanny: tcp://127.0.0.1:44581,
Local directory: /tmp/dask-scratch-space/worker-9q0ee9_t,Local directory: /tmp/dask-scratch-space/worker-9q0ee9_t

0,1
Comm: tcp://127.0.0.1:42289,Total threads: 2
Dashboard: http://127.0.0.1:33249/status,Memory: 6.23 GiB
Nanny: tcp://127.0.0.1:44009,
Local directory: /tmp/dask-scratch-space/worker-dqxi1xdo,Local directory: /tmp/dask-scratch-space/worker-dqxi1xdo

0,1
Comm: tcp://127.0.0.1:33367,Total threads: 2
Dashboard: http://127.0.0.1:45081/status,Memory: 6.23 GiB
Nanny: tcp://127.0.0.1:35905,
Local directory: /tmp/dask-scratch-space/worker-wdm36zl7,Local directory: /tmp/dask-scratch-space/worker-wdm36zl7

0,1
Comm: tcp://127.0.0.1:33479,Total threads: 2
Dashboard: http://127.0.0.1:38465/status,Memory: 6.23 GiB
Nanny: tcp://127.0.0.1:45789,
Local directory: /tmp/dask-scratch-space/worker-jzyrukvk,Local directory: /tmp/dask-scratch-space/worker-jzyrukvk


<font size="6">Shutting down cloud and local clusters</font> 

In [None]:
coiled_cluster.shutdown()

In [None]:
local_client.shutdown()

<font size="6">Analysis</font> 

<font size="4">Paths and functions</font>

In [3]:
# General paths and constants

general_uri = 's3://gfw2-data/landcover/composite/'

s3_out_dir = 'climate/AFOLU_flux_model/LULUCF/outputs'

def timestr():
    return time.strftime("%Y%m%d_%H_%M_%S")

In [4]:
# Returns list of all chunk boundaries within a bounding box for chunks of a given size
def get_chunk_bounds(chunk_params):

    min_x = chunk_params[0]
    min_y = chunk_params[1]
    max_x = chunk_params[2]
    max_y = chunk_params[3]
    chunk_size = chunk_params[4]
    
    x, y = (min_x, min_y)
    chunks = []

    # Polygon Size
    while y < max_y:
        while x < max_x:
            bounds = [
                x,
                y,
                x + chunk_size,
                y + chunk_size,
            ]
            chunks.append(bounds)
            x += chunk_size
        x = min_x
        y += chunk_size

    return chunks

# Returns the encompassing tile_id string in the form YYN/S_XXXE/W based on a coordinate
def xy_to_tile_id(top_left_x, top_left_y):

    lat_ceil = math.ceil(top_left_y/10.0) * 10
    lng_floor = math.floor(top_left_x/10.0) * 10
    
    lng: str = f"{str(lng_floor).zfill(3)}E" if (lng_floor >= 0) else f"{str(-lng_floor).zfill(3)}W"
    lat: str = f"{str(lat_ceil).zfill(2)}N" if (lat_ceil >= 0) else f"{str(-lat_ceil).zfill(2)}S"

    return f"{lat}_{lng}"

In [5]:
# Lazily opens tile within provided bounds (i.e. one chunk) and returns as a numpy array
# If it can't open the chunk (no data in it), it returns an array of all 0s
def get_tile_dataset_rio(uri, bounds, chunk_length):

    try:
        with rasterio.open(uri) as ds:
            window = rasterio.windows.from_bounds(*bounds, ds.transform)
            data = ds.read(1, window=window)
    except:
        data = np.zeros((chunk_length, chunk_length))

    if data.size==0:
        # dask_print("No data in chunk")
        return np.zeros((chunk_length, chunk_length))
    else:
        # dask_print("Data in chunk")
        return data

In [63]:
def save_and_upload(bounds, chunk_length_pixels, tile_id, bounds_str, year, output_dict):

    transform = rasterio.transform.from_bounds(*bounds, width=chunk_length_pixels, height=chunk_length_pixels)

    file_info = f'{tile_id}__{bounds_str}__{year}'

    s3_client = boto3.client("s3")

    dask_print(f"Saving {bounds_str} in {tile_id} for {year}: {timestr()}")

    # For every output file, saves from array to local raster, then to s3.
    # Can't save directly to s3, unfortunately, so need to save locally first.
    for key, value in output_dict.items():

        file_name = f"{key}__{file_info}__{timestr()}"

        with rasterio.open(f"/tmp/{file_name}.tif", 'w', driver='GTiff', width=chunk_length_pixels, height=chunk_length_pixels, count=1, dtype='uint8', crs='EPSG:4326', transform=transform, compress='lzw', blockxsize=400, blockysize=400) as dst:
            dst.write(value[0].astype(rasterio.uint8), 1)

        s3_client.upload_file(f"/tmp/{file_name}.tif", "gfw2-data", Key=f"{s3_out_dir}/{file_name}.tif")

        # Deletes the local raster. It won't be used again.
        os.remove(f"/tmp/{file_name}.tif")

<font size="4">Model steps</font>

In [46]:
def classify_array(GLCLU_block):

    # Outputs
    IPCC_classes = np.zeros(GLCLU_block.shape)

    IPCC_classes[np.where(GLCLU_block <= 1)] = 6                                 # Other land
    IPCC_classes[np.where((GLCLU_block >= 2) & (GLCLU_block <= 26))] = 5          # Grassland
    IPCC_classes[np.where((GLCLU_block >= 27) & (GLCLU_block <= 48))] = 1         # Forest
    IPCC_classes[np.where((GLCLU_block >= 100) & (GLCLU_block <= 101))] = 4       # Wetland
    IPCC_classes[np.where((GLCLU_block >= 102) & (GLCLU_block <= 126))] = 5       # Grassland
    IPCC_classes[np.where((GLCLU_block >= 127) & (GLCLU_block <= 148))] = 1       # Forest
    IPCC_classes[np.where((GLCLU_block >= 200) & (GLCLU_block <= 204))] = 4       # Wetland   
    IPCC_classes[np.where(GLCLU_block == 241)] = 6                               # Other
    IPCC_classes[np.where(GLCLU_block == 244)] = 2                               # Cropland
    IPCC_classes[np.where(GLCLU_block == 250)] = 3                               # Settlement
    
    return IPCC_classes

In [62]:
# Runs model on each chunk for all years.
# Chunks are defined by a bounding box and a starting year for iteration
def process_chunk(bounds, start_year):
 
    futures = {}
    layers = {}

    bounds_str = "_".join([str(round(x)) for x in bounds])
    chunk_length_pixels = int((bounds[3]-bounds[1]) * (40000/10))

    # Submit requests to S3 for input chunks but don't actually download them yet. This queueing of the requests before downloading them speeds up the downloading
    # Approach is to download all the input chunks up front for every year to make downloading more efficient, even though it means storing more upfront
    with concurrent.futures.ThreadPoolExecutor() as executor:
        tile_id = xy_to_tile_id(bounds[0], bounds[3])

        dask_print(f"Making requests for data in chunk {bounds_str} in {tile_id}: {timestr()}")

        # TODO: Create tiles of last year of forest (new utitilites script) and add as another input file
        download_dict = {
            "iso": f"s3://gfw2-data/gadm_administrative_boundaries/v3.6/raster/epsg-4326/10/40000/adm0/gdal-geotiff/{tile_id}.tif",  # Originally from gfw-data-lake, so it's in 400x400 windows,

            "land_cover_2000": f"s3://gfw2-data/landcover/composite/2000/raw/{tile_id}.tif",
            "land_cover_2005": f"s3://gfw2-data/landcover/composite/2005/raw/{tile_id}.tif",
            "land_cover_2010": f"s3://gfw2-data/landcover/composite/2010/raw/{tile_id}.tif",
            "land_cover_2015": f"s3://gfw2-data/landcover/composite/2015/raw/{tile_id}.tif",
            "land_cover_2020": f"s3://gfw2-data/landcover/composite/2020/raw/{tile_id}.tif"   
        }

        for key, value in download_dict.items():
            futures[executor.submit(get_tile_dataset_rio, value, bounds, chunk_length_pixels)] = key

    # Waits for requests to come back with data from S3
    for future in concurrent.futures.as_completed(futures):
        layer = futures[future]
        layers[layer] = future.result()

    
    for year in list(range(2000, 2021, 5)):
        
        dask_print(f"Reclassifying {bounds_str} in {tile_id} for {year}: {timestr()}")

        IPCC_classes = classify_array(
            layers[f"land_cover_{year}"]
        )

        # Output files to upload to s3
        output_dict = {
            "IPCC_classes": [IPCC_classes, "uint8"]                 
       }

        save_and_upload(bounds, chunk_length_pixels, tile_id, bounds_str, year, output_dict)

        # Clear memory of unneeded arrays
        del IPCC_classes

    return f"success for {bounds_str}: {timestr()}"

In [64]:
%%time

# Year to start the analysis
# start_year = 2000   # full run
start_year = 2020  # final year

# Area to analyze
# chunk_params arguments: W, S, E, N, chunk size (degrees)
# chunk_params = [-12, 34, 32, 72, 1]  # all of Europe
# chunk_params = [-10, 40, 20, 70, 1]    # 30x30 deg (70N_010W), 900 chunks
# chunk_params = [10, 40, 20, 50, 1]    # 10x10 deg (50N_010E), 100 chunks
# chunk_params = [10, 46, 14, 50, 2]   # 4x4 deg, 4 chunks
# chunk_params = [10, 48, 12, 50, 1]   # 2x2 deg, 4 chunks
# chunk_params = [10, 49, 11, 50, 1]   # 1x1 deg, 1 chunk
# chunk_params = [10, 49, 11, 50, 0.5] # 1x1 deg, 4 chunks
# chunk_params = [10, 49.5, 10.5, 50, 0.25] # 0.5x0.5 deg, 4 chunks
chunk_params = [10, 49.75, 10.25, 50, 0.25] # 0.25x0.25 deg, 1 chunk (has data)
# chunk_params = [0, 79.75, 0.25, 80, 0.25] # 0.25x0.25 deg, 1 chunk (no data)

# Makes list of chunks to analyze
chunks = get_chunk_bounds(chunk_params)  
print("Processing", len(chunks), "chunks")

# Creates list of tasks to run (1 task = 1 chunk for all years)
delayed = [dask.delayed(process_chunk)(chunk, start_year) for chunk in chunks]

# Actually runs analysis
results = dask.compute(*delayed)
results

Processing 1 chunks
Making requests for data in chunk 10_50_10_50 in 50N_010E: 20240124_12_47_19
Reclassifying 10_50_10_50 in 50N_010E for 2000: 20240124_12_47_36
Saving 10_50_10_50 in 50N_010E for 2000: 20240124_12_47_36
Reclassifying 10_50_10_50 in 50N_010E for 2005: 20240124_12_47_36
Saving 10_50_10_50 in 50N_010E for 2005: 20240124_12_47_36
Reclassifying 10_50_10_50 in 50N_010E for 2010: 20240124_12_47_36
Saving 10_50_10_50 in 50N_010E for 2010: 20240124_12_47_36
Reclassifying 10_50_10_50 in 50N_010E for 2015: 20240124_12_47_37
Saving 10_50_10_50 in 50N_010E for 2015: 20240124_12_47_37
Reclassifying 10_50_10_50 in 50N_010E for 2020: 20240124_12_47_37
Saving 10_50_10_50 in 50N_010E for 2020: 20240124_12_47_37
CPU times: user 1.64 s, sys: 169 ms, total: 1.81 s
Wall time: 18 s


('success for 10_50_10_50: 20240124_12_47_37',)

In [None]:
# To run without dask at all
process_chunk([10, 49, 11, 50], 1, start_year)

In [None]:
# Download test-- checks that uri is found and recognized
tile_id = "50N_010E"
# uri = f"s3://gfw2-data/climate/carbon_model/BGB_AGB_ratio/processed/20230216/{tile_id}_BGB_AGB_ratio.tif"
uri = f"s3://gfw2-data/gadm_administrative_boundaries/v3.6/raster/epsg-4326/10/40000/adm0/gdal-geotiff/{tile_id}.tif"  # Originally from gfw-data-lake, so it's in 400x400 windows
# uri = f"s3://gfw2-data/fao_ecozones/v2000/raster/epsg-4326/10/40000/class/gdal-geotiff/{tile_id}.tif"   # Originally from gfw-data-lake, so it's in 400x400 windows 
bounds = [10, 49.75, 10.25, 50]

get_tile_dataset_rio(uri, bounds, 1000)

In [None]:
coiled_client.restart() 

In [None]:
client.cancel(future) # per https://github.com/dask/distributed/issues/3898#issuecomment-645590511

In [None]:
# aws s3 cp s3://gfw2-data/climate/European_height_carbon_model/outputs/ . --recursive --exclude "*" --include "*10_49_11_50*"
# aws s3 cp s3://gfw2-data/climate/European_height_carbon_model/outputs/ . --recursive --exclude "*" --include "*2002*10_49_11_50*"