In [1]:
%load_ext jupyter_black

In [3]:
import os
import uuid
from pathlib import Path
from warnings import warn
from datetime import datetime
from typing import Callable, Union, TypedDict

import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from geopandas import GeoDataFrame
from requests import Session, HTTPError


PROBSEVERE_URL_TEMPLATE = (
    "https://mtarchive.geol.iastate.edu/%Y/%m/%d/mrms/ncep/ProbSevere/MRMS_PROBSEVERE_%Y%m%d_%H%M00.json"
)

FILE_OUT_DIR = os.path.abspath("../../bucket")

TimeLike = Union[datetime, str, pd.Timestamp]


class Feature(TypedDict):
    properties: dict[str, Union[float, int, str]]


class FeatureCollection(TypedDict):
    validTime: str
    features: list[Feature]

In [4]:
cluster = LocalCluster()  # Launches a scheduler and workers locally
client = Client(cluster)  # Connect to distributed cluster and override default
client

2022-08-13 10:02:25,854 - distributed.diskutils - INFO - Found stale lock file and directory '/workspaces/griblib/notebooks/probsevere/dask-worker-space/worker-u1d18jx3', purging


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 3
Total threads: 6,Total memory: 15.57 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43905,Workers: 3
Dashboard: http://127.0.0.1:8787/status,Total threads: 6
Started: Just now,Total memory: 15.57 GiB

0,1
Comm: tcp://127.0.0.1:41879,Total threads: 2
Dashboard: http://127.0.0.1:41329/status,Memory: 5.19 GiB
Nanny: tcp://127.0.0.1:38875,
Local directory: /workspaces/griblib/notebooks/probsevere/dask-worker-space/worker-dtq66umx,Local directory: /workspaces/griblib/notebooks/probsevere/dask-worker-space/worker-dtq66umx
GPU: NVIDIA GeForce RTX 2080 SUPER,GPU memory: 8.00 GiB

0,1
Comm: tcp://127.0.0.1:41003,Total threads: 2
Dashboard: http://127.0.0.1:34051/status,Memory: 5.19 GiB
Nanny: tcp://127.0.0.1:40931,
Local directory: /workspaces/griblib/notebooks/probsevere/dask-worker-space/worker-e4e0ee2k,Local directory: /workspaces/griblib/notebooks/probsevere/dask-worker-space/worker-e4e0ee2k
GPU: NVIDIA GeForce RTX 2080 SUPER,GPU memory: 8.00 GiB

0,1
Comm: tcp://127.0.0.1:38697,Total threads: 2
Dashboard: http://127.0.0.1:42491/status,Memory: 5.19 GiB
Nanny: tcp://127.0.0.1:40787,
Local directory: /workspaces/griblib/notebooks/probsevere/dask-worker-space/worker-dbj77_0m,Local directory: /workspaces/griblib/notebooks/probsevere/dask-worker-space/worker-dbj77_0m
GPU: NVIDIA GeForce RTX 2080 SUPER,GPU memory: 8.00 GiB


In [18]:
__uuid = uuid.uuid4()


def __batch_id(validtime: datetime) -> Callable[[int], str]:
    def wrapper(n: int) -> str:
        return f"probsevere-{n}-{validtime.isoformat(timespec='minutes')}-{__uuid}.pq"

    return wrapper


def __bounds(
    df: pd.DataFrame,
    drop_columns: list[str] = ["MAXRC_EMISS", "MAXRC_ICECF", "AVG_BEAM_HGT", "geometry"],
) -> pd.DataFrame:
    return pd.concat((df, df["geometry"].bounds), axis=1).drop(columns=drop_columns)


def __dask_dataframe_from_features(
    features: list[Feature],
    validtime: datetime,
    *,
    chunksize: int = 256,
) -> dd.DataFrame:
    #
    df = GeoDataFrame.from_features(features).pipe(__bounds).astype(np.float32)
    df["valid_time"] = validtime
    return dd.from_pandas(df, chunksize=chunksize)


def direct_to_parquet(
    path_dir: Path,
    start: TimeLike,
    end: TimeLike,
    freq: str = "2min",  # probsevere generaly is avaliable at a 2 min interval
) -> None:
    """scrape data from the iastate archive over a daterange"""
    # create a DatetimeIndex using the the function arguments and format the urls using the url template
    urls = pd.date_range(start=start, end=end, freq=freq)
    # using Session as a context manager
    with Session() as session:
        # iterating over all of the urls
        for url in urls.strftime(PROBSEVERE_URL_TEMPLATE):
            # using a try/catch block in the event the download fails
            try:
                # with our session make a get request, r is a response object
                r = session.get(url, stream=True)
                # in the event of a non 200 status code we'll raise a HTTPError and trigger the except block
                r.raise_for_status()
            # if there was an error downloading, continue
            except (ConnectionError, HTTPError):
                warn(f"error downloading {url}")
                continue
            fc: FeatureCollection = r.json()

            features = fc["features"]
            # in the event no storms were record, continue
            if not features:
                continue
            # parse the datetime string
            validtime = datetime.strptime(fc["validTime"], "%Y%m%d_%H%M%S %Z")
            # transformation stack
            # GeoJSON.features -> geopandas.GeoDataFrame -> pandas.DataFrame -> dask.DataFrame -> apache.parquet
            __dask_dataframe_from_features(features, validtime).to_parquet(
                path_dir,
                engine="pyarrow",
                append=True,
                name_function=__batch_id(validtime),
                ignore_divisions=True,
            )

In [145]:
if __name__ == "__main__":
    # NOTE: the first time this ran it collected files from 2022-03-01T00:00:00Z -> 2022-05-14T15:54:00Z
    direct_to_parquet(
        Path("./hello"),
        # start="2022-03-01T00:00:00Z",
        start="2022-08-10T15:00",
        end="2022-08-10T15:02",
    )

In [190]:
from typing import Iterable, Iterator, Mapping


def __iterdaterange(
    start: datetime, end: datetime, *, freq: str = "2min"
) -> Iterator[tuple[pd.Timestamp, pd.DataFrame]]:
    dr = pd.date_range(start=start, end=end, freq=freq)
    urls = dr.strftime(PROBSEVERE_URL_TEMPLATE)
    yield from pd.DataFrame({"date": dr, "urls": urls}).set_index(dr).groupby(pd.Grouper(key="date", freq="D", axis=0))


def __bounds(
    df: pd.DataFrame,
    drop_columns: list[str] = ["MAXRC_EMISS", "MAXRC_ICECF", "AVG_BEAM_HGT", "geometry"],
) -> pd.DataFrame:
    return pd.concat((df, df["geometry"].bounds), axis=1).drop(columns=drop_columns)


def __generate_from_features(session: Session, *, urls: Iterable[str]) -> Iterable[pd.DataFrame]:
    for url in urls:
        try:
            # with our session make a get request, r is a response object
            r = session.get(url, stream=True)
            # in the event of a non 200 status code we'll raise a HTTPError and trigger the except block
            r.raise_for_status()
        # if there was an error downloading, continue
        except (ConnectionError, HTTPError):
            warn(f"error downloading {url}")
            continue
        fc: FeatureCollection = r.json()

        features = fc["features"]
        # in the event no storms were record, continue
        if not features:
            warn(f"url contained no features: {url}")
            continue

        df = GeoDataFrame.from_features(features).pipe(__bounds).astype(np.float32)
        validtime = datetime.strptime(fc["validTime"], "%Y%m%d_%H%M%S %Z")
        df["valid_time"] = validtime
        yield df


def direct_to_parquet_by_day(path: Path, *, start: str, end: str) -> None:

    with Session() as session:
        for timestamp, values in __iterdaterange(
            start="2022-08-10T15:00",
            end="2022-08-10T15:02",
        ):
            df = pd.concat(__generate_from_features(session, urls=values["urls"]))
            dd.from_pandas(df, chunksize=256).to_parquet(
                path,
                engine="pyarrow",
                append=True,
                name_function=lambda _: f"probsevere-{timestamp.strftime('%Y-%m-%dZ')}.pq",
            )


if __name__ == "__main__":
    direct_to_parquet_by_day(
        Path("./probsevere-data"),
        start="2022-03-10T00:00",
        end="2022-08-10T00:00",
    )

In [144]:
start = "2022-08-10T15:00"
end = "2022-09-10T15:02"
# 2 min freq date range
dr_2min = pd.date_range(start=start, end=end, freq="2min")
dr_1day = pd.date_range(start=start, end=end, normalize=True, freq="D")
# time delta matrix
td_matrix = abs(dr_2min.values - dr_1day.values[:, np.newaxis])
assert np.argmin(td_matrix, axis=0).shape == dr_2min.shape
dr_2min.groupby(np.argmin(td_matrix, axis=0))
# tdm = abs(dr.values - pd.date_range(start=start, end=end, normalize=True, freq="D").values[:, np.newaxis])


# np.argmin(tdm, axis=0).shape, dr.shape
# dr.shape
# tuple(pd.DataFrame({"dates": dr}).groupby(pd.Grouper(key="dates", freq="D")))
# time_delta = dr - dr[0]
# time_delta.shift(freq="D").days  # , time_delta.shift(-1)
# pd.Series(dr - dr[0]).resample("5d")
# np.unique(dr.day)
# dr.day[0]

# dr.groupby(pd.Grouper("1D"))
# pd.Series(dr).resample("5d")
# dr.groupby(pd.TimeGrouper("1D"))
# np.unique(dr.day - dr[0].day)

{1: [2022-08-10 15:00:00, 2022-08-10 15:02:00, 2022-08-10 15:04:00, 2022-08-10 15:06:00, 2022-08-10 15:08:00, 2022-08-10 15:10:00, 2022-08-10 15:12:00, 2022-08-10 15:14:00, 2022-08-10 15:16:00, 2022-08-10 15:18:00, 2022-08-10 15:20:00, 2022-08-10 15:22:00, 2022-08-10 15:24:00, 2022-08-10 15:26:00, 2022-08-10 15:28:00, 2022-08-10 15:30:00, 2022-08-10 15:32:00, 2022-08-10 15:34:00, 2022-08-10 15:36:00, 2022-08-10 15:38:00, 2022-08-10 15:40:00, 2022-08-10 15:42:00, 2022-08-10 15:44:00, 2022-08-10 15:46:00, 2022-08-10 15:48:00, 2022-08-10 15:50:00, 2022-08-10 15:52:00, 2022-08-10 15:54:00, 2022-08-10 15:56:00, 2022-08-10 15:58:00, 2022-08-10 16:00:00, 2022-08-10 16:02:00, 2022-08-10 16:04:00, 2022-08-10 16:06:00, 2022-08-10 16:08:00, 2022-08-10 16:10:00, 2022-08-10 16:12:00, 2022-08-10 16:14:00, 2022-08-10 16:16:00, 2022-08-10 16:18:00, 2022-08-10 16:20:00, 2022-08-10 16:22:00, 2022-08-10 16:24:00, 2022-08-10 16:26:00, 2022-08-10 16:28:00, 2022-08-10 16:30:00, 2022-08-10 16:32:00, 2022-08-