In [1]:
import os
import os.path
from glob import glob
import rioxarray
from dask.distributed import Client, LocalCluster, Lock
import xarray as xr

In [2]:
# set up the dask client
# docs.das.org/en/latest/deploying-python.html
cluster = LocalCluster()
# client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 3
Total threads: 6,Total memory: 32.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:54669,Workers: 3
Dashboard: http://127.0.0.1:8787/status,Total threads: 6
Started: Just now,Total memory: 32.00 GiB

0,1
Comm: tcp://127.0.0.1:54692,Total threads: 2
Dashboard: http://127.0.0.1:54693/status,Memory: 10.67 GiB
Nanny: tcp://127.0.0.1:54673,
Local directory: D:\Users\gparrish\PycharmProjects\cloud_ssebop\data_prep\dask-worker-space\worker-78g13hih,Local directory: D:\Users\gparrish\PycharmProjects\cloud_ssebop\data_prep\dask-worker-space\worker-78g13hih

0,1
Comm: tcp://127.0.0.1:54689,Total threads: 2
Dashboard: http://127.0.0.1:54690/status,Memory: 10.67 GiB
Nanny: tcp://127.0.0.1:54672,
Local directory: D:\Users\gparrish\PycharmProjects\cloud_ssebop\data_prep\dask-worker-space\worker-xe21p9nf,Local directory: D:\Users\gparrish\PycharmProjects\cloud_ssebop\data_prep\dask-worker-space\worker-xe21p9nf

0,1
Comm: tcp://127.0.0.1:54699,Total threads: 2
Dashboard: http://127.0.0.1:54700/status,Memory: 10.67 GiB
Nanny: tcp://127.0.0.1:54674,
Local directory: D:\Users\gparrish\PycharmProjects\cloud_ssebop\data_prep\dask-worker-space\worker-5zdrp86p,Local directory: D:\Users\gparrish\PycharmProjects\cloud_ssebop\data_prep\dask-worker-space\worker-5zdrp86p


#### you can loose information critical to the geospatial nature of the rioxarray when doing certain xarray functions.

##### https://corteva.github.io/rioxarray/stable/getting_started/manage_information_loss.html

In [3]:
def qa_band_to_raster_masks(cloud_root: str, strong_cloud_root: str, water_root: str,
                            ice_snow_root: str, high_cirrus_root:str,
                            average_cirrus_root:str, qa_raster: str,  dummy_raster: str):
    qa = rioxarray.open_rasterio(qa_raster, masked=True, chunks=(4500, 4500)).squeeze('band', drop=True)
    green = rioxarray.open_rasterio(dummy_raster, masked=True, chunks=(4500, 4500)).squeeze('band', drop=True)
    # Make the bits and sort
    bits = 16                        # Define number of bits
    vals = list(range(0,(2**bits))) # Generate a list of all possible bit values
    bitvals = []
    for i in vals:
        bv = format(vals[i],'b').zfill(bits)             # Convert to binary based on values and # of bits defined above:
        bitvals.append(bv)
    # Make a list of the bits that indicate absence of all clouds, cloud-adjacent and cloud shadow
    # Bitvalues are RIGHT to LEFT, then the bit code is Left to right. Which is crazy.
    strong_clear_bitvals = [] # <- demands absolutely no clouds, cloud adjacent or cirrus clouds of any kind
    clear_bitvals = []  # <- does not count 'low' cirrus amounts as cloud contam --> removed "...and val[-9:-7] != '01'..." for low cirrus
    land_bitvals = []  # <- make a list of the bits that indicate LAND pixels. All water is thrown OUT.
    snow_free_bitvals = []  # <- a list of the bits that include snow free values...
    high_cirrus = []  # <- Cirrus masks for testing.
    average_cirrus = []
    # BIT VALUES
    for val in bitvals:
        # two cirrus instances.
        if val[-9:-7] == '11' or val[-9:-7] == '10':
            high_cirrus.append(val)
        if  val[-14] == '0':
            average_cirrus.append(val)
        # strong clear
        if val[-2:] == '00' and val[-3] == '0' and val[-14] == '0' and val[-9:-7] != '11' and val[-9:-7] != '10':
            strong_clear_bitvals.append(val)
        # clear (see comment above)
        if val[-2:] == '00' and val[-3] == '0' and val[-14] == '0' and val[-9:-7] != '11' and val[-9:-7] != '10':
            clear_bitvals.append(val)
        # land
        if val[-6:-3] != '000' and val[-6:-3] != '010' and val[-6:-3] != '011' and val[-6:-3] != '100'\
                and val[-6:-3] != '110' and val[-6:-3] != '111':
            land_bitvals.append(val)
        # snow-free
        if val[-13] == '0':
            snow_free_bitvals.append(val)

    # converting them to intergers isn't too bad.
    # https://www.delftstack.com/howto/python/convert-binary-to-int-python/
    snow_free_ints = [int(i, 2) for i in snow_free_bitvals]
    land_ints = [int(i, 2) for i in land_bitvals]
    clear_ints = [int(i, 2) for i in clear_bitvals]
    strong_clear_ints = [int(i, 2) for i in strong_clear_bitvals]
    high_cirrus_ints = [int(i, 2) for i in high_cirrus]
    average_cirrus_ints = [int(i, 2) for i in average_cirrus]

    # Now, keeping the attributes of the data array, we make boolean arrays for each prospective mask...
    with xr.set_options(keep_attrs=True):
        cloud_bool = qa.isin(test_elements=clear_ints)
        land_bool = qa.isin(test_elements=land_ints)
        snow_bool = qa.isin(test_elements=snow_free_ints)
        strong_cloud_bool = qa.isin(test_elements=strong_clear_ints)
        high_cirrus_bool = qa.isin(test_elements=high_cirrus_ints)
        average_cirrus_bool = qa.isin(test_elements=average_cirrus_ints)

    cloud_mask = xr.where(cloud_bool, x=1, y=0)
    land_mask = xr.where(land_bool, x=1, y=0)
    snow_mask = xr.where(snow_bool, x=1, y=0)
    strong_cloud_mask = xr.where(strong_cloud_bool, x=1, y=0)
    high_cirrus_mask = xr.where(high_cirrus_bool, x=1, y=0)
    average_cirrus_mask = xr.where(average_cirrus_bool, x=1, y=0)

    # get the data from the filename
    # filename example: 2018001.1_km_8_days_QA.tif
    filename = os.path.split(qa_raster)[-1]
    name_chunks = filename.split('.')[0:2]
    unique_id = '.'.join(name_chunks)
    cloud_name = '_cloud_mask'
    strong_cloud_name = '_cloud_mask_strong'
    water_name = '_water_mask'
    snow_name = '_snow_mask'
    high_cirrus_name = '_high_cirrus'
    average_cirrus_name = '_average_cirrus'

    # cloud_mask.rio.to_raster(os.path.join(cloud_root, f'{unique_id}{cloud_name}.tif'), tiled=True, lock=Lock('rio', client=client))
    land_mask.rio.to_raster(os.path.join(water_root, f'{unique_id}{water_name}.tif'), tiled=True, lock=Lock('rio', client=client))
    snow_mask.rio.to_raster(os.path.join(ice_snow_root, f'{unique_id}{snow_name}.tif'), tiled=True, lock=Lock('rio', client=client))
    # strong_cloud_mask.rio.to_raster(os.path.join(strong_cloud_root, f'{unique_id}{strong_cloud_name}.tif'), tiled=True, lock=Lock('rio', client=client))
    # high_cirrus_mask.rio.to_raster(os.path.join(high_cirrus_root, f'{unique_id}{high_cirrus_name}.tif'), tiled=True, lock=Lock('rio', client=client))
    # average_cirrus_mask.rio.to_raster(os.path.join(average_cirrus_root, f'{unique_id}{average_cirrus_name}.tif'), tiled=True, lock=Lock('rio', client=client))

In [9]:
root = r'W:\Data\WaterMask\VIIRS\QA_masks\2014'
dirs_to_create = ['STRONG_CLOUD_MASK', 'CLOUD_MASK', 'WATER_MASK_QA', 'SNOW_MASK', 'HIGH_CIRRUS', 'AVERAGE_CIRRUS']
out_roots = []
for d in dirs_to_create:
    thing = os.path.join(root, d)
    out_roots.append(thing)
    if not os.path.exists(thing):
        os.mkdir(thing)

# # grab the rasters we need.
qa_2018 = sorted(glob(r'W:\Data\WaterMask\VIIRS\QA\2014\*_QA.tif'))
# turn green into just another qa -> cause it's for a dummy raster ayway
green_2018 = sorted(glob(r'W:\Data\WaterMask\VIIRS\GREEN\2017\*.tif'))
print(out_roots)
print(f'len of qa {len(qa_2018)}, len of green {len(green_2018)}')

['W:\\Data\\WaterMask\\VIIRS\\QA_masks\\2014\\STRONG_CLOUD_MASK', 'W:\\Data\\WaterMask\\VIIRS\\QA_masks\\2014\\CLOUD_MASK', 'W:\\Data\\WaterMask\\VIIRS\\QA_masks\\2014\\WATER_MASK_QA', 'W:\\Data\\WaterMask\\VIIRS\\QA_masks\\2014\\SNOW_MASK', 'W:\\Data\\WaterMask\\VIIRS\\QA_masks\\2014\\HIGH_CIRRUS', 'W:\\Data\\WaterMask\\VIIRS\\QA_masks\\2014\\AVERAGE_CIRRUS']
len of qa 46, len of green 46


In [10]:
# produce 'em!!!
for q, g in zip(qa_2018[-7:], green_2018[-7:]):
    qa_band_to_raster_masks(cloud_root=out_roots[1], strong_cloud_root=out_roots[0],
                            water_root=out_roots[2], ice_snow_root=out_roots[3],
                            high_cirrus_root=out_roots[4], average_cirrus_root=out_roots[5],
                            qa_raster=q, dummy_raster=g)

Task exception was never retrieved
future: <Task finished name='Task-4250' coro=<Client._gather.<locals>.wait() done, defined at D:\programs\Anaconda3\envs\satpy_env\lib\site-packages\distributed\client.py:2003> exception=AllExit()>
Traceback (most recent call last):
  File "D:\programs\Anaconda3\envs\satpy_env\lib\site-packages\distributed\client.py", line 2008, in wait
    raise AllExit()
distributed.client.AllExit
Task exception was never retrieved
future: <Task finished name='Task-4253' coro=<Client._gather.<locals>.wait() done, defined at D:\programs\Anaconda3\envs\satpy_env\lib\site-packages\distributed\client.py:2003> exception=AllExit()>
Traceback (most recent call last):
  File "D:\programs\Anaconda3\envs\satpy_env\lib\site-packages\distributed\client.py", line 2008, in wait
    raise AllExit()
distributed.client.AllExit


MemoryError: Unable to allocate 640. MiB for an array with shape (167841000,) and data type int32