In [3]:
%load_ext autoreload
%autoreload 2

import concurrent.futures
from cmipper import utils, config, parallelised_download_and_process, file_ops
import xarray as xa

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [4]:
xa.open_dataset("/maps/rt582/cmipper/data/env_vars/cmip6/EC-Earth3P-HR/r1i1p2f1/regridded/concatted_vars_N0_S-32_W130_E170/hfds_mlotst_rsdo_so_thetao_tos_umo_uo_vmo_vo_wfo_N0_S-32_W130_E170_levs_0-20_ll_195000-204912.nc")

In [5]:
model_info_dict = utils.read_yaml(config.model_info)
download_config_dict = utils.read_yaml(config.download_config)
limited_dict = utils.limit_model_info_dict(model_info_dict, download_config_dict)
limited_dict

{'EC-Earth3P-HR': {'resolution': 0.25,
  'experiment_ids': ['hist-1950', 'highres-future'],
  'member_ids': ['r1i1p2f1'],
  'data_nodes': ['esgf-data1.llnl.gov'],
  'frequency': 'mon',
  'variable_dict': {'rsdo': {'include': True,
    'table_id': 'Omon',
    'plevels': [-1]},
   'umo': {'include': True, 'table_id': 'Omon', 'plevels': [-1]},
   'vmo': {'include': True, 'table_id': 'Omon', 'plevels': [-1]},
   'mlotst': {'include': True, 'table_id': 'Omon', 'plevels': [None]},
   'so': {'include': True, 'table_id': 'Omon', 'plevels': [-1]},
   'thetao': {'include': True, 'table_id': 'Omon', 'plevels': [-1]},
   'uo': {'include': True, 'table_id': 'Omon', 'plevels': [-1]},
   'vo': {'include': True, 'table_id': 'Omon', 'plevels': [-1]},
   'wfo': {'include': True, 'table_id': 'Omon', 'plevels': [None]},
   'tos': {'include': True, 'table_id': 'Omon', 'plevels': [None]}}}}

In [17]:
from pathlib import Path

fps = Path("/maps/rt582/cmipper/data/env_vars/cmip6/EC-Earth3P-HR/r1i1p2f1/regridded/concatted_vars_N0_S-32_W130_E170").glob("*.nc")


file_ops.find_files_for_time(fps, [1950, 2060])

[]

In [32]:
source_id = 'EC-Earth3P-HR'
variables = list(limited_dict[source_id]["variable_dict"].keys())
member_id=limited_dict[source_id]["member_ids"][0]
lats = download_config_dict["lats"]
lons = download_config_dict["lons"]

In [33]:
# finding existing files
file_ops.find_intersecting_cmip(variables=variables, source_id=source_id, member_id=limited_dict[source_id]["member_ids"][0], lats=lats, lons=lons, year_range=(1950, 2040))

(<xarray.Dataset> Size: 900MB
 Dimensions:    (time: 1092, latitude: 128, longitude: 161)
 Coordinates:
   * time       (time) datetime64[ns] 9kB 1950-01-16T12:00:00 ... 2040-12-16T1...
   * longitude  (longitude) float64 1kB 130.0 130.2 130.5 ... 169.5 169.8 170.0
   * latitude   (latitude) float64 1kB -31.99 -31.74 -31.49 ... -0.4917 -0.2417
 Data variables:
     rsdo       (time, latitude, longitude) float32 90MB ...
     umo        (time, latitude, longitude) float32 90MB ...
     vmo        (time, latitude, longitude) float32 90MB ...
     mlotst     (time, latitude, longitude) float32 90MB ...
     so         (time, latitude, longitude) float32 90MB ...
     thetao     (time, latitude, longitude) float32 90MB ...
     uo         (time, latitude, longitude) float32 90MB ...
     vo         (time, latitude, longitude) float32 90MB ...
     wfo        (time, latitude, longitude) float32 90MB ...
     tos        (time, latitude, longitude) float32 90MB ...
 Attributes: (12/49)
     C

In [None]:
source_ids = list(limited_dict.keys())
for source_id in source_ids:
    print(f"Processing {source_id}")
    for member_id in limited_dict[source_id]["member_ids"]:
        for experiment_id in limited_dict[source_id]["experiment_ids"]:
            for variable_id in limited_dict[source_id]["variable_dict"].keys():
                print(member_id, experiment_id, variable_id)
                parallelised_download_and_process.concat_cmip_files_by_time(source_id=source_id, experiment_id=experiment_id, member_id=member_id, variable_id=variable_id)
            # once all concatted by time and ready for merging
            parallelised_download_and_process.merge_cmip_data_by_variables(source_id=source_id, experiment_id=experiment_id, member_id=member_id)     

In [None]:
xa.open_dataset("/maps/rt582/cmipper/data/env_vars/cmip6/EC-Earth3P-HR/r1i1p2f1/newtest/regridded/concatted_vars_N0_S-32_W130_E170/rsdo_N0_S-32_W130_E170_sfl-20_ll_195000-201412.nc")

In [None]:
found = file_ops.find_intersecting_cmip(
    variables = ["tos"],
    lats = [-10,0]
)

In [None]:
found[1]

In [None]:
# MVP, from https://stackoverflow.com/questions/44989473/nesting-concurrent-futures-threadpoolexecutor

def inner(i, j):
    return i, j, i**j


def outer(i):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(inner, i, j): j for j in range(5)}
        results = []
        for future in concurrent.futures.as_completed(futures):
            results.append(future.result())
    return results


def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = {executor.submit(outer, i): i for i in range(10)}
        results = []
        for future in concurrent.futures.as_completed(futures):
            results.extend(future.result())
    print(results)


if __name__ == "__main__":
    main()

In [None]:
model_info_dict = utils.read_yaml(config.model_info)
download_config_dict = utils.read_yaml(config.download_config)
utils.limit_model_info_dict(model_info_dict, download_config_dict)['EC-Earth3P-HR']["variable_dict"].keys()


    

In [None]:
import concurrent.futures
from cmipper import utils, config

def test_func(arg1, arg2, arg3):
    print(arg1, arg2, arg3)

def execute_functions_in_threadpool(args):
    try:
        with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
            futures = [executor.submit(parallelised_download_and_process.download_cmip_variable_data, *arg) for arg in args]
            return futures
    except Exception as e:
        print(f"An error occurred: {e}")


def main():
    model_info_dict = utils.read_yaml(config.model_info)
    download_config_dict = utils.read_yaml(config.download_config)
    limited_download_dict = utils.limit_model_info_dict(model_info_dict, download_config_dict)
    
    source_ids = ["EC-Earth3P-HR"]
    member_ids = limited_download_dict[source_ids[0]]["member_ids"]
    variable_ids = limited_download_dict[source_ids[0]]["variable_dict"].keys()

    try:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [executor.submit(execute_functions_in_threadpool, [(source_id, member_id, variable_id)]) 
                       for source_id in source_ids for member_id in member_ids for variable_id in variable_ids]

            # Wait for all futures to complete
            concurrent.futures.wait(futures)
    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    main()


# WIP: Attempt at parallelised logging

In [None]:
def main():
    model_info_dict = utils.read_yaml(config.model_info)
    download_config_dict = utils.read_yaml(config.download_config)

    source_ids = ["EC-Earth3P-HR"]
    member_ids = model_info_dict[source_ids[0]]["member_ids"]
    variable_ids = download_config_dict["variable_ids"]

    try:
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = []
            for source_id in source_ids:
                for member_id in member_ids:
                    for variable_id in variable_ids:
                        log_fp = config.logging_dir / source_id / member_id / "_".join([variable_id, "download.log"])
                        print(log_fp)
                        if not log_fp.parent.exists():
                            log_fp.parent.mkdir(parents=True)
                        utils.redirect_stdout_stderr_to_file(log_fp)
                        # futures.extend(executor.submit(utils.execute_functions_in_threadpool, [(source_id, member_id, variable_id)]))
                        futures = [executor.submit(utils.execute_functions_in_threadpool, [(source_id, member_id, variable_id)]) 
                                for source_id in source_ids for member_id in member_ids for variable_id in variable_ids]

            # Wait for all futures to complete
            concurrent.futures.wait(futures)
    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":

    main()