In [1]:
import os
import descarteslabs as dl 
from descarteslabs.client.services.tasks import AsyncTasks, as_completed
from descarteslabs.client.services.catalog import Catalog

In [2]:
def rice_sar_stats(year, tile):

    import json
    import numpy as np
    from osgeo import gdal
    import logging
    import time
    from datetime import datetime
    catalog = Catalog()


    def calculate_stats(arr):

        vx_max = arr.max(axis=0)
        vx_min = arr.min(axis=0)
        vx_mean = arr.mean(axis=0)
        vx_med = np.ma.median(arr, axis=0)
        vx_std = arr.std(axis=0)

        stats = [vx_max, vx_min, vx_mean, vx_med, vx_std]

        return stats

    def get_imagery(year, tile, s1_pass, band):

        start = str(year) + '-01-01'
        end = str(year) + '-12-31'

        dlkey = tile['properties']['key']

        # Search for available imagery for each tile
        scenes = dl.metadata.search(
                               products=["sentinel-1:GRD"],
                               start_time=start,
                               end_time=end,
                               geom=json.dumps(tile['geometry']),
                               limit = 1000
                               )

        if s1_pass == 'DESCENDING' or s1_pass == 'ASCENDING':
            img_subset = [item for item in scenes['features'] if item['properties']['pass'] == s1_pass]
            scenes['features'] = img_subset

        ids = [scene['id'] for scene in scenes['features']]
        
        arr, meta = dl.raster.stack(
                                ids,
                                bands=band,
                                data_type='Byte',
                                dltile=dlkey,
                                )
        arr = arr[:,:,:,0]
        return arr, meta[0]

    def catalog_upload(tile, stats_list, meta, year):
        # Upload the ndarray as a single scene in our new product
        product_id  = '7294028cc01114d89a473cf055d29dc5cd5ffe88:sar_img_stats:astrodigital'

        upload = catalog.upload_ndarray(
            stats_list,
            product_id=product_id,
            image_id=tile['properties']['key'],
            raster_meta=meta)
        print(upload)
#         print(upload_result(product_id, upload))

    def run_analysis(year, tile):

        bands = [['vv'], ['vh']]
        passes = ['DESCENDING', 'ASCENDING', 'BOTH']
        all_stats = []

        for band in bands:
            for s1_pass in passes:
                try:
                    imagery, meta = get_imagery(year, tile, s1_pass, band)
                    arr = np.ma.masked_equal(imagery,0)
                    stats = calculate_stats(arr)
                    all_stats.append(stats)
                except:
                    tile_size = tile['properties']['tilesize']
                    nodata = np.zeros((tile_size, tile_size), dtype=np.uint8)
                    all_stats.append([nodata, nodata, nodata, nodata, nodata])
         
        # flatten the list of lists (all_stats) into a single list
        stats_list = [item for sublist in all_stats for item in sublist]
        catalog_upload(tile, np.array(stats_list), meta, year)
    
    run_analysis(year,tile)

In [3]:
year = '2017'
loc = 'singapore'
    
res = 20.0
pad = 0
pixels = 1024 

aoi = dl.places.shape(loc, geom = 'low')
tiles = dl.raster.dltiles_from_shape(res, pixels, pad, aoi['geometry'])

try:
    at = AsyncTasks()
    async_function = at.create_function(
            rice_sar_stats, cpus=1, memory="6Gi", name="sar_img_stats",
            maximum_concurrency = 14,
            image='us.gcr.io/dl-ci-cd/images/tasks/public/py3.7/default:v2018.11.27')
    # Iterate over tiles, submit tasks
    tasks = []
    for tile in tiles['features']:

        print("TILE ID: %s" % tile['properties']['key'])

        t = async_function( 
            year,
            tile,
        ) 
        tasks.append(t)

    print('Done submitting %i tasks. Starting to collect' % len(tasks))

    for task in as_completed(tasks):

        if task.exception:
            print("Failed with exception: ", task.log)
        else:
            task.result
except:
    print ("tasks not submitted!")
    

TILE ID: 1024:0:20.0:48:-8:6
TILE ID: 1024:0:20.0:48:-8:7
TILE ID: 1024:0:20.0:48:-7:6
TILE ID: 1024:0:20.0:48:-7:7
TILE ID: 1024:0:20.0:48:-6:6
TILE ID: 1024:0:20.0:48:-6:7
TILE ID: 1024:0:20.0:48:-5:7
TILE ID: 1024:0:20.0:48:-4:7
Done submitting 8 tasks. Starting to collect


