<br>

# ERA5 in GEE - Functions

<br>

This notebook lists all functions that are useful to bring ERA5 reanalysis data into Google Earth Engine.

The functions can be grouped into the following categories:



**[Useful data handling functions](#useful_functions)**
* [createFileList](#create_filelist)
* [createListOfLists](#create_list_of_lists)
* [getEpochTimes](#epoch_times)
* [getEpochTimes_daily](#epoch_times_daily)
* [getEpochTimes_monthly](#epoch_times_monthly)

**[Functions to generate a GeoTiff file with gdal](#generate_geotiff)**
* [initTiff](#initTiff)
* [createTiff](#create_tiff)
* [getScaleFactor](#scale_factor)
* [getOffset](#offset)
* [setSpatialReference](#spatial_ref)

**[Functions to convert NetCDF files to GeoTiffs](#convert_ncs_to_geotiffs)**
* [ncToTiff](#nc_tiff)
* [ncToTiff_hourly](#nc_tiff_hourly)
* [convertFilesToTiff](#convert_to_tiff)

**[Functions to temporally aggregate data](#aggregate)**
* [createDailyFiles](#aggregate_daily)
* [createMonthlyFiles](#aggregate_monthly)

**[Functions to create / update manifests](#manifests)**
* [updateManifest_hourly](#manifest_hourly)
* [updateManifest_daily](#manifest_daily)
* [updateManifest_monthly](#manifest_monthly)
* [manifestToJSON](#manifest_json)
* [createManifestCombined_hourly](#manifest_combined_hourly)
* [createManifestCombined_daily](#manifest_combined_daily)
* [createManifestCombined_monthly](#manifest_combined_monthly)

**[Functions to upload files to Google Cloud Platform](#gcp_upload)**
* [upload_blob](#upload_blob)
* [uploadMonthlyFileToGCP](#upload_gcp_monthly)
* [uploadToGCP](#upload_gcp)


#### Load libraries

In [None]:
import os
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import glob
from osgeo import gdal, osr
import pytz
import re
import json
from google.cloud import storage
import xarray as xr

<hr>

## <a id='useful_functions'></a>Useful data handling functions

### <a id='create_filelist'></a>`createFileList`

In [None]:
def createFileList(directory,file_pattern):
    os.chdir(directory)
    return glob.glob(file_pattern)

### <a id='create_list_of_lists'></a>`createListOfLists`

In [None]:
def createListOfLists(directory_list,aggregation,year):
    fileList=[]
    for i in directory_list:
        os.chdir(i)
        fileList_tmp = createFileList(i,'./tiff/'+aggregation+'/'+year+'/*')
        fileList_tmp.sort()
        fileList.append(fileList_tmp)
        os.chdir('..')
    return(fileList)

### <a id='epoch_times'></a>`getEpochTimes`

In [None]:
def getEpochTimes(file, noOfBands):
    base = datetime(1900,1,1,0,0,0,0).replace(tzinfo=pytz.UTC)
    ls_epochtime = []
    
    for i in range(1,noOfBands+1):
        tmp = file.GetRasterBand(i)
        tmp_time = tmp.GetMetadata()['NETCDF_DIM_time']
        epoch_time = base + timedelta(hours=int(tmp_time))
        ls_epochtime.append(int(epoch_time.timestamp()))
    epoch_time = base + timedelta(hours=int(tmp_time)+1)
    ls_epochtime.append(int(epoch_time.timestamp()))
    return ls_epochtime

### <a id='epoch_times_daily'></a>`getEpochTimes_daily`

In [None]:
def getEpochTimes_daily(year,month,day):
    ls_epochtime = []
    startTime = datetime(year,month,day, tzinfo=pytz.utc)
    endTime = startTime + timedelta(days=1)
    ls_epochtime.append(startTime.timestamp())
    ls_epochtime.append(endTime.timestamp())
    return ls_epochtime

### <a id='epoch_times_monthly'></a>`getEpochTimes_monthly`

In [None]:
def getEpochTimes_monthly(year,month):
    ls_epochtime = []
    startTime = datetime(year,month, 1, tzinfo=pytz.utc)
    endTime = startTime + relativedelta(months=+1)
    ls_epochtime.append(startTime.timestamp())
    ls_epochtime.append(endTime.timestamp())
    return ls_epochtime

<hr>

## <a id='generate_geotiff'></a>Functions to generate a GeoTiff with `gdal`

### <a id='init_tiff'></a>`initTiff`

In [None]:
def initTiff(filename, file, noOfBands):
    outFile = gdal.GetDriverByName('GTiff').Create(filename, file.RasterXSize, file.RasterYSize, noOfBands, gdal.GDT_Float32)
    geotransform = (-180.0, 0.25, 0.0, 90.0, 0.0, -0.25)
    outFile.SetGeoTransform(geotransform)
    return outFile

### <a id='create_tiff'></a>`createTiff`

In [None]:
def createTiff(file, outfile, scale_factor, offset):
    for j in range(1, file.RasterCount+1):
        fileLayer = file.GetRasterBand(j).ReadAsArray().astype('float')
        finalArray = float(offset) + (fileLayer * float(scale_factor))
        finalArray[finalArray<0] = 0.0
        outBand = outfile.GetRasterBand(j)
        outBand.WriteArray(finalArray)
    return outBand

### <a id='scale_factor'></a>`getScaleFactor`

In [None]:
def getScaleFactor(file, parameter):
    return float(file.GetMetadataItem(parameter+"#scale_factor"))

### <a id='offset'></a>`getOffset`

In [None]:
def getOffset(file, parameter):
    return float(file.GetMetadataItem(parameter+"#add_offset"))

### <a id='spatial_ref'></a>`setSpatialReference`

In [None]:
def setSpatialReference(file,EPSGCode):
    srs = osr.SpatialReference()
    srs.ImportFromEPSG(EPSGCode)
    file.SetProjection(srs.ExportToWkt())

<hr>

## <a id='convert_ncs_to_geotiffs'></a>Functions to convert NetCDF files to GeoTiffs

### <a id='nc_tiff'></a>`ncToTiff`

In [None]:
def ncToTiff(file, noOfBands, year,epsgCode,outfile):
    outfile = outfile
    print(outfile)
    ncFile=gdal.Open(file)
    outTiff = initTiff(outfile,ncFile,noOfBands)
    fileLayer = ncFile.GetRasterBand(1).ReadAsArray().astype('float')
    fileLayer[fileLayer<0] = 0.0
    outBand = outTiff.GetRasterBand(1)
    outBand.WriteArray(fileLayer)
    setSpatialReference(outTiff, epsgCode)
    outBand.FlushCache()
    outTiff=None

### <a id='nc_tiff_hourly'></a>`ncToTiff_hourly`

### <a id='convert_to_tiff'></a>`convertFilesToTiff`

In [None]:
def convertFilesToTiff(directory, time_step, parameter, year, epsg):
    fileList = createFileList(directory, './era5_'+parameter+'/nc/'+time_step+'/'+year+'/era5_surface_pressure_1985_06_12*.nc')
    print(fileList)
    print(len(fileList))
 #   if(len(fileList)<365):
 #       return

    fileList.sort()
    for file in fileList:
        tmp = file.split('/')
        print(tmp[5][:-3])      
        outfile = directory+'era5_'+parameter+'/tiff/'+time_step+'/'+year+'/'+str(tmp[5][:-3])+'.tif'
        print(outfile)
        if(time_step!='hourly'):
            ncToTiff(file,1,year,epsg, outfile)
        else:
            ncToTiff_hourly(file,24,year, epsg,outfile,parameter)

<hr>

## <a id='aggregate'></a>Functions to temporally aggregate data

### <a id='aggregate_daily'></a>`createDailyFiles`

In [None]:
def createDailyFiles(directory, parameter, year, aggregation):
#    fileList = createFileList(directory, './era5_'+parameter+'/nc/hourly/'+year+'/era5_'+parameter+'_'+year+'*')
    fileList = createFileList(directory, './era5_'+parameter+'/nc/hourly/'+year+'/era5_'+parameter+'*')
    fileList.sort()
    print(fileList)
    
    for i in range(0,len(fileList)-1):
        if(parameter=='tp'):
            array=xr.open_mfdataset([fileList[i],fileList[i+1]],concat_dim='time', combine='nested')
            print(array)
        else:
            array = xr.open_dataset(fileList[i], mask_and_scale=True, decode_times=True)
            print(array)
        tmp = fileList[i].split('/')
        print(tmp)

        outFileName = directory+'./era5_'+parameter+'/nc/daily/'+year+'/'+tmp[5][:-3]+'_daily_'+aggregation+'.nc'
        
        print(outFileName)
        if(aggregation=='mean'):
            print('mean')
            array.resample(time='1D').mean().to_netcdf(outFileName, mode='w', compute=True)
        elif(aggregation=='sum'):
            print('sum')
            array.resample(time='1D',closed='right').sum().isel(time=1).to_netcdf(outFileName, mode='w', compute=True)
        elif(aggregation=='min'):
            print('min')
            array.resample(time='1D').min().to_netcdf(outFileName, mode='w', compute=True)
        else:
            print('max')
            array.resample(time='1D').max().to_netcdf(outFileName, mode='w', compute=True)

### <a id='aggregate_monthly'></a>`createMonthlyFiles`

In [None]:
def createMonthlyFiles(directory, parameter, year, aggregation):
    month_list = ['01','02','03','04','05','06','07','08','09','10','11','12']
#    month_list = ['08']
    for i in month_list:
        fileList_param = createFileList(directory,'./era5_'+parameter+'/nc/'+year+'/era5_'+parameter+'_'+year+'_'+i+'*')
        fileList_param.sort()
        print(fileList_param)
        os.chdir(directory)
        array_param = xr.open_mfdataset(fileList_param,combine='nested', concat_dim='time')
        print(array_param)
        tmp = fileList_param[0].split('/')
        outFileName_param = directory+'./era5_'+parameter+'/nc/monthly/'+year+'/'+tmp[4][:-6]+'_monthly_'+aggregation+'.nc'
        if(aggregation=='mean'):
            print('mean')
            array_param.resample(time='1M').mean().to_netcdf(outFileName_param, mode='w', compute=True)
        elif(aggregation=='sum'):
            print('sum')
            array_param.resample(time='1M').sum().to_netcdf(outFileName_param, mode='w', compute=True)
        elif(aggregation=='min'):
            print('min')
            array_param.resample(time='1M').min().to_netcdf(outFileName_param, mode='w', compute=True)
        else:
            print('max')
            array_param.resample(time='1M').max().to_netcdf(outFileName_param, mode='w', compute=True)


<hr>

## <a id='manifests'></a>Functions to create / update manifests

### <a id='manifest_hourly'></a>`updateManifest_hourly`

In [None]:
def updateManifest_hourly(directory, eeCollectionName, assetName, startTime, endTime, bandIndex, gs_bucket_list, uris1, uris2, uris3, uris4, uris5, uris6, uris7, uris8, uris9, year,month, day, hour):
    with open(directory+'manifest_structure_hourly.json','r') as f:
        jsonFile = json.load(f)

    jsonFile['name']=eeCollectionName+assetName
    jsonFile['tilesets'][0]['sources'][0]['uris']='gs://'+gs_bucket_list[0]+'/'+uris1
    jsonFile['tilesets'][1]['sources'][0]['uris']='gs://'+gs_bucket_list[1]+'/'+uris2
    jsonFile['tilesets'][2]['sources'][0]['uris']='gs://'+gs_bucket_list[2]+'/'+uris3
    jsonFile['tilesets'][3]['sources'][0]['uris']='gs://'+gs_bucket_list[3]+'/'+uris4
    jsonFile['tilesets'][4]['sources'][0]['uris']='gs://'+gs_bucket_list[4]+'/'+uris5
    jsonFile['tilesets'][5]['sources'][0]['uris']='gs://'+gs_bucket_list[5]+'/'+uris6
    jsonFile['tilesets'][6]['sources'][0]['uris']='gs://'+gs_bucket_list[6]+'/'+uris7
    jsonFile['tilesets'][7]['sources'][0]['uris']='gs://'+gs_bucket_list[7]+'/'+uris8
    jsonFile['tilesets'][8]['sources'][0]['uris']='gs://'+gs_bucket_list[8]+'/'+uris9

    jsonFile['bands'][0]['tileset_band_index']=bandIndex
    jsonFile['bands'][1]['tileset_band_index']=bandIndex
    jsonFile['bands'][2]['tileset_band_index']=bandIndex
    jsonFile['bands'][3]['tileset_band_index']=bandIndex
    jsonFile['bands'][4]['tileset_band_index']=bandIndex
    jsonFile['bands'][5]['tileset_band_index']=bandIndex
    jsonFile['bands'][6]['tileset_band_index']=bandIndex
    jsonFile['bands'][7]['tileset_band_index']=bandIndex
    jsonFile['bands'][8]['tileset_band_index']=bandIndex

    jsonFile['start_time']['seconds']=startTime
    jsonFile['end_time']['seconds']=endTime
    jsonFile['properties']['year']=year
    jsonFile['properties']['month']=month
    jsonFile['properties']['day']=day 
    jsonFile['properties']['hour']=hour
    return jsonFile

### <a id='manifest_daily'></a>`updateManifest_daily`

In [None]:
def updateManifest_daily(directory, eeCollectionName, assetName, startTime, endTime, gs_bucket_list, uris1, uris2, uris3, uris4, uris5, uris6, uris7, uris8, year,month, day):
    with open(directory+'manifest_structure_daily.json','r') as f:
        jsonFile = json.load(f)

    jsonFile['name']=eeCollectionName+assetName
    jsonFile['tilesets'][0]['sources'][0]['uris']='gs://'+gs_bucket_list[0]+'/'+uris1
    jsonFile['tilesets'][1]['sources'][0]['uris']='gs://'+gs_bucket_list[1]+'/'+uris2
    jsonFile['tilesets'][2]['sources'][0]['uris']='gs://'+gs_bucket_list[2]+'/'+uris3
    jsonFile['tilesets'][3]['sources'][0]['uris']='gs://'+gs_bucket_list[3]+'/'+uris4
    jsonFile['tilesets'][4]['sources'][0]['uris']='gs://era5_tp_daily/era5_tp_'+str(year)+'_'+str(month).zfill(2)+'_'+str(day).zfill(2)+'_daily_sum.tif'
    jsonFile['tilesets'][5]['sources'][0]['uris']='gs://'+gs_bucket_list[4]+'/'+uris5
    jsonFile['tilesets'][6]['sources'][0]['uris']='gs://'+gs_bucket_list[5]+'/'+uris6
    jsonFile['tilesets'][7]['sources'][0]['uris']='gs://'+gs_bucket_list[6]+'/'+uris7
    jsonFile['tilesets'][8]['sources'][0]['uris']='gs://'+gs_bucket_list[7]+'/'+uris8
 #   jsonFile['tilesets'][8]['sources'][0]['uris']=gs_bucket_list[8]+uris9    
    jsonFile['start_time']['seconds']=startTime
    jsonFile['end_time']['seconds']=endTime
    jsonFile['properties']['year']=year
    jsonFile['properties']['month']=month
    jsonFile['properties']['day']=day   
    return jsonFile

### <a id='manifest_monthly'></a>`updateManifest_monthly`

In [None]:
def updateManifest_monthly(directory,eeCollectionName, assetName, startTime, endTime, gs_bucket_list, uris1, uris2, uris3, uris4, uris5, uris6, uris7, uris8, year, month):
    with open(directory+'manifest_structure_monthly.json','r') as f:
        jsonFile = json.load(f)

    jsonFile['name']=eeCollectionName+assetName
    jsonFile['tilesets'][0]['sources'][0]['uris']='gs://'+gs_bucket_list[0]+'/'+uris1
    jsonFile['tilesets'][1]['sources'][0]['uris']='gs://'+gs_bucket_list[1]+'/'+uris2
    jsonFile['tilesets'][2]['sources'][0]['uris']='gs://'+gs_bucket_list[2]+'/'+uris3
    jsonFile['tilesets'][3]['sources'][0]['uris']='gs://'+gs_bucket_list[3]+'/'+uris4
    jsonFile['tilesets'][4]['sources'][0]['uris']='gs://era5_tp_monthly/era5_tp_'+str(year)+'_'+str(month).zfill(2)+'_monthly_sum.tif'
    jsonFile['tilesets'][5]['sources'][0]['uris']='gs://'+gs_bucket_list[4]+'/'+uris5
    jsonFile['tilesets'][6]['sources'][0]['uris']='gs://'+gs_bucket_list[5]+'/'+uris6
    jsonFile['tilesets'][7]['sources'][0]['uris']='gs://'+gs_bucket_list[6]+'/'+uris7
    jsonFile['tilesets'][8]['sources'][0]['uris']='gs://'+gs_bucket_list[7]+'/'+uris8
    jsonFile['start_time']['seconds']=startTime
    jsonFile['end_time']['seconds']=endTime
    jsonFile['properties']['year']=year
    jsonFile['properties']['month']=month
    return jsonFile

### <a id='manifest_json'></a>`manifestToJSON`

In [None]:
def manifestToJSON(manifestDict, path,outFile):
    with open(path+outFile+'.json','w') as fp:
        json.dump(manifestDict,fp,indent=4)

### <a id='manifest_combined_hourly'></a>`createManifestCombined_hourly`

In [None]:
ef createManifestCombined_hourly(fileList, ncFileList, year, bucket_list, directory_manifest,directory_outfile):
    print
    for i in range(0,len(fileList[0])):
        print(len(fileList[0]))
        item = list(zip(*fileList))[i]
        print('item', item)
        tmp = re.findall('\d+', item[0])
        print(tmp)
        assetName=tmp[3]+tmp[4]+tmp[5]
        print('assetname', assetName)
        print(item[0])
        ncFile = gdal.Open(ncFileList[i])

        ls_epochtimes = getEpochTimes(ncFile,24)
        print(ls_epochtimes)
        uris_list = []
        for i in item:
             tmp2 = i.split('/')
             uris_list.append(tmp2[4])
        print(uris_list)

        for k in range(0,len(ls_epochtimes)-1):
            print(k)
            hour= str(k).zfill(2)
            manifest = updateManifest_hourly(directory=directory_manifest,
                                  eeCollectionName='projects/earthengine-legacy/assets/projects/ecmwf/era5_hourly/',
                                  assetName=assetName+'T'+hour,
                                  startTime=int(ls_epochtimes[k]),
                                  endTime=int(ls_epochtimes[k+1]),
                                  bandIndex=k,
                                  gs_bucket_list=bucket_list,
                                  uris1=uris_list[0],
                                  uris2=uris_list[1],
                                  uris3=uris_list[2],
                                  uris4=uris_list[3],
                                  uris5=uris_list[4],
                                  uris6=uris_list[5],
                                  uris7=uris_list[6],
                                  uris8=uris_list[7],
                                  uris9=uris_list[8],
                                  year=int(tmp[3]),
                                  month=int(tmp[4]),
                                  day=int(tmp[5]),
                                  hour=int(hour))
            outfile='manifest_'+assetName+hour+'_hourly'
            manifestToJSON(manifest,directory_outfile+year+'/',outfile)


### <a id='manifest_combined_daily'></a>`createManifestCombined_daily`

In [None]:
def createManifestCombined_daily(fileList, year,bucket_list, directory_manifest,directory_outfile):
    item = list(zip(*fileList))[0]
    print(item)
    for i in range(0,len(fileList[0])):
#        print(i)
        item = list(zip(*fileList))[i]
#        print(item)
        tmp = re.findall('\d+', item[0])
        assetName=tmp[3]+tmp[4]+tmp[5]
        ls_epochtimes = getEpochTimes_daily(int(tmp[3]),int(tmp[4]),int(tmp[5]))
        uris_list = []
        for i in item:
            tmp2 = i.split('/')
            uris_list.append(tmp2[4])
        manifest = updateManifest_daily(directory=directory_manifest,
                                        eeCollectionName='projects/earthengine-legacy/assets/projects/ecmwf/era5_daily/',
                                        assetName=assetName,
                                        startTime = int(ls_epochtimes[0]),
                                        endTime = int(ls_epochtimes[1]),
                                        gs_bucket_list = bucket_list,
                                        uris1=uris_list[0],
                                        uris2=uris_list[1],
                                        uris3=uris_list[2],
                                        uris4=uris_list[3],
                                        uris5=uris_list[4],
                                        uris6=uris_list[5],
                                        uris7=uris_list[6],
                                        uris8=uris_list[7],
                                        year=int(tmp[3]),
                                        month=int(tmp[4]),
                                        day=int(tmp[5]))
        outfile='manifest_'+assetName+'_daily'
        manifestToJSON(manifest,directory_outfile+year+'/',outfile)

### <a id='manifest_combined_monthly'></a>`createManifestCombined_monthly`

In [None]:
def createManifestCombined_monthly(fileList, year,bucket_list, directory_manifest,directory_outfile):
    item = list(zip(*fileList))[0]
    print(item)
    for i in range(0,len(fileList[0])):
#        print(i)
        item = list(zip(*fileList))[i]
#        print(item)
        tmp = re.findall('\d+', item[0])
        assetName=tmp[3]+tmp[4]
        ls_epochtimes = getEpochTimes_monthly(int(tmp[3]),int(tmp[4]))
        uris_list = []
        for i in item:
            tmp2 = i.split('/')
            print(tmp2)
            uris_list.append(tmp2[4])
        manifest = updateManifest_monthly(directory=directory_manifest,
                                        eeCollectionName='projects/earthengine-legacy/assets/projects/ecmwf/era5_monthly/',
                                        assetName=assetName,
                                        startTime = int(ls_epochtimes[0]),
                                        endTime = int(ls_epochtimes[1]),
                                        gs_bucket_list = bucket_list,
                                        uris1=uris_list[0],
                                        uris2=uris_list[1],
                                        uris3=uris_list[2],
                                        uris4=uris_list[3],
                                        uris5=uris_list[4],
                                        uris6=uris_list[5],
                                        uris7=uris_list[6],
                                        uris8=uris_list[7],
                                        year=int(tmp[3]),
                                        month=int(tmp[4]))
        outfile='manifest_'+assetName+'_monthly'
        manifestToJSON(manifest,directory_outfile+year+'/',outfile)

<hr>

## <a id='gcp_upload'></a>Functions to upload files to Google Cloud Platform (GCP)

### <a id='upload_blob'></a>`upload_blob`

In [None]:
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    if(blob.exists()):
        print('File {} already exists'.format(destination_blob_name))
        next
    else:
        blob.upload_from_filename(source_file_name)

        print('File {} uploaded to {}.'.format(
                source_file_name,
                destination_blob_name))

### <a id='upload_gcp_monthly'></a>`uploadMonthlyFilesToGCP`

In [None]:
def uploadMonthlyFilesToGCP(directory,parameter,year,bucket):
    fileList = createFileList(directory,'./era5_'+parameter+'/tiff/monthly/'+year+'/*08_monthly*')
    fileList.sort()
    for file in fileList:
        tmp = file.split('/')
        print(tmp)
        destname = tmp[5]
        print(destname)
        upload_blob(bucket,file,destname)

### <a id='upload_gcp'></a>`uploadToGCP`

In [None]:
def uploadToGCP(directory,year,time_step,parameter,bucket):
    fileList = createFileList(directory, 'era5_'+parameter+'/tiff/'+time_step+'/'+year+'/*.tif')
    fileList.sort()

    for file in fileList:
        print(file)
        tmp = file.split('/')
        print(tmp)

        upload_blob(bucket,file,tmp[4])

<hr>

<a rel="license" href="http://creativecommons.org/licenses/by-nc/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by-nc/4.0/88x31.png" align='right' /></a>