In [1]:
%matplotlib inline

import os
from dask.distributed import progress,wait,Client, LocalCluster
import dask.array as da
import dask
import dask.dataframe as dd
import numpy as np
import pandas as pd
import xarray as xr
import multiprocessing
import h5py
import matplotlib.pyplot as plt
import rioxarray



from sklearn.cluster import MiniBatchKMeans
from dask_ml.wrappers import Incremental
from dask_ml.wrappers import ParallelPostFit
from dask_ml.preprocessing import StandardScaler
from dask_ml.cluster import KMeans

ncores = multiprocessing.cpu_count()
ncores

24

In [2]:
# launch a scheduler and workers locally
cluster = LocalCluster()

In [3]:
# connect to cluster
client = Client(cluster) 

In [4]:
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 6
Total threads: 24,Total memory: 31.21 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:42095,Workers: 6
Dashboard: http://127.0.0.1:8787/status,Total threads: 24
Started: Just now,Total memory: 31.21 GiB

0,1
Comm: tcp://127.0.0.1:36153,Total threads: 4
Dashboard: http://127.0.0.1:36229/status,Memory: 5.20 GiB
Nanny: tcp://127.0.0.1:43613,
Local directory: /home/michael/Work/NEON/dask-worker-space/worker-uyxijycg,Local directory: /home/michael/Work/NEON/dask-worker-space/worker-uyxijycg

0,1
Comm: tcp://127.0.0.1:39393,Total threads: 4
Dashboard: http://127.0.0.1:42043/status,Memory: 5.20 GiB
Nanny: tcp://127.0.0.1:45945,
Local directory: /home/michael/Work/NEON/dask-worker-space/worker-f7kys1pz,Local directory: /home/michael/Work/NEON/dask-worker-space/worker-f7kys1pz

0,1
Comm: tcp://127.0.0.1:44309,Total threads: 4
Dashboard: http://127.0.0.1:44817/status,Memory: 5.20 GiB
Nanny: tcp://127.0.0.1:33135,
Local directory: /home/michael/Work/NEON/dask-worker-space/worker-n93viq3o,Local directory: /home/michael/Work/NEON/dask-worker-space/worker-n93viq3o

0,1
Comm: tcp://127.0.0.1:41985,Total threads: 4
Dashboard: http://127.0.0.1:34851/status,Memory: 5.20 GiB
Nanny: tcp://127.0.0.1:40633,
Local directory: /home/michael/Work/NEON/dask-worker-space/worker-wxuvi5q5,Local directory: /home/michael/Work/NEON/dask-worker-space/worker-wxuvi5q5

0,1
Comm: tcp://127.0.0.1:41297,Total threads: 4
Dashboard: http://127.0.0.1:40741/status,Memory: 5.20 GiB
Nanny: tcp://127.0.0.1:46129,
Local directory: /home/michael/Work/NEON/dask-worker-space/worker-e1tavjx6,Local directory: /home/michael/Work/NEON/dask-worker-space/worker-e1tavjx6

0,1
Comm: tcp://127.0.0.1:36777,Total threads: 4
Dashboard: http://127.0.0.1:46353/status,Memory: 5.20 GiB
Nanny: tcp://127.0.0.1:35223,
Local directory: /home/michael/Work/NEON/dask-worker-space/worker-n_3p1h94,Local directory: /home/michael/Work/NEON/dask-worker-space/worker-n_3p1h94




In [5]:
# path and sitename
site = 'TALL'
data = '/media/data/NEON/TALL/hyperspectral/DP3.30006.001/2021/FullSite/D08/2021_TALL_6/L3/Spectrometer/Reflectance'

# find the filenames
files = [os.path.join(data, f) for f in os.listdir(data) if '.h5' in f]

# list_dataset lists the names of datasets in an hdf5 file
def list_dataset(name,node):
    if isinstance(node, h5py.Dataset):
        print(name)

f.visititems(list_dataset)

In [6]:
def band_list():
    '''excludes bands with H2O or CO2 absorption'''
    good_bands1 = np.linspace(0,188,189).astype(int)
    good_bands3 = np.linspace(211,269,269-211+1).astype(int)
    good_bands5 = np.linspace(316,425,425-316+1).astype(int)
    good_bands = np.hstack([good_bands1,good_bands3,good_bands5])
    bad_bands2 = np.linspace(189,210,210-189+1).astype(int)
    bad_bands4 = np.linspace(270,315,315-270+1).astype(int)
    bad_bands = np.hstack([bad_bands2,bad_bands4])
    return(good_bands,bad_bands)


def read_h5to_xarray_with_spectral_indices(f):
    '''reads'''  

    # open the file
    f = h5py.File(f, 'r')
    
    # seperate out reflectance
    refl = f[site]['Reflectance']

    # get the actual data within reflectance as dask array
    refl_array = da.from_array(np.rot90(refl['Reflectance_Data'], k=3))

    # get wavelength info
    wavelengths = refl['Metadata']['Spectral_Data']['Wavelength']

    # bag geographic info
    epsg = refl['Metadata']['Coordinate_System']['EPSG Code'][()].decode("utf-8")
    epsg = f'EPSG:{epsg}'
    crs_info = refl['Metadata']['Coordinate_System']['Map_Info'][()].decode("utf-8").split(',')
    utm_zone = int(crs_info[7])

    xmin = float(crs_info[3])
    ymax = float(crs_info[4])
    res = (float(crs_info[5]), float(crs_info[6]))

    xmax = xmin + (refl_array.shape[1] * res[0]) 
    ymin = ymax - (refl_array.shape[0] * res[1])

    extent = (xmin, xmax, ymin, ymax) 

    # create array of x center pixel locations in utm coords
    x = np.linspace(xmin, xmax, refl_array.shape[1], endpoint=False)
    x = x + res[0] * 0.5

    # create array of y center pixel locations in utm coords
    y = np.linspace(ymin, ymax, refl_array.shape[0], endpoint=False)
    y = y + res[1] * 0.5
    
    # make dataset
    d_all = xr.DataArray(refl_array, dims=['x', 'y', 'wl'], coords={'x':x, 'y':y, 'wl': wavelengths})
    d_all.name = 'reflectance'
    d_all = d_all.to_dataset()
    
    # assign crs and spatial dims
    d_all.rio.write_crs(epsg, inplace=True)
    
    # find and add scale factor and data ignore value as attrs
    scale_factor = refl['Reflectance_Data'].attrs['Scale_Factor']
    no_data_value = refl['Reflectance_Data'].attrs['Data_Ignore_Value']
    d_all.attrs = {'scale_factor': scale_factor, 'no_data_value': no_data_value}
    
    # select only good bands
    d_all =  d_all.isel(wl=band_list()[0]).chunk({'x':'auto','y':'auto','wl':-1})
    
    # calculate the spectral indices and add to dataset
    ndvi = ((d_all.reflectance.sel(wl=858.6,
                                   method='nearest') -
             d_all.reflectance.sel(wl=648.2,
                                   method='nearest')) /
            (d_all.reflectance.sel(wl=858.6,
                                   method='nearest') +
             d_all.reflectance.sel(wl=648.2,
                                   method='nearest'))
           ).assign_coords(index='ndvi').expand_dims('index')


    cai = ((0.5 *
            (d_all.reflectance.sel(wl=2000,
                                   method='nearest') /
             10000.0 +
             d_all.reflectance.sel(wl=2200,
                                   method='nearest') /
             10000.0)) - d_all.reflectance.sel(wl=2100.0,
                                              method='nearest') /
          10000.0).drop('wl').assign_coords(index='cai').expand_dims('index')


    ndli = ((np.log(1. /
                           (d_all.reflectance.sel(wl=1754.,
                                                  method='nearest') /
                            10000.0)) -
             np.log(1.0 /
                           (d_all.reflectance.sel(wl=1680.0,
                                                  method='nearest') /
                          10000.0))) /
            (np.log(d_all.reflectance.sel(wl=1754.0,
                                                 method='nearest') /
                           10000.0) +
             np.log(d_all.reflectance.sel(wl=1680,
                                               method='nearest') /
                           10000.0))).assign_coords(index='ndli').expand_dims('index')


    mrendvi = ((d_all.reflectance.sel(wl=750.0,
                                      method='nearest') -
                d_all.reflectance.sel(wl=705.0,
                                      method='nearest')) /
               (d_all.reflectance.sel(wl=750.0,
                                      method='nearest') +
                d_all.reflectance.sel(wl=705.0,
                                      method='nearest') -
                (2.0 *
                 d_all.reflectance.sel(wl=445.0,
                                       method='nearest')
                )
               )
              ).drop('wl').assign_coords(index='mrendvi').expand_dims('index')


    sipi = ((d_all.reflectance.sel(wl=800.0,
                                   method='nearest') -
             d_all.reflectance.sel(wl=445.0,
                                   method='nearest')) /
            (d_all.reflectance.sel(wl=800.0,
                                   method='nearest') -
             d_all.reflectance.sel(wl=680.0,
                                   method='nearest')
            )
           ).assign_coords(index='sipi').expand_dims('index')


    ndni = ((np.log(10000.0 /
                           d_all.reflectance.sel(wl=1510.0,
                                                 method='nearest')
                          ) -
             np.log(10000.0 /
                           d_all.reflectance.sel(wl=1680.0,
                                                 method='nearest')
                          )
            ) / 
            (np.log(10000.0 /
                           d_all.reflectance.sel(wl=1510.0,
                                                 method='nearest')
                          )+np.log(10000.0 /
                           d_all.reflectance.sel(wl=1680.0,
                                                 method='nearest')
                                         )
            )
           ).assign_coords(index='ndni').expand_dims('index')


    cri1 = ((1.0 /
             (d_all.reflectance.sel(wl=510.0,
                                    method='nearest') /
              10000.0)
            ) -
            (1.0 /
             (d_all.reflectance.sel(wl=550.0,
                                    method='nearest') /
              10000.0)
            )
           ).assign_coords(index='cri1').expand_dims('index')


    cri2 = ((1.0 /
             (d_all.reflectance.sel(wl=510.0,
                                    method='nearest') / 10000.0)
            ) - 
            (1.0 /
             (d_all.reflectance.sel(wl=700.0,
                                    method='nearest') /
              10000.0)
            )
           ).assign_coords(index='cri2').expand_dims('index')


    d_all['indices'] = xr.concat(
        [ndvi,
         cai, 
         ndli, 
         mrendvi, 
         sipi, 
         cri1, 
         cri2],
        dim='index').chunk((1.0,
                            d_all.reflectance.data.chunksize[0],
                            d_all.reflectance.data.chunksize[1])
                          ).transpose('y','x','index').chunk(('auto','auto',1))

    return d_all

In [7]:

def read_h5to_xarray(f):
    '''reads'''  

    # open the file
    f = h5py.File(f, 'r')
    
    # seperate out reflectance
    refl = f[site]['Reflectance']

    # get the actual data within reflectance as dask array
    refl_array = da.from_array(np.rot90(refl['Reflectance_Data'], k=3))

    # get wavelength info
    wavelengths = refl['Metadata']['Spectral_Data']['Wavelength']

    # bag geographic info
    epsg = refl['Metadata']['Coordinate_System']['EPSG Code'][()].decode("utf-8")
    epsg = f'EPSG:{epsg}'
    crs_info = refl['Metadata']['Coordinate_System']['Map_Info'][()].decode("utf-8").split(',')
    utm_zone = int(crs_info[7])

    xmin = float(crs_info[3])
    ymax = float(crs_info[4])
    res = (float(crs_info[5]), float(crs_info[6]))

    xmax = xmin + (refl_array.shape[1] * res[0]) 
    ymin = ymax - (refl_array.shape[0] * res[1])

    extent = (xmin, xmax, ymin, ymax) 

    # create array of x center pixel locations in utm coords
    x = np.linspace(xmin, xmax, refl_array.shape[1], endpoint=False)
    x = x + res[0] * 0.5

    # create array of y center pixel locations in utm coords
    y = np.linspace(ymin, ymax, refl_array.shape[0], endpoint=False)
    y = y + res[1] * 0.5
    
    # make dataset
    d_all = xr.DataArray(refl_array, dims=['x', 'y', 'wl'], coords={'x':x, 'y':y, 'wl': wavelengths})
    d_all.name = 'reflectance'
    d_all = d_all.to_dataset()
    
    # assign crs and spatial dims
    d_all.rio.write_crs(epsg, inplace=True)
    
    # find and add scale factor and data ignore value as attrs
    scale_factor = refl['Reflectance_Data'].attrs['Scale_Factor']
    no_data_value = refl['Reflectance_Data'].attrs['Data_Ignore_Value']
    d_all.attrs = {'scale_factor': scale_factor, 'no_data_value': no_data_value}
    
    # select only good bands
    d_all =  d_all.isel(wl=band_list()[0]).chunk({'x':'auto','y':'auto','wl':-1})
    
    return d_all

In [8]:
def plot_all_spectral_indices(d_all):
    '''plot the indices'''
    fig, axes = plt.subplots(nrows=4,ncols=2,figsize=(24,30))
    ax = axes.flatten()

    i=-1
    for ind in d_all.coords['index'].values:
        print('Plotting '+ind+' ...')
        i=i+1

        d_all.indices.sel(index=ind).where(d_all.reflectance.isel(wl=22)>-10.).isel(
            x=slice(None,None,1),
            y=slice(None,None,1)
        ).plot(ax=ax[i])

In [12]:
client.restart()
site = 'TALL'

def scabies(f):
    # getpath and basename for files
    base = '_'.join(os.path.basename(f).split('.')[0].split('_')[4:6])
    path = os.path.dirname(f)
    ncdf = os.path.join(path, f'{site}_{base}.nc')
    
    # read h5
    d_all = read_h5to_xarray(f)
    
    # write an NDVI raster to view for sanity if using spectral v of function
    #d_all.indices.sel(index='ndvi').rio.to_raster(f'/home/michael/tmp/{base}_ndvi.tiff')
    
    # uncomment to plot. Warning: it will be a mess down below!
    #plot_all_spectral_indices(d_all)
    
    # save as netcdf
    d_all.to_netcdf(ncdf)
    
    del d_all



for f in files[:7]:
    scabies(f)
    
client.restart()

for f in files[7:14]:
    scabies(f)
    
client.restart()

for f in files[14:21]:
    scabies(f)

client.restart()

for f in files[21:28]:
    scabies(f)

client.restart()

for f in files[28:35]:
    scabies(f)

client.restart()

for f in files[35:42]:
    scabies(f)

client.restart()

for f in files[42:]:
    scabies(f)


In [15]:
_ = [print(f) for f in os.listdir(data) if '.nc' in f]

463000_3645000.nc
TALL_464000_3646000.nc
TALL_464000_3643000.nc
TALL_463000_3648000.nc
TALL_466000_3648000.nc
TALL_463000_3643000.nc
TALL_461000_3644000.nc
TALL_465000_3643000.nc
TALL_466000_3643000.nc
TALL_463000_3647000.nc
TALL_463000_3644000.nc
TALL_465000_3646000.nc
TALL_462000_3646000.nc
TALL_465000_3642000.nc
TALL_466000_3646000.nc
TALL_460000_3644000.nc
TALL_463000_3646000.nc
TALL_464000_3644000.nc
TALL_465000_3647000.nc
TALL_462000_3645000.nc
TALL_461000_3642000.nc
TALL_463000_3645000.nc
TALL_465000_3645000.nc
TALL_461000_3648000.nc
TALL_462000_3643000.nc
TALL_462000_3642000.nc
TALL_464000_3642000.nc
TALL_461000_3645000.nc
TALL_460000_3642000.nc
TALL_465000_3648000.nc
TALL_462000_3644000.nc
TALL_464000_3647000.nc
TALL_463000_3642000.nc
TALL_466000_3645000.nc
TALL_466000_3647000.nc
TALL_466000_3644000.nc
465000_3647000.nc
TALL_465000_3644000.nc
TALL_461000_3646000.nc
TALL_464000_3645000.nc
TALL_460000_3643000.nc
TALL_461000_3643000.nc
460000_3642000.nc
TALL_460000_3647000.nc
TAL

In [12]:
a =listdirpen_dataset(ncdf)

In [12]:
len(files)


49