**Inputs**
* ERA-5 data
* Hazard definition(s)

**Outputs**
* ImageCollection(s) containing hazard magnitudes for each year-model pair
* Stored in users/tedwongwri/dataportal/hist-magnitudes/HAZARDNAME
* **You must manually create the empty ImageCollection** in the dataportal/magnitudes directory before trying to export results

In [1]:
import ee
ee.Authenticate()

Enter verification code: 4/1AX4XfWih0Mg1EOw1UQarexTR24vl0dFlKb0hC8Ok01CPDHKomoQxIyWcYZA

Successfully saved authorization token.


In [2]:
ee.Initialize()

In [6]:
# Define hazards here

def dryspells(yeardata):
    dry_days = yeardata.map(lambda d: d.lt(0.00001))
    return process_runs(dry_days, 5, 'count')


hazards = [
            {'definition': dryspells,
             'variable': 'pr'
            },
]

In [7]:
def process_runs(imgs, runLength, resultType): 
# from Logan Byers
    def doOne(img, data):
   #data = ee.Image(data);

        dataDict = ee.Dictionary(data)

        previousThresholdImage = ee.Image(dataDict.get('previousThresholdImage'))
        currentStreakImage = ee.Image(dataDict.get('currentStreakImage')).uint16()
        streakCountImage = ee.Image(dataDict.get('streakCountImage')).uint16()
        longestStreakImage = ee.Image(dataDict.get('longestStreakImage')).uint16()
        streakAccumulation = ee.Image(dataDict.get('streakAccumulation')).uint16()

        # WHERE yesterday AND today : 1, else 0
        #continueStreakImage = previousThresholdImage.and(ee.Image(img))
        continueStreakImage = previousThresholdImage.multiply(ee.Image(img))

        # WHERE NOT on streak :  yesterday streak length, else 0
        #streakEndedImage = currentStreakImage.multiply(currentStreakImage.and(continueStreakImage.not()))
        streakEndedImage = currentStreakImage.multiply(currentStreakImage.multiply(continueStreakImage.multiply(-1).add(1)))

        # WHERE NOT on streak AND yesterday streak length > length threshold : 1, else 0
        endedStreakExceedsLengthImage = currentStreakImage.multiply(streakEndedImage).gte(runLength)

        # update the state
        accumulator = ee.Dictionary.fromLists([
            'previousThresholdImage',
            'currentStreakImage',
            'streakCountImage',
            'longestStreakImage',
            'streakAccumulation'
          ], [
            # previousThresholdImage --> today's image
            ee.Image(img),
            # currentStreakImage --> today's image PLUS yesterday's streak (where continuing)
            currentStreakImage
              .multiply(continueStreakImage).add(ee.Image(img)),
            # streakCountImage --> PLUS 1 where long streak ended today
            streakCountImage.add(endedStreakExceedsLengthImage),
            # longestStreakImage --> larger of prev and current value
            longestStreakImage.max(currentStreakImage.multiply(continueStreakImage).add(ee.Image(img))),
            # streakAccumulation --> yesterday's accum plus current 1/0, if in streak
            streakAccumulation.add(ee.Image(img).multiply(continueStreakImage))
          ]
        )

        return accumulator
  
    resultImageName = {
        'count': 'streakCountImage',
        'max': 'longestStreakImage',
        'accum': 'streakAccumulation'
    }
  
    streakData = imgs.iterate(
    # iterate over each image in the ImageCollection
    #   accumulate a stateful Dictionary of images
    
        doOne, ee.Dictionary.fromLists([
          'previousThresholdImage', 
          'currentStreakImage',
          'streakCountImage',
          'longestStreakImage',
          'streakAccumulation'
        ], [
          ee.Image.constant(1),
          ee.Image.constant(0).uint16(),
          ee.Image.constant(0).uint16(),
          ee.Image.constant(0).uint16(),
          ee.Image.constant(0).uint16()
        ]
        )
    
    )
    return ee.Image(ee.Dictionary(streakData).get(resultImageName[resultType]))


In [8]:
1000 / 86400

0.011574074074074073

In [9]:
era5_tasmax = ee.ImageCollection("ECMWF/ERA5/DAILY").select('maximum_2m_air_temperature')
era5_tasmin = ee.ImageCollection("ECMWF/ERA5/DAILY").select('minimum_2m_air_temperature')
era5_pr = ee.ImageCollection("ECMWF/ERA5/DAILY").select('total_precipitation')

# ERA5 reports precip in m/d, while NEX-GDDP reports it as kg m^-2 s^-1
# 1kg water spread out over 1m is 1mm thick
# 1d = 86400s
#           1m/d  *  1000mm/m  *  1d/86400s  *  1kg/(mm * m^2) = 0.011574074074074073 m^-2 s^-1
# So to express ERA5 precip in NEx units:
era5_tasmin = era5_tasmin.map(lambda i: i.multiply(0.011574074074074073))

In [10]:
WHOLE_GLOBE = ee.Geometry.Rectangle([-179.999, -90, 180, 90], 'EPSG:4326', False)
START_YEAR = 1980
END_YEAR = 2019

In [12]:
for hazard in hazards:
    for year in range(START_YEAR, END_YEAR + 1):
        dataset = {'tasmax': era5_tasmax, 'tasmin': era5_tasmin, 'pr': era5_pr}[hazard['variable']]
        data = dataset.filterDate(str(year) + '-01-01', str(year) + '-12-31')
        img = hazard['definition'](data)
        task = ee.batch.Export.image.toAsset(**{
          'image': img.rename(hazard['definition'].__name__).set('model', 'era5').set('year', year),
          'description': '{0}_{1}_{2}'.format(hazard['definition'].__name__, str(year), 'ERA5'),
          'assetId': 'users/tedwongwri/dataportal/hist-magnitudes/{0}/{1}_{2}'.format(hazard['definition'].__name__, str(year), 'ERA5'),
          'region': WHOLE_GLOBE,
          'crs': 'EPSG:4326',
          'dimensions': '1440x720'
        })
        task.start()