In [None]:
import xarray as xr
import numpy as np
import dask
import dask.array as da
import distributed
from dask.distributed import wait as dask_wait
import os
from IPython.display import display
from IPython.display import Image

In [None]:
from datacube.utils.rio import (
    configure_s3_access,
)

from kk.gmc2 import (
    keep_good_only, 
    xr_geomedian, 
    to_f32,
    from_float,
    reshape_for_geomedian,
    c2_geomedian,
    get_paths,
    mk_yaml,
    chunked_persist,
    save_to_s3,
)
from odc.index import bin_dataset_stream


In [None]:
client = distributed.Client('tcp://10.0.5.26:8786', direct_to_workers=False)
display(client)

if False:
    creds = configure_s3_access(region_name='auto', client=client)
    creds.access_key

creds = configure_s3_access(region_name='auto', 
                            client=None)
creds.access_key

In [None]:
from datacube.utils.cog import write_cog, to_cog
from datacube.utils.dask import save_blob_to_s3
s3_upload_prefix='s3://deafrica-staging-west/kk-temp/gm-2018/'
output_product = 'ga_ls8c_gm_2_annual'

cog_opts = dict(overview_levels=[4,8,16], 
                overview_resampling='average', 
                zlevel=9)


In [None]:
from datacube.storage import measurement_paths
import datacube
from datacube.storage import masking
from kk.wofs_summary import mk_africa_albers_gs

gs = mk_africa_albers_gs()

scale, offset = (2.75e-5, -0.2)
year = 2018
chunk_sz = 5000//2

dc = datacube.Datacube()

In [None]:
%%time
dss_all = dc.find_datasets_lazy(product='usgs_ls8c_level2_2', 
                                time=str(year))

bins = bin_dataset_stream(gs, dss_all, persist=lambda ds: ds)
tasks = sorted(bins.values(), key=lambda s: s.idx[::-1])

In [None]:
task = tasks[0]
task.idx, len(task.dss), task.geobox

In [None]:
def task_proc(tasks):
    for task in tasks:
        ss = c2_geomedian(dc, task.dss, 
                          chunk_sz=chunk_sz, 
                          geobox=task.geobox,
                          min_good_obs=3)
        gm = ss.gm
        yaml_fut = save_to_s3(gm, task.idx, year,
                              grid=gs, 
                              prefix=s3_upload_prefix, 
                              dst_product=output_product, 
                              cog_params=cog_opts, creds=creds)
        yield yaml_fut
        

```python
ss = c2_geomedian(dc, dss, chunk_sz=chunk_sz, geobox=geobox,
                  min_good_obs=3)
gm, gm_array, ok_pix = ss.gm, ss.gm_array, ss.ok_pix
yaml_fut = save_to_s3(gm, tidx, year,
                      grid=gs, 
                      prefix=s3_upload_prefix, 
                      dst_product=output_product, 
                      cog_params=cog_opts, creds=creds)
```

In [None]:
import time
failures = []

t0 = time.time()

for i, f in enumerate(task_proc(tasks)):
    t_start = time.time()
    try:
        print(f"Processing {i} {tasks[i].idx} ", end='')
        path, ok = client.compute(f).result()
        t_finish = time.time()
        if ok:
            print(f"OK {t_finish-t_start:.3f} {t_finish-t0:.2f}")
            print(path)
        else:
            raise ValueError("not ok save")
    except:
        print(f"Failed: {i} {tasks[i].idx}")
        failures.append(tasks[i])