In [138]:
import os

# dask/parallelization libraries
import coiled
import dask
from dask.distributed import Client, LocalCluster
from dask.distributed import print as dask_print
import dask.config
import distributed

import numpy as np
import rasterio
import rasterio.features
import rasterio.transform
import rasterio.windows
from osgeo import gdal, ogr, osr

from numba import jit
import concurrent.futures

import boto3
import time
import math
import ctypes
import pandas as pd

<font size="6">Making cloud and local clusters</font> 

In [None]:
coiled_cluster = coiled.Cluster(
    n_workers=20,
    use_best_zone=True, 
    compute_purchase_option="spot_with_fallback",
    idle_timeout="10 minutes",
    region="us-east-1",
    name="next_gen_forest_carbon_flux_model", 
    account='jterry64', # Necessary to use the AWS environment that Justin set up in Coiled
    worker_memory = "32GiB" 
)

In [None]:
# Coiled cluster (cloud run)
coiled_client = coiled_cluster.get_client()
coiled_client

In [None]:
# Local single-process cluster (local run). Will run .compute() on just one process, not a whole cluster.
local_client = Client(processes=False)
local_client

In [None]:
local_client = Client()
local_client

In [2]:
# Local cluster with multiple workers
local_cluster = LocalCluster()  
local_client = Client(local_cluster)
local_client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 24.91 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:37687,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 24.91 GiB

0,1
Comm: tcp://127.0.0.1:40861,Total threads: 2
Dashboard: http://127.0.0.1:46177/status,Memory: 6.23 GiB
Nanny: tcp://127.0.0.1:45491,
Local directory: /tmp/dask-scratch-space/worker-kncv0ql5,Local directory: /tmp/dask-scratch-space/worker-kncv0ql5

0,1
Comm: tcp://127.0.0.1:35127,Total threads: 2
Dashboard: http://127.0.0.1:36829/status,Memory: 6.23 GiB
Nanny: tcp://127.0.0.1:44899,
Local directory: /tmp/dask-scratch-space/worker-6fwbp3b2,Local directory: /tmp/dask-scratch-space/worker-6fwbp3b2

0,1
Comm: tcp://127.0.0.1:34775,Total threads: 2
Dashboard: http://127.0.0.1:42363/status,Memory: 6.23 GiB
Nanny: tcp://127.0.0.1:33415,
Local directory: /tmp/dask-scratch-space/worker-wd0f8rok,Local directory: /tmp/dask-scratch-space/worker-wd0f8rok

0,1
Comm: tcp://127.0.0.1:46143,Total threads: 2
Dashboard: http://127.0.0.1:45167/status,Memory: 6.23 GiB
Nanny: tcp://127.0.0.1:36103,
Local directory: /tmp/dask-scratch-space/worker-dgmygjwf,Local directory: /tmp/dask-scratch-space/worker-dgmygjwf


<font size="6">Shutting down cloud and local clusters</font> 

In [None]:
coiled_cluster.shutdown()

In [None]:
local_client.shutdown()

<font size="6">Analysis</font> 

<font size="4">Paths and functions</font>

In [4]:
# General paths and constants

general_uri = 's3://gfw2-data/forest_change/GLAD_Europe_height_data/'

s3_out_dir = 'climate/European_height_carbon_model/outputs'

def timestr():
    return time.strftime("%Y%m%d_%H_%M_%S")

In [5]:
# Returns list of all chunk boundaries within a bounding box for chunks of a given size
def get_chunk_bounds(chunk_params):

    min_x = chunk_params[0]
    min_y = chunk_params[1]
    max_x = chunk_params[2]
    max_y = chunk_params[3]
    chunk_size = chunk_params[4]
    
    x, y = (min_x, min_y)
    chunks = []

    # Polygon Size
    while y < max_y:
        while x < max_x:
            bounds = [
                x,
                y,
                x + chunk_size,
                y + chunk_size,
            ]
            chunks.append(bounds)
            x += chunk_size
        x = min_x
        y += chunk_size

    return chunks

# Returns the encompassing tile_id string in the form YYN/S_XXXE/W based on a coordinate
def xy_to_tile_id(top_left_x, top_left_y):

    lat_ceil = math.ceil(top_left_y/10.0) * 10
    lng_floor = math.floor(top_left_x/10.0) * 10
    
    lng: str = f"{str(lng_floor).zfill(3)}E" if (lng_floor >= 0) else f"{str(-lng_floor).zfill(3)}W"
    lat: str = f"{str(lat_ceil).zfill(2)}N" if (lat_ceil >= 0) else f"{str(-lat_ceil).zfill(2)}S"

    return f"{lat}_{lng}"

In [6]:
# Lazily opens tile within provided bounds (i.e. one chunk) and returns as a numpy array
# If it can't open the chunk (no data in it), it returns an array of all 0s
def get_tile_dataset_rio(uri, bounds, chunk_length):

    try:
        with rasterio.open(uri) as ds:
            window = rasterio.windows.from_bounds(*bounds, ds.transform)
            data = ds.read(1, window=window)
    except:
        data = np.zeros((chunk_length, chunk_length))

    if data.size==0:
        # dask_print("No data in chunk")
        return np.zeros((chunk_length, chunk_length))
    else:
        # dask_print("Data in chunk")
        return data

<font size="4">Model steps</font>

In [204]:
# TODO: is chunk_length_deg really needed for this? It could be calculated from bounds and passed to get_tile_dataset_rio that way. 
def warp_to_Hansen(bounds, chunk_length_deg, start_year):
 
    futures = {}
    layers = {}

    bounds_str = "_".join([str(round(x)) for x in bounds])
    chunk_length_pixels = int(chunk_length_deg * (40000/10))

    block_size = 400

    # Submit requests to S3 for input chunks but dont' actually download them yet. This queueing of the requests before downloading them speeds up the downloading
    # Approach is to download all the input chunks up front for every year to make downloading more efficient, even though it means storing more upfront
    with concurrent.futures.ThreadPoolExecutor() as executor:
        tile_id = xy_to_tile_id(bounds[0], bounds[3])

        tree_removal_latest_date_uri = f'{general_uri}202312_published/tree_removal_latest_date/raw/Tree_removal_latest_date.tif'
        futures[executor.submit(get_tile_dataset_rio, tree_removal_latest_date_uri, bounds, chunk_length_pixels)] = f"tree_removal_latest_date"

        # Faster to just get every year of the timeseries inputs up front unless we're running into memory issues
        for year in range(start_year, 2022):
            tree_height_uri = f'{general_uri}202312_published/tree_height/raw/Tree_height_{year}.tif'
            tree_extent_uri = f'{general_uri}202312_published/tree_extent/raw/Tree_extent_{year}.tif'
            tree_removal_uri = f'{general_uri}202312_published/tree_removal/raw/Tree_removal_{year}.tif'
            

            futures[executor.submit(get_tile_dataset_rio, tree_height_uri, bounds, chunk_length_pixels)] = f"tree_height_{year}"
            futures[executor.submit(get_tile_dataset_rio, tree_extent_uri, bounds, chunk_length_pixels)] = f"tree_extent_{year}"
            futures[executor.submit(get_tile_dataset_rio, tree_removal_uri, bounds, chunk_length_pixels)] = f"tree_removal_{year}"


    # Wait for requests to come back with data from S3
    for future in concurrent.futures.as_completed(futures):
        layer = futures[future]
        layers[layer] = future.result()

    # Iterates through years
    for year in range(start_year, 2022):

        # Skips chunk if it has no forest extent in it
        if not np.any(layers[f"tree_height_{year}"]):
            dask_print(f"No data in chunk {bounds_str}. Skipping: {timestr()}")
            return f"No data in chunk {bounds_str}. Skipping: {timestr()}"
        
        dask_print(f"Data in chunk {bounds_str}. Proceeding.")

        transform = rasterio.transform.from_bounds(*bounds, width=chunk_length_pixels, height=chunk_length_pixels)

        # Output files to upload to s3
        output_dict = {
            "tree_height": [layers[f"tree_height_{year}"], "forest_change/GLAD_Europe_height_data/202312_published/tree_height/processed/"],
            "tree_extent": [layers[f"tree_extent_{year}"], "forest_change/GLAD_Europe_height_data/202312_published/tree_extent/processed/"],
            "tree_removal": [layers[f"tree_removal_{year}"], "forest_change/GLAD_Europe_height_data/202312_published/tree_removal/processed/"]                          
        }

        s3_client = boto3.client("s3")

        dask_print(f"Saving {bounds_str} in {tile_id} for {year}: {timestr()}")

        # For every output file, saves from array to local raster, then to s3.
        # Can't save directly to s3, unfortunately, so need to save locally first.
        for key, value in output_dict.items():

            file_name_intermediate = f'{tile_id}__{bounds_str}__{key}__{year}__{timestr()}_intermediate'
            file_name_final = f'{tile_id}__{bounds_str}__{key}__{year}__{timestr()}'
            upload_file = f"{value[1]}{file_name_final}.tif"

            # This doesn't actually successfully create rasters with 400x400 windows. They were instead chunk_length_pixels x 400. 
            # Thus, I'm using rasterio to create an intermediate raster with partially correct dimensions, then correcting it with gdal_translate
            with rasterio.open(f"/tmp/{file_name_intermediate}.tif", 'w', driver='GTiff', width=chunk_length_pixels, height=chunk_length_pixels, count=1, dtype='uint8', 
                               crs='EPSG:4326', transform=transform, compress='lzw', blockxsize=block_size, blockysize=block_size) as dst:
                dst.write(value[0].astype(rasterio.uint8), 1)

            input_path = f"/tmp/{file_name_intermediate}.tif"
            output_path = f"/tmp/{file_name_final}.tif"
            
            input_dataset = gdal.Open(input_path)
            
            # Get information from the intermediate dataset
            width = input_dataset.RasterXSize
            height = input_dataset.RasterYSize
            count = input_dataset.RasterCount
            dtype = input_dataset.GetRasterBand(1).DataType
            crs = input_dataset.GetProjection()
            
            # Set final output creation options, including block size
            options = [
                'TILED=YES',
                f'BLOCKXSIZE={block_size}',
                f'BLOCKYSIZE={block_size}',
                'COMPRESS=LZW',
            ]
            
            # Create the output dataset using gdal.Translate
            gdal.Translate(
                output_path,
                input_path,
                width=width,
                height=height,
                format='GTiff',
                outputType=dtype,
                creationOptions=options,
            )
            
            # Close the datasets
            input_dataset = None
            output_dataset = None

            s3_client.upload_file(f"/tmp/{file_name_final}.tif", "gfw2-data", Key=upload_file)

            # Delete the intermediate raster
            os.remove(f"/tmp/{file_name_intermediate}.tif")

In [207]:
%%time

# Year to start the analysis
# start_year = 2002   # full run
start_year = 2012  # last few years
# start_year = 2020  # last two years
# start_year = 2021  # final year

# Area to analyze
# chunk_params arguments: W, S, E, N, chunk size (degrees)
# chunk_params = [-12, 34, 32, 72, 1]  # all of Europe
chunk_params = [10, 40, 20, 50, 1]    # 10x10 deg (50N_010E), 100 chunks
# chunk_params = [10, 46, 14, 50, 2]   # 4x4 deg, 4 chunks
# chunk_params = [10, 48, 12, 50, 1]   # 2x2 deg, 4 chunks
# chunk_params = [10, 49, 11, 50, 1]   # 1x1 deg, 1 chunk
# chunk_params = [10, 49, 11, 50, 0.5] # 1x1 deg, 4 chunks
# chunk_params = [10, 49.5, 10.5, 50, 0.25] # 0.5x0.5 deg, 4 chunks
# chunk_params = [10, 49.5, 10.5, 50, 0.5] # 0.5x0.5 deg, 1 chunk
# chunk_params = [10, 49.75, 10.25, 50, 0.25] # 0.25x0.25 deg, 1 chunk

# Makes list of chunks to analyze
chunks = get_chunk_bounds(chunk_params)  
print("Processing", len(chunks), "chunks")

# Creates list of tasks to run (1 task = 1 chunk for all years)
delayed = [dask.delayed(warp_to_Hansen)(chunk, chunk_params[4], start_year) for chunk in chunks]

# Actually runs analysis
results = dask.compute(*delayed)

Processing 1 chunks


2023-12-22 17:47:45,226 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
Process Dask Worker process (from Nanny):
2023-12-22 17:47:45,227 - distributed.nanny - ERROR - Worker process died unexpectedly
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/dagibbs22/anaconda3/envs/dask_notebook/lib/python3.10/site-packages/distributed/nanny.py", line 1014, in _run
    asyncio_run(run(), loop_factory=get_loop_factory())
  File "/home/dagibbs22/anaconda3/envs/dask_notebook/lib/python3.10/site-packages/distributed/compatibility.py", line 236, in asyncio_run
    return loop.run_until_complete(main)
  File "/home/dagibbs22/anaconda3/envs/dask_notebook/lib/python3.10/asyncio/base_events.py", line 633, in run_until_complete
    self.run_forever()
  File "/home/dagibbs22/anaconda3/envs/dask_notebook/lib/python3.10/site-packages/distri

{'forest_height_2021': array([[ 0,  0,  0, ...,  0,  0,  0],
       [ 0,  0,  0, ...,  0,  0,  0],
       [ 0,  0,  0, ...,  0,  0,  0],
       ...,
       [21, 22, 23, ...,  0,  0,  0],
       [21, 22, 23, ...,  0,  0,  0],
       [22, 23, 23, ...,  0,  0,  0]], dtype=uint8), 'forest_height_2020': array([[ 0,  0,  0, ...,  0,  0,  0],
       [ 0,  0,  0, ...,  0,  0,  0],
       [ 0,  0,  0, ...,  0,  0,  0],
       ...,
       [21, 22, 23, ...,  0,  0,  0],
       [21, 22, 23, ...,  0,  0,  0],
       [22, 23, 23, ...,  0,  0,  0]], dtype=uint8)}
Data in chunk 10_50_10_50. Proceeding.
Data in chunk 10_50_10_50. Proceeding.
Saving 10_50_10_50 in 50N_010E for 2021: 20231222_13_36_56
Data in chunk 10_50_10_50. Proceeding.
Saving 10_50_10_50 in 50N_010E for 2021: 20231222_13_38_56
50N_010E__10_50_10_50__tree_height__2021__20231222_13_38_56
Data in chunk 10_50_10_50. Proceeding.
Saving 10_50_10_50 in 50N_010E for 2021: 20231222_13_40_49
50N_010E__10_50_10_50__tree_height__2021__20231222_1

KeyboardInterrupt: 

2023-12-22 17:47:47,227 - distributed.nanny - ERROR - Worker process died unexpectedly
Process Dask Worker process (from Nanny):
Traceback (most recent call last):
  File "/home/dagibbs22/anaconda3/envs/dask_notebook/lib/python3.10/site-packages/distributed/compatibility.py", line 236, in asyncio_run
    return loop.run_until_complete(main)
  File "/home/dagibbs22/anaconda3/envs/dask_notebook/lib/python3.10/asyncio/base_events.py", line 633, in run_until_complete
    self.run_forever()
  File "/home/dagibbs22/anaconda3/envs/dask_notebook/lib/python3.10/asyncio/base_events.py", line 600, in run_forever
    self._run_once()
  File "/home/dagibbs22/anaconda3/envs/dask_notebook/lib/python3.10/asyncio/base_events.py", line 1860, in _run_once
    event_list = self._selector.select(timeout)
  File "/home/dagibbs22/anaconda3/envs/dask_notebook/lib/python3.10/selectors.py", line 469, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt

During handling of the abo

In [None]:
# To run without dask at all
process_chunk([10, 49, 11, 50], 1, start_year)

In [None]:
# To make 10x10 tiles:
# gdalwarp from subprocess.check_call(cmd) isn't working
# cmd = ['gdalwarp', '-tr', '0.00025', '0.00025', '-co', 'COMPRESS=DEFLATE', '-tap', '-te', str(10), str(49), str(11), str(50), '-dstnodata', '0', '-t_srs', 'EPSG:4326', 
#        '-overwrite', '-progress', '/vsis3/gfw2-data/forest_change/GLAD_Europe_height_data/202307_revision/FH_2021.tif', 'C:\\GIS\\Carbon_model_Europe\\outputs\\50N_010E_FH_2021.tif']
# check_call(cmd)
# gdalwarp -tr 0.00025 0.00025 -co COMPRESS=DEFLATE -tap -te 10 40 20 50 -dstnodata 0 -t_srs EPSG:4326 -overwrite /vsis3/gfw2-data/forest_change/GLAD_Europe_height_data/202307_revision/FH_2021.tif 50N_010E_FH_2021.tif
# gdalwarp -tr 0.00025 0.00025 -co COMPRESS=DEFLATE -tap -te 10 40 20 50 -dstnodata 0 -t_srs EPSG:4326 -overwrite /vsis3/gfw2-data/forest_change/GLAD_Europe_height_data/202307_revision/FH_2020.tif 50N_010E_FH_2020.tif
# gdalwarp -tr 0.00025 0.00025 -co COMPRESS=DEFLATE -tap -te 10 40 20 50 -dstnodata 0 -t_srs EPSG:4326 -overwrite /vsis3/gfw2-data/forest_change/GLAD_Europe_height_data/202307_revision/FH_2019.tif 50N_010E_FH_2019.tif
# gdalwarp -tr 0.00025 0.00025 -co COMPRESS=DEFLATE -tap -te 10 40 20 50 -dstnodata 0 -t_srs EPSG:4326 -overwrite /vsis3/gfw2-data/forest_change/GLAD_Europe_height_data/202307_revision/FH_2018.tif 50N_010E_1FH_2018.tif
# gdalwarp -tr 0.00025 0.00025 -co COMPRESS=DEFLATE -tap -te 10 40 20 50 -dstnodata 0 -t_srs EPSG:4326 -overwrite /vsis3/gfw2-data/forest_change/GLAD_Europe_height_data/202307_revision/DFL_2021.tif 50N_010E_DFL_2021.tif
# gdalwarp -tr 0.00025 0.00025 -co COMPRESS=DEFLATE -tap -te 10 40 20 50 -dstnodata 0 -t_srs EPSG:4326 -overwrite /vsis3/gfw2-data/forest_change/GLAD_Europe_height_data/202307_revision/DFL_2020.tif 50N_010E_DFL_2020.tif
# gdalwarp -tr 0.00025 0.00025 -co COMPRESS=DEFLATE -tap -te 10 40 20 50 -dstnodata 0 -t_srs EPSG:4326 -overwrite /vsis3/gfw2-data/forest_change/GLAD_Europe_height_data/202307_revision/DFL_2019.tif 50N_010E_DFL_2019.tif
# gdalwarp -tr 0.00025 0.00025 -co COMPRESS=DEFLATE -tap -te 10 40 20 50 -dstnodata 0 -t_srs EPSG:4326 -overwrite /vsis3/gfw2-data/forest_change/GLAD_Europe_height_data/202307_revision/DFL_2018.tif 50N_010E_DFL_2018.tif
# gdalwarp -tr 0.00025 0.00025 -co COMPRESS=DEFLATE -tap -te 10 40 20 50 -dstnodata 0 -t_srs EPSG:4326 -overwrite /vsis3/gfw2-data/climate/carbon_model/other_emissions_inputs/tree_cover_loss_drivers/processed/drivers_2022/20230407/50N_010E_tree_cover_loss_driver_processed.tif 50N_010E_1deg_tree_cover_loss_driver_processed.tif
# gdalwarp -tr 0.00025 0.00025 -co COMPRESS=DEFLATE -tap -te 10 40 20 50 -dstnodata 0 -t_srs EPSG:4326 -overwrite /vsis3/gfw2-data/climate/carbon_model/other_emissions_inputs/tree_cover_loss_fires/20230315/processed/50N_010E_tree_cover_loss_fire_processed.tif 50N_010E_1deg_tree_cover_loss_fire_processed.tif
# gdalwarp -tr 0.00025 0.00025 -co COMPRESS=DEFLATE -tap -te 10 40 20 50 -dstnodata 0 -t_srs EPSG:4326 -overwrite /vsis3/gfw2-data/climate/carbon_model/other_emissions_inputs/peatlands/processed/20230315/50N_010E_peat_mask_processed.tif 50N_010E_1deg_peat_mask_processed.tif
