In [1]:
from satsearch import Search
import stackstac, os, requests
from netrc import netrc
from subprocess import Popen
from getpass import getpass
import rasterio
from distributed import LocalCluster,Client
import datetime
import dask.array as dask_array
import dask
from utils import DevNullStore,DiagnosticTimer,total_nthreads,total_ncores,total_workers,get_chunksize

In [2]:
# data = 'hls'
# data = 'sentinel_2a'
data = 'landsat8'
s3 = True

if data == 'sentinel_2a':
    band='B05'
if data == 'hls':
    band='B01'
if data == 'landsat8':
    band='B1'

In [3]:
if data == 'hls':
    #Setup NASA Credentials
    urs = 'urs.earthdata.nasa.gov'    # Earthdata URL to call for authentication
    prompts = ['Enter NASA Earthdata Login Username \n(or create an account at urs.earthdata.nasa.gov): ',
               'Enter NASA Earthdata Login Password: ']
    try:
        netrcDir = os.path.expanduser("~/.netrc")
        netrc(netrcDir).authenticators(urs)[0]
        del netrcDir

    # Below, create a netrc file and prompt user for NASA Earthdata Login Username and Password
    except FileNotFoundError:
        homeDir = os.path.expanduser("~")
        Popen('touch {0}.netrc | chmod og-rw {0}.netrc | echo machine {1} >> {0}.netrc'.format(homeDir + os.sep, urs), shell=True)
        Popen('echo login {} >> {}.netrc'.format(getpass(prompt=prompts[0]), homeDir + os.sep), shell=True)
        Popen('echo password {} >> {}.netrc'.format(getpass(prompt=prompts[1]), homeDir + os.sep), shell=True)
        del homeDir, urs, prompts

In [4]:
base_env = stackstac.DEFAULT_GDAL_ENV.updated(dict(
    GDAL_MAX_RAW_BLOCK_CACHE_SIZE='200000000',
    GDAL_SWATH_SIZE='200000000',
    VSI_CURL_CACHE_SIZE='200000000',
    GDAL_HTTP_UNSAFESSL='YES',
    GDAL_HTTP_COOKIEFILE=os.path.expanduser('~/cookies.txt'),
    GDAL_HTTP_COOKIEJAR=os.path.expanduser('~/cookies.txt'),
))

In [5]:
if s3 and data=='hls':
    #Get NASA Temp AWS Credentials
    s3_cred = requests.get('https://lpdaac.earthdata.nasa.gov/s3credentials').json()
    s3_cred
    
    env = base_env.updated(
        dict(
            AWS_REGION='us-west-2',
            AWS_NO_SIGN_REQUEST='NO',
            AWS_REQUEST_PAYER='REQUESTER',
            AWS_SECRET_ACCESS_KEY=s3_cred['secretAccessKey'],
            AWS_ACCESS_KEY_ID=s3_cred['accessKeyId'],
            AWS_SESSION_TOKEN=s3_cred['sessionToken']
        )
    )

# Default to the StackStac ~.LayeredEnv
if s3==False and data=='hls':
    env = base_env.updated(dict(AWS_NO_SIGN_REQUEST='YES'))

if s3 and (data == 'landsat8' or data == 'sentinel_2a'):
    env = base_env.updated(
        dict(
            AWS_REGION='us-west-2',
            AWS_REQUEST_PAYER='REQUESTER',
            region_name='us-west-2',
            AWS_NO_SIGN_REQUEST='YES',
        )
    )
env

LayeredEnv(
    always={'CPL_VSIL_CURL_ALLOWED_EXTENSIONS': 'tif', 'GDAL_HTTP_MULTIRANGE': 'YES', 'GDAL_HTTP_MERGE_CONSECUTIVE_RANGES': 'YES', 'GDAL_MAX_RAW_BLOCK_CACHE_SIZE': '200000000', 'GDAL_SWATH_SIZE': '200000000', 'VSI_CURL_CACHE_SIZE': '200000000', 'GDAL_HTTP_UNSAFESSL': 'YES', 'GDAL_HTTP_COOKIEFILE': '/home/jovyan/cookies.txt', 'GDAL_HTTP_COOKIEJAR': '/home/jovyan/cookies.txt', 'AWS_REGION': 'us-west-2', 'AWS_REQUEST_PAYER': 'REQUESTER', 'region_name': 'us-west-2', 'AWS_NO_SIGN_REQUEST': 'YES'},
    open={'GDAL_DISABLE_READDIR_ON_OPEN': 'EMPTY_DIR', 'VSI_CACHE': True},
    open_vrt={},
    read={'VSI_CACHE': False},
)

In [6]:
def get_STAC_items(url, collection, dates, bbox):
  results = Search.search(url=url,
                      collections=collection, 
                      datetime=dates,
                      bbox=bbox)

  return(results)

def remap_s3_url(stac):
    for i,entry in enumerate(stac):
        for asset in entry['assets'].keys():
            stac[i]['assets'][asset]['href'] = stac[i]['assets'][asset]['href'].replace('https://lpdaac.earthdata.nasa.gov/',
                                                                                          '/vsis3/')
            stac[i]['assets'][asset]['href'] = stac[i]['assets'][asset]['href'].replace('https://sentinel-cogs.s3.us-west-2.amazonaws.com/',
                                                                                          '/vsis3/sentinel-cogs/')
            stac[i]['assets'][asset]['href'] = stac[i]['assets'][asset]['href'].replace('https://landsat-pds.s3.us-west-2.amazonaws.com/',
                                                                                        '/vsis3/landsat-pds/')
    return(stac)

bbox = [-104.79107047,   40.78311181, -104.67687336,   40.87008987]

if data == 'hls':
    url = 'https://cmr.earthdata.nasa.gov/stac/LPCLOUD/' 
    collection = ['HLSS30.v1.5']#'C1711924822-LPCLOUD' #HLS
    dates = '2020-01-01/2021-02-10'
    stac_items = get_STAC_items(url,collection,dates,','.join(map(str, bbox))).items()
    s_col = stac_items.geojson()['features']
    if s3:
        s_col = remap_s3_url(s_col)
if data == 'sentinel_2a':
    url = 'https://earth-search.aws.element84.com/v0'
    collection = ['sentinel-s2-l2a-cogs']
    dates = '2019-01-20/2022-02-10'
    stac_items = get_STAC_items(url,collection,dates,bbox).items()
    s_col = stac_items.geojson()['features']
    if s3:
        s_col = remap_s3_url(s_col)
if data == 'landsat8':
    url = 'https://earth-search.aws.element84.com/v0'
    collection = ['landsat-8-l1-c1']
    dates = '2013-11-01/2022-02-10'
    stac_items = get_STAC_items(url,collection,dates,bbox).items()
    s_col = stac_items.geojson()['features']
    if s3:
        s_col = remap_s3_url(s_col)
print('Number of Items: ',len(s_col))

Number of Items:  30


In [7]:
with env.open, rasterio.open(s_col[0]['assets']['B7']['href']) as src:
    b= src.bounds
    prof = src.profile
    dtype = prof['dtype']
    nodata = prof['nodata']
    res = src.res
    print(prof)

{'driver': 'GTiff', 'dtype': 'uint16', 'nodata': None, 'width': 7791, 'height': 7911, 'count': 1, 'crs': CRS.from_epsg(32613), 'transform': Affine(30.0, 0.0, 315585.0,
       0.0, -30.0, 4583115.0), 'blockxsize': 512, 'blockysize': 512, 'tiled': True, 'compress': 'deflate', 'interleave': 'band'}


In [8]:
#lab_extension = /user/<username>/proxy/8787/status
cluster = LocalCluster(threads_per_worker=1)
cl = Client(cluster)
cl

0,1
Client  Scheduler: tcp://127.0.0.1:34209  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 8.00 GiB


In [9]:
if data=='landsat8':
    band = ['B1','B2','B3','B4','B5','B6','B7']
else:
    band=[band]
da = stackstac.stack(s_col,dtype=dtype,
                     fill_value=nodata,
                     resolution=res[0],
                     epsg=32613,
                     properties=None,
                     snap_bounds=True,
                     chunksize=-1,
                     assets=band,
                     bounds=list(b),
                     gdal_env=env)
da

Unnamed: 0,Array,Chunk
Bytes,24.11 GiB,117.59 MiB
Shape,"(30, 7, 7912, 7792)","(1, 1, 7912, 7792)"
Count,631 Tasks,210 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 24.11 GiB 117.59 MiB Shape (30, 7, 7912, 7792) (1, 1, 7912, 7792) Count 631 Tasks 210 Chunks Type uint16 numpy.ndarray",30  1  7792  7912  7,

Unnamed: 0,Array,Chunk
Bytes,24.11 GiB,117.59 MiB
Shape,"(30, 7, 7912, 7792)","(1, 1, 7912, 7792)"
Count,631 Tasks,210 Chunks
Type,uint16,numpy.ndarray


In [10]:
%pdb

Automatic pdb calling has been turned ON


In [11]:
diag_timer = DiagnosticTimer()
devnull = DevNullStore()

dat = da.data
chunksize = get_chunksize(dat)
totalsize = dat.nbytes*1e-9

if s3:
    method='s3'
else:
    method='http'

diag_kwargs = dict(nbytes=dat.nbytes,
                   nGBytes=totalsize,
                   chunksize=chunksize,
                   method=method,
                   cloud_source='aws us-west-2',
                   system='aws us-west-2',
                   format='cog')

runtime = datetime.datetime.now().strftime("%Y%m%d_%H%M")
with diag_timer.time(nthreads=total_nthreads(cl),
                     ncores=total_ncores(cl),
                     nworkers=total_workers(cl),
                     **diag_kwargs):
    future = dask_array.store(dat, devnull, lock=False, compute=False)
    dask.compute(future, retries=5)
df = diag_timer.dataframe()
df['throughput_MBps'] = df.nbytes/1e6/df.runtime
df

Unnamed: 0,nthreads,ncores,nworkers,nbytes,nGBytes,chunksize,method,cloud_source,system,format,runtime,throughput_MBps
0,4,4,4,25893127680,25.893128,123300608,s3,aws us-west-2,aws us-west-2,cog,154.046489,168.086451
