In [1]:
%load_ext swagpy.jupyter

import io
import asyncio
import warnings
from pathlib import Path
from datetime import datetime, timedelta
from typing import Literal, Iterable, Union, TypedDict

import s3fs
import requests
from requests import Session, HTTPError

import xarray as xr
import pandas as pd
import numpy as np
from geopandas import GeoDataFrame

# paths
DATA = Path.cwd().parent / "data"
PS_STORE = DATA / "PROBSEVERE"
GMGSI_STORE = DATA / "GMGSI"
# date range
START = datetime.fromisoformat("2022-03-01")
END = datetime.fromisoformat("2022-09-30")
DATE_RANGE = pd.date_range(START, END, freq="D")

PROBSEVRE

In [2]:
class Properties(TypedDict):
    MUCAPE: int
    MLCAPE: int
    MLCIN: int
    EBSHEAR: float
    SRH01KM: int
    MESH: float
    VIL_DENSITY: float
    FLASH_RATE: int
    FLASH_DENSITY: float
    MAXLLAZ: float
    P98LLAZ: float
    P98MLAZ: float
    MAXRC_EMISS: str
    MAXRC_ICECF: str
    WETBULB_0C_HGT: float
    PWAT: float
    CAPE_M10M30: int
    LJA: float
    SIZE: int
    AVG_BEAM_HGT: str
    MOTION_EAST: float
    MOTION_SOUTH: float
    PS: int
    ID: int


class Geometry(TypedDict):
    type: Literal["Polygon"]
    coordinates: list[list[tuple[float, float]]]


class Feature(TypedDict):
    type: Literal["Feature"]
    geometry: Geometry
    models: dict[str, dict[str, str]]
    properties: Properties


class FeatureCollection(TypedDict):
    source: Literal["NOAA/NCEP Central Operations"]
    product: Literal["ProbSevere"]
    type: Literal["FeatureCollection"]
    validTime: str
    productionTime: str
    machine: str
    features: list[Feature]


TimeLike = Union[datetime, str, pd.Timestamp]

VALIDTIME_TEMPLATE = "%Y%m%d_%H%M%S %Z"

FLOAT32_COLS = [
    "EBSHEAR",
    "MEANWIND_1-3kmAGL",
    "MESH",
    "VIL_DENSITY",
    "FLASH_DENSITY",
    "MOTION_EAST",
    "MOTION_SOUTH",
    "MAXLLAZ",
    "P98LLAZ",
    "P98MLAZ",
    "WETBULB_0C_HGT",
    "PWAT",
    "LJA",
]

INT32_COLS = ["MLCIN"]

UINT32_COLS = [
    "MUCAPE",
    "MLCAPE",
    "SRH01KM",
    "FLASH_RATE",
    "CAPE_M10M30",
    "SIZE",
    "ID",
]

UINT8_COLS = ["PS"]

ALL_COLUMNS = UINT8_COLS + UINT32_COLS + INT32_COLS + FLOAT32_COLS

In [3]:
def extract_probsevere(date: TimeLike) -> Iterable[FeatureCollection]:
    base_url = "https://mtarchive.geol.iastate.edu"
    url = f"{base_url}/{date:%Y}/{date:%m}/{date:%d}/mrms/ncep/ProbSevere/"
    r = requests.get(url)
    if r.status_code == 200:
        (df,) = pd.read_html(r.text, skiprows=[1, 2], keep_default_na=False)

        with Session() as session:
            for file in tuple(url + df.loc[df["Name"] != "", "Name"]):
                try:
                    # with our session make a get request, r is a response object
                    r = session.get(file, stream=True)
                    # in the event of a non 200 status code we'll raise a HTTPError and trigger the except block
                    r.raise_for_status()
                # if there was an error downloading, continue
                except (ConnectionError, HTTPError):
                    warnings.warn(f"error downloading {url}")
                    continue
                yield r.json()


def wrangle_geometry(df: GeoDataFrame) -> pd.DataFrame:
    # to keep things consistent uppercase all of the bounds
    bounds = df.bounds
    df[bounds.columns.str.upper()] = bounds
    with warnings.catch_warnings():
        # /opt/conda/envs/rapids/lib/python3.9/site-packages/geopandas/array.py:524:
        # ShapelyDeprecationWarning: The array interface is deprecated and will no longer work in Shapely 2.0. Convert the '.coords' to a numpy array instead.
        #   return GeometryArray(vectorized.representative_point(self.data), crs=self.crs)
        warnings.simplefilter("ignore")
        points = df.representative_point()
    df["Y"] = points.x
    df["X"] = points.y
    return df


def update_dtypes(
    df: pd.DataFrame, float32: list[str], int32: list[str], uint32: list[str], uint8: list[str]
) -> pd.DataFrame:
    df[float32] = df[float32].astype(np.float32)
    # 32-bit signed integer (``-2_147_483_648`` to ``2_147_483_647``)
    df[int32] = df[int32].astype(np.int32)
    # 32-bit unsigned integer (``0`` to ``4_294_967_295``)
    df[uint32] = df[uint32].astype(np.uint32)
    # numpy.uint8`: 8-bit unsigned integer (``0`` to ``255``)
    df[uint8] = df[uint8].astype(np.uint8)
    return df


def transfer_probsevere(data: Iterable[FeatureCollection]) -> xr.Dataset:
    def generate():
        for fc in data:
            df = GeoDataFrame.from_features(fc["features"], columns=ALL_COLUMNS + ["geometry"])
            df["time"] = datetime.strptime(fc["validTime"], VALIDTIME_TEMPLATE)
            yield df

    df = pd.concat(generate()).set_index("time").pipe(wrangle_geometry).drop(columns="geometry")

    df = update_dtypes(
        df,
        float32=FLOAT32_COLS + ["MINX", "MINY", "MAXX", "MAXY", "X", "Y"],
        int32=INT32_COLS,
        uint32=UINT32_COLS,
        uint8=UINT8_COLS,
    )
    return xr.Dataset.from_dataframe(df)


def load(ds: xr.Dataset, store: Path) -> None:
    zarr_kwargs = {}
    if store.exists():
        zarr_kwargs = {"mode": "a", "append_dim": "time"}
    ds.to_zarr(store, **zarr_kwargs)

In [4]:
def main():
    for date in DATE_RANGE:
        collection_list = extract_probsevere(date)
        ds = transfer_probsevere(collection_list)
        load(ds, PS_STORE)


main()

# NOAA Global Mosaic of Geostationary Satellite Imagery (GMGSI)

https://www.ospo.noaa.gov/Products/imagery/gmgsi/

https://registry.opendata.aws/noaa-gmgsi/

In [21]:
GMGSIProducts = Literal["GMGSI_LW", "GMGSI_SSR", "GMGSI_SW", "GMGSI_VIS", "GMGSI_WV"]

fs = s3fs.S3FileSystem(anon=True)


def daily_file_report(date: datetime, product: GMGSIProducts = "GMGSI_LW") -> Iterable[str]:
    url_path = f"s3://noaa-gmgsi-pds/{product}/{date:%Y}/{date:%m}/{date:%d}"
    for path in fs.ls(url_path):
        for file in fs.ls(path):
            yield file


def generate_objs(files: Iterable[str]) -> Iterable[xr.Dataset]:
    for file in files:
        with fs.open(file, "rb") as f:
            yield xr.open_dataset(io.BytesIO(f.read()), engine="h5netcdf", chunks={})


def extract_and_transfer(
    start: datetime, stop: datetime, /, product: GMGSIProducts = "GMGSI_LW"
) -> Iterable[xr.Dataset]:
    """extract from the aws s3 bucket"""
    for date in pd.date_range(start, stop, freq="D"):
        files = tuple(daily_file_report(date, product))
        if not files:
            continue
        ds = xr.concat(generate_objs(files), dim="time")
        ds["yc"] = np.unique(ds.lat)
        ds["xc"] = np.unique(ds.lon)
        yield ds.drop(["lat", "lon"]).rename({"xc": "lon", "yc": "lat", "data": product})


ds, *_ = extract_and_transfer("2022-01-01", "2022-01-02", product="GMGSI_LW")
ds

Unnamed: 0,Array,Chunk
Bytes,1.34 GiB,57.21 MiB
Shape,"(24, 3000, 4999)","(1, 3000, 4999)"
Count,72 Tasks,24 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.34 GiB 57.21 MiB Shape (24, 3000, 4999) (1, 3000, 4999) Count 72 Tasks 24 Chunks Type float32 numpy.ndarray",4999  3000  24,

Unnamed: 0,Array,Chunk
Bytes,1.34 GiB,57.21 MiB
Shape,"(24, 3000, 4999)","(1, 3000, 4999)"
Count,72 Tasks,24 Chunks
Type,float32,numpy.ndarray


In [22]:
def main():
    for ds in extract_and_transfer(START, END, product="GMGSI_LW"):
        load(ds, GMGSI_STORE)

In [30]:
gmgsi = xr.open_zarr(GMGSI_STORE)
