https://medium.com/swlh/automated-weather-model-processing-with-foss4g-lessons-learned-8aaaeda1e3bc

In [62]:
import io
from datetime import datetime, timedelta, timezone

import numpy as np
import psycopg
from psycopg import sql
import xarray as xr
from numpy import typing as npt

In [63]:
pg_connection_dict = {
    'dbname': "mydb",
    'user': "myn",
    'password': r"IE>AUs]6)fr>jQ8m",
    'port': "5432",
    'host': "terraform-20221218184631507400000002.c2x7llrlmsr3.us-east-2.rds.amazonaws.com"
}

with psycopg.connect(**pg_connection_dict, autocommit=True) as conn:
    with conn.cursor() as curr:
        print(conn.info.encoding)
        print(curr.execute(r"SELECT now()").fetchall())

utf-8
[(datetime.datetime(2022, 12, 19, 17, 1, 31, 345199, tzinfo=datetime.timezone.utc),)]


In [64]:
with psycopg.connect(**pg_connection_dict, autocommit=True) as conn:
    with conn.cursor() as curr:
        curr.execute(open("../database.sql", "rt").read())

In [65]:
ds = xr.open_dataset(
    "../data/CMC_reg_SNOD_SFC_0_ps10km_2022121518_P001.grib2",
    engine="cfgrib",
    indexpath="",
)

In [66]:
def to_utcdatetime(date: np.datetime64):
    """
    Converts a numpy datetime64[ns] object to a python datetime object
    Input:
      date - a np.datetime64[ns] object
    Output:
      DATE - a python datetime object
    """
    timestamp = int(
        (date - np.datetime64("1970-01-01T00:00:00")) / np.timedelta64(1, "s")
    )
    return datetime.fromtimestamp(timestamp, timezone.utc)


def to_timedelta(td: np.timedelta64):
    """
    Converts a numpy timedelta64[ns] object to a python timedelta object
    Input:
      date - a np.datetime64[ns] object
    Output:
      DATE - a python datetime object
    """
    return timedelta(td.astype("timedelta64[h]").astype("float") / 24)

In [67]:
ds_var = list(ds.keys())[0]

variable = {
        "short_name": ds[ds_var].attrs["GRIB_shortName"],
        "long_name": ds[ds_var].attrs["GRIB_name"],
        "unit": ds[ds_var].attrs["units"],
    }

In [68]:
with psycopg.connect(**pg_connection_dict, autocommit=True) as conn:
    with conn.cursor() as curr:
        curr.execute("""
        INSERT INTO public.variables(short_name, long_name, unit) VALUES (
            %(short_name)s,
            %(long_name)s,
            %(unit)s
        );""", variable)

In [69]:
forecast = {
    "model": "rdps",
    "reference_time": to_utcdatetime(ds["time"].values),
    "step": to_timedelta(ds["step"].values),
}

with psycopg.connect(**pg_connection_dict, autocommit=True) as conn:
    with conn.cursor() as curr:
        curr.execute("""
        INSERT INTO public.forecasts(model, forecast_reference_time, forecast_step) VALUES (
            %(model)s,
            %(reference_time)s,
            %(step)s
        );""", forecast)

In [70]:
columns = ["forecast_id", "variable_id", "longitude", "latitude", "value"]

write_query = sql.SQL("INSERT INTO {table} ({columns}) VALUES ({placeholders})").format(
    table=sql.Identifier("public", "predictions"),
    columns=sql.SQL(', ').join(map(sql.Identifier, columns)),
    placeholders=sql.SQL(', ').join(sql.Placeholder() * len(columns))
)

In [71]:
def write_data():
    with psycopg.connect(**pg_connection_dict, autocommit=True) as conn:
        with conn.cursor() as curr:
            with conn.transaction():
                forecast_id = curr.execute(
                    """
                    SELECT forecast_id
                    FROM public.forecasts
                    WHERE 
                        model = %(model)s AND
                        forecast_reference_time = %(reference_time)s AND
                        forecast_step = %(step)s
                """,
                    forecast,
                ).fetchone()
            if not forecast_id:
                raise ValueError("No forecast_id obtained")

            with conn.transaction():
                variable_id = curr.execute(
                    """
                    SELECT variable_id
                    FROM public.variables
                    WHERE short_name = %s
                """,
                    [variable["short_name"]],
                ).fetchone()
            
            if not variable_id:
                raise ValueError("No variable_id obtained")

            # Prepare data to upload
            df = ds.to_dataframe().fillna({ds_var: 0})
            df = df[["longitude", "latitude", ds_var]].reset_index(drop=True)
            df = df.loc[:, ["longitude", "latitude", ds_var]]
            df = df.rename(columns={ds_var: "value"})
            df["forecast_id"] = forecast_id[0]
            df["variable_id"] = variable_id[0]
            df = df.loc[
                :, ["forecast_id", "variable_id", "longitude", "latitude", "value"]
            ]

            curr.executemany(write_query, list(df.itertuples(index=False)))

In [72]:
write_data()