In [19]:
from pathlib import Path
from typing import Any, Dict, List

import dask
import dask.dataframe
import dask_geopandas
import geopandas
import pandas

from stactools.noaa_climate_normals.tabular import constants
from stactools.noaa_climate_normals.tabular.parquet import load_column_metadata
from stactools.noaa_climate_normals.tabular.utils import id_string

In [None]:
def pandas_datatypes(
    frequency: constants.Frequency, period: constants.Period
) -> Dict[str, Any]:
    """Generates a dictionary of pandas datatypes for the CSV file columns from
    metadata stored in JSON files."""

    column_metadata = load_column_metadata(frequency, period)
    dtypes = {}
    for key, value in column_metadata.items():
        if value["pandas_dtype"] == "category":
            dtypes[key] = pandas.CategoricalDtype(value["categories"], ordered=False)
        else:
            dtypes[key] = value["pandas_dtype"]
    return dtypes

In [20]:
def create_parquet(
    csv_hrefs: List[str],
    frequency: constants.Frequency,
    period: constants.Period,
    parquet_path: str,
) -> str:
    """Creates a GeoParquet file from a list of CSV files."""

    @dask.delayed
    def dataframe_from_csv(
        csv_href: str, pd_dtypes: Dict[str, Any], empty_df: pandas.DataFrame
    ) -> pandas.DataFrame:
        """Returns a dataframe with an ordered, complete set of columns."""
        df = pandas.read_csv(csv_href, dtype=pd_dtypes)
        return pandas.concat([df, empty_df])[empty_df.columns]

    pd_dtypes = pandas_datatypes(frequency, period)
    empty_df = pandas.DataFrame({c: pandas.Series(dtype=t) for c, t in pd_dtypes.items()})

    pandas_dataframes = [
        dataframe_from_csv(csv_href, pd_dtypes, empty_df) for csv_href in csv_hrefs
    ]

    dask_dataframe = dask.dataframe.from_delayed(pandas_dataframes, meta=empty_df)

    dask_geodataframe = dask_geopandas.from_dask_dataframe(dask_dataframe)

    # This produces "ValueError: Length of values (x) does not match length of
    # index (y)". Appears the delayed dataframe only has a few rows, but
    # `points_from_xy` has generated data for all rows, which also seems to
    # break the delayed concept. If I comment this out, parquet files are generated.
    dask_geodataframe.assign(
        geometry=lambda df: geopandas.points_from_xy(
            x=df.LONGITUDE,
            y=df.LATITUDE,
            z=df.ELEVATION,
            crs=constants.CRS,
        )
    )

    dask_geodataframe.repartition(10).to_parquet(parquet_path, write_index=False)

    return parquet_path

In [21]:
# NOTE: Only the "annualseasonal/2006-2020" data has updated JSON metadata. You
# can't run any of the other CSV data piles yet.

csv_dir = "/Volumes/Samsung_T5/data/ncn/tabular/normals-annualseasonal/2006-2020/access"
csv_paths = list(Path(csv_dir).glob("*.csv"))
csv_hrefs = [path.as_posix() for path in csv_paths]

period = constants.Period(csv_dir.split("/")[-2])
frequency = constants.Frequency(csv_dir.split("/")[-3].split("-")[1])

create_parquet(
    csv_hrefs=csv_hrefs[0:100],
    frequency=frequency,
    period=period,
    parquet_path=Path("geoparquet_test", f"{id_string(frequency, period)}.parquet"),
)

ValueError: Length of values (100) does not match length of index (2)

In [None]:
gdf = geopandas.read_parquet("geoparquet_test/2006_2020-annualseasonal.parquet")
gdf