In [1]:
import pandas as pd
from typing import Tuple, Optional

## Step 1: Load and clean tower data

In [None]:
GLOBAL_PATH="/home/jose/DATA_WEATHER_ORNL/data"

# Specify towers and variables of interest
towers_of_interest = ['TOWA', 'TOWB', 'TOWD', 'TOWF', 'TOWS', 'TOWY']
vars = ['TempC',
        'RelHum', 'AbsHum', 
        'WSpdMph', 'PkWSpdMph', 'VSSpdMph',
        'SolarRadWm2', 
        'BarPresMb',
        'Sigma', 'SigPhi',
        'WDir',
        'PrecipIn']

# Load final quality assessed data and format datetime
tower_dfs_15m_clean = []
for tower in towers_of_interest:

        df = pd.read_csv(f'{GLOBAL_PATH}/{tower}_2017-2022_final-qc.csv', header=0, 
                     skipfooter=1, na_values=[-999, '-999'], engine='python', 
                     parse_dates=True)
    
        df['timestampUTC'] = pd.to_datetime(df['timestampUTC'], format='%Y%m%d%H%M%S').dt.tz_localize('UTC')
        df = df.set_index('timestampUTC', drop=True)

        tower_dfs_15m_clean.append(df)

FileNotFoundError: [Errno 2] No such file or directory: '/home/jose/DATA_WEATHER_ORNL/data/met_towers_2017-2022_final-qc/TOWA_2017-2022_final-qc.csv'

## Step 2: Detect extreme events per tower

In [10]:
# Event thresholds and required minimum duration (hours)
EVENT_SPECS = {
    # Event name: (required variables, condition function, minimum duration in hours)
    "E1_TempMoistHaz": (
        ["TempC", "AbsHum"],
        lambda df, col: (df[col["TempC"]] > 24.0) & (df[col["AbsHum"]] > 20.0),
        2.0
    ),
    "E2_WindChill": (
        ["TempC", "PkWSpdMph"],
        lambda df, col: (df[col["TempC"]] <= 4.8) & (df[col["PkWSpdMph"]] >= 3.0),
        2.0
    ),
    "E3_LowTemp_lt0":   (["TempC"], lambda df, col: df[col["TempC"]] < 0.0,   2.0),
    "E3_LowTemp_lt-5":  (["TempC"], lambda df, col: df[col["TempC"]] < -5.0,  2.0),
    "E3_LowTemp_lt-10": (["TempC"], lambda df, col: df[col["TempC"]] < -10.0, 2.0),

    "E4_HighWind_Peak_gt25": (["PkWSpdMph"], lambda df, col: df[col["PkWSpdMph"]] > 25.0, 1.0),

    "E5_LowWind_lt2":   (["PkWSpdMph"], lambda df, col: df[col["PkWSpdMph"]] < 2.0,  3.0),
    "E5_LowWind_lt1":   (["PkWSpdMph"], lambda df, col: df[col["PkWSpdMph"]] < 1.0,  3.0),
    "E5_LowWind_lt0_5": (["PkWSpdMph"], lambda df, col: df[col["PkWSpdMph"]] < 0.5, 3.0),
}

# Sensor columns used for each tower 
colmap_per_tower = {
    "TOWA": {"TempC":"TempC_030m","AbsHum":"AbsHum_015m","PkWSpdMph":"PkWSpdMph_030m"},
    "TOWB": {"TempC":"TempC_030m","AbsHum":None,          "PkWSpdMph":"PkWSpdMph_030m"},
    "TOWD": {"TempC":"TempC_035m","AbsHum":"AbsHum_015m","PkWSpdMph":"PkWSpdMph_035m"},
    "TOWF": {"TempC":"TempC_010m","AbsHum":"AbsHum_010m","PkWSpdMph":"PkWSpdMph_010m"},
    "TOWS": {"TempC":"TempC_025m","AbsHum":None,          "PkWSpdMph":"PkWSpdMph_025m"},
    "TOWY": {"TempC":"TempC_033m","AbsHum":None,          "PkWSpdMph":"PkWSpdMph_033m"},
}

# infer the median step size of the time index (minutes)
def infer_step_minutes(index: pd.DatetimeIndex, fallback=15.0) -> float:
    if len(index) < 2:
        return float(fallback)
    diffs = pd.Series(index).diff().dropna().dt.total_seconds() / 60.0
    return float(diffs.median()) if not diffs.empty else float(fallback)

# convert a boolean time series into continuous event segments
def boolean_runs_to_segments(mask: pd.Series, min_duration_min: float) -> list:
    """
    mask: Boolean time series (True = threshold condition is satisfied)
    min_duration_min: required minimum segment length in minutes
    Returns: list of (start, end, duration_minutes)
    """
    if mask.empty:
        return []
    # Identify transition points (True blocks)
    m = mask.astype(bool).copy()
    change = m.ne(m.shift(1, fill_value=False))
    starts = m & change
    ends   = (~m) & change
    start_times = list(mask.index[starts])
    end_times   = list(mask.index[ends])

    # If the series ends with True, close the last block at the end
    if len(start_times) > len(end_times):
        end_times.append(mask.index[-1])

    segs = []
    for st, et in zip(start_times, end_times):
        dur = (et - st).total_seconds()/60.0
        if dur >= min_duration_min:
            segs.append((st, et, dur))
    return segs

#  Main function: classify events for one tower 
def classify_events_for_tower(df: pd.DataFrame, tower_name: str) -> pd.DataFrame:
    """
    Input: single tower dataframe (time index)
    Output: DataFrame with event segments:
        tower, event, start, end, duration_minutes
    """
    cmap = colmap_per_tower.get(tower_name, {})
    idx = pd.to_datetime(df.index)
    step_min = infer_step_minutes(idx, fallback=15.0)

    out = []
    for ev, (needed_keys, cond_fn, min_hours) in EVENT_SPECS.items():
        # Skip events if the tower is missing required variables
        ok = True
        for k in needed_keys:
            colname = cmap.get(k)
            if (colname is None) or (colname not in df.columns):
                ok = False
                break
        if not ok:
            continue

        # Evaluate the condition; any row with NaN is treated as False
        sub = df[[cmap[k] for k in needed_keys]].copy()
        mask = cond_fn(df, cmap) & (~sub.isna().any(axis=1))
        mask = pd.Series(mask.values, index=idx)

        # Convert True blocks to event segments
        segs = boolean_runs_to_segments(mask, min_duration_min=min_hours*60.0)
        for st, et, dur in segs:
            out.append({
                "tower": tower_name,
                "event": ev,
                "start": st,
                "end": et,
                "duration_minutes": float(dur)
            })

    return pd.DataFrame(out)

# Batch processing across all towers 
def classify_all_towers(tower_dfs_15m_clean: list, towers_of_interest: list) -> pd.DataFrame:
    """
    Input: list of tower DataFrames (same order as towers_of_interest)
    Output: combined DataFrame with event segments across towers
    """
    all_rows = []
    for tower, df in zip(towers_of_interest, tower_dfs_15m_clean):
        seg = classify_events_for_tower(df, tower)
        all_rows.append(seg)
    if not all_rows:
        return pd.DataFrame(columns=["tower","event","start","end","duration_minutes"])
    out = pd.concat(all_rows, ignore_index=True).sort_values(["tower","event","start"])
    return out


In [5]:
segments = classify_all_towers(tower_dfs_15m_clean, towers_of_interest)
segments.to_csv("event_segments_by_tower.csv", index=False)
print(segments.head())

  tower            event                     start                       end  \
0  TOWA  E1_TempMoistHaz 2017-07-08 00:30:00+00:00 2017-07-08 04:45:00+00:00   
1  TOWA  E1_TempMoistHaz 2017-07-21 00:45:00+00:00 2017-07-21 05:30:00+00:00   
2  TOWA  E1_TempMoistHaz 2017-07-22 00:00:00+00:00 2017-07-22 07:15:00+00:00   
3  TOWA  E1_TempMoistHaz 2017-07-23 00:15:00+00:00 2017-07-23 05:30:00+00:00   
4  TOWA  E1_TempMoistHaz 2017-07-23 05:45:00+00:00 2017-07-23 10:30:00+00:00   

   duration_minutes  
0             255.0  
1             285.0  
2             435.0  
3             315.0  
4             285.0  


## Step 3: Union events across towers


In [8]:
def _merge_intervals(ivals, max_gap_minutes: float = 0.0):
    
    # Merge overlapping or adjacent intervals, also bridge small gaps (e.g., ≤ max_gap_minutes).
    
    gap = pd.Timedelta(minutes=float(max_gap_minutes))
    merged = []
    for st, et in ivals:
        if not merged:
            merged.append([st, et])
            continue
        lst_st, lst_et = merged[-1]
        # If the new interval overlaps or nearly touches the last one, merge them
        if st <= lst_et + gap:
            if et > lst_et:
                merged[-1][1] = et
        else:
            # Otherwise start a new block
            merged.append([st, et])
    return merged


def union_segments_across_towers(
    segments: pd.DataFrame,
    max_gap_minutes: float = 0.0,
    default_step_min: float = 15.0,
    by_month: bool = True
) -> Tuple[pd.DataFrame, Optional[pd.DataFrame]]:
    """
    Combine event detections across all towers into a single “union” timeline.
    That means: if any tower detects the event, the union interval is counted.

    """
    if segments.empty:
        return (
            pd.DataFrame(columns=["event","start","end","duration_minutes"]),
            pd.DataFrame(columns=["year","month","event","union_count","union_total_minutes"]) if by_month else None
        )

    seg = segments.copy()
    seg["start"] = pd.to_datetime(seg["start"])
    seg["end"]   = pd.to_datetime(seg["end"])
    seg = seg.dropna(subset=["start","end"])

    # merge intervals tower-by-tower into one union per event
    out_rows = []
    for ev, grp in seg.groupby("event"):
        # Collect all time intervals across towers for this event
        ivals = grp[["start","end"]].sort_values("start").to_numpy().tolist()
        merged = _merge_intervals(ivals, max_gap_minutes=max_gap_minutes)

        # Calculate duration of each merged interval
        for st, et in merged:
            dur_min = (et - st).total_seconds()/60.0
            if dur_min <= 0:
                dur_min = float(default_step_min)
            out_rows.append({
                "event": ev,
                "start": st,
                "end": et,
                "duration_minutes": float(dur_min)
            })

    union_segments = (
        pd.DataFrame(out_rows)
        .sort_values(["event","start"])
        .reset_index(drop=True)
    )

    # monthly aggregation (count + total duration) 
    monthly_summary = None
    if by_month and not union_segments.empty:
        tmp = union_segments.copy()
        tmp["year"]  = tmp["start"].dt.year
        tmp["month"] = tmp["start"].dt.month
        monthly_summary = (
            tmp.groupby(["year","month","event"], as_index=False)
               .agg(union_count=("duration_minutes","size"),
                    union_total_minutes=("duration_minutes","sum"))
               .sort_values(["year","month","event"])
               .reset_index(drop=True)
        )

    return union_segments, monthly_summary


In [9]:
union_seg, monthly_union = union_segments_across_towers(segments, max_gap_minutes=0, by_month=True)
union_seg.to_csv("union_segments_per_event.csv", index=False)
