In [1]:
import netCDF4
import h5netcdf

In [2]:
import os

import geowombat as gw
from geowombat.errors import logger
from geowombat.backends import Cluster

import numpy as np

import distributed
import dask
import dask.array as da
from dask import is_dask_collection
from dask.base import tokenize
from dask.distributed import progress

import xarray as xr

import rasterio as rio
from rasterio.windows import Window
from rasterio.warp import calculate_default_transform, aligned_target
from rasterio.crs import CRS
# from dask.distributed import Client, LocalCluster

Using TensorFlow backend.


In [3]:
from geowombat.data import rgbn, rgbn_suba, rgbn_subb

In [None]:
class RasterioDaskWriter(object):

    """
    ``Rasterio`` wrapper to allow ``dask.array.store`` to do window saving.
    """

    def __init__(self, filename, **kwargs):

        self.filename = filename
        self.kwargs = kwargs

    def __setitem__(self, key, item):

        """Put the data chunk in the image"""

        if len(key) == 3:

            index_range, y, x = key
            indexes = list(range(index_range.start + 1, index_range.stop + 1, index_range.step or 1))

        else:

            indexes = 1
            y, x = key

        with rio.open(self.filename, mode='r+', **self.kwargs) as dst:

            dst.write(item,
                      window=Window(col_off=x.start, row_off=y.start, width=x.stop - x.start, height=y.stop - y.start),
                      indexes=indexes)

    def __enter__(self):

        # Create the output file
        with rio.open(self.filename, mode='w', **self.kwargs) as dst:
            pass

        return self

    def __exit__(self, exc_type, exc_value, traceback):
        pass


def to_raster(data,
              filename,
              client=None,
              scheduler_address=None,
              verbose=0,
              overwrite=False,
              n_jobs=1,
              gdal_cache=512,
              **kwargs):

    """
    Writes a ``dask`` array to file

    Args:
        data (DataArray): The ``xarray.DataArray`` to write.
        filename (str): The output file name to write to.
        verbose (Optional[int]): The verbosity level.
        overwrite (Optional[bool]): Whether to overwrite an existing file.
        n_jobs (Optional[str]): The number of parallel chunks to write.
        gdal_cache (Optional[int]): The ``GDAL`` cache size (in MB).
        kwargs (Optional[dict]): Additional keyword arguments to pass to ``rasterio.write``.

    Returns:
        ``dask.delayed`` object
    """

    if overwrite:

        if os.path.isfile(filename):
            os.remove(filename)

    if not is_dask_collection(data.data):
        logger.exception('  The data should be a dask array.')
        
    with rio.Env(GDAL_CACHEMAX=gdal_cache):

        with RasterioDaskWriter(filename, **kwargs) as dst:

            res = da.store(da.squeeze(data.data), dst, lock=False, compute=False)

            if verbose > 0:
                logger.info('  Writing data to file ...')

            if scheduler_address:

                if verbose > 0:
                    logger.info('  Sending delayed futures to the distributed client ...')

                #client = distributed.Client(scheduler_address)
                #client.persist(res)
                #progress(x)

            else:

                res.compute(num_workers=n_jobs)

                if verbose > 0:
                    logger.info('  Finished writing data to file.')
                 
    client = distributed.Client(scheduler_address)
    client.persist(res)
    return res

# Quickbird test

In [None]:
chunksize = 256
out_chunksize = 256
outfile = '/home/jcgr/Documents/scripts/Python/git_repos/geowombat/data/write_test.tif'

In [None]:
# from dask.distributed import LocalCluster
# LocalCluster?

In [None]:
cluster = Cluster(n_workers=4,
                  threads_per_worker=4,
                  scheduler_port=0,
                  processes=False)

cluster.start()

cluster.client

In [None]:
def linear_transform():
    
    coeffs = xr.DataArray(data=np.array([1.1, 1.3, 2.1, 0.8], dtype='float64'),
                          coords={'band': ['blue', 'green', 'red', 'nir']},
                          dims='band')
    
    return coeffs

In [None]:
%%time

if os.path.isfile(outfile):
    os.remove(outfile)
    
with gw.config.update(sensor='quickbird', scale_factor=0.0001):        

    with gw.open(rgbn, 
                 band_names=['blue', 'green', 'red', 'nir'], 
                 chunks=chunksize) as ds:
        
        coeffs = linear_transform()

        dss = ds * coeffs
        
#         ndvi = ds.gw.norm_diff('red', 'nir')
#         evi2 = ds.gw.evi2()
#         dss = ndvi.sel(band='norm-diff') / evi2.sel(band='evi2')
#         dss = dss.expand_dims(dim='band')
        dss.attrs = ds.attrs
        #dss = ds.gw.tasseled_cap()
        
        dss.gw.to_raster(outfile,
                         verbose=1,
                         overwrite=True,
                         n_jobs=4)
                         #client=cluster.client)

#         res = to_raster(dss,
#                   outfile,
#                   overwrite=True,
#                   verbose=1,
#                   scheduler_address=cluster.cluster.scheduler_address,
#                   crs=dss.crs,
#                   transform=dss.transform,
#                   width=dss.gw.ncols,
#                   height=dss.gw.nrows,
#                   count=dss.gw.nbands,
#                   driver='GTiff',
#                   sharing=False,
#                   dtype=dss.data.dtype,
#                   nodata=None,
#                   tiled=True,
#                   blockxsize=512,
#                   blockysize=512)

#         dss = dss.to_dataset(dim='band')

        #print(dss)

#         print(dss[['brightness', 'greenness', 'wetness']])

#         encoding_dict = {'brightness': {'zlib': True, 'complevel': 5, 'chunksizes': (512, 512), 'dtype': 'float64'},
#                          'greenness': {'zlib': True, 'complevel': 5, 'chunksizes': (512, 512), 'dtype': 'float64'},
#                          'wetness': {'zlib': True, 'complevel': 5, 'chunksizes': (512, 512), 'dtype': 'float64'}}

#         dss.to_netcdf(path=outfile.replace('.tif', '.nc'),
#                       mode='w',
#                       format='NETCDF4',
#                       encoding=encoding_dict,
#                       engine='netcdf4',
#                       compute=False)

#         dss.gw.to_raster(outfile,
#                          client=cluster.client,
#                          n_jobs=8,
#                          verbose=1,
#                          overwrite=True,
#                          dtype='uint8',
#                          nodata=None,
#                          tiled=True,
#                          blockxsize=out_chunksize,
#                          blockysize=out_chunksize)

In [None]:
res

In [None]:
cluster.client.persist(res)

In [None]:
cluster.stop()

# Southern Cone test

In [4]:
# from dask.distributed import Client, LocalCluster
# LocalCluster?

In [8]:
cluster = Cluster(n_workers=4,
                  threads_per_worker=2,
                  scheduler_port=0,
                  processes=False)
                  #scheduler_port=8787,
                  #dashboard_address='localhost:8787',
                  #diagnostics_port='localhost:8787')

cluster.start()

cluster.client

# print(cluster.cluster.scheduler)

0,1
Client  Scheduler: inproc://172.16.11.249/3112/18  Dashboard: http://172.16.11.249/3112/18:8787/status,Cluster  Workers: 8  Cores: 16  Memory: 16.54 GB


In [9]:
chunksize = 512
out_chunksize = 512
outfile = '/media/jcgr/data/imagery/temp/write_test.tif'

In [None]:
# def get_coeffs():
    
#     coeffs = xr.DataArray(data=np.array([1.1, 1.3, 2.1], dtype='float64'),
#                           coords={'band': ['blue', 'green', 'red']},
#                           dims='band')
    
#     return coeffs

In [17]:
%%time

if os.path.isfile(outfile):
    os.remove(outfile)
    
#with gw.config.update(ref_res=(1000, 1000)):

# with dask.config.set(scheduler='threads', num_workers=1):

filename = '/media/jcgr/data/imagery/temp/sc_stack.tif'

with gw.open(filename,
             band_names=['blue', 'green', 'red'], 
             chunks=chunksize) as ds:
    
    ds = ds.to_dataset(dim='band')
    
    #coeffs = get_coeffs()
    
    #dss = (ds.sel(band='blue')*1.1 + ds.sel(band='green')*1.3 + ds.sel(band='red')*2.1).expand_dims(dim='band')
    
    dss_blue = ds['blue']*1.1 + 1.1
    dss_green = ds['green']*1.3 + 0.3
    dss_red = ds['red']*2.1 + 0.8
#     dss_blue = ds.sel(band='blue')*1.1 + 1.1
#     dss_green = ds.sel(band='green')*1.3 + 0.3
#     dss_red = ds.sel(band='red')*2.1 + 0.7
    
    dss = (dss_blue + dss_green + dss_red)#.expand_dims(dim='band')
    dss.attrs = ds.attrs
    
    #dss = ds.sel(time=1).where(ds.sel(time=1) == 1).astype('uint8')    

    dss.gw.to_raster(outfile,
                     verbose=1,
                     overwrite=True,
                     n_jobs=16)
    
#     res = to_raster(dss,
#                   outfile,
#                   overwrite=True,
#                   verbose=1,
#                   scheduler_address=cluster.cluster.scheduler_address,
#                   crs=dss.crs,
#                   transform=dss.transform,
#                   width=dss.gw.ncols,
#                   height=dss.gw.nrows,
#                   count=dss.gw.nbands,
#                   driver='GTiff',
#                   sharing=False,
#                   dtype=dss.data.dtype,
#                   nodata=None,
#                   tiled=True,
#                   blockxsize=out_chunksize,
#                   blockysize=out_chunksize) 

#         dss = dss.to_dataset(dim='band')

#         encoding_dict = {'class': {'zlib': True, 'complevel': 5, 'chunksizes': (1024, 1024), 'dtype': 'uint8'}}

#         dss.to_netcdf(path=outfile.replace('.tif', '.nc'),
#                       mode='w',
#                       format='NETCDF4',
#                       encoding=encoding_dict,
#                       engine='netcdf4',
#                       compute=False)

#     dss.gw.to_raster(outfile,
#                      client=cluster.client,
#                      n_jobs=8,
#                      verbose=1,
#                      overwrite=True,
#                      dtype='uint8',
#                      nodata=None,
#                      tiled=True,
#                      blockxsize=out_chunksize,
#                      blockysize=out_chunksize)

13:09:58:INFO:622:io.to_raster:  Writing data to file ...
13:22:17:INFO:638:io.to_raster:  Finished writing data to file.


CPU times: user 5min 6s, sys: 1min 26s, total: 6min 32s
Wall time: 12min 20s




In [None]:
# x = cluster.client.persist(res)
# progress(x)

In [7]:
cluster.stop()

---

In [None]:
def n_rows_cols(pixel_index, block_size, rows_cols):
    return block_size if (pixel_index + block_size) < rows_cols else rows_cols - pixel_index

In [None]:
@dask.delayed
def write_func(output, outfile, out_window, n_bands):
    
    out_indexes = 1 if n_bands == 1 else np.arange(1, n_bands + 1)
    
    with rio.Env(gdal_cache=512):

        with rio.open(outfile,
                      mode='r+', 
                      sharing=False) as dst:    

            dst.write(np.squeeze(output), window=out_window, indexes=out_indexes)  

In [None]:
def gen_windows(rows, cols, chunksize):

    windows = list()
    
    for i in range(0, rows, chunksize):
        nrows = n_rows_cols(i, chunksize, rows)
        for j in range(0, cols, chunksize):
            ncols = n_rows_cols(j, chunksize, cols)

            windows.append(Window(row_off=i, col_off=j, height=nrows, width=ncols))
            
    return windows

In [None]:
def create_file(outfile, ds, out_dtype, out_count, out_chunksize):
    
    if os.path.isfile(outfile):
        os.remove(outfile)
        
    with rio.Env(gdal_cache=512):

        with rio.open(outfile,
                      mode='w', 
                      height=ds.gw.nrows, 
                      width=ds.gw.ncols,
                      count=out_count, 
                      dtype=out_dtype, 
                      driver='GTiff',
                      crs=ds.crs, 
                      transform=ds.transform, 
                      sharing=False, 
                      tiled=True,
                      blockxsize=out_chunksize, 
                      blockysize=out_chunksize) as dst:
            pass            

In [None]:
# def feature_gen(feature):
#     yield features

In [None]:
def gen_futures(windows, data, n_bands):
    
    futures = list()
    
    for w in windows:
        
        futures.append(write_func(data[:, 
                                       w.row_off:w.row_off+w.height,
                                       w.col_off:w.col_off+w.width].data, 
                                  outfile, 
                                  w, 
                                  n_bands))
        
    return futures

In [None]:
cluster = Cluster(n_workers=8,
                  threads_per_worker=2,
                  scheduler_port=0,
                  processes=False)

cluster.start()

cluster.client

In [None]:
# def my_func(block, dummy):
#     return np.uint8(np.where(np.squeeze(block) == 1, 1, 0))

In [None]:
chunksize = 1024
out_chunksize = 512
out_count = 1
out_indexes = 1
outfile = '/home/jcgr/Documents/scripts/Python/git_repos/geowombat/data/test_sc.tif'

In [None]:
# gw.apply('/media/jcgr/data/imagery/southern_cone/predictions/mosaics/sch1_2017_2018.tif',
#          '/home/jcgr/Documents/scripts/Python/git_repos/geowombat/data/test_sc.tif',
#          my_func,
#          args=(None,),
#          n_jobs=8,
#          overwrite=False,
#          count=1,
#          dtype='uint8',
#          nodata=0,
#          compress='lzw',
#          tiled=True,
#          blockxsize=512,
#          blockysize=512)

In [None]:
# with gw.open('/media/jcgr/data/imagery/southern_cone/predictions/mosaics/sch1_2017_2018.tif', chunks=chunkszie) as ds:
    
#     dss = ds.where(ds == 1).astype('uint8')
#     dss.attrs = ds.attrs
    
#     dss.gw.to_raster(outfile,
#                      n_jobs=8,
#                      verbose=1,
#                      overwrite=True,
#                      row_chunks=1024,
#                      col_chunks=1024,
#                      dtype='uint8',
#                      nodata=0,
#                      compress='lzw',
#                      tiled=True,
#                      blockxsize=512,
#                      blockysize=512)

In [None]:
out_count = 1
out_indexes = 1

with gw.open('/media/jcgr/data/imagery/southern_cone/predictions/mosaics/sch1_2017_2018.tif', chunks=chunksize) as ds:
    
    #dss = ds.where(ds == 1).astype('uint8')
    
    windows = gen_windows(ds.gw.nrows, ds.gw.ncols, chunksize)
    
    create_file(outfile, ds, 'uint8', out_count, out_chunksize)
    
    futures = gen_futures(windows, ds, out_count)

In [None]:
x = cluster.client.persist(futures)
progress(x)

In [None]:
cluster.stop()

In [None]:
chunksize = 256
out_count = 1
out_indexes = 1#np.arange(1, 5)

outfile = '/home/jcgr/Documents/scripts/Python/git_repos/geowombat/data/test2.tif'

if os.path.isfile(outfile):
    os.remove(outfile)

        
with gw.config.update(sensor='quickbird', scale_factor=0.0001):        

    with gw.open([rgbn, rgbn, rgbn, rgbn, rgbn, rgbn, rgbn, rgbn], 
                 band_names=['blue', 'green', 'red', 'nir'], 
                 chunks=chunksize) as ds:

        #ndvi = ds.gw.ndvi()
        evi2 = ds.gw.evi2()
        dss = evi2.mean(dim='time')
        #dss = ndvi / evi2
#         dss = ds.gw.tasseled_cap()
#         dss = dss.mean(dim='band')
#         dss = dss * 10.0
#         dss = dss / 2.3
        dss.attrs = ds.attrs
        print(dss)

        with rio.open(outfile,
                      mode='w', 
                      height=ds.gw.nrows, width=ds.gw.ncols,
                      count=out_count, dtype='float64', driver='GTiff',
                      crs=ds.crs, transform=ds.transform, 
                      sharing=False, 
                      tiled=True,
                      blockxsize=chunksize, 
                      blockysize=chunksize) as dst:
            pass



                futures.append(write_func(dss[:, i:i+chunksize, j:j+chunksize].squeeze().data, 
                                          outfile, w, out_indexes))

In [None]:
# dss.data.visualize()

In [None]:
x = cluster.client.persist(futures)
progress(x)

In [None]:
cluster.stop()

In [None]:
chunksize = 64

gw.apply(rgbn_suba,
         '/home/jcgr/Documents/scripts/Python/git_repos/geowombat/data/test.tif',
         dummy_func,
         args=(None,),
         n_jobs=4,
         count=4,
         blockxsize=chunksize,
         blockysize=chunksize)

# with gw.open(rgbn_suba, chunks=chunksize) as ds:
#     ds.gw.to_raster('/home/jcgr/Documents/scripts/Python/git_repos/geowombat/data/test.tif',
#                     n_jobs=4,
#                     tiled=True)

In [None]:
writer.compute()

In [None]:
res = cluster.client.persist(writer)

In [None]:
# res.compute()

In [None]:
cluster.stop()

In [None]:
# writer.visualize()

In [None]:
dss.data.visualize()

In [None]:
with gw.open(rgbn_suba, band_names=['b', 'g', 'r', 'n']) as ds:
    ds.gw.imshow(band_names=['b', 'g', 'r'], flip=True, mask=True, nodata=0, robust=True)

In [None]:
with gw.open(rgbn_subb, band_names=['b', 'g', 'r', 'n']) as ds:
    ds.gw.imshow(band_names=['b', 'g', 'r'], flip=True, mask=True, nodata=0, robust=True)

In [None]:
with gw.open([rgbn_suba, rgbn_subb], band_names=['b', 'g', 'r', 'n'], mosaic=True) as ds:
    ds.gw.imshow(band_names=['b', 'g', 'r'], flip=True, mask=True, nodata=0, robust=True)

In [None]:
with gw.open([rgbn_suba, rgbn_subb, rgbn_subb], 
             time_names=[1, 1, 2], 
             band_names=['b', 'g', 'r', 'n']) as ds:
    
    ds.sel(time=1).gw.imshow(band_names=['b', 'g', 'r'], flip=True, mask=True, nodata=0, robust=True)

In [None]:
with rio.open(rgbn) as src:
    
    transform, width, height = calculate_default_transform(src.crs,
                                                           CRS.from_epsg(4326),
                                                           src.width,
                                                           src.height,
                                                           *src.bounds,
                                                           dst_width=src.width,
                                                           dst_height=src.height)
    
    print(width, height)
    res = (transform[0], -transform[4])
    
    print(res)
    
    dst_transform, dst_width, dst_height = aligned_target(transform, width, height, res)
    
    print(dst_transform)
    print(dst_width, dst_height)

In [None]:
with gw.config.update(sensor='rgbn', ref_res=(5, 5)):

    with gw.open(rgbn, band_names=['b', 'g', 'r', 'n'], resampling='cubic') as ds:

        dss = ds.gw.moving(stat='max', n_jobs=8)

        dss.gw.imshow(band_names=['g', 'r', 'n'], flip=True, mask=True, nodata=0, robust=True)

In [None]:
with gw.config.update(sensor='rgbn', ref_res=(5, 5)):

    with gw.open(rgbn, 
                 band_names=['b', 'g', 'r', 'n'],
                 resampling='cubic') as ds:
        
        print(ds)
        ds.gw.imshow(band_names=['b', 'g', 'r'], flip=True, mask=True, nodata=0, robust=True)