This works on datarmor: 
'''
qsub -I -q mpi_1 -l walltime=1:00:00
bash
conda activate /home/datawork-lops-iaocea/conda-env/pangeo-fish_1222/
cd /home/datawork-taos-s/test
ipython
'''

## List kerchunk catalogue for intranet usage

In [23]:
names=!find /home/datawork-taos-s/intranet/kerchunk/ref-marc/ |grep json.zst

## Create directories for publishing the kerchunk to internet

In [15]:
import os
paths= [ os.path.dirname(name)   for name in names]
paths=list(set(paths))
def createpath(path):
    newpath=path.replace('intranet', 'public')
    return os.makedirs(newpath, exist_ok=True)
createpath= [ createpath(path)   for path in paths]

## Translate kerchunk catalogue for intranet access to https access

In [24]:
import fsspec
import ujson
import dask



def match_keys(mapping, value):
    for k in mapping:
        if k in value: 
            return k
        
    raise ValueError(f"could not find {value} in mapping") 
    
def match_in_keys(mapping,value):
    try:
        match_keys(mapping,value)
        return True
    except ValueError:
        return False    

def rename_target(refs, renames):
    #from kerchunk.utils import conslidate
    print('in rename_target')
    """Utility to change URLs in a reference set in a predictable way

    For reference sets including templates, this is more easily done by
    using template overrides at access time; but rewriting the references
    and saving a new file means not having to do that every time.

    Parameters
    ----------
    refs: dict
        Reference set
    renames: dict[str, str]
        Mapping from the old URL (including protocol, if this is how they appear
        in the original) to new URL

    Returns
    -------
    dict: the altered reference set, which can be saved
    """
    fs = fsspec.filesystem("reference", fo=refs)  # to produce normalised refs
    refs = fs.references
    out = {}
    for k, v in refs.items():
        if isinstance(v, list) and v[0] in renames:
            out[k] = [renames[v[0]]] + v[1:]
        elif isinstance(v, list) and match_in_keys(renames, v[0]) :
            url = v[0]
            print(url)
            key = match_keys(renames, url)
            new_url = url.replace(key, renames[key])
            out[k] = [new_url] + v[1:]
            #print(new_url)
        else:
            out[k] = v
        #    print('boo')
    return consolidate(out)

def rename_target_files(
    url_in, renames, url_out=None, storage_options_in=None, storage_options_out=None):
    print('in rename_target_files')

    """Perform URL renames on a reference set - read and write from JSON

    Parameters
    ----------
    url_in: str
        Original JSON reference set
    renames: dict
        URL renamings to perform (see ``renate_target``)
    url_out: str | None
        Where to write to. If None, overwrites original
    storage_options_in: dict | None
        passed to fsspec for opening url_in
    storage_options_out: dict | None
        passed to fsspec for opening url_out. If None, storage_options_in is used.

    Returns
    -------
    None
    """
    with fsspec.open(url_in, **(storage_options_in or {})) as f:
        old = ujson.load(f)
    new = rename_target(old, renames)
    if url_out is None:
        url_out = url_in
    if storage_options_out is None:
        storage_options_out = storage_options_in
    with fsspec.open(url_out, mode="wt", **(storage_options_out or {})) as f:
        ujson.dump(new, f)

def consolidate(refs):
    """Turn raw references into output"""
    out = {}
    for k, v in refs.items():
        if isinstance(v, bytes):
            try:
                # easiest way to test if data is ascii
                out[k] = v.decode("ascii")
            except UnicodeDecodeError:
                out[k] = (b"base64:" + base64.b64encode(v)).decode()
        else:
            out[k] = v
    return {"version": 1, "refs": out}

@dask.delayed
def translate(name):
    in_path='file:///home/datawork-taos-s/intranet/kerchunk/ref-marc/'
    out_path='file:///home/datawork-taos-s/public/kerchunk/ref-marc/'
    name=name.replace('/home/datawork-taos-s/intranet/kerchunk/ref-marc/','')
    url_in = in_path+name
    url_out = out_path+name

    renames={'file:///home/ref-marc/':'https://data-dataref.ifremer.fr/marc/'} 


    storage_options_in= {"compression": "zstd"}
    storage_options_out= {"compression": "zstd"}
    return rename_target_files(
        url_in, renames, url_out, storage_options_in=storage_options_in
        , storage_options_out=storage_options_out ) 

In [25]:
#
translated= [ translate(name)   for name in names]

## Start Dask workers to do parallel translation

In [27]:
import dask_hpcconfig
cluster = dask_hpcconfig.cluster("datarmor")
cluster.scale(60)



In [28]:
from distributed import Client

client = Client(cluster)

In [29]:
%%time
ok=dask.compute(*translated)

CPU times: user 1.4 s, sys: 252 ms, total: 1.66 s
Wall time: 28.6 s


In [18]:
names

['./toto/test.json.zstd', './toto/test-Copy1.json.zstd']

In [30]:
cluster.close()

2023-01-23 16:11:28,903 - distributed.client - ERROR - Failed to reconnect to scheduler after 30.00 seconds, closing client
