In [1]:
%matplotlib inline

In [2]:
from pathlib import Path

import dask.dataframe as dd
import pandas as pd

In [3]:
# {column name:extents of the fixed-width fields}
columns = {"ID": (0,11), "LATITUDE": (12, 20), "LONGITUDE": (21, 30), "ELEVATION": (31, 37),"STATE": (38, 40),
           "NAME": (41, 71), "GSN FLAG": (72, 75), "HCN/CRN FLAG": (76, 79),"WMO ID": (80, 85)}
df = pd.read_fwf("http://noaa-ghcn-pds.s3.amazonaws.com/ghcnd-stations.txt", 
                    colspecs=list(columns.values()), names=list(columns.keys())).dropna(subset=['STATE'])

In [4]:
nydf = df[df['STATE'].str.match("NY")]

In [5]:
#ny stations
nydf.head()

Unnamed: 0,ID,LATITUDE,LONGITUDE,ELEVATION,STATE,NAME,GSN FLAG,HCN/CRN FLAG,WMO ID
83276,US1NYAB0001,42.667,-74.0509,445.0,NY,ALTAMONT 2.7 SSW,,,
83277,US1NYAB0006,42.7198,-73.9304,88.7,NY,SCHENECTADY 5.6 SSW,,,
83278,US1NYAB0010,42.5455,-74.1475,488.3,NY,RENSSELAERVILLE 2.1 NNW,,,
83279,US1NYAB0016,42.759,-73.737,104.2,NY,LATHAM 1.1 NNE,,,
83280,US1NYAB0017,42.6678,-73.7856,65.5,NY,ALBANY 0.7 E,,,


In [30]:
YEAR = 2022
elements = ["TAVG", "PRCP"]
dfs = {var : dd.read_parquet( f"s3://noaa-ghcn-pds/parquet/by_year/YEAR={YEAR}/ELEMENT={var}/", storage_options={"anon": True}) for var in elements}

In [31]:
dfs

{'TAVG': Dask DataFrame Structure:
                    ID    DATE DATA_VALUE  M_FLAG  Q_FLAG  S_FLAG OBS_TIME             YEAR          ELEMENT
 npartitions=5                                                                                              
                string  string      int64  string  string  string   string  category[known]  category[known]
                   ...     ...        ...     ...     ...     ...      ...              ...              ...
 ...               ...     ...        ...     ...     ...     ...      ...              ...              ...
                   ...     ...        ...     ...     ...     ...      ...              ...              ...
                   ...     ...        ...     ...     ...     ...      ...              ...              ...
 Dask Name: read_parquet, 1 expression
 Expr=ReadParquetFSSpec(64d3f6b),
 'PRCP': Dask DataFrame Structure:
                     ID    DATE DATA_VALUE  M_FLAG  Q_FLAG  S_FLAG OBS_TIME             YEAR  

In [32]:
nyds = {var : dfs[var][dfs[var]['ID'].isin(nydf['ID'])] for var in elements}

In [33]:
data = nyds['TAVG'].merge(nyds['PRCP'], on=['ID', 'DATE', 'YEAR']).merge(nydf, on=['ID'])

In [34]:
data.columns

Index(['ID', 'DATE', 'DATA_VALUE_x', 'M_FLAG_x', 'Q_FLAG_x', 'S_FLAG_x',
       'OBS_TIME_x', 'YEAR', 'ELEMENT_x', 'DATA_VALUE_y', 'M_FLAG_y',
       'Q_FLAG_y', 'S_FLAG_y', 'OBS_TIME_y', 'ELEMENT_y', 'LATITUDE',
       'LONGITUDE', 'ELEVATION', 'STATE', 'NAME', 'GSN FLAG', 'HCN/CRN FLAG',
       'WMO ID'],
      dtype='object')

In [35]:
df = data[['ID',  'DATE', 'DATA_VALUE_x', 'ELEMENT_x','DATA_VALUE_y',  'ELEMENT_y', 'LATITUDE',
       'LONGITUDE', 'ELEVATION', 'STATE', 'NAME']].compute()

In [36]:
df.columns

Index(['ID', 'DATE', 'DATA_VALUE_x', 'ELEMENT_x', 'DATA_VALUE_y', 'ELEMENT_y',
       'LATITUDE', 'LONGITUDE', 'ELEVATION', 'STATE', 'NAME'],
      dtype='object')

In [37]:
df['TAVG'] = (df['DATA_VALUE_x'].astype(float)/10) *(9/5) + 32 # to fahrenheit

In [38]:
df['PRCP'] = (df['DATA_VALUE_y'].astype(float)/10) * 0.039370 #to inches

In [39]:
df[['ID', 'NAME',  'LATITUDE', 'LONGITUDE',  'DATE', 'TAVG', 'PRCP']].to_parquet(f"nydata_{YEAR}.parquet")

In [40]:
df['NAME'].unique()

<ArrowStringArray>
[              'BINGHAMTON',             'LAGUARDIA AP',
       'ROCHESTER GTR INTL',    'ISLIP-LI MACARTHUR AP',
           'GLENS FALLS AP', 'SYRACUSE HANCOCK INTL AP',
              'JFK INTL AP',           'ALBANY INTL AP',
                  'BUFFALO']
Length: 9, dtype: string