In [None]:
import pandas as pd
import numpy as np
import sys, os
from glob import glob
import xarray as xr
import datetime
import warnings
import gcsfs
import datetime

In [None]:
# local modules
from myidentify import gsurl2tracks, tracks2version

In [None]:
# lets put the date here to know when it was last run
date = str(datetime.datetime.now().strftime("%m/%d/%Y, %H:%M:%S"))
print(date)

In [None]:
# are there extra local drives with new zarr stores? (helps limit the search for new datasets)

mach = os.uname()[1]
if 'haden' in mach:
    local_storage = True
else:
    local_storage = False
    zarr_local = '/d1/naomi/cmip6-zarrs'  # usually matches location in GetSpecified and Requests

In [None]:
# A. Make new noQC catalog
#    1. Collect listings of all files in GC to make dataframe 'dz_GC'
#       a. figure out which file listings need to be updated, update and concatenate

if local_storage:
    #new_drives =  ['/net/abbey/a2','/net/abbey/a3','/net/carney/d2','/h120']
    new_drives = ['/net/abbey/a4','/net/carney/d3','/h119','/h121','/h122']

    drives = [s + '/naomi/zarr-minimal' for s in new_drives]
else:
    drives = [zarr_local]  # This must match location in Requests.ipynb or GetSpecified.ipynb

new_drives = list(set(drives))

new_activities=[]
for drive in new_drives:
    dirs = glob(drive+'/*/*')
    for d in dirs:
        new_activities += [d.split('/')[-2]+'/'+d.split('/')[-1]]

new_activities = sorted(list(set(new_activities)))   #. [:2]   # just test a few ***

# override, if necessary:
#new_activities = []


print(new_activities)

In [None]:
for activity_id in new_activities:
    print(activity_id)
    os.system(f"/usr/bin/gsutil -m ls gs://cmip6/{activity_id}/**/.zmetadata > ncsv/GC_files_{activity_id.replace('/','-')}.csv")

os.system("cat ncsv/GC_files_*.csv > ncsv/GC_files.csv") 

In [None]:
# A. Make new noQC catalog
#    1. Collect listings of all files in GC to make dataframe 'dz_GC'
#       b. read in list of zarr stores and turn into df with 8-tuple dataset id

df = pd.read_csv('ncsv/GC_files.csv',names=(['zstore']))
print(len(df.zstore.unique()))

files = df.zstore.values
ddict = {}
for item, tdir in enumerate(files):
    store = tdir.split('.zmetadata')[0]
    vlist = tdir.split('/')[-9:-1]
    if vlist[-2] == vlist[-1]:
        print('must fix:',tdir)
        assert False
        
    vlist += [store]
    ddict[item] = vlist

dz_GC = pd.DataFrame.from_dict(ddict, orient='index')
dz_GC = dz_GC.rename(columns={0: "activity_id", 1: "institution_id", 2:"source_id",
                        3:"experiment_id",4:"member_id",5:"table_id",6:"variable_id",
                        7:"grid_label",8:"zstore"}) 

dz_GC["dcpp_init_year"] = dz_GC.member_id.map(lambda x: float(x.split("-")[0][1:] if x.startswith("s") else np.nan))
dz_GC["member_id"] = dz_GC["member_id"].map(lambda x: x.split("-")[-1] if x.startswith("s") else x)

In [None]:
# A. Make new noQC catalog
#    2. read dz_exclude from errata files

dz_exclude = pd.read_csv('csv/errata-files.csv')
dz_exclude['zstore'] = ['gs://cmip6/' + s.split('CMIP6.')[-1][:-9].replace('.','/') \
                        + '/' for s in dz_exclude.file_id]
dz_exclude['vstore'] = dz_exclude.apply(lambda row: row.zstore + 'v' + str(row.version), axis = 1)
len(dz_exclude)

In [None]:
# A. Make new noQC catalog
#    3. read in dz_old from old noQC to get known versions from old catalog

dz_old = pd.read_csv('https://cmip6.storage.googleapis.com/cmip6-zarr-consolidated-stores-noQC.csv', dtype='unicode')
dz_new = pd.merge(dz_GC, dz_old[['zstore','version']], on='zstore', how='left', sort=False)

In [None]:
# A. 
#   4a. update version if store has been replaced by a newer version

df_newversions = pd.read_csv('csv/newversions.csv', dtype='unicode')

for index, row in df_newversions.iterrows():
    zstore = row.zstore
    newver = row.version
    #print(index,zstore,newver)
    oldver = dz_new[dz_new.zstore==zstore].version.values[0] 
    #print(oldver,newver)
    if newver == oldver:
        ver = oldver
    else:
        #print(newver,oldver)
        ver = newver
    dz_new.loc[dz_new.zstore == zstore,'version'] = newver 

In [None]:
### try write using list comprehension

# A. Make new noQC catalog
#    4b. fix ambiguous or missing versions in dz_new

version = []
print(dz_new.zstore.values[0],dz_new.zstore.values[-1])
for index, row in dz_new.iterrows():
    zstore = row.zstore
    ver = row.version           
    if ver == 'ambiguous' or pd.isna(ver):
        try:
            tracks = gsurl2tracks(zstore)
            (newver,jdict) = tracks2version(tracks,verbose=False)
            #print(ver,newver)
            ver = newver
        except:
            print('trouble determining version for ',zstore)
            ver = 'ambiguous'
        
    version += [ver]

dz_new['version'] = version

In [None]:
# A. Make new noQC catalog
#    5. using vstore=zstore+version, update dz_new with status, severity and url columns

# combine zstore and version
dz_new['vstore'] = dz_new.apply(lambda row: row.zstore + 'v' + str(row.version), axis = 1)

# Find vstores (= zstore+version) in dz_GC which have issues at ES-DOC
set_A = set(sorted(list(dz_new.vstore.values))) 
set_B = set(sorted(list(dz_exclude.vstore.values)))

in_both = sorted(list(set_A.intersection(set_B)))
print(len(set_A),len(set_B),len(in_both))

status = []
severity = []
url = []
for index, row in dz_new.iterrows():
    vstore = row.vstore
    if vstore in in_both:
        dze = dz_exclude[dz_exclude.vstore==vstore]
        status += [dze.status.values[0]]
        severity += [dze.severity.values[0]]
        url += [dze.issue_url.values[0]]
    else:
        status += ['good']
        severity += ['none']
        url += ['none']

dz_new['status'] = status
dz_new['severity'] = severity
dz_new['issue_url'] = url

In [None]:
# MAKE SURE NEW CATALOG IS LARGER THAN OLD
print(len(dz_new),len(dz_old))
assert len(dz_new) > len(dz_old)

In [None]:
# A. Make new noQC catalog
#    6a. save old noQC catalog and then upload to GC

date = str(datetime.datetime.now().strftime("%Y%m%d"))
old_cat = f'old_catalogs/pangeo-cmip6-{date}-noQC.csv'
os.system(f'cp csv/pangeo-cmip6-noQC.csv {old_cat}')
os.system(f'gzip {old_cat}')
ret = os.system(f'/usr/bin/gsutil -m cp {old_cat}.gz gs://cmip6/{old_cat}.gz')

In [None]:
#    6b. save new noQC catalog and then upload to GC

dz_new.drop(['vstore'],1).to_csv('csv/pangeo-cmip6-noQC.csv', mode='w+', index=False)

ret = os.system('/usr/bin/gsutil -m cp csv/pangeo-cmip6-noQC.csv gs://cmip6/cmip6-zarr-consolidated-stores-noQC.csv')
if ret != 0:
    print('noQC upload not working')

In [None]:
# B. Make new standard catalog
#    1. eliminate harmless issues from dz_exclude
#       these should all be properly evaluated - I just made a first guess

dz_exclude = dz_exclude[dz_exclude.status != 'resolved']
dz_exclude = dz_exclude[dz_exclude.severity != 'low']
dz_exclude = dz_exclude[dz_exclude.issue_uid != 'b6302400-3620-c8f1-999b-d192c0349084']
dz_exclude = dz_exclude[dz_exclude.issue_uid != '45f9e7b9-1844-7a92-8b54-10d954e621db']
dz_exclude = dz_exclude[dz_exclude.issue_uid != '4aa40e49-b2d4-0b29-a6b1-c80ee8dce11a']
dz_exclude = dz_exclude[dz_exclude.issue_uid != '2f6b5963-f87e-b2df-a5b0-2f12b6b68d32']
dz_exclude = dz_exclude[dz_exclude.issue_uid != '61fb170e-91bb-4c64-8f1d-6f5e342ee421']
dz_exclude = dz_exclude[dz_exclude.issue_uid != '90cac29b-eaff-c450-8621-ea31e305a40e']
dz_exclude = dz_exclude[dz_exclude.issue_uid != '8fbd8df5-c349-315b-9ec3-5a2f4ec4ec63']
dz_exclude = dz_exclude[dz_exclude.issue_uid != 'ad5ca671-39d0-39ed-bf4f-6c8fb1a06047']

In [None]:
# B. Make new standard catalog
#    2. use this (smaller) list of issues to eliminate the more serious issues from standard catalog

# Find zstores in both:
set_A = set(sorted(list(dz_new.vstore.values))) 
set_B = set(sorted(list(dz_exclude.vstore.values)))

in_both = sorted(list(set_A.intersection(set_B)))
print(len(set_A),len(set_B),len(in_both))

dfz = dz_new.copy()
dfz['issue'] = [value in in_both for value in dfz.vstore.values]
dz_issues = dfz[dfz.issue]
dz_clean  = dfz[dfz.issue==False]
dz_orig = pd.concat([dfz, dz_issues, dz_issues]).drop_duplicates(keep=False)
dz_orig = dz_orig.drop(['issue','vstore'],1)

In [None]:
# MAKE SURE NEW CATALOG IS LARGER THAN OLD
dfcat = pd.read_csv('https://cmip6.storage.googleapis.com/cmip6-zarr-consolidated-stores.csv')
print(len(dz_orig),len(dfcat))
assert len(dz_orig) > len(dfcat)

In [None]:
# B. Make new standard catalog
#    3a. save old standard catalog and then upload to GC

date = str(datetime.datetime.now().strftime("%Y%m%d"))
old_cat = f'old_catalogs/pangeo-cmip6-{date}.csv'
os.system(f'cp csv/pangeo-cmip6.csv {old_cat}')
os.system(f'gzip {old_cat}')
ret = os.system(f'/usr/bin/gsutil -m cp {old_cat}.gz gs://cmip6/{old_cat}.gz')

In [None]:
# B. Make new standard catalog
#    3b. save new standard catalog and then upload to GC

dz_orig.drop(['status','severity','issue_url'],1).to_csv('csv/pangeo-cmip6.csv', mode='w+', index=False)

ret = os.system('/usr/bin/gsutil -m cp csv/pangeo-cmip6.csv gs://cmip6/cmip6-zarr-consolidated-stores.csv')
if ret != 0:
    print('standard catalog upload not working')
    
ret = os.system('/usr/bin/gsutil -m cp csv/pangeo-cmip6.csv gs://cmip6/pangeo-cmip6.csv')
if ret != 0:
    print('duplicate standard catalog upload not working')

In [None]:
print(new_drives)