In [1]:
from download_manager import DownloadManager
from file_metadata import GFSForecastMetadata
import pandas as pd
import xarray as xr
import pygrib
from pathlib import Path
import cfgrib
import os
import re
from datetime import datetime
from nimbus.common.io import run_command

In [None]:
# WRF_DIFFUSION_CONFIG = {
#     'plevels': [200, 300, 500, 700, 850, 950, 1000],
#     'pressure_variable_keys': [
#         # Pressure variables
#         "HGT",  # geopotential height [gpm]
#         "SPFH", # specific humidity [kg/kg]
#         "VVEL", # Vertical Velocity [Pa/s]
#         "TMP",  # Temperature [K]
#         "UGRD", # U-component of wind [m/s]
#         "VGRD", # V-component of wind [m/s]
#     ],
#     'surface_variables': [
#         ":UGRD:10 m above ground:", # U-wind at 10 m
#         ":VGRD:10 m above ground:", # V-wind at 10 m
#         ":TMP:2 m above ground:",   # air temp. at 2m
#         ":PWAT:"                    # Precipitable water [kg / m^2]
#     ],
#     'region_bound': "198.5:207.5 17.5:23.5"
# }

In [2]:
VARIABLES = [ # I know, this looks repetitive but trust me this is the most flexible way...
    # Check https://www.nco.ncep.noaa.gov/pmb/products/gfs/gfs.t00z.pgrb2.0p25.f003.shtml at https://www.nco.ncep.noaa.gov/pmb/products/gfs/ for variable names.
    # Pressure level variables
    ":HGT:200 mb:",  ":HGT:300 mb:",  ":HGT:500 mb:",  ":HGT:700 mb:",  ":HGT:850 mb:",  ":HGT:950 mb:",  ":HGT:1000 mb:",  # HGT: geopotential height [gpm]
    ":SPFH:200 mb:", ":SPFH:300 mb:", ":SPFH:500 mb:", ":SPFH:700 mb:", ":SPFH:850 mb:", ":SPFH:950 mb:", ":SPFH:1000 mb:", # SPFH: specific humidity [kg/kg]
    ":VVEL:200 mb:", ":VVEL:300 mb:", ":VVEL:500 mb:", ":VVEL:700 mb:", ":VVEL:850 mb:", ":VVEL:950 mb:", ":VVEL:1000 mb:", # VVEL: Vertical Velocity [Pa/s]
    ":TMP:200 mb:",  ":TMP:300 mb:",  ":TMP:500 mb:",  ":TMP:700 mb:",  ":TMP:850 mb:",  ":TMP:950 mb:",  ":TMP:1000 mb:",  # TMP: Temperature [K]
    ":UGRD:200 mb:", ":UGRD:300 mb:", ":UGRD:500 mb:", ":UGRD:700 mb:", ":UGRD:850 mb:", ":UGRD:950 mb:", ":UGRD:1000 mb:", # UGRD: U-component of wind [m/s]
    ":VGRD:200 mb:", ":VGRD:300 mb:", ":VGRD:500 mb:", ":VGRD:700 mb:", ":VGRD:850 mb:", ":VGRD:950 mb:", ":VGRD:1000 mb:", # VGRD: V-component of wind [m/s]
    # Surface variables
    ":UGRD:10 m above ground:", # U-wind at 10 m
    ":VGRD:10 m above ground:", # V-wind at 10 m
    ":TMP:2 m above ground:",   # air temp. at 2m
    ":PWAT:"                    # Precipitable water [kg / m^2]
]
REGION_BOUND = "198.5:207.5 17.5:23.5"

In [45]:
# '|'.join(VARIABLES)

In [3]:
class GFSForecastDownloadManager(DownloadManager):
    def __init__(
            self, 
            base_dir,
            buffer_dir,
            forecast_horizon_hours,
            issue_times,
            variables,
            region_bound,
            log_filename,
        ):
        super().__init__(base_dir=base_dir, buffer_dir=buffer_dir, db_field_dtypes=GFSForecastMetadata.dtypes, log_filename=log_filename)
        self.variables = variables
        self.region_bound = region_bound
        self.forecast_horizon_hours = forecast_horizon_hours
        self.issue_times = issue_times

    def callback(self, source_filename, buffer_filename, local_filename):
        def parse_source_filename(url):
            # E.g. "https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs.20220626/00/atmos/gfs.t00z.pgrb2.0p25.f024"
            
            d = re.search(r'\.(\d{8})/', url).group(1)    # look for . followed by 8 digits
            h = re.search(r'/(\d{2})/', url).group(1)     # look for two digits surrounded by two /'s
            f = int(re.search(r'\.f(\d{3})', url).group(1)) # look for . followed by 3 digits
            t0 = datetime.strptime(d + h, "%Y%m%d%H")
            return t0, f

        temp_filename = f"{buffer_filename}.partial.grib2"
        # filter_key = f":({'|'.join(PRESSURE_VARIABLE_KEYS)}):({'|'.join(map(str, PLEVELS))}) mb:|" + '|'.join(SURFACE_VARIABLES)
        filter_key = '|'.join(self.variables)
        Path(local_filename).parent.mkdir(parents=True, exist_ok=True)
        compress_command = f"wgrib2 {buffer_filename} -match '{filter_key}' -small_grib {self.region_bound} {temp_filename}"
        conversion_command = f"wgrib2 {temp_filename} -netcdf {local_filename}"

        run_command(compress_command)
        run_command(conversion_command)

        Path(buffer_filename).unlink()
        Path(temp_filename).unlink()

        utc_issue_timestamp, forecast_horizon = parse_source_filename(source_filename)
        return GFSForecastMetadata(
                source_filename=source_filename,
                local_filename=local_filename,
                size=os.stat(local_filename).st_size,
                last_modified=pd.Timestamp.now('UTC'),
                utc_issue_timestamp=utc_issue_timestamp,
                forecast_horizon=forecast_horizon
            )

    def _calculate_filenames_for_range(self, start: pd.Timestamp, end: pd.Timestamp):
        utc_issue_timestamps = pd.date_range(start.floor('D'), end.ceil('D'), freq='6h')
        utc_issue_timestamps = utc_issue_timestamps[
            utc_issue_timestamps.hour.isin(self.issue_times)
            & (utc_issue_timestamps >= start)
            & (utc_issue_timestamps <= end)]

        source_filenames, buffer_filenames, local_filenames, db_filenames = [], [], [], []
        for utc_issue_timestamp in utc_issue_timestamps:
            for forecast_horizon_hour in self.forecast_horizon_hours:
                str_format = utc_issue_timestamp.strftime(f"gfs.%Y%m%d/%H/atmos/gfs.t%Hz.pgrb2.0p25.f{forecast_horizon_hour:03d}")
                source_filenames.append(f"https://noaa-gfs-bdp-pds.s3.amazonaws.com/{str_format}")
                buffer_filenames.append(str_format)
                local_filenames.append(utc_issue_timestamp.strftime(f"%Y_%m/%d/%Y_%m_%d_%H:00_f{forecast_horizon_hour:03d}.nc"))
                db_filenames.append(utc_issue_timestamp.strftime("%Y_%m.db"))

        return source_filenames, buffer_filenames, local_filenames, db_filenames

    def cleanup(self):
        root = Path(self.buffer_dir)
        for path in sorted(root.rglob('*'), key=lambda p: len(p.parts), reverse=True): # deepest directories first
            if path.is_dir() and not any(path.iterdir()): # remove if empty (Path.rmdir() cannot remove non-empty dir anyways, so it is safe.)
                path.rmdir()

In [4]:
dm = GFSForecastDownloadManager(base_dir=f"/mnt/lustre/koa/koastore/sadow_group/shared/wrf_hawaii/raw_data/gfs/", 
                                buffer_dir='/mnt/lustre/koa/scratch/yusukemh/wrf-diffusion/gfs_buffer',
                                log_filename='stdout', forecast_horizon_hours=range(49), issue_times=[0,6,12,18],
                                variables=VARIABLES, region_bound=REGION_BOUND)

In [7]:
start = pd.Timestamp('2023-01-03 04:34:23')
end = pd.Timestamp('2023-01-04 05:24:23')

In [8]:
dm.download_files(mode='range', start=start, end=end)

02/24 11:30 HST [INFO]:</home/yusukemh/github/DownloadManager/download_manager/download_manager.py:94> in download_files, Calculating filenames
02/24 11:30 HST [INFO]:</home/yusukemh/github/DownloadManager/download_manager/download_manager.py:108> in download_files, Checking database for already existing files.
02/24 11:30 HST [INFO]:</home/yusukemh/github/DownloadManager/download_manager/download_manager.py:119> in download_files, Found 0 new files.
02/24 11:30 HST [INFO]:</home/yusukemh/github/DownloadManager/download_manager/download_manager.py:121> in download_files, Completed downloading.


In [12]:
dm.cleanup()

In [9]:
df = dm.export_databases()

In [11]:
df[0].sort_values('last_modified')

Unnamed: 0,product,datatype,source_filename,local_filename,size,last_modified,forecast_horizon,utc_issue_timestamp
0,GFS,forecast,https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs....,/mnt/lustre/koa/koastore/sadow_group/shared/wr...,181540,2026-02-24 08:26:21.141528+00:00,1,2023-01-04 06:00:00
1,GFS,forecast,https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs....,/mnt/lustre/koa/koastore/sadow_group/shared/wr...,181540,2026-02-24 08:26:26.532577+00:00,9,2023-01-04 06:00:00
2,GFS,forecast,https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs....,/mnt/lustre/koa/koastore/sadow_group/shared/wr...,181540,2026-02-24 08:26:27.082952+00:00,2,2023-01-04 06:00:00
3,GFS,forecast,https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs....,/mnt/lustre/koa/koastore/sadow_group/shared/wr...,181540,2026-02-24 08:26:27.470124+00:00,6,2023-01-04 06:00:00
4,GFS,forecast,https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs....,/mnt/lustre/koa/koastore/sadow_group/shared/wr...,181540,2026-02-24 08:26:35.154431+00:00,8,2023-01-04 06:00:00
...,...,...,...,...,...,...,...,...
436,GFS,forecast,https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs....,/mnt/lustre/koa/koastore/sadow_group/shared/wr...,181540,2026-02-24 21:29:00.502360+00:00,43,2023-01-04 00:00:00
437,GFS,forecast,https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs....,/mnt/lustre/koa/koastore/sadow_group/shared/wr...,181540,2026-02-24 21:29:01.628911+00:00,39,2023-01-04 00:00:00
438,GFS,forecast,https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs....,/mnt/lustre/koa/koastore/sadow_group/shared/wr...,181540,2026-02-24 21:29:14.356270+00:00,40,2023-01-04 00:00:00
439,GFS,forecast,https://noaa-gfs-bdp-pds.s3.amazonaws.com/gfs....,/mnt/lustre/koa/koastore/sadow_group/shared/wr...,181540,2026-02-24 21:29:24.462313+00:00,48,2023-01-04 00:00:00
