# First testing to read, test, ingest legacy catalog into bigquery

Got this running fairly fast (~8 hours for the whole catalog)

- [ ] Scale this out on a coiled cluster?
- [ ] Apply same methodology to retesting

In [1]:
from leap_data_management_utils.cmip_testing import test_all
from leap_data_management_utils import CMIPBQInterface, IIDEntry
# # TODO refactor
# from leap_data_management_utils.cmip_catalog import convert_cmip6_df_to_iid_df
# from catalog_utils import convert_cmip6_df_to_iid_df
import zarr
import pandas as pd
from tqdm.auto import tqdm
import dask.bag as db
from dask.diagnostics import ProgressBar
from dask.distributed import LocalCluster, Client

In [2]:
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 12,Total memory: 64.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:51629,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 12
Started: Just now,Total memory: 64.00 GiB

0,1
Comm: tcp://127.0.0.1:51640,Total threads: 3
Dashboard: http://127.0.0.1:51642/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:51632,
Local directory: /var/folders/m2/vwrjf3zx1bvdbhfkw7bxpz7r0000gn/T/dask-scratch-space/worker-7quefal2,Local directory: /var/folders/m2/vwrjf3zx1bvdbhfkw7bxpz7r0000gn/T/dask-scratch-space/worker-7quefal2

0,1
Comm: tcp://127.0.0.1:51641,Total threads: 3
Dashboard: http://127.0.0.1:51645/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:51633,
Local directory: /var/folders/m2/vwrjf3zx1bvdbhfkw7bxpz7r0000gn/T/dask-scratch-space/worker-eo5tiwa6,Local directory: /var/folders/m2/vwrjf3zx1bvdbhfkw7bxpz7r0000gn/T/dask-scratch-space/worker-eo5tiwa6

0,1
Comm: tcp://127.0.0.1:51643,Total threads: 3
Dashboard: http://127.0.0.1:51648/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:51634,
Local directory: /var/folders/m2/vwrjf3zx1bvdbhfkw7bxpz7r0000gn/T/dask-scratch-space/worker-_q32ir20,Local directory: /var/folders/m2/vwrjf3zx1bvdbhfkw7bxpz7r0000gn/T/dask-scratch-space/worker-_q32ir20

0,1
Comm: tcp://127.0.0.1:51647,Total threads: 3
Dashboard: http://127.0.0.1:51650/status,Memory: 16.00 GiB
Nanny: tcp://127.0.0.1:51635,
Local directory: /var/folders/m2/vwrjf3zx1bvdbhfkw7bxpz7r0000gn/T/dask-scratch-space/worker-a0jq5vnl,Local directory: /var/folders/m2/vwrjf3zx1bvdbhfkw7bxpz7r0000gn/T/dask-scratch-space/worker-a0jq5vnl


In [3]:
table_id = 'leap-pangeo.testcmip6.cmip6_consolidated_manual_testing'
bq = CMIPBQInterface(table_id=table_id)

In [4]:
def fix_legacy_df(df: pd.DataFrame) -> pd.DataFrame:
    """There were several errors in the original dataframe. This should fix it and bring it into official CMIP conventions
    Source: https://docs.google.com/document/d/1h0r8RZr_f3-8egBMMh7aqLwy3snpD6_MrDz1q8n5XUk/edit
    1. `member_id` and `variant_label` are mixed up. 
    2. `dcpp_init` year is not mentioned anywhere in the CMIP docs. I suspect this is `sub_experiment_id`
    3. Some values are saved as floats causing issues with generating proper iids
    """
    df = df.copy()
    # df = df.drop(columns='variant_label')
    df = df.rename(columns={'dcpp_init_year':'sub_experiment_id', 'member_id':'variant_label'})
    # convert sub_experiment column to str and prepend an `s` (see: https://github.com/WCRP-CMIP/CMIP6_CVs/blob/master/CMIP6_sub_experiment_id.json)
    df['sub_experiment_id'] = df['sub_experiment_id'].fillna(1e6).astype(int)
    #
    df['sub_experiment_id'] = 's'+df['sub_experiment_id'].astype('str')
    # remove nan values
    df.loc[df['sub_experiment_id'].isin(['s1000000']), "sub_experiment_id"] = 'none'
    return df


def _maybe_join(iterable):
    assert len(iterable) == 2 
    sub_experiment_id = iterable.iloc[0]
    variant_label = iterable.iloc[1]
    if sub_experiment_id != 'none':
        return f"{sub_experiment_id}-{variant_label}"
    else:
        return variant_label

def convert_cmip6_df_to_iid_df(df: pd.DataFrame) -> pd.DataFrame:
    # now remove the ones already in the pangeo catalog
    df = df.copy()
    df['member_id'] = df[["sub_experiment_id", "variant_label"]].agg(_maybe_join, axis=1)
    df['version'] = 'v'+df['version'].astype(str)
    df['instance_id'] = df[['activity_id', 'institution_id', 'source_id', 'experiment_id',
        'member_id', 'table_id', 'variable_id', 'grid_label', 'version']].astype(str).agg('.'.join, axis=1).tolist()
    df['instance_id'] = 'CMIP6.'+df['instance_id']
    df['store'] = df['zstore']
    # add current time as bigquery timestamp
    df['timestamp'] = pd.Timestamp.now(tz='UTC')
    df = df[['instance_id','store', 'timestamp']]
    return df

In [5]:
df_legacy_raw = pd.read_csv('https://storage.googleapis.com/cmip6/pangeo-cmip6.csv')
df_legacy_fixed_raw = fix_legacy_df(df_legacy_raw)
df_legacy = convert_cmip6_df_to_iid_df(df_legacy_fixed_raw)

In [6]:
def test(store_url,iid):
    store = zarr.storage.FSStore(store_url)
    try:
        test_all(store, iid, verbose=False)
        test_passed = True
    except Exception as e: 
        test_passed = False
    return test_passed 

In [7]:
# filter for iids that do not exist
# need to do this in batches
iids = df_legacy['instance_id'].tolist()
iids_in_bq = []
batchsize = 10000 #10k seems to be the limit 
iid_batches = [iids[i:i+batchsize] for i in range(0,len(iids), batchsize)]
for iids_batch in tqdm(iid_batches):
    iids_in_bq_batch = bq.iid_list_exists(iids_batch)
    iids_in_bq.extend(iids_in_bq_batch)

  0%|          | 0/52 [00:00<?, ?it/s]

In [8]:
df_legacy_filtered = df_legacy[~df_legacy.instance_id.isin(iids_in_bq)]

In [9]:
df_input = df_legacy_filtered.iloc[:300000, :] # for now only a representative example?

## Parallel loading and testing xarray datasets and adding to bq in batches

In [10]:
def batch_test_add(batch_df: pd.DataFrame, bq_client: CMIPBQInterface):
    seq = [(a[1].store,a[1].instance_id) for a in batch_df.iterrows()]
    bag = db.from_sequence(seq)
    with ProgressBar():
        results_dask = bag.map(lambda a: (*a, test(*a))).compute()
    results_dask_iid_entry = [IIDEntry(iid=a[1], store=a[0], retracted=False, tests_passed=a[2]) for a in results_dask]
    bq_client.insert_multiple_iids(results_dask_iid_entry)

In [None]:
n=10000
df_input_batches = [df_input[i:i+n] for i in range(0,len(df_input),n)]
for df_batch in tqdm(df_input_batches):
    batch_test_add(df_batch, bq)

  0%|          | 0/30 [00:00<?, ?it/s]

  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
  ret = ret.dtype.type(ret / rcount)
 

## show all legacy stores that had failing tests

In [None]:
df_read = bq.get_latest()
failed_legacy = df_read[df_read['tests_passed'].isin([False])]
failed_legacy

In [13]:
break this shit

SyntaxError: invalid syntax (332580067.py, line 1)

## Running in serial and profiling
- Opening the zarr in xarray takes by far the longest
    - I was able to shave quite a bit of time by only opening the dataset once and then passing that object around
- The API calls to google are not negligible (~25% of the wall time)
- I guess to speed this up we would need to parallelize the loading of datasets and testing and then add batches of results to the bq! Doable but not exactly what I want to focus on right now.
- Snakeviz adds significant overhead! Run without if we want to speed this up. 

In [None]:
def test_and_add(iid, store_url, bq_client):
    test_passed = test(iid, store_url)
    iid_entry = IIDEntry(
            iid,
            store_url, # should this be the url (str) or the store ( zarr.storage.FSStore)?
            retracted = False, 
            tests_passed=False
        ) 
    bq_client.insert_iid(iid_entry)
    return iid, store_url, test_passed

In [None]:
%load_ext snakeviz

In [None]:
%%snakeviz
results = []
for idx,row in tqdm(df_input.iterrows(),total=len(df_input)):
    iid = row.instance_id
    store_url = row.store
    results.append(test_and_add(iid, store_url, bq))

In [None]:
results = []
for idx,row in tqdm(df_input.iterrows(),total=len(df_input)):
    iid = row.instance_id
    store_url = row.store
    results.append(test_and_add(iid, store_url, bq))