# To Run Standard Searches
- define new searches in mysets.py
- beginning of notebook is assumed to be interactive until the requests have been checked
- all progress and exception logging is done only for main loop

- `gsurl` is the GCS url for the dataset
- `ds_dir` is our dataset identifier ( activity_id/institution_id/.../variable_id/grid_label ) NO version included

In [None]:
import pandas as pd
import gcsfs
import xarray as xr
from datetime import datetime
import itertools
import os
import gspread
from oauth2client.service_account import ServiceAccountCredentials

### Local modules

In [None]:
import myconfig
from mysets import all_search
from mydataset import dir2url_, dir2dict
from mytasks import Check, Download, ReadFiles, SaveAsZarr, Upload, Cleanup
from mysearch import esgf_search
#from myidentify import gsurl2search, gsurl2dsdir
from myutilities import search_df, remove_from_GC_bucket

In [None]:
# Directly connect to Search Status Page for updating
# Anyone can view: https://docs.google.com/spreadsheets/d/1yAt7604tVt7OXXZUyL2uALtGP2WVa-Pb5NMuTluFsAc/edit?usp=sharing

json_keyfile = '/home/naomi/json/CMIP6-d0cb1df722d1.json'
scope = ['https://spreadsheets.google.com/feeds','https://www.googleapis.com/auth/drive']
credentials = ServiceAccountCredentials.from_json_keyfile_name(json_keyfile, scope)
gc = gspread.authorize(credentials)
sheet_name = "CMIP6_UpdateSchedule"
sh = gc.open(sheet_name)
wks = sh.worksheet("Searches")

# read the first row to get the column labels
columns = wks.row_values(3)
col_status = columns.index('status')
col_run = columns.index('last run')
col_drive = columns.index('HD')
col_dataset = columns.index('current dataset')
col_node = columns.index('search node')

In [None]:
# CONFIGURE ESGF Search here
node_pref = myconfig.node_pref
dtype = myconfig.dtype
hd = '/h123'
myconfig.local_target_prefix = hd + '/naomi/zarr-minimal/'
dir2local = dir2url_(myconfig.local_target_prefix)

clear_logs = True
update_ESGF = True
update_Needed  = True

search = 'newtest'   # if search='test', must define below

try:
    search_row = wks.find(search).row
    print(search_row)
except:
    print('need another row in Google Form!')
    wks.append_row([search])
    search_row = wks.find(search).row

search_node = 'llnl'
ESGF_site = dtype[search_node]

date = datetime.now().strftime('%Y-%m-%d')
wks.update_cell(search_row, col_run + 1, date)
wks.update_cell(search_row, col_drive + 1, hd)
wks.update_cell(search_row, col_node + 1, search_node)

print('zarrs will be written to: ',myconfig.local_target_prefix)

In [None]:
# CONFIGURE GCS
fs     = gcsfs.GCSFileSystem(token='anon', access='read_only',cache_timeout=-1)
df_GCS = pd.read_csv('https://cmip6.storage.googleapis.com/pangeo-cmip6-noQC.csv', dtype='unicode')
df_GCS['ds_dir'] = df_GCS.apply(lambda row: myconfig.target_format % row,axis=1)

# make available to all modules (except those)
myconfig.fs = fs
myconfig.df_GCS = df_GCS

In [None]:
if search in all_search:
    # Use a pre-defined search
    asearch = all_search[search]
else:
    # Define your own search: Pick keyword values to specify your search, not specifying a particular keyword means it will find all.
    asearch = {
                     'table_id'      : ['SImon']
                    #,'activity_id'   : ['CDRMIP']
                    #,'experiment_id' : ['1pctCO2-cdr']
                    ,'variable_id'   : ["siconc"] 
                    #,'member_id'     : ['r2i1p1f1']
                    ,'source_id'     : ['BCC-ESM1']
                    ,'grid_label'    : ['gn']
                }

# check if ANOTHER notebook is doing the same search
lock_file = f'logs/{search}.lock'
if os.path.exists(lock_file):
    if clear_logs:  
        command = f'/bin/rm logs/*{search}*'
        print(command)
        os.system(command)
    #else:
    #    assert False, f'lockfile {lock_file} exists'
        
f = open(lock_file,'w')
date = str(datetime.now().strftime("%Y%m%d-%H%M"))
f.write(f'started {search} at {date}')
f.close()

search, asearch

In [None]:
# make dataframe of ESGF search results

if update_ESGF:
    x = [value for key,value in asearch.items()]
    searches = [p for p in itertools.product(*x)]

    dESGF = []
    for s in searches:
        dsearch = dict(zip(asearch.keys(),s))
        print(dsearch)
        df = esgf_search(dsearch, server=ESGF_site)
        if len(df)>0:
            dESGF += [df]

    df_ESGF = pd.concat(dESGF)
    df_ESGF.to_csv(f'csv/ESGF_{search}.csv',index=False)
else:
    df_ESGF = pd.read_csv(f'csv/ESGF_{search}.csv', dtype='unicode')

len(df_ESGF), len(df_ESGF.ds_dir.unique())
df_ESGF.ds_dir.unique()

In [None]:
# make dataframe of all needed

if update_Needed:
    df_ESGF['cversion'] = [int(s[1:]) for s in df_ESGF.version_id]
    df_ESGF = df_ESGF.sort_values(by=['cversion'])
    df_ESGF = df_ESGF.drop_duplicates(subset =["ds_dir","ncfile"],keep='last')
 
    #df_GCS['ds_dir'] = df_GCS.apply(lambda row: myconfig.target_format % row,axis=1) # THIS WON'T WORK FOR DCPP
    df_GCS = df_GCS[df_GCS.version != 'unknown']
    df_GCS['ds_dir']=[s.split('gs://cmip6/CMIP6/')[-1].split('/v20')[0] for s in df_GCS.zstore]
    df_GCS['cversion'] = [int(s) for s in df_GCS.version]
    df_GCS = df_GCS.sort_values(by=['version'])
    df_GCS = df_GCS.drop_duplicates(subset =["ds_dir"],keep='last')

    df = pd.merge(df_ESGF, df_GCS, how='outer', on=['ds_dir'], indicator=True, suffixes=('', '_y'),)
    df_new_dataset = df[df._merge == 'left_only'] 
    
    df_check = df[df._merge == 'both'] # we might want to add these if the ESGF version is newer than the GCS version
    # New versions (at least 2 days newer) which exist at ESGF
    if len(df_check) > 0:
        df_check['dversion'] = df_check.apply(lambda row: row.cversion - row.cversion_y,axis=1)
        df_check = df_check.sort_values(by=['dversion'])
        df_new_version = df_check[df_check.dversion > 2] # at least 2 days newer

        # Want to append together df_new_dataset and df_new_version
        keys = ['activity_id', 'institution_id', 'source_id', 'experiment_id', 'member_id', 'table_id', 'variable_id', 'grid_label', 'version_id', 'ncfile', 'file_size', 'url', 'data_node', 'ds_dir', 'node_order', 'start', 'stop']
        df_new_version = df_new_version[keys]
        df_new_dataset = df_new_dataset[keys]

        df_needed = df_new_dataset.append(df_new_version)
    else:
        df_needed = df_new_dataset

    df_needed['version'] = [s[1:] for s in df_needed.version_id]

    num_stores = 0
    if len(df_needed) > 0:
        num_stores = df_needed.ds_dir.nunique() 
        print(f'needed: nfiles={len(df_needed)}, nstores={num_stores}')
    
        df_needed['member'] = [int(s.split('r')[-1].split('i')[0]) for s in df_needed['member_id']]
        df_needed = df_needed.sort_values(by=['member'])
        #df_needed['zsize'] = [df_needed[df_needed.ds_dir==zs]['file_size'].sum() for zs in df_needed['ds_dir']]
        #df_needed = df_needed.sort_values(by=['zsize'])
        df_needed.to_csv(f'csv/needed_{search}.csv',index=False)
else:
    df_needed = pd.read_csv(f'csv/needed_{search}.csv', dtype='unicode')

print('Variables')
try:
    for var in df_needed.variable_id.unique():
        print(var,df_needed[df_needed.variable_id==var].ds_dir.nunique())

    print('\nExperiments')
    for exp in df_needed.experiment_id.unique():
        print(exp,df_needed[df_needed.experiment_id==exp].ds_dir.nunique())
except:
    print('no new data available')

In [None]:
# make available to all modules
myconfig.df_needed = df_needed
ds_dirs = df_needed.ds_dir.unique()
numdsets = len(ds_dirs)

In [None]:
progress_log  = f'logs/progress_{search}.log'
failure_log   = f'logs/failure_{search}.log'
success_log   = f'logs/success_{search}.log'
logs = {1:progress_log, 2:failure_log, 3:success_log}

### ierr:
- 0 : proceed with next task
- 1 : write to progress_log, go to next dataset (finished or try again)
- 2 : write to failure_log,  go to next dataset (mark as un-usable - do not try again until problem is solved) 
- 3 : write to success_log,  go to next dataset (dataset added to cloud)

In [None]:
def write_log(file,str,verbose=True):
    date = datetime.now().strftime('%Y%m%d-%H%M')
    f = open(file,'a')
    if verbose:
        print(str)
    f.write(f'{date}:{str}\n')
    f.close()
    return

In [None]:
# refresh the gcsfs
fs.invalidate_cache()
date = str(datetime.now().strftime("%Y%m%d-%H%M"))
for log in [1,2,3]:   
    write_log(logs[log],f'starting at {date}') 

for item, ds_dir in enumerate(ds_dirs):
    #if item > 3:
    #    continue
    print(f'\n{item}/{numdsets-1}',ds_dir)

    #skip = 'EC-Earth3'
    skip = 'none'
    if skip in ds_dir:
        write_log(progress_log,f'{ds_dir} skipping {skip}') 
        continue

    version = df_needed[df_needed.ds_dir==ds_dir].version.values[0]

    (ierr, exc) = Check(ds_dir, version, dir2local)
    if ierr > 0:
        write_log(logs[ierr],f'{ds_dir} {exc}'); continue
                     
    (gfiles, version, ierr, exc) = Download(ds_dir)
    if ierr > 0:
        write_log(logs[ierr],f'{ds_dir}, {exc}'); continue
        
    search_row = wks.find(search).row
    wks.update_cell(search_row, col_dataset + 1, ds_dir)
    
    (ds,ierr,exc) = ReadFiles(ds_dir, gfiles, version, dir2dict)
    
    if ierr > 0:
        write_log(logs[ierr],f'{ds_dir}, {exc}'); continue

    (version,ierr,exc) = SaveAsZarr(ds_dir, ds, dir2local)
    if ierr > 0:
        write_log(logs[ierr],f'{ds_dir}, {exc}'); continue
        
    (zbdir, gsurl, ierr,exc) = Upload(ds_dir, version, dir2local)
    if ierr > 0:
        write_log(logs[ierr],f'{ds_dir}, {exc}'); continue

    (ierr,exc) = Cleanup(ds_dir, version, gfiles, dir2local)
    if ierr > 0:
        write_log(logs[ierr],f'{ds_dir}, {exc}'); continue

    date = datetime.now().strftime('%H:%M, %b%d')
    status_str = f'{item+1} of {numdsets} at {date}'
    search_row = wks.find(search).row
    wks.update_cell(search_row, col_status + 1, status_str)

    write_log(success_log,f'{zbdir} saved to {gsurl}'); continue

In [None]:
#xr.open_zarr(fs.get_mapper(gsurl),consolidated=True)
#xr.open_zarr(zbdir,consolidated=True)