In [None]:
import itertools
import json
from pathlib import Path

import zict
from odc.emit import cmr_to_stac
from odc.emit._md import emit_id
from tqdm.auto import tqdm
from utils.tar import tar_doc_stream
from utils.txt import from_njson, to_njson
from zstandard import ZstdCompressor, ZstdDecompressor


def open_zict(fname, mode="a", level=6):
    comp = ZstdCompressor(level)
    decomp = ZstdDecompressor()

    store = zict.Zip(fname, mode=mode)
    return zict.Func(comp.compress, decomp.decompress, store)


def open_zict_json(src, mode="a", level=6):
    if isinstance(src, (str, Path)):
        zstd = open_zict(src, mode=mode, level=level)
    else:
        zstd = src
    return zict.Func(lambda doc: json.dumps(doc, separators=(',', ':')).encode("utf-8"), json.loads, zstd)

def emit_md_stream(fname):
    store = open_zict(fname, "r")
    as_json = open_zict_json(store)
    
    for kid in (k[:-4] for k in store if k.endswith(".cmr")):
        cmr_doc = as_json[kid + ".cmr"]
        dmr_doc = store[kid + ".dmrpp"]
        yield kid, (cmr_doc, dmr_doc)

srcs = {
    "cmr": Path("Data/emit-jsons.tar.gz"),
    "dmrpp": Path("Data/emit-dmrpp.tar.gz"),
}

stac_njson = Path("Data/emit-stac.njson.xz")

In [None]:
emit_src_md_zip = Path("Data/emit-src-md.zip")
if not emit_src_md_zip.exists():
    store = open_zict(emit_src_md_zip, "w")
    cmrs = ((emit_id(p, ".cmr"), doc) for p, doc in tar_doc_stream(srcs["cmr"]))
    dmrpp = ((emit_id(p, ".dmrpp"), doc) for p, doc in tar_doc_stream(srcs["dmrpp"]))

    store.update(tqdm(itertools.chain(cmrs, dmrpp)))
    store.close()
else:
    print(f"Skipping generation of {emit_src_md_zip}, exists")

In [None]:
if stac_njson.exists():
    print(f"Skipping generation of `{stac_njson}`, exists")
else:
    mdd = (cmr_to_stac(cmr, dmr) for _, (cmr, dmr) in emit_md_stream(emit_src_md_zip))
    to_njson(tqdm(mdd), stac_njson)
    

## Load back

In [None]:
import fsspec
import zarr

stacs_path = Path("/tmp/emit.zip")

if not stacs_path.exists():
    stacs = open_zict_json("/tmp/emit.zip", "w")
    stacs.update((doc['id'], doc) for doc in tqdm(from_njson(stac_njson)))
    stacs.close()

stacs = open_zict_json(stacs_path, "r")    

In [None]:
doc = stacs["EMIT_L2A_RFL_001_20230316T045133_2307503_005"]

rfs = fsspec.filesystem("reference", fo=doc['assets']['RFL']['zarr:spec'])
zg = zarr.open_group(rfs.get_mapper(""))
print(zg.tree())
display(zg.tree(expand=True))

In [None]:
import odc.stac
from odc.stac import parse_item
from pystac.item import Item as StacItem


sit = StacItem.from_dict(doc)
pit = parse_item(sit)
pit

In [None]:
pit.safe_geometry('UTM')

In [None]:
gbx = pit.geoboxes()[0]
gbx.footprint(4326, 2).boundingbox

In [None]:
ds, = odc.stac.stac2ds([sit])

ds.metadata_doc

In [None]:
!du -h Data/emit-src-md.zip
!du -h Data/emit-stac.njson.xz
!du -h /tmp/emit.zip
!du -h /tmp/*gz
#!unzip -lv Data/emit-src-md.zip | head -20

------------------------------------------------