In [96]:
%load_ext jupyter_black
import os
from glob import glob
import pandas as pd
import numpy as np
from wxlab import forecast, probsevere
from wxlab.probsevere import Probsevere
import requests

  class GeoAccessor:


In [159]:
def name_to_datetime(names: pd.Series) -> pd.DatetimeIndex:
    return pd.DatetimeIndex(names.str.replace("_", "T").str.extract(r"(\d*T\d*).json")[0]).rename("validTime")


def get_local_files(ps_files: list[str]) -> pd.Series
    ps = pd.Series(ps_files, name="ProbSevere")
    ps.index = name_to_datetime(ps)
    return ps


ps_data = get_local_files(sorted(glob(os.path.join("data", "probsevere", "*.json"))))
ps_data

validTime
2022-05-19 22:00:41    data/probsevere/MRMS_PROBSEVERE_20220519_22004...
2022-05-19 22:02:40    data/probsevere/MRMS_PROBSEVERE_20220519_22024...
2022-05-19 22:04:40    data/probsevere/MRMS_PROBSEVERE_20220519_22044...
2022-05-19 22:06:39    data/probsevere/MRMS_PROBSEVERE_20220519_22063...
2022-05-19 22:08:40    data/probsevere/MRMS_PROBSEVERE_20220519_22084...
                                             ...                        
2022-05-20 07:50:38    data/probsevere/MRMS_PROBSEVERE_20220520_07503...
2022-05-20 07:52:40    data/probsevere/MRMS_PROBSEVERE_20220520_07524...
2022-05-20 07:54:39    data/probsevere/MRMS_PROBSEVERE_20220520_07543...
2022-05-20 07:56:41    data/probsevere/MRMS_PROBSEVERE_20220520_07564...
2022-05-20 07:58:35    data/probsevere/MRMS_PROBSEVERE_20220520_07583...
Name: ProbSevere, Length: 300, dtype: object

In [94]:
from datetime import datetime
from glob import glob
import os
import pandas as pd
import json
import numpy as np
import requests
from geopandas import GeoDataFrame
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger

scheduler = BlockingScheduler()


def name_to_datetime(names: pd.Series) -> pd.DatetimeIndex:
    return (
        pd.DatetimeIndex(names.str.replace("_", "T").str.extract(r"(\d*T\d*).json")[0]).rename("validTime"))


def read_mrms(*args: str) -> pd.DataFrame:
    url = "/".join(["https://mrms.ncep.noaa.gov/data", *args]) + "/?C=M;O=D"
    return pd.read_html(url)[0].dropna()


def read_probsevere() -> pd.DataFrame:
    df = read_mrms("ProbSevere", "PROBSEVERE")
    df.index = name_to_datetime(df.Name)
    return ("https://mrms.ncep.noaa.gov/data/ProbSevere/PROBSEVERE/" + df["Name"]).rename("url")


def to_dataframe(urls: pd.Series, source="URL") -> pd.DataFrame:
    def generate():
        for vt, path in urls.items():
            if source == "URL":
                try:
                    feat = requests.get(path).json()
                except ConnectionError:
                    continue

            if source == "PATH":
                with open(path, mode="r", encoding="utf-8") as f:
                    feat = json.load(f)
            df = GeoDataFrame.from_features(feat["features"])
            df["validTime"] = vt
            yield df

    # concat the iterable files
    ps = pd.concat(generate(), ignore_index=True)
    ps = pd.concat([ps, ps["geometry"].bounds], axis=1)
    # convert string values to
    ps["AVG_BEAM_HGT"] = ps["AVG_BEAM_HGT"].str.replace(r"[A-Za-z]", "", regex=True).apply(pd.eval)

    ps[["MAXRC_EMISS", "MAXRC_ICECF"]] = (
        ps[["MAXRC_EMISS", "MAXRC_ICECF"]]
        .stack()
        .str.extract(r"(?:\()([a-z]*)(?:\))")
        .replace({"weak": 1, "moderate": 2, "strong": 3})
        .fillna(0)
        .unstack(-1)
        .droplevel(0, axis=1)
    )

    return (
        ps.set_index(["validTime", "ID", "minx", "miny", "maxx", "maxy"]).drop("geometry", axis=1).astype(np.float32)
    )


# @scheduler.scheduled_job(IntervalTrigger(minutes=10))
def on_interval():
    print(f"begining interval scheduled task at {datetime.now()}\n")
    # the previously downloaded dataset
    # this should be updated to save the data into files based on the utc date
    df = pd.read_parquet("data/PROBSEVERE.parquet", engine="pyarrow")
    # READ FROM MRMS DATASET
    urls = read_probsevere()
    # LOCATE FILES THAT HAVE NOT BEEN DOWNLOADED
    data_to_get = urls.loc[~urls.index.unique("validTime").isin(df.index.unique("validTime"))]
    if not data_to_get.any():
        print("NO NEW FILES FOUND")
        return
    getting = data_to_get.tail(20)

    print(f"getting {len(getting)} files valid for", ", ".join(getting.index.astype(str).tolist()),"\n\n")
    # DOWNLOAD NEWFILES
    new_files = to_dataframe(getting, source="URL")
    # JOIN THE FILES
    pd.concat([df, new_files]).to_parquet("data/PROBSEVERE.parquet", engine="pyarrow")
    print("FILE SAVED")


# if __name__ == "__main__":
# pd.DatetimeIndex(pd.to_datetime(f"2022-05-{x}") for x in range(1,5)).max() #- datetime.now())
template ="data/ps/{0}.parquet"
db = pd.Series(glob(template.format("*")))
db.index = pd.DatetimeIndex(db.str.strip(template.format("|")))
urls = read_probsevere()

same_day = (datetime.now() - db.index.max()) < pd.Timedelta(1,unit="d")
data_for_current_day = urls[urls.index.day == datetime.utcnow().day]
if same_day:    
    print("ADDING FILE TO EXSITING")
    df = pd.read_parquet(db[db.index.max()], engine="pyarrow")
    # remove data from mapper
    data_to_get = data_for_current_day.loc[~data_for_current_day.index.unique("validTime").isin(df.index.unique("validTime"))]
    getting = data_to_get.tail(20)
    new_files = to_dataframe(getting, source="URL")
    file_name:datetime = db.index.max()
    df = pd.concat([df, new_files])
else:
    print("CREATING NEWFILE")
    df = to_dataframe(data_for_current_day.tail(20), source="URL")
    # print(data_for_current_day)
file_name:datetime = df.index.get_level_values("validTime").max()
# if False:
df.to_parquet(template.format(file_name.strftime("%Y-%m-%d")), engine="pyarrow")



ADDING FILE TO EXSITING


In [2]:
from typing import Mapping
from datetime import datetime
import requests
import time
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
import pandas as pd


def name_to_datetime(names: pd.Series) -> pd.DatetimeIndex:
    return pd.DatetimeIndex(names.str.replace("_", "T").str.extract(r"(\d*T\d*).json")[0], tz="utc").rename(
        "validTime"
    )


def read_mrms(*args: str) -> pd.DataFrame:
    url = "/".join(["https://mrms.ncep.noaa.gov/data", *args]) + "/?C=M;O=D"
    return pd.read_html(url)[0].dropna()


def read_probsevere() -> pd.DataFrame:
    df = read_mrms("ProbSevere", "PROBSEVERE")
    df.index = name_to_datetime(df.Name)
    return ("https://mrms.ncep.noaa.gov/data/ProbSevere/PROBSEVERE/" + df["Name"]).rename("url")


def to_dataframe(vt_url: Mapping[pd.Timestamp, str]):
    def generate():
        for vt, url in vt_url.items():
            print(vt.strftime("%Y-%m-%dT%H:%M:%SZ"))
            # print(f"getting file for " + vt.strftime("%Y-%m-%dT%H:%M:%SZ"))
            for feat in requests.get(url).json()["features"]:
                props = feat["properties"]
                props["validTime"] = vt
                props["geometry"] = feat["geometry"]
                yield props
            time.sleep(1)

    print("begining file collection, collecting file for....")
    ps = pd.DataFrame(generate()).set_index(["validTime", "ID"])
    print("all files collected")
    # print(collect)
    ps["AVG_BEAM_HGT"] = ps["AVG_BEAM_HGT"].str.replace(r"[A-Za-z]", "", regex=True).apply(pd.eval)

    ps[["MAXRC_EMISS", "MAXRC_ICECF"]] = (
        ps[["MAXRC_EMISS", "MAXRC_ICECF"]]
        .stack()
        .str.extract(r"(?:\()([a-z]*)(?:\))")
        .replace({"weak": 1, "moderate": 2, "strong": 3})
        .fillna(0)
        .unstack(-1)
        .droplevel(0, axis=1)
    )
    ps.loc[:, ps.columns != "geometry"] = ps.loc[:, ps.columns != "geometry"].astype(np.float32)
    print("dataframe processed")
    # print(savi)
    return ps

    ...

scheduler = BlockingScheduler()
template = "data/{0}.parquet"

@scheduler.scheduled_job(IntervalTrigger(days=1))
def on_newday():
    print("ON NEW DAY EVENT TRIGGERED")
    available_data = read_probsevere()

    mapping = available_data[available_data.index.day != datetime.utcnow().day]
    print(f"there are {len(mapping)} files queued for download")
    file_name = datetime.utcnow().strftime("%Y-%m-%d")
    to_dataframe(mapping[::-1]).to_parquet(template.format(file_name))
    print(f"file saved as {template.format(file_name)}")


if __name__ == "__main__":
    on_newday()

ON NEW DAY EVENT TRIGGERED
there are 210 files queued for download
begining file collection, collecting file for....
2022-05-28T23:58:41Z
2022-05-28T23:56:37Z
2022-05-28T23:54:41Z
2022-05-28T23:52:38Z
2022-05-28T23:50:38Z


KeyboardInterrupt: 

In [156]:
def name_to_datetime(names: pd.Series) -> pd.Index:
    return pd.DatetimeIndex(names.str.replace("_", "T").str.extract(r"(\d*T\d*).json")[0]).rename("validTime")


def get_local_files(ps_files: list[str]):
    ps = pd.Series(ps_files, name="ProbSevere")
    ps.index = name_to_datetime(ps)
    return ps


get_local_files(sorted(glob(os.path.join("data", "probsevere", "*.json"))))
# pd.Series(sorted(glob(os.path.join("data", "probsevere", "*.json")))).str.replace("_", "T").str.extract(
#     r"(\d*T\d*).json"
# ).astype("datetime64[ns]")
# ps_data = get_probsevere_files(sorted(glob(os.path.join("data", "probsevere", "*.json"))))
# name_to_datetime
# ps_data

validTime
2022-05-19 22:00:41    data/probsevere/MRMS_PROBSEVERE_20220519_22004...
2022-05-19 22:02:40    data/probsevere/MRMS_PROBSEVERE_20220519_22024...
2022-05-19 22:04:40    data/probsevere/MRMS_PROBSEVERE_20220519_22044...
2022-05-19 22:06:39    data/probsevere/MRMS_PROBSEVERE_20220519_22063...
2022-05-19 22:08:40    data/probsevere/MRMS_PROBSEVERE_20220519_22084...
                                             ...                        
2022-05-20 07:50:38    data/probsevere/MRMS_PROBSEVERE_20220520_07503...
2022-05-20 07:52:40    data/probsevere/MRMS_PROBSEVERE_20220520_07524...
2022-05-20 07:54:39    data/probsevere/MRMS_PROBSEVERE_20220520_07543...
2022-05-20 07:56:41    data/probsevere/MRMS_PROBSEVERE_20220520_07564...
2022-05-20 07:58:35    data/probsevere/MRMS_PROBSEVERE_20220520_07583...
Name: ProbSevere, Length: 300, dtype: object

In [3]:
ps_0 = Probsevere(ps_data)

In [4]:
df = ps_0.to_dataframe()
df = (
    pd.concat([df, df.bounds], axis=1)
    .set_index(["minx", "miny", "maxx", "maxy"], append=True)
    .drop("geometry", axis=1)
)
df

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,Unnamed: 5_level_0,MUCAPE,MLCAPE,MLCIN,EBSHEAR,SRH01KM,MEANWIND_1-3kmAGL,MESH,VIL_DENSITY,FLASH_RATE,FLASH_DENSITY,...,MAXRC_ICECF,WETBULB_0C_HGT,PWAT,CAPE_M10M30,LJA,SIZE,AVG_BEAM_HGT,MOTION_EAST,MOTION_SOUTH,PS
validTime,ID,minx,miny,maxx,maxy,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1
1652997641000000000,25310,-90.70,38.16,-90.40,38.62,3408.0,3008.0,-7.0,40.400002,120.0,27.799999,0.69,1.86,66.0,0.85,...,0.0,11.4,1.8,741.0,0.0,708.0,3.258065,4.446000,-9.957,85.0
1652997641000000000,25384,-85.93,37.89,-85.76,38.06,3970.0,3273.0,-36.0,44.400002,140.0,20.700001,0.92,1.79,15.0,0.60,...,0.0,11.5,1.7,996.0,0.0,241.0,3.333333,13.457000,1.008,69.0
1652997641000000000,25496,-91.07,44.91,-90.86,45.07,133.0,0.0,0.0,60.700001,310.0,39.900002,0.79,2.26,6.0,0.19,...,1.0,7.7,1.3,0.0,0.0,257.0,3.287293,11.908000,-4.059,55.0
1652997641000000000,25504,-87.00,38.86,-86.66,39.18,2515.0,2091.0,-122.0,42.700001,94.0,21.500000,2.37,3.48,47.0,0.50,...,0.0,10.4,1.5,617.0,1.0,538.0,3.270492,12.612000,-1.374,99.0
1652997641000000000,25505,-87.44,38.65,-87.26,38.84,3138.0,2562.0,-43.0,45.700001,123.0,15.100000,1.32,2.91,21.0,0.51,...,0.0,10.8,1.5,747.0,1.8,235.0,3.289855,10.627000,-6.493,91.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1653033515000000000,30717,-79.67,39.71,-79.59,39.79,166.0,19.0,0.0,26.500000,238.0,30.700001,0.00,0.60,0.0,0.00,...,0.0,7.4,1.4,6.0,0.0,55.0,3.268116,9.409000,-4.166,0.0
1653033515000000000,30718,-84.54,39.35,-84.43,39.56,1071.0,485.0,-94.0,38.200001,256.0,42.799999,0.00,0.60,0.0,0.00,...,0.0,10.2,1.9,231.0,0.0,200.0,3.291139,14.880000,-1.683,2.0
1653033515000000000,30719,-83.85,39.10,-83.78,39.19,756.0,508.0,-103.0,38.299999,431.0,42.799999,0.08,0.63,2.0,0.09,...,0.0,10.0,1.5,181.0,0.0,64.0,3.240000,17.424999,-8.546,3.0
1653033515000000000,30720,-75.48,34.26,-75.41,34.33,3864.0,1798.0,-125.0,20.299999,103.0,16.600000,0.12,0.62,0.0,0.03,...,0.0,11.3,1.8,792.0,0.0,53.0,3.287879,11.863000,4.165,1.0


In [15]:
import pyarrow.parquet as pq

ps_1 = Probsevere(df)

ps_1.to_dataframe().astype(np.float32).to_parquet("prob.parquet")
pd.read_parquet("prob.parquet")

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,Unnamed: 5_level_0,MUCAPE,MLCAPE,MLCIN,EBSHEAR,SRH01KM,MEANWIND_1-3kmAGL,MESH,VIL_DENSITY,FLASH_RATE,FLASH_DENSITY,...,MAXRC_ICECF,WETBULB_0C_HGT,PWAT,CAPE_M10M30,LJA,SIZE,AVG_BEAM_HGT,MOTION_EAST,MOTION_SOUTH,PS
validTime,ID,minx,miny,maxx,maxy,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1
1652997641000000000,25310,-90.70,38.16,-90.40,38.62,3408.0,3008.0,-7.0,40.400002,120.0,27.799999,0.69,1.86,66.0,0.85,...,0.0,11.4,1.8,741.0,0.0,708.0,3.258065,4.446000,-9.957,85.0
1652997641000000000,25384,-85.93,37.89,-85.76,38.06,3970.0,3273.0,-36.0,44.400002,140.0,20.700001,0.92,1.79,15.0,0.60,...,0.0,11.5,1.7,996.0,0.0,241.0,3.333333,13.457000,1.008,69.0
1652997641000000000,25496,-91.07,44.91,-90.86,45.07,133.0,0.0,0.0,60.700001,310.0,39.900002,0.79,2.26,6.0,0.19,...,1.0,7.7,1.3,0.0,0.0,257.0,3.287293,11.908000,-4.059,55.0
1652997641000000000,25504,-87.00,38.86,-86.66,39.18,2515.0,2091.0,-122.0,42.700001,94.0,21.500000,2.37,3.48,47.0,0.50,...,0.0,10.4,1.5,617.0,1.0,538.0,3.270492,12.612000,-1.374,99.0
1652997641000000000,25505,-87.44,38.65,-87.26,38.84,3138.0,2562.0,-43.0,45.700001,123.0,15.100000,1.32,2.91,21.0,0.51,...,0.0,10.8,1.5,747.0,1.8,235.0,3.289855,10.627000,-6.493,91.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1653033515000000000,30717,-79.67,39.71,-79.59,39.79,166.0,19.0,0.0,26.500000,238.0,30.700001,0.00,0.60,0.0,0.00,...,0.0,7.4,1.4,6.0,0.0,55.0,3.268116,9.409000,-4.166,0.0
1653033515000000000,30718,-84.54,39.35,-84.43,39.56,1071.0,485.0,-94.0,38.200001,256.0,42.799999,0.00,0.60,0.0,0.00,...,0.0,10.2,1.9,231.0,0.0,200.0,3.291139,14.880000,-1.683,2.0
1653033515000000000,30719,-83.85,39.10,-83.78,39.19,756.0,508.0,-103.0,38.299999,431.0,42.799999,0.08,0.63,2.0,0.09,...,0.0,10.0,1.5,181.0,0.0,64.0,3.240000,17.424999,-8.546,3.0
1653033515000000000,30720,-75.48,34.26,-75.41,34.33,3864.0,1798.0,-125.0,20.299999,103.0,16.600000,0.12,0.62,0.0,0.03,...,0.0,11.3,1.8,792.0,0.0,53.0,3.287879,11.863000,4.165,1.0
