In [1]:
import os
import configparser
import json
from functools import partial

import boto3
import numpy as np
import pandas as pd
import rasterio as rio
from rasterio.windows import Window
from pystac_client import Client
import zarr

from typing import List, TypedDict, Tuple

ModuleNotFoundError: No module named 'zarr'

In [None]:
!which python

In [45]:
os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'
os.environ['AWS_REQUEST_PAYER'] = 'requester'

In [46]:
def session() -> rio.session.AWSSession:
    """
    Generate a rasterio AWS Session for reading data
    """
    return rio.session.AWSSession(boto3.Session(), requester_pays=True)

def open_tif(path: str):
    """
    Open a tif file
    """
    with rio.Env(session(), GDAL_DISABLE_READDIR_ON_OPEN='EMPTY_DIR'):
        return rio.open(path)

def read_tif(path: str, band: int = 1, window: Window = None):
    return open_tif(path).read(band, window=window)

def read_ls_data(item, band, window):
    return read_tif(item['assets'][band]['alternate']['s3']['href'], window=window)

def read_obs_chip(item, window, bands):
    return {b: read_ls_data(item, b, window) for b in bands}

In [47]:
class DataStack(TypedDict):
    """
    Container for storing chip data
    """
    h: int
    v: int
    window: Window
    acquired: List[str]
    data: np.ndarray

def sum_squares(arrs: List[np.ndarray]) -> np.ndarray:
    """
    Square and then sum all the values (element wise) in the given arrays
    """
    return np.ma.sum([np.ma.power(a, 2) for a in arrs], axis=0)

def difference(arr1: np.ndarray, arr2: np.ndarray) -> np.ndarray:
    """
    Difference the two given arrays
    """
    return arr1 - arr2

def difference_absolute(arr1: np.ndarray, arr2: np.ndarray) -> np.ndarray:
    """
    Calculate the absolute distance between the values in the two different arrays
    """
    return np.abs(difference(arr1, arr2))

def difference_median(arr: np.ndarray) -> np.ndarray:
    """
    Calculate the absolute difference each value is from the median
    """
    return difference_absolute(arr,
                               np.ma.median(arr, axis=0))
def dstack_idx(idxs: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Takes 2-d index returns from numpy.argmin or numpy.argmax on a stack where axis=0 and turns it into
    a tuple of tuples for indexing back into the 3-d array
    """
    return (idxs,
            np.repeat(np.arange(256).reshape(-1, 1), repeats=256, axis=1),
            np.array(list(range(256)) * 256).reshape(256, 256))

def distance_overall(spectral_stacks: dict) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Return the per-pixel index location for observations that come closest to the overall median value
    """
    #euc_dist = np.ma.sqrt(sum_squares([difference_median(spectral_stacks[k]) for k in spectral_stacks]))
    euc_dist = ((spectral_stacks[k]) for k in spectral_stacks)

    return dstack_idx(np.ma.argmin(euc_dist, axis=0))

def value_mask(arr: np.ndarray, value: int):
    """
    Create a mask where the specified value exists in the given array
    """
    return arr == value

def bitmask(arr: np.ndarray, bit: int) -> np.ndarray:
    """
    Create a boolean mask for where the bit is set in the given array
    """
    bit_val = 1 << bit
    return value_mask(arr & bit_val, bit_val)

def landsat_collection2(qa_arr: np.ndarray) -> np.ndarray:
    """
    Provide a default interpretation of the Landsat Collection 2 pixel quality
    layer for masking

    Reference:
    https://www.usgs.gov/media/files/landsat-8-9-olitirs-collection-2-level-2-data-format-control-book
    """
    return (bitmask(qa_arr, 0) |
             bitmask(qa_arr, 2) |
             bitmask(qa_arr, 3) |
             bitmask(qa_arr, 4) |
             bitmask(qa_arr, 5) |
             bitmask(qa_arr, 7))

def assemble_stacks(items, bands, window):
    out = {b: np.zeros(shape=(len(items), 256, 256), dtype=np.uint16) for b in bands}

    for idx, item in enumerate(items):
        for b in bands:
            lsd = read_ls_data(item, b, window)
            out[b][idx][:lsd.shape[0], :lsd.shape[1]] = lsd
    

   # mask = landsat_collection2(out.pop('qa_pixel'))
    #return {k: np.ma.masked_array(val, dtype=val.dtype, mask=mask)
      #      for k, val in out.items()}
    return out

def median_distance(items, bands, window):
    data = assemble_stacks(items, bands, window)
    
    indxs = distance_overall(data)
    return {k: val[indxs] for k, val in data.items()}
    #return dstack_idx(np.ma.argmin(data, axis=0))
    #return data

def dask_worker(window, items, bands, z_path, key, secret):
    data = median_distance(items, bands, window)
    
    z1 = zarr.open(z_path, 'a', storage_options={'key': key, 'secret': secret})
    # Window(col_off=4, row_off=4, width=2, height=2)
    start_row = window.row_off
    end_row = window.row_off + window.height
    start_col = window.col_off
    end_col = window.col_off + window.width
    
    end_sub_row = 256
    end_sub_col = 256
    
    if end_row > 5000:
        end_row = 5000
        end_sub_row = 5000 - start_row
    if end_col > 5000:
        end_col = 5000
        end_sub_col = 5000 - start_col
    
    z1[:, start_row:end_row, start_col:end_col] = np.stack(list(data.values()))[:, :end_sub_row, :end_sub_col]

In [31]:
h = 16
v = 5
region = 'CU'
#bands = ['blue', 'green', 'red', 'nir08', 'swir16', 'swir22', 'qa_pixel']
#bands = ['diag', 'intr', 'intsm', 'inwam', 'mask', 'shade']
#bands = ['diag']
bands = ['intsm']

#cp = configparser.ConfigParser()
#cp.read(os.path.join(os.environ['HOME'],'.aws','credentials'))
#key = cp['default']['aws_access_key_id']
#secret = cp['default']['aws_secret_access_key']

stac = Client.open('https://landsatlook.usgs.gov/stac-server')

query = stac.search(collections=['landsat-c2l3-dswe'],
                    datetime='1980-01-01/2024-01-01',
                    query={'landsat:grid_horizontal': {'eq': f'{h:02}'},
                           'landsat:grid_vertical': {'eq': f'{v:02}'},
                           'landsat:grid_region': {'eq': region},
                           #'',
                           #'platform': {'eq': 'LANDSAT_8'},
                          }
                   )

stuff = query.get_all_items_as_dict()['features']
# dates = [pd.to_datetime(r['properties']['datetime'], utc=True) for r in stuff]


print(len(stuff))

4405


In [48]:
print(stuff[-1])

{'type': 'Feature', 'stac_version': '1.0.0', 'stac_extensions': ['https://landsat.usgs.gov/stac/landsat-ard-extension/v1.0.0/schema.json', 'https://stac-extensions.github.io/projection/v1.0.0/schema.json', 'https://stac-extensions.github.io/eo/v1.0.0/schema.json', 'https://stac-extensions.github.io/alternate-assets/v1.1.0/schema.json', 'https://stac-extensions.github.io/storage/v1.0.0/schema.json'], 'id': 'LT04_CU_016005_19821118_20210421_02_DSWE', 'description': 'Landsat Collection 2 Level-3 Dynamic Surface Water Extent Product', 'bbox': [-98.13649364738521, 45.58567137749687, -97.9356570419568, 46.05781825562636], 'geometry': {'type': 'Polygon', 'coordinates': [[[-98.13649364738521, 46.05481255042304], [-98.12148664400478, 45.58567137749687], [-98.12110240954793, 45.58567740351473], [-97.9356570419568, 46.05781825562636], [-98.13649364738521, 46.05481255042304]]]}, 'properties': {'datetime': '1982-11-18T16:51:35.3450250Z', 'platform': 'LANDSAT_4', 'instruments': ['TM'], 'landsat:grid

In [49]:
stuff[0].keys()

dict_keys(['type', 'stac_version', 'stac_extensions', 'id', 'description', 'bbox', 'geometry', 'properties', 'assets', 'links', 'collection'])

In [50]:
in_path = stuff[0]['assets']['intsm']['alternate']['s3']['href']

In [51]:
in_path

's3://usgs-landsat-level-3/collection02/DSWE/2023/CU/016/005/LC09_CU_016005_20231105_20231109_02_DSWE/LC09_CU_016005_20231105_20231109_02_INTSM.TIF'

In [38]:
#in_path.replace('s3:/','/vsis3')

'/vsis3/usgs-landsat-level-3/collection02/DSWE/2023/CU/016/005/LC09_CU_016005_20231105_20231109_02_DSWE/LC09_CU_016005_20231105_20231109_02_INTSM.TIF'

In [52]:
open_tif(in_path)

RasterioIOError: '/vsis3/usgs-landsat-level-3/collection02/DSWE/2023/CU/016/005/LC09_CU_016005_20231105_20231109_02_DSWE/LC09_CU_016005_20231105_20231109_02_INTSM.TIF' does not exist in the file system, and is not recognized as a supported dataset name.

In [43]:
in_path = 's3://usgs-landsat-level-3/collection02/DSWE/2022/CU/016/005/LC08_CU_016005_20220101_20220109_02_DSWE/LC08_CU_016005_20220101_20220109_02_INTSM.TIF'

In [44]:
open_tif(in_path)

RasterioIOError: '/vsis3/usgs-landsat-level-3/collection02/DSWE/2022/CU/016/005/LC08_CU_016005_20220101_20220109_02_DSWE/LC08_CU_016005_20220101_20220109_02_INTSM.TIF' does not exist in the file system, and is not recognized as a supported dataset name.

In [20]:
from osgeo import gdal

In [35]:
#gdal.Open(stuff[0]['assets']['intsm']['alternate']['s3']['href'])
gdal.Open(in_path.replace('s3:/','/vsis3'))

ERROR 4: `/vsis3/usgs-landsat-level-3/collection02/DSWE/2022/CU/016/005/LC08_CU_016005_20220101_20220109_02_DSWE/LC08_CU_016005_20220101_20220109_02_INTSM.TIF' does not exist in the file system, and is not recognized as a supported dataset name.


In [6]:
z_path = f's3://chs-pangeo-data-bucket/pdanielson//level3/data/{region}.zarr'

z1 = zarr.open(z_path,
               mode='w',
               shape=(1, 5000, 5000),
               chunks=(1, 256, 256),
               dtype='uint16',
               write_empty_chunks=False,
               fill_value=0,
               storage_options={'key': key, 'secret': secret})

func = partial(dask_worker,
               items=stuff,
               bands=bands,
               z_path=z_path,
               key=key,
               secret=secret)

windows = [Window(col_off=col, row_off=row, width=256, height=256) for col in range(0, 5000, 256) for row in range(0, 5000, 256)]

In [7]:
from dask.distributed import Client

client = Client("tcp://10.12.76.172:37011")
client

0,1
Connection method: Direct,
Dashboard: /user/pdanielson@contractor.usgs.gov/proxy/8787/status,

0,1
Comm: tcp://10.12.76.172:37011,Workers: 0
Dashboard: /user/pdanielson@contractor.usgs.gov/proxy/8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [8]:
res = client.map(func, windows)
yes = client.gather(res)

In [9]:
stuff[0]['properties']

{'datetime': '2015-04-01T18:28:07.9233150Z',
 'platform': 'LANDSAT_8',
 'instruments': ['OLI', 'TIRS'],
 'landsat:grid_horizontal': '03',
 'landsat:grid_vertical': '13',
 'landsat:grid_region': 'CU',
 'landsat:scene_count': 1,
 'eo:cloud_cover': 5.9727,
 'landsat:cloud_shadow_cover': 4.307,
 'landsat:snow_ice_cover': 0,
 'landsat:fill': 52.0756,
 'proj:epsg': None,
 'proj:shape': [5000, 5000],
 'proj:transform': [30, 0, -2115585, 0, -30, 1364805],
 'created': '2022-06-30T14:02:12.501Z',
 'updated': '2022-06-30T14:02:12.501Z'}

In [10]:
plt.imshow(z1[0])

NameError: name 'plt' is not defined

In [12]:
import rasterio as rio
from rasterio import crs

int_meta = {'driver': 'GTiff',
            'dtype': np.uint16,
            'nodata': None,
            'width': 5000,
            'height': 5000,
            'count': 6,
            'crs': crs.CRS.from_epsg(5070),
            'compress': 'lzw',
            'transform': stuff[0]['properties']['proj:transform'],
            'tiled': True,
            'blockxsize': 256,
            'blockysize': 256,
            'predictor': 2}

with rio.Env():
    with rio.open('h03v10-dswe-test4.tif', 'w', **int_meta) as dst:
        for b in range(6):
            print(b)
            dst.write(z1[b], b + 1)

0
1


BoundsCheckError: index out of bounds for dimension with length 1

In [52]:
%%time
window = Window(0, 0, 256, 256)

data = median_distance(stuff, bands, window)

CPU times: user 262 ms, sys: 49.2 ms, total: 311 ms
Wall time: 2.28 s


In [53]:
import matplotlib.pyplot as plt

In [67]:
plt.imshow(dat['intr'])

KeyError: 0