# Create Weather DataFrame 1950-2025 with Dask

Creates a wide-format dataframe with weather data from 1950-2025 including TOBS, TMAX, TMIN, PRCP, SNOW, SNWD measurements.
Each row represents a station-year-measurement combination with 365 daily value columns.


In [1]:
import s3fs
import dask.dataframe as dd
import pandas as pd
import numpy as np
from datetime import datetime

# Import cluster management utilities
from cluster_utils import setup_dask_cluster

# Setup Dask cluster with 20 workers
cluster, client, cleanup_summary = setup_dask_cluster(
    n_workers=20,
    memory_per_worker='4GB',
    dashboard_port=8790,
    worker_port=8791
)

# Configure dask to use the cluster for all operations
import dask
dask.config.set({'scheduler': 'distributed'})

# Note: This will process 76 years × 6 measurements = 456 data collections
# Estimated processing time: 10-30 minutes depending on data availability
print(f"Starting weather data processing at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")


Checking for existing Dask clusters...
No current client found


2025-10-17 01:14:40,041 - distributed.scheduler - INFO - State start
2025-10-17 01:14:40,043 - distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:40247
2025-10-17 01:14:40,043 - distributed.scheduler - INFO -   dashboard at:            127.0.0.1:8790


All existing clusters closed.
Setting up new cluster with 20 workers...


2025-10-17 01:14:40,109 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:38221'
2025-10-17 01:14:40,111 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:39437'
2025-10-17 01:14:40,113 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:42715'
2025-10-17 01:14:40,115 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:33549'
2025-10-17 01:14:40,117 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:39073'
2025-10-17 01:14:40,119 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:37293'
2025-10-17 01:14:40,121 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:39867'
2025-10-17 01:14:40,124 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:34067'
2025-10-17 01:14:40,126 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:44761'
2025-10-17 01:14:40,129 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0

✓ Dask cluster created with 20 workers
✓ Dashboard available at: http://127.0.0.1:8790/status
✓ Total workers: 20
✓ Cores per worker: {'tcp://127.0.0.1:32935': 1, 'tcp://127.0.0.1:33023': 1, 'tcp://127.0.0.1:33083': 1, 'tcp://127.0.0.1:34133': 1, 'tcp://127.0.0.1:34233': 1, 'tcp://127.0.0.1:35403': 1, 'tcp://127.0.0.1:35709': 1, 'tcp://127.0.0.1:36101': 1, 'tcp://127.0.0.1:37337': 1, 'tcp://127.0.0.1:38009': 1, 'tcp://127.0.0.1:38439': 1, 'tcp://127.0.0.1:39319': 1, 'tcp://127.0.0.1:40031': 1, 'tcp://127.0.0.1:40867': 1, 'tcp://127.0.0.1:41071': 1, 'tcp://127.0.0.1:41255': 1, 'tcp://127.0.0.1:42241': 1, 'tcp://127.0.0.1:43071': 1, 'tcp://127.0.0.1:44135': 1, 'tcp://127.0.0.1:44139': 1}
✓ Total memory available: 80.0 GB
✓ Cluster test computation successful: 4981.52
Starting weather data processing at 2025-10-17 01:14:41


In [None]:
# Load weather data for all years 1950-2025 using Dask
s3 = s3fs.S3FileSystem(anon=True)
bucket_path = 's3://noaa-ghcn-pds/parquet/by_year/'
measurements = ['TOBS', 'TMAX', 'TMIN', 'PRCP', 'SNOW', 'SNWD']

# Collect all data files (this may take a few minutes)
all_files = []
print("Collecting file paths...")
for year in range(1950, 2026):
    for measurement in measurements:
        file_path = f"{bucket_path}YEAR={year}/ELEMENT={measurement}/"
        try:
            files = s3.glob(f"{file_path}*.parquet")
            all_files.extend([f"s3://{f}" for f in files])
        except:
            continue
    if year % 10 == 0:  # Progress indicator
        print(f"  Processed years up to {year}...")

print(f"Found {len(all_files)} parquet files to process")

# Load all data using Dask (lazy loading)
df_long = dd.read_parquet(all_files, storage_options={'anon': True})
print(f"Loaded {len(df_long):,} rows from {df_long['ID'].nunique().compute():,} stations")


Collecting file paths...
  Processed years up to 1950...
  Processed years up to 1960...
  Processed years up to 1970...
  Processed years up to 1980...
  Processed years up to 1990...
  Processed years up to 2000...
  Processed years up to 2010...
  Processed years up to 2020...
Found 4751 parquet files to process


In [None]:
df_long.head()


In [None]:
# Transform to wide format using Dask
df_long['date_dt'] = dd.to_datetime(df_long['DATE'], format='%Y%m%d')
df_long['day_of_year'] = df_long['date_dt'].dt.dayofyear
df_long['year'] = df_long['date_dt'].dt.year

# Use dask pivot operation instead of pandas
# First, select the columns we need for pivoting
df_pivot = df_long[['ID', 'year', 'ELEMENT', 'day_of_year', 'DATA_VALUE']].copy()

# Create a unique identifier for each group
df_pivot['group_id'] = df_pivot['ID'].astype(str) + '_' + df_pivot['year'].astype(str) + '_' + df_pivot['ELEMENT']


In [None]:

# Pivot using dask groupby and unstack operations
df_wide = df_pivot.groupby(['group_id', 'day_of_year'])['DATA_VALUE'].first().unstack(fill_value=np.nan)


In [None]:

# Reset index to get group_id as a column
df_wide = df_wide.reset_index()

# Split the group_id back into separate columns
df_wide[['ID', 'year', 'ELEMENT']] = df_wide['group_id'].str.split('_', expand=True)
df_wide['ID'] = df_wide['ID'].astype(int)
df_wide['year'] = df_wide['year'].astype(int)

# Drop the temporary group_id column
df_wide = df_wide.drop('group_id', axis=1)

# Ensure all days 1-365 exist as columns
day_columns = list(range(1, 366))
for day in day_columns:
    if day not in df_wide.columns:
        df_wide[day] = np.nan

# Reorder columns to have ID, year, ELEMENT first, then days 1-365
columns_order = ['ID', 'year', 'ELEMENT'] + day_columns
df_wide = df_wide[columns_order]

# Sort and rename columns
day_cols = sorted([col for col in df_wide.columns if isinstance(col, int)])
df_wide = df_wide[['ID', 'year', 'ELEMENT'] + day_cols]
df_wide.columns = ['station_id', 'year', 'measurement'] + [f'day_{i}' for i in range(1, 366)]

print(f"Created wide format: {df_wide.shape[0]:,} station-year-measurement combinations × {df_wide.shape[1]} columns")


In [None]:
df_wide.head()


In [None]:
# Save to file
output_file = 'weather_1950_2025_wide.parquet'
df_wide.to_parquet(output_file, index=False)
print(f"Saved to {output_file}")

# Show summary of measurements and years
print(f"\nSummary:")
print(f"Measurements included: {sorted(df_wide['measurement'].unique())}")
print(f"Years covered: {df_wide['year'].min()} to {df_wide['year'].max()}")
print(f"Total unique stations: {df_wide['station_id'].nunique():,}")
print(f"Total combinations: {len(df_wide):,}")
print(f"Processing completed at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")


## Summary

Dataframe saved as `weather_1950_2025_wide.parquet` with:
- **Index**: `station_id`, `year`, `measurement` (where measurement is TOBS, TMAX, TMIN, PRCP, SNOW, or SNWD)
- **Columns**: `day_1` through `day_365` 
- **Values**: Weather observations (units vary by measurement type)
- **Time Range**: 1950-2025 (76 years)
- **Processed using Dask DataFrames** for efficient handling of large datasets

### Measurement Types:
- **TOBS**: Temperature at observation time (tenths of degrees C)
- **TMAX**: Maximum temperature (tenths of degrees C)  
- **TMIN**: Minimum temperature (tenths of degrees C)
- **PRCP**: Precipitation (tenths of mm)
- **SNOW**: Snowfall (mm)
- **SNWD**: Snow depth (mm)


In [None]:
# Clean up cluster
from cluster_utils import close_dask_cluster
close_dask_cluster(cluster, client)
