In [1]:
# Install packages not in this env in the current Jupyter kernel
# can comment out or delete when they are installed in environment
import sys

!{sys.executable} -m pip install s3fs
!{sys.executable} -m pip install boto3

Collecting botocore<1.24.22,>=1.24.21
  Using cached botocore-1.24.21-py3-none-any.whl (8.6 MB)
Installing collected packages: botocore
  Attempting uninstall: botocore
    Found existing installation: botocore 1.25.0
    Uninstalling botocore-1.25.0:
      Successfully uninstalled botocore-1.25.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
boto3 1.22.0 requires botocore<1.26.0,>=1.25.0, but you have botocore 1.24.21 which is incompatible.[0m
Successfully installed botocore-1.24.21
Collecting botocore<1.26.0,>=1.25.0
  Using cached botocore-1.25.0-py3-none-any.whl (8.7 MB)
Installing collected packages: botocore
  Attempting uninstall: botocore
    Found existing installation: botocore 1.24.21
    Uninstalling botocore-1.24.21:
      Successfully uninstalled botocore-1.24.21
[31mERROR: pip's dependency resolver does not currently take into account al

In [2]:
import numpy as np
import s3fs
import argparse
from datetime import datetime
import os.path
from pathlib import Path
import time
import glob

from boto3 import client
from botocore import UNSIGNED
from botocore.client import Config

In [3]:
def download_s3_file(bucket, ingest_path, output_path):
    fs = s3fs.S3FileSystem(anon=True, asynchronous=False)

    if fs.exists(f"{bucket}/{ingest_path}"):
        # Download file, will throw FileNotFoundError if non existent
        fs.download(f"{bucket}/{ingest_path}", output_path)
        print(f"✅ downloading {ingest_path} & saving to {output_path}")
    else:
        print(f"‼️ file not found {ingest_path}")


def list_s3_files(bucket, model, date, init_time, data_type):
    conn = client("s3", config=Config(signature_version=UNSIGNED))

    if model == "nam":
        prefix = f"{model}.{date}/"
    elif model == "gfs":
        prefix = f"{model}.{date}/{init_time}/atmos/"
    elif model == "hrrr":
        prefix = f"{model}.{date}/conus/"

    response = conn.list_objects_v2(
        Bucket=bucket, Prefix=f"{prefix}{model}.t{init_time}z.{data_type}"
    )
    files = response.get("Contents")
    if files:
        all_files = [file.get("Key") for file in files]
        existing_files_for_download = [
            file
            for file in all_files
            if f"t{init_time}z.{data_type}" in file
            and not file.endswith("idx")
            and not file.endswith("anl")
        ]
        existing_files_for_download.sort()
        return existing_files_for_download
    else:
        return []


def get_avail_files(
    s3_bucket,
    model,
    year,
    month,
    day,
    init_time,
    data_type,
    split_loc,
    fh_loc,
    fxx_max,
    zfill,
    download_dir,
    fname_out,
    fname_end,
    full_filelist_len,
):
    ii = 0
    len_files_for_download = [0]
    while True:
        files_for_download = list_s3_files(
            s3_bucket, model, f"{year}{month}{day}", init_time, data_type
        )
        files_for_download = [
            file
            for file in files_for_download
            if int(file.split(".")[split_loc][fh_loc:]) <= fxx_max
        ]
        print(files_for_download)
        len_files_for_download.append(len(files_for_download))
        for file in files_for_download:
            fxx = file.split(".")[split_loc][fh_loc:]
            if not os.path.isdir(download_dir):
                print("making directory: ", download_dir)
                Path(download_dir).mkdir(parents=True, exist_ok=True)
            # check to see if output_path is a directory. if not, create directory
            output_path = f"{download_dir}{fname_out}{str(fxx).zfill(zfill)}{fname_end}"
            if not os.path.exists(output_path):
                # if the file already exists, do not redownload
                download_s3_file(s3_bucket, file, output_path)
                # call the rest of the pipeline here, run_pipeline
                # running cleaning through the pipeline could be the "sleep" period
                print("FXX IS: ", fxx)
            else:
                print(f"file has already been downloaded: {output_path}")

        # STOP WHILE LOOP IF ALL DESIRED FILES HAVE BEEN DOWNLOADED ON OUR SIDE
        if os.path.isdir(download_dir):
            files_downloaded = glob.glob(f"{download_dir}{fname_out}*{fname_end}")
            num_files_downloaded = len(files_downloaded)
            print(num_files_downloaded)
            if num_files_downloaded >= full_filelist_len:
                print("exiting from while loop")
                break

        # if no additional files are available compared to last try but the full_filelist_len has not been reached yet...
        if len_files_for_download[-1] == len_files_for_download[-2]:
            ii += 1
            print("same number of available files as last try. ii=", ii)
            if (
                ii > 10
            ):  # stop waiting for additional file if we have tried 10 separate times
                print("waited too long for new file, exiting while loop")
                break

        # try again in 90 seconds
        print("sleep: ", datetime.now())
        time.sleep(90)

In [4]:
def main(model, data_type, init_date, init_time):
    month = str(init_date.month).zfill(2)
    year = init_date.year
    day = str(init_date.day).zfill(2)

    download_dir = f"/home/lgaudet/model-data/{model.upper()}/{year}/{month}/"

    if model == "nam":
        s3_bucket = f"noaa-{model}-pds"
    else:
        s3_bucket = f"noaa-{model}-bdp-pds"

    if model == "nam":
        fxx_max = 84
        split_loc, fh_loc = -3, -2
        fname_out = f"nam_218_{year}{month}{day}_{init_time}00_"
        fname_end = ".grb2"
        zfill = 3
        full_filelist_len = len(
            np.arange(0, 37, 1).tolist() + np.arange(39, 85, 3).tolist()
        )
    elif model == "gfs":
        fxx_max = 96
        split_loc, fh_loc = -1, 1
        fname_out = f"gfs_4_{year}{month}{day}_{init_time}00_"
        fname_end = ".grb2"
        zfill = 3
        full_filelist_len = len(np.arange(0, 99, 3))
    elif model == "hrrr":
        fxx_max = 18
        split_loc, fh_loc = -2, -2
        fname_out = f"{year}{month}{day}_hrrr.t{init_time}z.wrfsfcf"
        fname_end = ".grib2"
        zfill = 2
        full_filelist_len = len(range(0, 19))

    get_avail_files(
        s3_bucket,
        model,
        year,
        month,
        day,
        init_time,
        data_type,
        split_loc,
        fh_loc,
        fxx_max,
        zfill,
        download_dir,
        fname_out,
        fname_end,
        full_filelist_len,
    )

    print(
        f"full download for {init_time}z initialization of the {model.upper()} complete!"
    )

In [5]:
data_type_dict = {"gfs": "pgrb2.0p50", "nam": "awphys", "hrrr": "wrfsfc"}

In [6]:
init_time = "00"
init_date = datetime(2022, 4, 25)

In [7]:
main("gfs", data_type_dict.get("gfs"), init_date, init_time)

['gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f000', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f003', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f006', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f009', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f012', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f015', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f018', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f021', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f024', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f027', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f030', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f033', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f036', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f039', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f042', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f045', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f048', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f051', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f054', 'gfs.20220425/00/atmos/gfs.t00z.pgrb2.0p50.f057',

In [8]:
main("nam", data_type_dict.get("nam"), init_date, init_time)

['nam.20220425/nam.t00z.awphys00.tm00.grib2', 'nam.20220425/nam.t00z.awphys01.tm00.grib2', 'nam.20220425/nam.t00z.awphys02.tm00.grib2', 'nam.20220425/nam.t00z.awphys03.tm00.grib2', 'nam.20220425/nam.t00z.awphys04.tm00.grib2', 'nam.20220425/nam.t00z.awphys05.tm00.grib2', 'nam.20220425/nam.t00z.awphys06.tm00.grib2', 'nam.20220425/nam.t00z.awphys07.tm00.grib2', 'nam.20220425/nam.t00z.awphys08.tm00.grib2', 'nam.20220425/nam.t00z.awphys09.tm00.grib2', 'nam.20220425/nam.t00z.awphys10.tm00.grib2', 'nam.20220425/nam.t00z.awphys11.tm00.grib2', 'nam.20220425/nam.t00z.awphys12.tm00.grib2', 'nam.20220425/nam.t00z.awphys13.tm00.grib2', 'nam.20220425/nam.t00z.awphys14.tm00.grib2', 'nam.20220425/nam.t00z.awphys15.tm00.grib2', 'nam.20220425/nam.t00z.awphys16.tm00.grib2', 'nam.20220425/nam.t00z.awphys17.tm00.grib2', 'nam.20220425/nam.t00z.awphys18.tm00.grib2', 'nam.20220425/nam.t00z.awphys19.tm00.grib2', 'nam.20220425/nam.t00z.awphys20.tm00.grib2', 'nam.20220425/nam.t00z.awphys21.tm00.grib2', 'nam.2022

In [9]:
main("hrrr", data_type_dict.get("hrrr"), init_date, init_time)

['hrrr.20220425/conus/hrrr.t00z.wrfsfcf00.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf01.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf02.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf03.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf04.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf05.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf06.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf07.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf08.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf09.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf10.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf11.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf12.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf13.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf14.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf15.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf16.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf17.grib2', 'hrrr.20220425/conus/hrrr.t00z.wrfsfcf18.grib2']
file has already been downloaded: /home/lgaudet/model-data/HRRR/2022