# 02 — Fetch GHCN (Illinois) from NOAA S3 → Parquet
This notebook reads **GHCN-Daily** metadata and inventory from NOAA's public S3, finds 4 Illinois stations with ≥30 years of record that **actually** have `csv/by_station` files, downloads and cleans their daily data, and writes a **local Parquet**: `data/ghcn_il_top4_daily.parquet`.

In [None]:
import pandas as pd, numpy as np
from pathlib import Path
import fsspec

S3_STATIONS_TXT   = "s3://noaa-ghcn-pds/ghcnd-stations.txt"
S3_INVENTORY_TXT  = "s3://noaa-ghcn-pds/ghcnd-inventory.txt"
S3_BY_STATION     = "s3://noaa-ghcn-pds/csv/by_station/{id}.csv"
STOR = {"anon": True}

OUTDIR = Path('../data'); OUTDIR.mkdir(parents=True, exist_ok=True)
OUT_PARQUET = OUTDIR / 'ghcn_il_top4_daily.parquet'
OUT_CSV = OUTDIR / 'ghcn_il_top4_daily.csv'
print('Output:', OUT_PARQUET.resolve())

## 1) Load stations (fixed-width) and inventory

In [None]:
colspecs = [(0,11),(12,20),(21,30),(31,37),(38,40),(41,71),(72,75),(76,79),(80,85)]
names = ['ID','LATITUDE','LONGITUDE','ELEVATION','STATE','NAME','GSN_FLAG','HCN_CRN_FLAG','WMO_ID']

stations = pd.read_fwf(S3_STATIONS_TXT, colspecs=colspecs, names=names, dtype={'ID':str,'STATE':str,'WMO_ID':str}, storage_options=STOR)
stations['NAME'] = stations['NAME'].str.strip(); stations['STATE'] = stations['STATE'].fillna('').str.strip()

inventory = pd.read_csv(
    S3_INVENTORY_TXT, sep=r'\s+', names=['ID','LAT','LON','ELEMENT','FIRSTYEAR','LASTYEAR'],
    dtype={'ID':str,'ELEMENT':str,'FIRSTYEAR':int,'LASTYEAR':int}, engine='python', storage_options=STOR
)

stations.head(), inventory.head()

In [None]:
len(stations)

## 2) Compute coverage; pick IL stations with ≥30 years

In [None]:
coverage = (inventory.groupby('ID', as_index=False)
                    .agg(first=('FIRSTYEAR','min'), last=('LASTYEAR','max'))
                    .assign(years=lambda d: d['last'] - d['first'] + 1))

il = (stations.loc[stations['STATE']=='IL', ['ID','NAME','STATE','LATITUDE','LONGITUDE','ELEVATION']]
              .merge(coverage, on='ID', how='inner'))
il30 = il[il['years']>=30].copy()
il30.sort_values(['years','ID'], ascending=[False, True]).head(12)

In [None]:
il

## 3) Probe `csv/by_station` and pick 4 that exist

In [None]:
fs = fsspec.filesystem('s3', **STOR)
candidates = il30.sort_values(['years','ID'], ascending=[False, True])['ID'].tolist()
picked, url_map = [], {}
for sid in candidates:
    url = S3_BY_STATION.format(id=sid)
    if fs.exists(url):
        picked.append(sid); url_map[sid] = url
    if len(picked)>=4: break
print('Picked:', picked)
url_map

## 4) Load, clean, pivot to wide, convert units

In [None]:
def load_station_daily(url: str) -> pd.DataFrame:
    df = pd.read_csv(url, storage_options=STOR, dtype={'ID':str,'ELEMENT':str}, parse_dates=['DATE'])
    df['DATA_VALUE'] = df['DATA_VALUE'].replace(-9999, np.nan)
    wide = (df.pivot_table(index=['ID','DATE'], columns='ELEMENT', values='DATA_VALUE', aggfunc='first').reset_index())
    for c in ('TMAX','TMIN','TAVG'):
        if c in wide: wide[c] = wide[c]/10.0
    if 'PRCP' in wide: wide['PRCP'] = wide['PRCP']/10.0
    return wide.sort_values(['ID','DATE']).reset_index(drop=True)

frames = []
for sid in picked:
    w = load_station_daily(url_map[sid])
    frames.append(w); print(sid, w.shape)

daily = pd.concat(frames, ignore_index=True)
daily.head()

## 5) Write Parquet and quick verify

In [None]:
%time
first_cols = [c for c in ['ID','DATE','PRCP','TMAX','TMIN','TAVG','SNOW','SNWD'] if c in daily.columns]
daily = daily[first_cols + [c for c in daily.columns if c not in first_cols]]
daily.to_parquet(OUT_PARQUET, index=False)
print('Wrote:', OUT_PARQUET.resolve())
pd.read_parquet(OUT_PARQUET).groupby('ID').size()

In [None]:
%time
first_cols = [c for c in ['ID','DATE','PRCP','TMAX','TMIN','TAVG','SNOW','SNWD'] if c in daily.columns]
daily = daily[first_cols + [c for c in daily.columns if c not in first_cols]]
daily.to_csv(OUT_CSV, index=False)
print('Wrote:', OUT_CSV.resolve())
pd.read_csv(OUT_CSV).groupby('ID').size()