In [None]:
!pip install netcdf4
import boto3
import pandas as pd
from datetime import datetime
import os
import urllib.request
import xarray as xr
from pathlib import Path
import sys
from itertools import count
import numpy as np

In [None]:
THIS_DIR = Path.cwd()
CORE_DIR = THIS_DIR.parent
SERVICES_DIR = CORE_DIR / "LAMBDA" / "viz_functions" / "viz_publish_service" / "services"

In [None]:
sys.path.append(str(CORE_DIR / "Manual_Workflows" / "helper_functions"))
from shared_functions import sql_to_dataframe

In [None]:
with open('/home/ec2-user/SageMaker/Secrets.env') as f:
    for l in f.readlines():
        if l.strip():
            var, val = l.strip().split("=", 1)
            os.environ[var] = val.replace('"', "")

In [None]:
DOMAIN_TO_RF_DF = {
    "alaska": sql_to_dataframe("SELECT feature_id, rf_10_0 as streamflow FROM derived.recurrence_flows_ak ORDER BY feature_id"),
    "conus": sql_to_dataframe('''
        SELECT 
            rf.feature_id, 
            CASE WHEN ch.public_fim_domain IS TRUE 
                THEN rf.rf_10_0_17c 
                ELSE 0.0 
            END as streamflow
        FROM derived.recurrence_flows_conus rf
        LEFT JOIN derived.channels_conus ch on ch.feature_id = rf.feature_id
        ORDER BY feature_id
    '''),
    "hawaii": sql_to_dataframe("SELECT feature_id, rf_10_0 as streamflow FROM derived.recurrence_flows_hi ORDER BY feature_id"),
    "puertorico": sql_to_dataframe("SELECT feature_id, rf_10_0 as streamflow FROM derived.recurrence_flows_prvi ORDER BY feature_id")
}

In [None]:
DATASET_CONFIGURATIONS = {
    "analysis_assim": {
        "alaska": ["channel_rt", "forcing"],
        "conus": ["channel_rt", "land", "forcing", "reservoir"],
        "hawaii": ["channel_rt", "forcing"],
        "puertorico": ["channel_rt", "forcing"]
    },
    "medium_range": {
        "alaska": ["channel_rt", "forcing"],
        "conus": ["channel_rt", "forcing", "reservoir"],
    },
    "medium_range_blend": {
        "alaska": ["channel_rt", "forcing"],
        "conus": ["channel_rt", "forcing"],
        "alaska": ["channel_rt"],
        "conus": ["channel_rt"],
    },
    "short_range": {
        "alaska": ["channel_rt", "forcing"],
        "conus": ["channel_rt", "forcing", "reservoir"],
        "hawaii": ["channel_rt", "forcing"],
        "puertorico": ["channel_rt", "forcing"]
    },
}

In [None]:
MODEL_COUNTER = count(0)
for MODEL, DOMAINS in DATASET_CONFIGURATIONS.items():
    print("***********************")
    model_count = next(MODEL_COUNTER)
    print(f"MODEL: {MODEL} (count: {model_count})")
    for DOMAIN, PEs in DOMAINS.items():
        print(f"... DOMAIN: {DOMAIN}")
        for PE in PEs:
            print(f"...... PE: {PE}")
            CONFIG = ("forcing_" if PE == "forcing" else "") + MODEL + (f"_{DOMAIN}" if DOMAIN != "conus" else "") + ("_mem1" if MODEL == "medium_range" and PE != "forcing" else "")
            S3 = boto3.client("s3")
            TIME_TAG = "tm" if "analysis" in MODEL else "f"
            HOUR = "00"
            if MODEL == "analysis_assim":
                START_STEP = 0
                END_STEP = 0
                STEP = 1
                STEP_FORMAT = "%02d"
                if DOMAIN == "hawaii":
                    if PE == "channel_rt":
                        STEP_FORMAT = "%04d"
                    elif PE == "forcing":
                        STEP_FORMAT = "%02d"
            elif MODEL == "short_range":
                START_STEP = 1
                END_STEP = 18
                STEP = 1
                STEP_FORMAT = "%03d"
                if DOMAIN == "alaska":
                    END_STEP = 15
                elif DOMAIN == "puertorico":
                    HOUR = "06"
                    START_STEP = 1
                    END_STEP = 48
                    STEP = 1
                    STEP_FORMAT = "%03d"
                elif DOMAIN == "hawaii":
                    if PE == "forcing":
                        START_STEP = 1
                        END_STEP = 48
                        STEP = 1
                        STEP_FORMAT = "%03d"
                    else:
                        START_STEP = 100
                        END_STEP = 4800
                        STEP = 100
                        STEP_FORMAT = "%05d"
            elif MODEL == "medium_range":
                END_STEP = 240
                START_STEP = 1
                STEP_FORMAT = "%03d"

            if 'forcing' in CONFIG:
                INTEREST_VARS = ['RAINRATE']
            elif 'coastal' in CONFIG:
                INTEREST_VARS = ['elevation']
            elif PE == 'land':
                INTEREST_VARS = ['SNEQV', 'SNOWH', 'SOILICE', 'SOILSAT_TOP']
            elif PE == 'channel_rt':
                INTEREST_VARS = ['streamflow']
            elif PE == 'reservoir':
                INTEREST_VARS = ['water_sfc_elev']
            else:
                raise Exception(f"Unknown PE: {PE}")

            PE += ("_1" if MODEL == "medium_range" and PE != "forcing" else "")
            FNAME_TEMPLATE = f"nwm.t{HOUR}z.{MODEL}.{PE}.{TIME_TAG}{{FILE_COUNT}}.{DOMAIN}.nc"
            FNAMES = [FNAME_TEMPLATE.format(FILE_COUNT=STEP_FORMAT % x) for x in range(START_STEP, END_STEP + (STEP * 1), STEP)]
            DATE = datetime(datetime.now().year, datetime.now().month, datetime.now().day - 1).strftime('%Y%m%d')
            URL_TEMPLATE = f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/nwm/prod/nwm.{DATE}/{CONFIG}/{{FNAME}}"
            DEST_BUCKET = "hydrovis-ti-deployment-us-east-1"
            DEST_KEY = f"test_nwm_outputs/{CONFIG}/{{FNAME}}"

            for fname in FNAMES:
                ds = None
                url = URL_TEMPLATE.format(FNAME=fname)
                dest_key = DEST_KEY.format(FNAME=fname)
                # print(f"Downloading {url}...")
                urllib.request.urlretrieve(url, fname)
                # print(f"Opening {fname} with xarray...")
                with xr.open_dataset(fname) as ds:
                    for interest_var in INTEREST_VARS:
                        # print(f"Applying apocalyptic values to {interest_var} variable...")
                        if interest_var == "streamflow":
                            # print("Streamflow values before:")
                            # print(ds[interest_var].values)
                            rf_df = DOMAIN_TO_RF_DF[DOMAIN]
                            new_values = rf_df['streamflow'].values.copy()
                            if DOMAIN == "alaska":
                                new_values = np.concatenate(([100.0]*6, new_values))
                            elif DOMAIN == "hawaii":
                                new_values = np.concatenate((new_values, [100.0]*341))
                            ds[interest_var].values = new_values
                            # print("Streamflow values after:")
                            if DOMAIN == "conus":
                                ds[interest_var].values[model_count::4] = 0
                                # print(ds[interest_var].values)
                            # else:
                            #     print(ds[interest_var].values)
                        else:
                            ds[interest_var] = (ds[interest_var] + 1) * 100
                    # print("Writing new dataset to tmp.nc...")
                    ds.to_netcdf('tmp.nc')
                # print(f"Uploading tmp.nc to s3://{DEST_BUCKET}/{dest_key}...")
                S3.upload_file('tmp.nc', DEST_BUCKET, dest_key)
                # print("Removing iteration files...")
                os.remove('tmp.nc')
                os.remove(fname)
                
print("ALL DONE!")