# Processing script
This script (1.) calculates CO2 flux for each member, (2.) calculates a suite of statistics on each member, and (3.) decomposes the flux into various time scales. 
Processing is done with [DASK](https://docs.dask.org/en/latest/) to speed up computation. 

### 1. calculate flux
air-sea CO2 exchange is parameterized following Wanninkhof (1992) with Sweeney et al. (2007) gas-exchange coefficient scaled to match ERA interim winds. 

### 2. calculate statistics
Statistical metrics used in this analysis. Metrics are calculated seprately for each member and the average across all ensemble members is displayed. 
- Bias : measures long-term offset
- correlation : measures phasing
- normalized STD : measure of amplitude

### 3. decompose_flux
This function decomposes the CO2 flux into various time scales:
- detrended
- IAV
- AV
- decadal
- subdecadal

## References 
- Wanninkhof (1992)
- Sweeney et al. (2007)
- Weiss (1974)
- Dickson et al. (2007)
- Landschutzer et al. (2016)
- Stow et al. (2009)
- ERA interim

## 0. Start the DASK client
This inititalizes and starts the dask client. 
- `client.restart()` will restart the client
- `client.close()` will close the client

In [1]:
from distributed import Client

In [2]:
# 1. Intitialize the dask client
client = Client(n_workers=25)
# 2. Start the dask client. 
client

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


0,1
Client  Scheduler: tcp://127.0.0.1:33128  Dashboard: http://127.0.0.1:34152/status,Cluster  Workers: 25  Cores: 50  Memory: 269.92 GB


In [4]:
client.close()

In [3]:
%matplotlib inline
import xarray as xr
import xarray.ufuncs as xu
import numpy as np
import scipy
import scipy.io
from scipy.stats import stats
#from processing import processing as pr
from skill_metrics import skill_metrics as sk
import decompose as stl

In [4]:
# these are notebooks with function definitions
%run _define_model_class.ipynb
%run _define_processing_functions.ipynb

In [5]:
###======================================
### Define directories
###====================================== 
dir_raw = '/local/data/artemis/workspace/gloege/SOCAT-LE/data/raw'
dir_clean = '/local/data/artemis/workspace/gloege/SOCAT-LE/data/clean'

# 1. Calculate flux
Did you run a computation already? Did you restart the client?

In [6]:
# Directory with u10_std
#fl_u10_std = f'{dir_clean}/ERA_interim/ERAinterim_1x1_u10-std_1982-2016.nc'
dir_obs = '/local/data/artemis/observations'
fl_u10_var = f'{dir_obs}/ERAinterim/processed/ERAinterim_1x1_u10-var_1982-2016.nc'
# output directory
dir_out = f'{dir_clean}/CO2_flux'

###=================================================================
### LE_model: ['CESM', 'GFDL', 'MPI', CanESM2]
###=================================================================
for LE_model in ['MPI']:
    ###======================================
    ### Get members from model
    ###======================================
    if LE_model=='CESM':
        members=[1,2,9,10,11,12,13,14,15,16,17,18,20,21,23,24,25,30,31,34,35,101,102,103,104]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='GFDL':
        members=[1,2,3,4,5,6,8,9,10,11,12,13,14,16,17,18,19,20,22,23,26,27,28,29,30]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='MPI':
        members=[6,9,14,20,22,24,25,27,38,43,45,46,57,60,64,70,75,77,78,80,81,83,91,95,98]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='CanESM2':
        members=['r1r10', 'r1r6', 'r1r7', 'r1r9', 'r2r1', 'r2r2', 'r2r8', 'r3r1', 'r3r2', 'r3r4',
                'r3r6', 'r3r7', 'r3r9', 'r4r1', 'r4r3', 'r4r5', 'r4r6', 'r4r7', 'r4r8', 'r5r1', 
                'r5r10', 'r5r2', 'r5r4', 'r5r5', 'r5r9']
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    ###======================================
    ### Load data
    ### client.map    --> These results live on distributed workers.
    ### client.submit --> We can submit tasks on futures. 
    ###                   The function will go to the machine where 
    ###                   the futures are stored and run on the result 
    ###                   once it has completed.
    ###======================================
    print(f'Calculating flux from {n} members from {LE_model}')
    fut = client.map(calculate_flux, 
                     tuple(np.repeat(LE_model, n)), 
                     members,
                     tuple(np.repeat(fl_u10_var, n)), 
                     tuple(np.repeat(dir_out, n)))
    client.gather(fut)
    
    # Clear up the memory
    del fut
    
print('Complete!!')

Loading 25 members from MPI
Calculating flux from 25 members from MPI
Complete!!


In [6]:
# Directory with u10_std
dir_obs = '/local/data/artemis/observations'
fl_u10_var = f'{dir_obs}/ERAinterim/processed/ERAinterim_1x1_u10-var_1982-2016.nc'
#fl_u10_std = f'{dir_obs}/ERAinterim/processed/ERAinterim_1x1_u10-var_1982-2016.nc'

# output directory
dir_out = f'{dir_clean}/CO2_flux-float'

###=================================================================
### LE_model: ['CESM', 'GFDL', 'MPI', CanESM2]
###=================================================================
for LE_model in ['GFDL']:
    if LE_model=='CESM':
        members=[1,10,11,14,17,35,103]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='GFDL':
        #members=[2,3,8,19,27,24]
        members = [15]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')


    ###======================================
    ### Load data
    ### client.map    --> These results live on distributed workers.
    ### client.submit --> We can submit tasks on futures. 
    ###                   The function will go to the machine where 
    ###                   the futures are stored and run on the result 
    ###                   once it has completed.
    ###======================================
    print(f'Calculating flux from {n} members from {LE_model}')
    fut = client.map(calculate_flux_float, 
                     tuple(np.repeat(LE_model, n)), 
                     members,
                     tuple(np.repeat(fl_u10_var, n)), 
                     tuple(np.repeat(dir_out, n)))
    client.gather(fut)
    
    # Clear up the memory
    del fut
    
print('Complete!!')

Loading 1 members from GFDL
Calculating flux from 1 members from GFDL
Complete!!


In [None]:
# Make sure to close the client when you're all done
# client.close()

# 2. Calculate pCO2 Statistics
This calculates the statistics on each member 

In [8]:
# input directory with CO2 flux files
# This dir_out from step 1.
dir_in = f'{dir_clean}/pCO2_decomp_stl'

# Output directory where statistics will be stored
dir_out = f'{dir_clean}/pCO2_stats'

# Print diectory in log file
f = open("/home/gloege/log.stats_pco2.txt", "a")
f.write(f"input dir : {dir_in} \n")
f.write(f"output dir : {dir_out} \n")
f.close()

###=================================================================
### LE_model: ['CESM', 'GFDL', 'MPI', CanESM2]
### model_or_recon: ['MODEL', 'SOMFFN', 'UEASI']
###=================================================================
for LE_model in ['CESM', 'GFDL', 'CanESM2', 'MPI']:
    ###======================================
    ### Get members from model
    ###======================================
    if LE_model=='CESM':
        members=[1,2,9,10,11,12,13,14,15,16,17,18,20,21,23,24,25,30,31,34,35,101,102,103,104]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='GFDL':
        members=[1,2,3,4,5,6,8,9,10,11,12,13,14,16,17,18,19,20,22,23,26,27,28,29,30]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='MPI':
        members=[6,9,14,20,22,24,25,27,38,43,45,46,57,60,64,70,75,77,78,80,81,83,91,95,98]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='CanESM2':
        members=['r1r10', 'r1r6', 'r1r7', 'r1r9', 'r2r1', 'r2r2', 'r2r8', 'r3r1', 'r3r2', 'r3r4',
                'r3r6', 'r3r7', 'r3r9', 'r4r1', 'r4r3', 'r4r5', 'r4r6', 'r4r7', 'r4r8', 'r5r1', 
                'r5r10', 'r5r2', 'r5r4', 'r5r5', 'r5r9']
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    ###======================================
    ### Load data
    ### client.map    --> These results live on distributed workers.
    ### client.submit --> We can submit tasks on futures. 
    ###                   The function will go to the machine where 
    ###                   the futures are stored and run on the result 
    ###                   once it has completed.
    ###======================================
    f = open("/home/gloege/log.stats_pco2.txt", "a")
    f.write(f"Calculating statistics for {n} member from {LE_model} \n")
    f.close()
    print(f'Calculating statistics for {n} member from {LE_model}')
    fut = client.map(calculate_pco2_statistics, 
                     tuple(np.repeat(LE_model, n)), 
                     members,
                     tuple(np.repeat(dir_in, n)),
                     tuple(np.repeat(dir_out, n)))
    client.gather(fut)
    
    # clear up the memory
    del fut

f = open("/home/gloege/log.stats_pco2.txt", "a")
f.write(f"Complete!! \n")
f.close()
print('Complete!!')

Loading 25 members from CESM
Calculating statistics for 25 member from CESM
Loading 25 members from GFDL
Calculating statistics for 25 member from GFDL
Loading 25 members from CanESM2
Calculating statistics for 25 member from CanESM2
Loading 25 members from MPI
Calculating statistics for 25 member from MPI
Complete!!


In [None]:
# Make sure to close the client when you're all done
# client.close()

# 3. Calculate flux statistics 

In [6]:
# input directory with CO2 flux files
# This dir_out from step 1.
dir_in = f'{dir_clean}/CO2_flux_decomp_stl'

# Output directory where statistics will be stored
dir_out = f'{dir_clean}/CO2_flux_stats'

# Print diectory in log file
f = open("/home/gloege/log.stats_flux.txt", "a")
f.write(f"input dir : {dir_in} \n")
f.write(f"output dir : {dir_out} \n")
f.close()

###=================================================================
### LE_model: ['CESM', 'GFDL', 'MPI', CanESM2]
### model_or_recon: ['MODEL', 'SOMFFN', 'UEASI']
###=================================================================
for LE_model in ['MPI']:
    ###======================================
    ### Get members from model
    ###======================================
    if LE_model=='CESM':
        members=[1,2,9,10,11,12,13,14,15,16,17,18,20,21,23,24,25,30,31,34,35,101,102,103,104]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='GFDL':
        members=[1,2,3,4,5,6,8,9,10,11,12,13,14,16,17,18,19,20,22,23,26,27,28,29,30]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='MPI':
        members=[6,9,14,20,22,24,25,27,38,43,45,46,57,60,64,70,75,77,78,80,81,83,91,95,98]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='CanESM2':
        members=['r1r10', 'r1r6', 'r1r7', 'r1r9', 'r2r1', 'r2r2', 'r2r8', 'r3r1', 'r3r2', 'r3r4',
                'r3r6', 'r3r7', 'r3r9', 'r4r1', 'r4r3', 'r4r5', 'r4r6', 'r4r7', 'r4r8', 'r5r1', 
                'r5r10', 'r5r2', 'r5r4', 'r5r5', 'r5r9']
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    ###======================================
    ### Load data
    ### client.map    --> These results live on distributed workers.
    ### client.submit --> We can submit tasks on futures. 
    ###                   The function will go to the machine where 
    ###                   the futures are stored and run on the result 
    ###                   once it has completed.
    ###======================================
    f = open("/home/gloege/log.stats_flux.txt", "a")
    f.write(f"Calculating statistics for {n} member from {LE_model} \n")
    f.close()
    print(f'Calculating statistics for {n} member from {LE_model}')
    fut = client.map(calculate_flux_statistics, 
                     tuple(np.repeat(LE_model, n)), 
                     members,
                     tuple(np.repeat(dir_in, n)),
                     tuple(np.repeat(dir_out, n)))
    client.gather(fut)
    
    # clear up the memory
    del fut

f = open("/home/gloege/log.stats_flux.txt", "a")
f.write(f"Complete!! \n")
f.close()
print('Complete!!')

Loading 25 members from MPI
Calculating statistics for 25 member from MPI
Complete!!


In [8]:
# input directory with CO2 flux files
# This dir_out from step 1.
dir_in = f'{dir_clean}/CO2_flux-float_decomp_stl'

# Output directory where statistics will be stored
dir_out = f'{dir_clean}/CO2_flux-float_stats'

# Print diectory in log file
#f = open("/home/gloege/log.stats_flux.txt", "a")
#f.write(f"input dir : {dir_in} \n")
#f.write(f"output dir : {dir_out} \n")
#f.close()

###=================================================================
### LE_model: ['CESM', 'GFDL', 'MPI', CanESM2]
### model_or_recon: ['MODEL', 'SOMFFN', 'UEASI']
###=================================================================
for LE_model in ['GFDL']:
    ###======================================
    if LE_model=='CESM':
        members=[1,10,11,14,17,35,103]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='GFDL':
        #members=[2,3,8,19,27]
        members=[15,24] 
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='MPI':
        members=[18,21,28,39,41,27,64]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='CanESM2':
        members=['r1r4','r1r8','r2r3', 'r3r8','r4r10','r4r3','r5r5']
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    ###======================================
    ### Load data
    ### client.map    --> These results live on distributed workers.
    ### client.submit --> We can submit tasks on futures. 
    ###                   The function will go to the machine where 
    ###                   the futures are stored and run on the result 
    ###                   once it has completed.
    ###======================================
    #f = open("/home/gloege/log.stats_flux.txt", "a")
    #f.write(f"Calculating statistics for {n} member from {LE_model} \n")
    #f.close()
    print(f'Calculating statistics for {n} member from {LE_model}')
    fut = client.map(calculate_flux_statistics, 
                     tuple(np.repeat(LE_model, n)), 
                     members,
                     tuple(np.repeat(dir_in, n)),
                     tuple(np.repeat(dir_out, n)))
    client.gather(fut)
    
    # clear up the memory
    del fut

#f = open("/home/gloege/log.stats_flux.txt", "a")
#f.write(f"Complete!! \n")
#f.close()
print('Complete!!')

Loading 2 members from GFDL
Calculating statistics for 2 member from GFDL
Complete!!


In [9]:
client.close()

In [12]:
client.restart()



0,1
Client  Scheduler: tcp://127.0.0.1:44143  Dashboard: http://127.0.0.1:36633/status,Cluster  Workers: 25  Cores: 50  Memory: 269.92 GB


# 4. Decompose flux -- STL method

In [10]:
# input directory with CO2 flux files
# This dir_out from step 1.
dir_in = f'{dir_clean}/CO2_flux'

# Output directory where decomposition stored
dir_out = f'{dir_clean}/CO2_flux_decomp_stl'

# Print diectory in log file
f = open("/home/gloege/log.decompose_flux.txt", "a")
f.write(f"Input directory : {dir_in} \n")
f.write(f"Output directory : {dir_out} \n")
f.close()
    
###=================================================================
### LE_model: ['CESM', 'GFDL', 'MPI', CanESM2]
### model_or_recon: ['MODEL', 'SOMFFN', 'UEASI']
###=================================================================

###======================================
### model_or_recon needs to be either MODEL or SOMFFN
###====================================== 
model_or_recon = 'SOMFFN'

for LE_model in ['MPI']:
    ###======================================
    ### Get members from model
    ###======================================
    if LE_model=='CESM':
        members=[1,2,9,10,11,12,13,14,15,16,17,18,20,21,23,24,25,30,31,34,35,101,102,103,104]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='GFDL':
        members=[1,2,3,4,5,6,8,9,10,11,12,13,14,16,17,18,19,20,22,23,26,27,28,29,30]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='MPI':
        members=[6,9,14,20,22,24,25,27,38,43,45,46,57,60,64,70,75,77,78,80,81,83,91,95,98]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='CanESM2':
        members=['r1r10', 'r1r6', 'r1r7', 'r1r9', 'r2r1', 'r2r2', 'r2r8', 'r3r1', 'r3r2', 'r3r4',
                'r3r6', 'r3r7', 'r3r9', 'r4r1', 'r4r3', 'r4r5', 'r4r6', 'r4r7', 'r4r8', 'r5r1', 
                'r5r10', 'r5r2', 'r5r4', 'r5r5', 'r5r9']
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    ###======================================
    ### Load data
    ### client.map    --> These results live on distributed workers.
    ### client.submit --> We can submit tasks on futures. 
    ###                   The function will go to the machine where 
    ###                   the futures are stored and run on the result 
    ###                   once it has completed.
    ###======================================
    # Print diectory in log file
    f = open("/home/gloege/log.decompose_flux.txt", "a")
    f.write(f"Decomposing {n} members from {LE_model} \n")
    f.close()
    print(f"Decomposing {n} members from {LE_model}")
    
    # run decompose_flux function
    fut = client.map(decompose_flux, 
                     tuple(np.repeat(LE_model, n)), 
                     members, 
                     tuple(np.repeat(model_or_recon, n)),
                     tuple(np.repeat(dir_in, n)),
                     tuple(np.repeat(dir_out, n)))
    client.gather(fut)
    
    del fut
    
# Print diectory in log file
f = open("/home/gloege/log.decompose_flux.txt", "a")
f.write(f"Complete!! \n")
f.close()
print('Complete!!')

Loading 25 members from MPI
Decomposing 25 members from MPI
Complete!!


In [6]:
# input directory with CO2 flux files
# This dir_out from step 1.
dir_in = f'{dir_clean}/CO2_flux-float'

# Output directory where decomposition stored
dir_out = f'{dir_clean}/CO2_flux-float_decomp_stl'

# Print diectory in log file
#f = open("/home/gloege/log.decompose_flux.txt", "a")
#f.write(f"Input directory : {dir_in} \n")
#f.write(f"Output directory : {dir_out} \n")
#f.close()
    
###=================================================================
### LE_model: ['CESM', 'GFDL', 'MPI', CanESM2]
### model_or_recon: ['MODEL', 'SOMFFN', 'UEASI']
###=================================================================

###======================================
### model_or_recon needs to be either MODEL or SOMFFN
###====================================== 
model_or_recon = 'SOMFFN'

for LE_model in ['GFDL']:
    ###======================================
    ### Get members from model
    ###======================================
    if LE_model=='CESM':
        members=[1,10,11,14,17,35,103]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='GFDL':
        #members=[2,3,8,19,27]
        members=[15,24] 
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='MPI':
        members=[18,21,28,39,41,27,64]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='CanESM2':
        members=['r1r4','r1r8','r2r3', 'r3r8','r4r10','r4r3','r5r5']
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    ###======================================
    ### Load data
    ### client.map    --> These results live on distributed workers.
    ### client.submit --> We can submit tasks on futures. 
    ###                   The function will go to the machine where 
    ###                   the futures are stored and run on the result 
    ###                   once it has completed.
    ###======================================
    # Print diectory in log file
    #f = open("/home/gloege/log.decompose_flux.txt", "a")
    #f.write(f"Decomposing {n} members from {LE_model} \n")
    #f.close()
    print(f"Decomposing {n} members from {LE_model}")
    
    # run decompose_flux function
    fut = client.map(decompose_flux, 
                     tuple(np.repeat(LE_model, n)), 
                     members, 
                     tuple(np.repeat(model_or_recon, n)),
                     tuple(np.repeat(dir_in, n)),
                     tuple(np.repeat(dir_out, n)))
    client.gather(fut)
    
    del fut

# Print diectory in log file
#f = open("/home/gloege/log.decompose_flux.txt", "a")
#f.write(f"Complete!! \n")
#f.close()
print('Complete!!')

Loading 2 members from GFDL
Decomposing 2 members from GFDL
Complete!!


In [13]:
client.close()

# 5. Decompose pCO2 -- STL
This decomposes pCO2 using the STL method

In [7]:
client.restart()



0,1
Client  Scheduler: tcp://127.0.0.1:33128  Dashboard: http://127.0.0.1:34152/status,Cluster  Workers: 25  Cores: 50  Memory: 269.92 GB


In [None]:
###=================================================================
### LE_model: ['CESM', 'GFDL', 'MPI', CanESM2]
### model_or_recon: ['MODEL', 'SOMFFN', 'UEASI']
###=================================================================

# Output directory where decomposition stored
dir_out = f'{dir_clean}/pCO2_decomp_stl'

# Print diectory in log file
f = open("/home/gloege/log.decompose_pco2.txt", "a")
f.write(f"Output directory : {dir_out} \n")
f.close()
    
###======================================
### model_or_recon needs to be either MODEL or SOMFFN
###====================================== 
model_or_recon = 'MODEL'

for LE_model in ['CESM', 'GFDL', 'CanESM2', 'MPI']:
    ###======================================
    ### Get members from model
    ###======================================
    if LE_model=='CESM':
        members=[1,2,9,10,11,12,13,14,15,16,17,18,20,21,23,24,25,30,31,34,35,101,102,103,104]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='GFDL':
        members=[1,2,3,4,5,6,8,9,10,11,12,13,14,16,17,18,19,20,22,23,26,27,28,29,30]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='MPI':
        members=[6,9,14,20,22,24,25,27,38,43,45,46,57,60,64,70,75,77,78,80,81,83,91,95,98]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='CanESM2':
        members=['r1r10', 'r1r6', 'r1r7', 'r1r9', 'r2r1', 'r2r2', 'r2r8', 'r3r1', 'r3r2', 'r3r4',
                'r3r6', 'r3r7', 'r3r9', 'r4r1', 'r4r3', 'r4r5', 'r4r6', 'r4r7', 'r4r8', 'r5r1', 
                'r5r10', 'r5r2', 'r5r4', 'r5r5', 'r5r9']
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    ###======================================
    ### Load data
    ### client.map    --> These results live on distributed workers.
    ### client.submit --> We can submit tasks on futures. 
    ###                   The function will go to the machine where 
    ###                   the futures are stored and run on the result 
    ###                   once it has completed.
    ###======================================
    ## Print to log file
    f = open("/home/gloege/log.decompose_pco2.txt", "a")
    f.write(f"Decomposing {n} members from {LE_model} \n")
    f.close()
    print(f'Decomposing {n} members from {LE_model}')
    
    ## Run the decompose_pco2 function
    fut = client.map(decompose_pco2, 
                     tuple(np.repeat(LE_model, n)), 
                     members, 
                     tuple(np.repeat(model_or_recon, n)),
                     tuple(np.repeat(dir_out, n)))
    client.gather(fut)
    
    del fut
    
print('Complete!!')
f = open("/home/gloege/log.decompose_pco2.txt", "a")
f.write(f"Complete! \n")
f.close()

Loading 25 members from CESM
Decomposing 25 members from CESM
Loading 25 members from GFDL
Decomposing 25 members from GFDL


# OLD STUFF

In [None]:
# input directory with CO2 flux files
# This dir_out from step 1.
dir_in = f'{dir_clean}/CO2_flux'

# Output directory where decomposition stored
dir_out = f'{dir_clean}/CO2_flux_decomp'

###=================================================================
### LE_model: ['CESM', 'GFDL', 'MPI', CanESM2]
### model_or_recon: ['MODEL', 'SOMFFN', 'UEASI']
###=================================================================

###======================================
### model_or_recon needs to be either MODEL or SOMFFN
###====================================== 
model_or_recon = 'MODEL'

for LE_model in ['CESM', 'GFDL', 'CanESM2']:
    ###======================================
    ### Get members from model
    ###======================================
    if LE_model=='CESM':
        members=[1,2,9,10,11,12,13,14,15,16,17,18,20,21,23,24,25,30,31,34,35,101,102,103,104]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='GFDL':
        members=[1,2,3,4,5,6,8,9,10,11,12,13,14,16,17,18,19,20,22,23,26,27,28,29,30]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='MPI':
        members=[6,9,14,20,22,24,25,27,38,43,45,46,57,60,64,70,75,77,78,80,81,83,91,95,98]
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    if LE_model=='CanESM2':
        members=['r1r10', 'r1r6', 'r1r7', 'r1r9', 'r2r1', 'r2r2', 'r2r8', 'r3r1', 'r3r2', 'r3r4',
                'r3r6', 'r3r7', 'r3r9', 'r4r1', 'r4r3', 'r4r5', 'r4r6', 'r4r7', 'r4r8', 'r5r1', 
                'r5r10', 'r5r2', 'r5r4', 'r5r5', 'r5r9']
        n = len(members)
        print(f'Loading {n} members from {LE_model}')

    ###======================================
    ### Load data
    ### client.map    --> These results live on distributed workers.
    ### client.submit --> We can submit tasks on futures. 
    ###                   The function will go to the machine where 
    ###                   the futures are stored and run on the result 
    ###                   once it has completed.
    ###======================================
    print(f'Decomposing {n} members from {LE_model}')
    fut = client.map(decompose_flux, 
                     tuple(np.repeat(LE_model, n)), 
                     members, 
                     tuple(np.repeat(model_or_recon, n)),
                     tuple(np.repeat(dir_in, n)),
                     tuple(np.repeat(dir_out, n)))
    client.gather(fut)
    
    del fut
    
print('Complete!!')