In [13]:
from google.cloud import storage
import os
import netCDF4
import numpy as np
import numpy.ma as ma
from dask import delayed
import dask.array as da
from dask.distributed import Client
import glob
import tempfile
import subprocess
import datetime, time
from urllib import request
from multiprocessing import Pool
import json
import gc
import boto3
import botocore

storage_client = storage.Client.from_service_account_json('/home/jovyan/work/credentials.json')
bucket = storage_client.get_bucket('nex-gddp')

loca_bucket = 'nasanex'
base_key_path = 'LOCA'
base_url = 'ftp://gdo-dcp.ucllnl.org/pub/dcp/archive/cmip5/loca/LOCA_2016-04-02/'
all_models = ["ACCESS1-0","ACCESS1-3","CCSM4","CESM1-BGC","CESM1-CAM5","CMCC-CM","CMCC-CMS","CNRM-CM5","CSIRO-Mk3-6-0","CanESM2","EC-EARTH","FGOALS-g2","GFDL-CM3","GFDL-ESM2G","GFDL-ESM2M","GISS-E2-H","GISS-E2-R","HadGEM2-AO","HadGEM2-CC","HadGEM2-ES","IPSL-CM5A-LR","IPSL-CM5A-MR","MIROC-ESM","MIROC-ESM-CHEM","MIROC5","MPI-ESM-LR","MPI-ESM-MR","MRI-CGCM3","NorESM1-M","bcc-csm1-1","bcc-csm1-1-m","inmcm4"]
some_models = ["ACCESS1-0","ACCESS1-3","CCSM4"]

client = Client('scheduler:8786')

s3 = boto3.resource('s3')

# Begin here
def process_model_year(model, scenario, year):
    process_year_temps(model, scenario, year)
    process_pr_temps(model, scenario, year)

def process_year_temps(model, scenario, year):
    print(f"Processing temperatures for {model} {year} ({scenario})")
    ids = (gen_netcdf_id(model, scenario, year, 'tasmax'), gen_netcdf_id(model, scenario, year, 'tasmin'))
    print(f"File ids are: {ids}")
    tasmax_file, tasmin_file = list(map(download_file, ids))
    tasmax_dataset, tasmin_dataset = netCDF4.Dataset(tasmax_file), netCDF4.Dataset(tasmin_file)
    print(tasmax_file)
    print(tasmax_dataset)
    print(tasmax_dataset['tasmax'])
    print(tasmax_dataset['tasmax'].shape)
    pass

def process_pr_temps(model, scenario, year):
    file_id = gen_netcdf_id(model, scenario, year, 'pr')
    pass

def gen_netcdf_id(model, scenario, year, var):
    id = f'LOCA/{model}/16th/{scenario}/r1i1p1/{var}/{var}_day_{model}_{scenario}_r1i1p1_{str(year)}0101-{str(year)}1231.LOCA_2016-04-02.16th.nc'
    return id

def download_file(file_id, loca_bucket = loca_bucket, download_location = '/temp'):
    filename = f'{download_location}/{file_id.split("/")[-1]}'
    print(f"Downloading {filename}")
    try:
        s3.Bucket(loca_bucket).download_file(file_id, filename)
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            file_id = file_id.replace('r1i1p1', 'r6i1p1')
            s3.Bucket(loca_bucket).download_file(file_id, filename)
    except:
        filename = None
    return filename

def cleanup():
    for file in glob.glob('/temp/*'):
        os.remove(file)

# Actual processing
def hdd(a, axis):
    a_to_baseline = 291.483 - a
    masked = ma.masked_where(a_to_baseline <= 0, a_to_baseline)
    intermediate_matrix = ma.filled(masked, fill_value = 0)
    result = np.sum(intermediate_matrix, axis = 0)
    return result

## BASELINE

# Average of models per year
def process_baseline_year_tasmax(year, models = all_models, file_prefix = ""):
    print("Generating ids")
    ids = [gen_netcdf_id(model, 'historical', year, 'tasmax') for model in models]
    print("Downloading files")
    pool = Pool()
    filenames = pool.map(download_file, ids)
    # shape = (365, 490, 960)
    print("Stacking files")
    arr_list = list(map(lambda fn: da.from_array(netCDF4.Dataset(fn)['tasmax'], chunks = (366, 245, 240)), filenames))
    arr_stack = da.stack(arr_list)
    print("Calculating average")
    year_avg = np.mean(arr_stack, axis = 0).compute()
    result_filename = f"/temp/{file_prefix}_{year}_tasmax_baseline_average.npy"
    np.save(result_filename, year_avg)
    blob = bucket.blob('baseline/tasmax/intermediate/{result_filename.split("/")[-1]}')
    blob.upload_from_filename(result_filename)
    print(result_filename)
    return None

In [15]:
cleanup()
process_baseline_year_tasmax(1971, models = all_models, file_prefix = 'baseline_v1')

Generating ids
Downloading files
Downloading /temp/tasmax_day_CESM1-CAM5_historical_r1i1p1_19710101-19711231.LOCA_2016-04-02.16th.nc
Downloading /temp/tasmax_day_ACCESS1-0_historical_r1i1p1_19710101-19711231.LOCA_2016-04-02.16th.nc
Downloading /temp/tasmax_day_CCSM4_historical_r1i1p1_19710101-19711231.LOCA_2016-04-02.16th.nc
Downloading /temp/tasmax_day_CMCC-CMS_historical_r1i1p1_19710101-19711231.LOCA_2016-04-02.16th.nc
Downloading /temp/tasmax_day_CMCC-CM_historical_r1i1p1_19710101-19711231.LOCA_2016-04-02.16th.nc
Downloading /temp/tasmax_day_CNRM-CM5_historical_r1i1p1_19710101-19711231.LOCA_2016-04-02.16th.nc
Downloading /temp/tasmax_day_CESM1-BGC_historical_r1i1p1_19710101-19711231.LOCA_2016-04-02.16th.nc
Downloading /temp/tasmax_day_ACCESS1-3_historical_r1i1p1_19710101-19711231.LOCA_2016-04-02.16th.nc
Downloading /temp/tasmax_day_CSIRO-Mk3-6-0_historical_r1i1p1_19710101-19711231.LOCA_2016-04-02.16th.nc
Downloading /temp/tasmax_day_EC-EARTH_historical_r1i1p1_19710101-19711231.LOCA_

In [11]:
client.restart()

0,1
Client  Scheduler: tcp://scheduler:8786  Dashboard: http://scheduler:8787,Cluster  Workers: 1  Cores: 4  Memory: 16.81 GB
