# Handle New Data Requests Automatically
- beginning of notebook is assumed to be interactive until the requests have been checked
- all progress and exception logging is done only for main loop
- copy and paste the e-mail response and send from gcs.cmip6.ldeo@gmail.com account

In [None]:
import numpy as np
import pandas as pd
import os
import gcsfs
import xarray as xr
from functools import partial
from IPython.display import display
from glob import glob
import warnings
import datetime

### Local modules

In [None]:
from request import requests, set_request_id
from search import esgf_search, esgf_search_sites
from netcdf import get_ncfiles, concatenate
from identify import needed
from response import response, dict_to_dfcat, get_details
from utilities import getFolderSize

### Initialization

In [None]:
# Where to write local zarr stores:
zarr_local = '/h85/naomi/zarr-minimal'

fs = gcsfs.GCSFileSystem(token='anon', access='read_only')

### Choose basic configuration parameters

In [None]:
dtype = esgf_search_sites()
print('possible ESGF API search nodes: ',list(dtype.keys()))

local_node = False
ESGF_site = dtype['llnl'];local_node = True
#ESGF_site = dtype['dkrz']
#ESGF_site = dtype['ipsl']
#ESGF_site = dtype['ceda'];local_node = False  # CEDA doesn't allow local-only searches

The complete archive of CMIP6 output is made available for search and download via any one of the following portals:

USA, PCMDI/LLNL (California) - https://esgf-node.llnl.gov/search/cmip6/

France, IPSL - https://esgf-node.ipsl.upmc.fr/search/cmip6-ipsl/

Germany, DKRZ - https://esgf-data.dkrz.de/search/cmip6-dkrz/

UK, CEDA - https://esgf-index1.ceda.ac.uk/search/cmip6-ceda/

If you encounter slow responses from one search interface, you might try one of the other portals (perhaps one near you). Also note that the datasets themselves are stored (and partially replicated) on a federated system of data nodes, and again you may find differences from node to node in download speeds.

In [None]:
update_ESGF = True

# must choose ONE table_id
#table_id = 'Amon'
table_id = 'day'

#must choose LIST of experiments, variables
#experiment_ids = ['ssp245', 'ssp370', 'ssp585']
experiment_ids = ['piControl']
variable_ids = ['prc']
#variable_ids = ['hus','mrsos','prc','rlds','sfcWind','ua','va','wap']

# can specify 'All' or give a list
sources = ['CESM2-WACCM-FV2']
#sources = 'All'
members = ['r1i1p1f1']
#members = 'All'

### Search ESGF for the availability of requested data

In [None]:
if update_ESGF:
    # get for all sources and members, filter request later
    df_list = []
    for experiment_id in experiment_ids:
        for variable_id in variable_ids:
            print(experiment_id,variable_id)
            try:
                files= esgf_search(server=ESGF_site, mip_era='CMIP6', variable_id=variable_id, 
        table_id=table_id, experiment_id=experiment_id, page_size=500, verbose=False, local_node=False)
                #print('got-em')
            except:
                #print(experiment_id, table_id,variable_id,'nothing doing')
                continue

            files.loc[:,'version'] = [str.split('/')[-2] for str in files['HTTPServer_url']]
            files.loc[:,'file_name'] = [str.split('/')[-1] for str in files['HTTPServer_url']]
            # might need to set activity_id to activity_drs for some files (see old versions)
            files.loc[:,'activity_id'] = files.activity_drs
            df_list += [files.drop_duplicates(subset =["file_name","version","checksum"]) ]

    dESGF = pd.concat(df_list,sort=False)
    dESGF = dESGF.drop_duplicates(subset =["file_name","version","checksum"])
    dESGF.to_csv('csv/ESGF_specific.csv',index=False)
else:
    dESGF = pd.read_csv('csv/ESGF_specific.csv')
    dESGF = dESGF.drop_duplicates(subset =["file_name","version","checksum"])

### Get the master list of existing zarr stores
- df_avail includes all stores, EVEN THOSE with known ES-DOC issues 

In [None]:
dzLocal = pd.read_csv('https://cmip6.storage.googleapis.com/cmip6-zarr-consolidated-stores-noQC.csv', dtype={'version': 'unicode'})
len(dzLocal),len(dESGF)

### Check the new requests:
- already exists in df_avail (what we have) - not needed
- exists in df_ESGF (what is available) - if not available, then not needed|

In [None]:
update_files_needed = True
if update_files_needed:
    ngood = 0
    zarr_format = '/%(activity_drs)s/%(institution_id)s/%(source_id)s/%(experiment_id)s/\
%(member_id)s/%(table_id)s/%(variable_id)s/%(grid_label)s/'
    df_list = []
    if sources == 'All':
        source_ids = dESGF.source_id.unique()
    else:
        source_ids = sources
    for source_id in source_ids:
        for experiment_id in experiment_ids:
            print(experiment_id,source_id)
            for variable_id in variable_ids:
                df_grid = dESGF[(dESGF.experiment_id==experiment_id)&(dESGF.source_id==source_id)&
                               (dESGF.variable_id==variable_id)]
                
                if members == 'All':
                    member_ids = dESGF.member_id.unique()
                else:
                    member_ids = members

                for member_id in member_ids:
                    dfm = df_grid[df_grid.member_id==member_id]
                    grid_labels = dfm.grid_label.unique()
                    for grid_label in grid_labels:
                        dfmg = dfm[dfm.grid_label==grid_label]
                    
                        file = dfmg.values[0]
                        zarr_dir = dict(zip(dfmg.keys(),file))
                        zarr_file = zarr_format % zarr_dir 

                        zstore = 'gs://cmip6' + zarr_file 

                        #print(zstore)
                        df_cloud = dzLocal[(dzLocal.zstore==zstore)]

                        if len(df_cloud) >= 1:
                            print('store already in cloud')
                            continue
                        else:
                            ngood += 1

                        with warnings.catch_warnings():
                            warnings.filterwarnings("ignore")
                            dfmg.loc[:,'zstore'] = zarr_file

                        df_list += [dfmg]

    dESGF3 = pd.concat(df_list,sort=False)
    dESGF3.to_csv('csv/ESGF3.csv',index=False)
else:
    dESGF3 = pd.read_csv('csv/ESGF3.csv')

keys_all = list(dESGF3.keys())
keys_show = ["source_id","experiment_id","member_id","variable_id",'zstore']
keys_drop = list(set(keys_all) - set(keys_show))

dESGF3.drop(keys_drop,1).groupby(['experiment_id','variable_id','source_id','member_id']).count()

In [None]:
df_needed = dESGF3

if len(df_needed) > 0:
    num_stores = df_needed.zstore.nunique() 
    print(f'needed: nfiles={len(df_needed)}, nstores={num_stores}')
    #print(df_needed.zstore.unique())
else:
    print('no new data available')
    exit

In [None]:
#df_needed.zstore.unique()
print("table_id = '",*df_needed.table_id.unique(),"'",sep = "")
print('exps = [\'',end=""); print(*df_needed.experiment_id.unique(), sep = "','",end="" ); print('\']')
print('variables = [\'',end=""); print(*df_needed.variable_id.unique(), sep = "','",end="" ); print('\']')
print('members = [\'',end=""); print(*df_needed.member_id.unique(), sep = "','",end="" ); print('\']')

In [None]:
assert False 

### Start logging the progress and exceptions

In [None]:
date = str(datetime.datetime.now().strftime("%Y%m%d%s"))

cat_file = 'csv/cmip6_'+date+'.csv'
log_file = 'txt/request_'+date+'.log'
print(log_file)

In [None]:
# open and close for each write in case of kernel interrupt
def write_log(file,str,verbose=True):
    f = open(file,'a')
    if verbose:
        print(str)
    f.write(str+'\n')
    f.close()
    return

In [None]:
df_needed['member'] = [int(s.split('r')[-1].split('i')[0]) for s in df_needed['member_id']]
df_needed = df_needed.sort_values(by=['source_id'])
df_needed = df_needed.sort_values(by=['member'])
#df_needed

In [None]:
# List sites to skip for aquiring new netcdf files: broken or slow sites

skip_sites = []
skip_sites += ['esg.lasg.ac.cn']
#skip_sites += ['esgf.nci.org.au']
#skip_sites += ['esg-cccr.tropmet.res.in']
#skip_sites += ['esgf.ichec.ie']
#skip_sites += ['esgf-data3.ceda.ac.uk']
#skip_sites += []

### The real work is done in this next loop 
- could be done in parallel except for the writing to the log file

In [None]:
# reload the catalog
df_GCS = pd.read_csv('https://cmip6.storage.googleapis.com/cmip6-zarr-consolidated-stores-noQC.csv', dtype='unicode')

# refresh the gcsfs
fs.invalidate_cache()

new_zarrs = df_needed.zstore.unique()

verbose = True

zdict = {}  # construct dictionary for new rows to add to master catalog
for item,zarr in enumerate(new_zarrs):
    
    zbdir  = zarr_local  + zarr
    
    write_log(log_file,f"\n>>{item+1}/{num_stores}:<< local file: {zbdir}",verbose=verbose)
    
    # is zarr already in cloud?
    gsurl = 'gs://cmip6' + zarr
    contents = fs.ls(gsurl)
    if any("zmetadata" in s for s in contents):
        write_log(log_file,'store already in cloud',verbose=verbose)
        continue

    cstore = df_GCS[df_GCS.zstore == gsurl]

    if len(cstore) > 0:
        print('store already in cloud catalog')
        continue

    # has zarr store already been created locally?  
    zstrs = glob(zbdir + '/.zmetadata')
    if len(zstrs) > 0 :
        print('store already exists locally, but is not in cloud, please upload manually')
        continue     
            
    # Download the needed netcdf files - reading the known trouble codes from database
    gfiles,troubles,codes,okay = get_ncfiles(zarr,df_needed,skip_sites)
    
    write_log(log_file,troubles,verbose=verbose)
    
    if okay == False:
        continue

    if len(gfiles) == 0: 
        write_log(log_file,'no files available',verbose=verbose)
        continue
    
    variable_id = zarr.split('/')[-3]

    # concatenate in time with mfdataset
    gfiles = sorted(gfiles)
    status, ds, dstr = concatenate(zarr,gfiles,codes)  

    if status == 'failure':
        write_log(log_file,status+dstr,verbose=verbose)
        continue
    else:
        write_log(log_file,dstr)

    # convert to zarr, with consolidated metadata
    ds.to_zarr(zbdir, consolidated=True, mode='w')
        
    if not os.path.isfile(zbdir+'/.zmetadata'):
        write_log(log_file,'to_zarr failure: ',verbose=verbose)
        continue
   
    vlist = get_details(ds,zbdir,zarr)
    
    # upload to cloud
    command = '/usr/bin/gsutil -m cp -r '+ zbdir[:-1] + ' ' + gsurl[:-1]
    write_log(log_file,command,verbose=verbose)
    # uncomment next line to really upload to GC
    os.system(command) 
        
    size_remote = fs.du(gsurl)
    size_local = getFolderSize(zbdir)
    assert (size_remote - size_local) < 100
    write_log(log_file,f'uploaded {zbdir} correctly',verbose=verbose)    

    try:
        ds = xr.open_zarr(fs.get_mapper(gsurl), consolidated=True)
        zdict[item] = vlist
        write_log(log_file,f'successfully saved as {zbdir}')
        for gfile in gfiles:
            os.system('rm -f '+ gfile)
    except:
        write_log(log_file,'store did not get saved to GCS properly')


In [None]:
if len(zdict) == 0 :
    print('nothing else to do')
    exit
else:
    dz = dict_to_dfcat(zdict)

In [None]:
# now update cloud catalog to reflect this new data (nb3b-CloudCat.ipynb)