In [None]:
# GHCN Analysis

In [None]:
import os
from glob import glob

import dask
import dask.bag as db
import dask.dataframe as dd

import pandas as pd

from distributed import Client
from dask_jobqueue import SLURMCluster

from IPython.display import display
import matplotlib.pyplot as plt

from ghcn import load_daily

In [None]:
# Set LOCAL to True for single-machine execution while developing
# Set LOCAL to False for cluster execution
LOCAL = True

if LOCAL:
    # This line creates a single-machine dask client
    client = Client()
else:    
    # This line creates a SLURM cluster dask and dask client
    # Logging outputs will be stored in /scratch/{your-netid}
    
    cluster = SLURMCluster(
                           # Memory and core limits should be sufficient here
                           memory='4GB', cores=2,

                           # Ensure that Dask uses the correct version of Python on the cluster
                           python='/scratch/work/public/dask/{}/bin/python'.format(dask.__version__),                           
                           
                           # Place the output logs in an accessible location
                           job_extra=['--output=/scratch/{}/slurm-%j.out'.format(os.environ['SLURM_JOB_USER'])]
    )

    cluster.submit_command = 'slurm'
    cluster.scale(50)

    display(cluster)
    client = Client(cluster)

display(client)

In [None]:
# Get a list of all input files
# We'll sort them alphabetically to ensure reproducibility

#files = sorted(glob('/scratch/work/courses/DSGA1004-2021/ghcnd_tiny/*.dly'))
#files = sorted(glob('/scratch/work/courses/DSGA1004-2021/ghcnd_small/*.dly'))
files = sorted(glob('/scratch/work/courses/DSGA1004-2021/ghcnd_all/*.dly'))

# Load in a single file to demonstrate the parser
# Just print out the first few records to illustrate the structure
load_daily(files[0])[:3]

In [None]:
# Load daily files into bags

# Create a Dask Bag from the files
bag = db.from_sequence(files).map(load_daily).flatten()

# Filter for valid TMAX and TMIN observations
bag = bag.filter(lambda x: x['value'] != -9999 and x['quality'] == ' ' and x['element'] in ('TMAX', 'TMIN'))

In [None]:
# Compute aggregated statistics

# Simplify each dictionary for DataFrame conversion
def extract_core_fields(d):
    return {
        'station_id': d['station_id'],
        'year': d['year'],
        'month': d['month'],
        'day': d['day'],
        'element': d['element'],
        'value': d['value'],
    }

# Transform bag → DataFrame
bag_reduced = bag.map(extract_core_fields)
df = bag_reduced.to_dataframe(meta={
    'station_id': str,
    'year': int,
    'month': int,
    'day': int,
    'element': str,
    'value': int
})

# Split TMAX and TMIN
df_tmax = df[df['element'] == 'TMAX'].rename(columns={'value': 'TMAX'}).drop(columns='element')
df_tmin = df[df['element'] == 'TMIN'].rename(columns={'value': 'TMIN'}).drop(columns='element')

# Join on date
df_merged = dd.merge(df_tmax, df_tmin, on=['station_id', 'year', 'month', 'day'], how='inner')

# Compute t_range
df_merged['t_range'] = df_merged['TMAX'] - df_merged['TMIN']

# Get the max t_range per station
result = df_merged.groupby('station_id')[['t_range']].max().reset_index()

In [None]:
# Debugging

#!ls -lh tdiff.parquet

In [None]:
# Resolving issue in Dask; 1st try & approach

from dask.distributed import wait

# choose which size you’re writing
# 'tiny', 'small', 'all'
size = 'all' 
out = f'tdiff-{size}.parquet'

# Cast station_id to string everywhere so all partitions match
result = result.assign(station_id=result.station_id.astype(str))

# Persist the already-computed `result` DataFrame in memory
result = result.persist()
wait(result)  # block until it’s fully in RAM

# Repartition to control chunk size / number of output files "Optional"
# Tune n partitions based on your worker RAM (e.g. 2 for "tiny", 5 for "small", 10 for “all”)
result = result.repartition(npartitions=7)

# Time the parquet write with Dask’s own writer
%time result.to_parquet(out, engine='pyarrow', overwrite=True)

In [None]:
# Two-stage reduction; 3rd try & approach - no use
# Pre-partition max (no heavy shuffle)
#def max_by_station(pdf):
#    return pdf.groupby('station_id', as_index=False)['t_range'].max()

#partial = df_merged.map_partitions(max_by_station, meta={'station_id': str, 't_range': df_merged['t_range'].dtype})

# Final global max (small shuffle)
#result = (partial.groupby('station_id')['t_range'].max().reset_index())

In [None]:
# Resolving issue using pandas-based write; 2nd try & approach - no use 

# choose which size you’re writing: 'tiny', 'small', or 'all'
#size = 'tiny'
#out  = f'tdiff-{size}.parquet'

# Compute into a single pandas DataFrame on the driver
#df_pd = result.compute()

# Time the pandas-based parquet write
#%time df_pd.to_parquet(out, engine='pyarrow', overwrite=True)

In [None]:
# Read from the saved Parquet file
df_plot = dd.read_parquet(f'tdiff-{size}.parquet').compute()

# Convert tenths of °C → °C
df_plot['t_range_C'] = df_plot['t_range'] / 10

# Sort and get top 10 stations
Top_N = 10
df_top = df_plot.sort_values('t_range_C', ascending=False).head(Top_N)

# Plot
plt.figure(figsize=(12, 6))
bars = plt.bar(df_plot['station_id'], df_plot['t_range_C'])

# Add value labels just above each bar
for bar in bars:
    height = bar.get_height()
    plt.text(
        bar.get_x() + bar.get_width() / 2,   # center X
        height + 0.5,                        # slightly above top
        f'{height:.1f}',                     # formatted label
        ha='center', va='bottom', fontsize=9
    )
plt.xticks(rotation=90)
plt.ylabel('Max Daily Temp Range (°C x10)')
plt.title('Top 10 Stations by Max Daily Temp Range')
plt.tight_layout()
plt.show()