In [1]:
from __future__ import annotations
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import xarray as xr
import fsspec
import s3fs
from datetime import date, datetime
from typing import Any, Dict, Iterable, List, Optional
import asyncio
from pathlib import Path
import zarr

In [4]:
NASA_POWER_BASE = "https://power.larc.nasa.gov/api/temporal/daily/point"

# Known daily Zarr roots (LST) for POWER ARD on AWS S3 (public/anonymous)
SYN1DAILY_ZARR_HINT = (
    "https://nasa-power.s3.us-west-2.amazonaws.com/"
    "syn1deg/temporal/power_syn1deg_daily_temporal_lst.zarr"
)
MERRA2DAILY_ZARR_HINT = (
    "https://nasa-power.s3.us-west-2.amazonaws.com/"
    "merra2/temporal/power_merra2_daily_temporal_lst.zarr"
)

# Default variable sets
SOLAR_VARS = ["ALLSKY_SFC_SW_DWN"]  # SRAD source (W m^-2) -> convert to MJ m^-2 d^-1
MET_VARS = ["T2M", "T2M_MAX", "T2M_MIN", "PRECTOTCORR", "T2MDEW", "WS2M", "RH2M"]

In [2]:
#helper functions

#find the daily LST zarr under a given prefix
def _discover_daily_zarr(prefix: str) -> str:
    """Discover a DAILY temporal LST Zarr under a given POWER product prefix.
    prefix examples: "nasa-power/syn1deg/temporal/" or "nasa-power/merra2/temporal/"
    Returns an HTTPS URL.
    """
    fs = s3fs.S3FileSystem(anon=True)
    keys = [p for p in fs.ls(prefix) if p.endswith(".zarr")]
    # Prefer names containing daily + temporal + lst
    for k in keys:
        low = k.lower()
        if ("daily" in low) and ("temporal" in low) and ("lst" in low):
            # Strip leading bucket name when forming HTTPS URL
            path = k.split("nasa-power/", 1)[1]
            return f"https://nasa-power.s3.us-west-2.amazonaws.com/{path}"
    # Fallback: if nothing matches, raise
    raise RuntimeError(f"No DAILY LST Zarr found under {prefix}")

def _open_power_zarr(zarr_url: str) -> xr.Dataset:
    store = fsspec.get_mapper(zarr_url)
    return xr.open_zarr(store, consolidated=True)


def _slice_point(ds: xr.Dataset,
                 latitude: float,
                 longitude: float,
                 start_date: date,
                 end_date: date,
                 variables: Iterable[str]) -> xr.Dataset:
    avail = [v for v in variables if v in ds.data_vars]
    if not avail:
        raise KeyError("None of the requested variables are present. Available examples: "
                       + ", ".join(list(ds.data_vars)[:25]))
    sub = ds[avail].sel(lat=latitude, lon=longitude, method="nearest").sel(
        time=slice(datetime.combine(start_date, datetime.min.time()),
                   datetime.combine(end_date, datetime.min.time()))
    )
    return sub

In [5]:
async def get_power_s3_daily(latitude: float,
                             longitude: float,
                             start_date: date,
                             end_date: date,
                             include_srad: bool = True,
                             include_met: bool = True,
                             syn1_url: Optional[str] = None,
                             merra2_url: Optional[str] = None):
    """Fetch daily data directly from POWER S3/Zarr (ARD), merging solar + meteorology.

    - Solar SRAD comes from SYN1deg: ALLSKY_SFC_SW_DWN (W m^-2) -> SRAD = *0.0864 (MJ m^-2 d^-1)
    - Meteorology (T2M_MAX, T2M_MIN, PRECTOTCORR, etc.) comes from MERRA-2.

    Returns a dict with `records` (list of per-day dictionaries) and metadata.
    """
    # Resolve URLs (try provided first; else discover; else fall back to hints)
    def _resolve_syn1() -> str:
        if syn1_url:
            return syn1_url
        try:
            return _discover_daily_zarr("nasa-power/syn1deg/temporal/")
        except Exception:
            return SYN1DAILY_ZARR_HINT

    def _resolve_merra2() -> str:
        if merra2_url:
            return merra2_url
        try:
            return _discover_daily_zarr("nasa-power/merra2/temporal/")
        except Exception:
            return MERRA2DAILY_ZARR_HINT

    out: Dict[str, Any] = {
        "source": "s3-zarr",
        "latitude": latitude,
        "longitude": longitude,
        "start": start_date.isoformat(),
        "end": end_date.isoformat(),
    }
    df = None
    try:
        # Open datasets (in threads to avoid blocking loop)
        ds_sol = None
        ds_met = None
        if include_srad:
            url_sol = _resolve_syn1()
            ds_sol = await asyncio.to_thread(_open_power_zarr, url_sol)
            out["syn1_url"] = url_sol
        if include_met:
            url_met = _resolve_merra2()
            ds_met = await asyncio.to_thread(_open_power_zarr, url_met)
            out["merra2_url"] = url_met

        # Slice
        
        if ds_met is not None:
            sub_met = await asyncio.to_thread(
                _slice_point, ds_met, latitude, longitude, start_date, end_date, MET_VARS
            )
            df_met = sub_met.to_dataframe().reset_index().rename(
                columns={"T2M_MAX": "TMAX", "T2M_MIN": "TMIN", "PRECTOTCORR": "RAIN"}
            )
            df = df_met
        if ds_sol is not None:
            sub_sol = await asyncio.to_thread(
                _slice_point, ds_sol, latitude, longitude, start_date, end_date, SOLAR_VARS
            )
            df_sol = sub_sol.to_dataframe().reset_index().rename(
                columns={"ALLSKY_SFC_SW_DWN": "SRAD_WM2"}
            )
            # Convert W/m^2 (mean power) to MJ/m^2/day
            df_sol["SRAD"] = df_sol["SRAD_WM2"].astype(float) * 0.0864
            df_sol = df_sol[["time", "SRAD"]]
            if df is None:
                df = df_sol
            else:
                df = pd.merge(df, df_sol, on="time", how="inner")

        if df is None:
            return {**out, "error": "No data sources selected: set include_srad and/or include_met."}
    except Exception as e:
        print(e)
    return df

In [6]:
df = await get_power_s3_daily(
        latitude=42.0,
        longitude=-93.5,
        start_date=date(2020, 1, 1),
        end_date=date(2020, 3, 31),
        include_srad=True,
        include_met=True
    )

In [8]:
df["date"] = pd.to_datetime(df["time"]).dt.strftime("%Y%m%d")

In [10]:
cols = ["date"] + [c for c in df.columns if c not in ("time", "lat", "lon", "date")]

In [11]:
for c in cols:
    if c != "date":
        try:
            df[c] = df[c].astype(float).round(1)
        except Exception:
            pass

In [12]:
print(df)

         time   T2M  TMAX  TMIN  RAIN  T2MDEW  WS2M  RH2M   lat    lon  SRAD  \
0  2020-01-01  -1.2   4.9  -7.1   0.4    -2.4   4.1  91.7  42.0 -93.75   4.6   
1  2020-01-02   0.3   5.0  -3.6   5.1    -0.5   2.7  94.5  42.0 -93.75   5.6   
2  2020-01-03  -1.2   2.2  -3.8   2.3    -2.2   3.3  93.5  42.0 -93.75   2.7   
3  2020-01-04  -3.6   0.7  -7.5   0.0    -6.1   3.5  84.8  42.0 -93.75   2.7   
4  2020-01-05   0.2   5.0  -3.7   0.0    -2.5   6.0  83.5  42.0 -93.75   7.4   
..        ...   ...   ...   ...   ...     ...   ...   ...   ...    ...   ...   
86 2020-03-27   7.0   9.3   4.8   2.3     6.0   3.4  92.9  42.0 -93.75   6.6   
87 2020-03-28  10.1  20.7   5.2   3.8     7.3   5.1  84.7  42.0 -93.75   5.6   
88 2020-03-29   7.1  13.7   1.8   0.7     0.8   7.8  66.7  42.0 -93.75  15.4   
89 2020-03-30   7.4  16.7  -2.0   0.0    -0.3   2.1  63.2  42.0 -93.75  23.7   
90 2020-03-31   7.2  14.1   1.0   0.0     2.0   2.9  72.9  42.0 -93.75  22.8   

        date  
0   20200101  
1   20200