In [2]:
"""
Notebook 1: Raw Data Ingestion, Preprocessing, and EDA (Multi-Router)

Purpose:
  - Load combined raw router data from the multi-part Parquet dataset (using Dask).
  - Iterate through each router, filter its data, and process it into a clean, hourly time series,
    performing hourly aggregation and initial cleaning within Dask.
  - Apply Isolation Forest anomaly detection per router.
  - Save processed hourly data with anomaly info for each router.
  - Generate global (cross-router) and illustrative (single-router) EDA plots.
  - Consolidate all per-router hourly processed files into one final combined hourly Parquet dataset.

Outputs:
  - outputs/ch4/hourly_<router>_processed_with_anomalies.parquet (10 individual files)
  - outputs/ch4/all_routers_hourly_processed_with_anomalies.parquet_dataset (final consolidated hourly data)
  - Figures 4-2a, 4-2b, 4-2c, 4-3a, 4-3b, 4-3c saved to outputs/ch4/figs
  - Printed Table 4-1 data (Router Inventory).
"""

#######################################################################
# 0. Environment set‑up                                               #
#######################################################################
# ☑️  Install all dependencies.
# !pip install --quiet pandas pyarrow dask[dataframe] matplotlib seaborn scikit-learn tqdm

import os
import glob # For listing files, but Dask glob will handle it
import warnings
import random
import gc
from pathlib import Path
from typing import List, Dict, Tuple, Optional

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.auto import tqdm # For progress bars

# For Dask DataFrames (for memory-efficient loading of combined raw data)
import dask.dataframe as dd

# For Anomaly Detection
from sklearn.ensemble import IsolationForest

# Suppress minor warnings for cleaner output in Jupyter
warnings.filterwarnings("ignore")

# Ensure plots appear inline in Jupyter Notebook
%matplotlib inline

# Set plotting style
sns.set_style("whitegrid")
plt.rcParams['figure.dpi'] = 150 # Increase resolution for better quality plots
plt.rcParams['savefig.dpi'] = 300 # Save plots with higher resolution

# Seed for reproducibility
SEED = 42
random.seed(SEED); np.random.seed(SEED); 

print("Environment setup complete.")

#######################################################################
# 1. Configuration                                                    #
#######################################################################

# --- Paths & File Names ---
# Input: Path to your combined multi-part Parquet dataset (the directory)
# This is the output from the previous script that combined all raw router files.
INPUT_COMBINED_RAW_PATH = Path("/mnt/nrdstor/ramamurthy/mhnarfth/internet2/parquet/combined/all_routers_combined.parquet_dataset")

# Output root directory for processed data and figures
OUT_DIR           = Path("outputs/ch4").absolute(); OUT_DIR.mkdir(parents=True, exist_ok=True)
FIG_DIR           = OUT_DIR / "figs"; FIG_DIR.mkdir(exist_ok=True, parents=True)
# Directory to save individual processed hourly files (intermediate output)
PROCESSED_HOURLY_INDIVIDUAL_DIR = OUT_DIR / "processed_hourly_individual"; PROCESSED_HOURLY_INDIVIDUAL_DIR.mkdir(parents=True, exist_ok=True)
# Final consolidated hourly processed data (main output of this notebook)
FINAL_COMBINED_HOURLY_PATH = OUT_DIR / "all_routers_hourly_processed_with_anomalies.parquet_dataset"

# --- Data Specifics ---
TARGET_COL        = "in_packets"           # The volume metric (packets)
TIMESTAMP_COL     = "t_first"              # Earliest packet timestamp for flow
ROUTER_COL        = 'router'
# Load only relevant columns from raw data for memory efficiency in Dask
COLS_TO_LOAD_RAW = [TIMESTAMP_COL, TARGET_COL, 't_last', ROUTER_COL] # Keep t_last for initial cleaning


FREQ              = "h"                    # Hourly resampling frequency ('h' for hourly)

# --- Anomaly Detection Parameters ---
IF_CONTAMINATION  = 0.01 # Initial contamination estimate
ANOM_SCORE_COL    = "if_score"             # Column name for Isolation Forest anomaly scores
ANOM_FLAG_COL     = "if_flag"              # Column name for binary anomaly flags (1 = anomaly, 0 = normal)

# --- Time Series Parameters (for data splitting, but defined globally) ---
TEST_SIZE_HOURS   = 7 * 24                 # 7 days for testing
VAL_SIZE_HOURS    = 7 * 24                 # 7 days for validation

# --- Global EDA Parameters ---
# Fraction of total raw rows for the global flow-size histogram sample.
GLOBAL_HISTOGRAM_SAMPLE_FRAC = 0.0005 # A very small fraction (e.g., 0.05% of total raw flows)

# Router to generate detailed single-router EDA plots (Fig 4-2b, 4-3a/b/c) for
ILLUSTRATIVE_ROUTER = "elpaso" # Change to "atlanta" or "dallas" if preferred


print(f"Configuration loaded.")
print(f"Input combined raw data: {INPUT_COMBINED_RAW_PATH}")
print(f"Outputs will be saved to: {OUT_DIR} (figs: {FIG_DIR})")
print(f"Processed individual hourly files to: {PROCESSED_HOURLY_INDIVIDUAL_DIR}")
print(f"Final combined hourly dataset to: {FINAL_COMBINED_HOURLY_PATH}")
print(f"Anomaly Detection contamination (initial): {IF_CONTAMINATION}")

#######################################################################
# Helper Functions                                                    #
#######################################################################

def save_plot(fig_name: str, router_label: str = ""): 
    """Saves the current matplotlib figure in both PNG and PDF formats."""
    plt.tight_layout()
    if router_label:
        png_path = FIG_DIR / f"{fig_name}_{router_label}.png"
        pdf_path = FIG_DIR / f"{fig_name}_{router_label}.pdf"
    else: # For global plots, no router_label in filename
        png_path = FIG_DIR / f"{fig_name}.png"
        pdf_path = FIG_DIR / f"{fig_name}.pdf"

    plt.savefig(png_path)
    plt.savefig(pdf_path)
    plt.close()
    print(f"  Saved {png_path.name} and {pdf_path.name}")


def plot_eda_single_router(df_hourly: pd.DataFrame, router_label: str, target_col: str):
    """
    Generates single-router EDA plots (Fig 4-2b, 4-3a/b/c).
    """
    print(f"\n--- Generating Single-Router EDA Plots for {router_label} ---")

    df_hourly_copy = df_hourly.copy() # Work on a copy to avoid SettingWithCopyWarning
    df_hourly_copy['hour_of_day'] = df_hourly_copy.index.hour
    df_hourly_copy['day_of_week_num'] = df_hourly_copy.index.dayofweek
    df_hourly_copy['day_name'] = df_hourly_copy.index.day_name()
    df_hourly_copy['is_weekend'] = df_hourly_copy['day_of_week_num'].isin([5, 6])
    
    day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']

    # Fig 4-2b: Week-of-day heat-map (hour × weekday) - Illustrative for one router
    print(f"  Generating Figure 4-2b (Week-of-Day Heatmap for {router_label})...")
    plt.figure(figsize=(12, 8))
    pivot_table = df_hourly_copy.pivot_table(index='hour_of_day', columns='day_name', values=target_col, aggfunc='mean')
    pivot_table = pivot_table[day_order]
    sns.heatmap(pivot_table, cmap='viridis', annot=False, fmt=".0f", linewidths=.5, linecolor='lightgray')
    plt.title(f"4-2b Average {target_col} Heatmap: Hour of Day vs. Day of Week ({router_label})")
    plt.xlabel("Day of Week")
    plt.ylabel("Hour of Day")
    save_plot(f"4_2b_weekday_heatmap", router_label)

    # Anomaly Detection Plots (Figures 4-3a/b/c)
    if ANOM_SCORE_COL in df_hourly_copy.columns and ANOM_FLAG_COL in df_hourly_copy.columns:
        print(f"\n--- Generating Anomaly Detection Plots for {router_label} ---")
        # Fig 4-3a: Isolation-Forest score histogram
        plt.figure(figsize=(10, 6))
        sorted_scores = np.sort(df_hourly_copy[ANOM_SCORE_COL].values)
        threshold_idx = int((1 - IF_CONTAMINATION) * len(sorted_scores))
        threshold_score = sorted_scores[threshold_idx]
        sns.histplot(df_hourly_copy[ANOM_SCORE_COL], bins=100, kde=True, color='teal', alpha=0.7)
        plt.axvline(x=threshold_score, color='red', linestyle='--', label=f'Threshold (Contamination={IF_CONTAMINATION})')
        plt.title(f"4-3a Isolation Forest Anomaly Score Histogram ({router_label})")
        plt.xlabel("Anomaly Score (Higher = More Anomalous)")
        plt.ylabel("Number of Observations")
        plt.legend()
        save_plot(f"4_3a_if_score_histogram", router_label)

        # Fig 4-3b: Time-series plot with anomalies marked
        plt.figure(figsize=(15, 6))
        plt.plot(df_hourly_copy.index, df_hourly_copy[target_col], label='Original Data', color='blue', alpha=0.7)
        anomalies = df_hourly_copy[df_hourly_copy[ANOM_FLAG_COL] == 1]
        plt.scatter(anomalies.index, anomalies[target_col], color='red', s=50, label='Detected Anomaly', zorder=5)
        plt.title(f"4-3b {target_col} Time Series with Detected Anomalies ({router_label})")
        plt.xlabel("Time")
        plt.ylabel(f"{target_col}")
        plt.legend()
        plt.grid(True, linestyle=':', alpha=0.7)
        save_plot(f"4_3b_timeseries_anomalies", router_label)

        # Fig 4-3c: Anomaly count bar-chart by day of week
        anomaly_counts_per_day = df_hourly_copy[df_hourly_copy[ANOM_FLAG_COL] == 1]['day_name'].value_counts().reindex(day_order).fillna(0)
        plt.figure(figsize=(10, 6))
        sns.barplot(x=anomaly_counts_per_day.index, y=anomaly_counts_per_day.values, palette='viridis')
        plt.title(f"4-3c Anomaly Counts by Day of Week ({router_label})")
        plt.xlabel("Day of Week")
        plt.ylabel("Number of Anomalies")
        plt.grid(axis='y', linestyle=':', alpha=0.7)
        save_plot(f"4_3c_anomaly_counts_by_day", router_label)
    else:
        print(f"  Anomaly columns not found in df_hourly for {router_label}. Skipping anomaly plots.")

def plot_global_eda(all_hourly_mean_per_hour: pd.DataFrame, all_raw_samples_target_col: pd.Series, target_col: str):
    """
    Generates global (cross-router) EDA plots (Fig 4-2a, 4-2c).
    """
    print("\n--- Generating Global EDA Plots (Figures 4-2a, 4-2c) ---")

    # Fig 4-2a: 10-router daily curves (mean traffic per hour)
    if not all_hourly_mean_per_hour.empty: # Only plot if data is provided
        print("  Generating Figure 4-2a (10-Router Daily Curves)...")
        plt.figure(figsize=(12, 7))
        sns.lineplot(x='hour_of_day', y='mean_packets', hue='router', data=all_hourly_mean_per_hour, palette='tab10')
        plt.title(f"4-2a Daily Utilization Curves (Mean {target_col} per Hour Across Routers)")
        plt.xlabel("Hour of Day (0-23)")
        plt.ylabel(f"Mean {target_col}")
        plt.xticks(range(24))
        plt.legend(title='Router', bbox_to_anchor=(1.05, 1), loc='upper left')
        plt.grid(True, linestyle=':', alpha=0.7)
        save_plot(f"4_2a_daily_curves_all_routers", router_label="")
    else:
        print("  No hourly mean data collected. Skipping Fig 4-2a.")

    # Fig 4-2c: Flow-size distribution (log-histogram of raw packets)
    if not all_raw_samples_target_col.empty: # Only plot if data is provided
        print("  Generating Figure 4-2c (Flow Size Distribution - All Routers Sample)...")
        plt.figure(figsize=(10, 6))
        sns.histplot(np.log1p(all_raw_samples_target_col), bins=50, kde=True, color='purple', alpha=0.7)
        plt.title(f"4-2c Log-Histogram of Flow Sizes (All Routers Sample)")
        plt.xlabel(f"log(1 + {target_col})")
        plt.ylabel("Number of Flows (Count)")
        plt.grid(True, linestyle=':', alpha=0.7)
        save_plot(f"4_2c_flow_size_histogram_all_routers", router_label="")
    else:
        print(f"  No raw samples data for flow-size histogram. Skipping Fig 4-2c.")


#######################################################################
# Anomaly Detection (Isolation Forest)                                #
#######################################################################

def run_isolation_forest(series: pd.Series, contamination: float, router_label: str) -> pd.DataFrame:
    """
    Fits Isolation Forest on the time series and returns anomaly scores and flags.
    The input `series` should be the hourly aggregated data (e.g., in_packets).
    """
    print(f"  Running Isolation Forest for Anomaly Detection for {router_label}...")
    
    if series.empty:
        print(f"  Warning: Input series for Isolation Forest is empty for {router_label}. Skipping.")
        return pd.DataFrame(columns=[series.name, ANOM_SCORE_COL, ANOM_FLAG_COL])

    X = series.values.reshape(-1, 1)
    
    iforest = IsolationForest(n_estimators=200, contamination=contamination, random_state=SEED, n_jobs=-1) 
    iforest.fit(X)
    
    scores = -iforest.decision_function(X)
    flags = iforest.predict(X)
    flags = np.where(flags == -1, 1, 0)
    
    out_df = pd.DataFrame({
        series.name: series.values,
        ANOM_SCORE_COL: scores,
        ANOM_FLAG_COL: flags
    }, index=series.index)
    
    print(f"  Isolation Forest detection complete for {router_label}. Found {out_df[ANOM_FLAG_COL].sum()} anomalies.")
    return out_df


#######################################################################
# Main Execution for Notebook 1                                       #
#######################################################################

def main():
    print("\n--- Initiating Notebook 1: Data Ingestion, Preprocessing, and EDA (Multi-Router) ---")

    # 1. Load the combined raw router data (Dask DataFrame)
    print(f"\nLoading combined raw data from: {INPUT_COMBINED_RAW_PATH}")
    try:
        # Load only necessary columns for the first stage (timestamps, target, router_col)
        combined_ddf_raw = dd.read_parquet(INPUT_COMBINED_RAW_PATH, engine="pyarrow", columns=COLS_TO_LOAD_RAW)
        combined_ddf_raw[ROUTER_COL] = combined_ddf_raw[ROUTER_COL].astype(str)
        print(f"Loaded Dask DataFrame (lazy): {combined_ddf_raw.npartitions} partitions.")
    except Exception as e:
        print(f"Error loading combined raw Dask DataFrame: {e}. Please ensure the path is correct and files exist.")
        return

    # Get unique router IDs (this will trigger a small Dask compute)
    unique_routers = combined_ddf_raw[ROUTER_COL].unique().compute().tolist()
    unique_routers.sort() 
    print(f"Found {len(unique_routers)} unique routers: {unique_routers}")

    # Data structures to collect info for global EDA plots and Table 4-1
    router_inventory_data = [] 
    all_hourly_mean_per_hour_list = [] 
    
    # NEW FIX for Fig 4-2c (Flow-size Distribution): Collect GLOBAL_HISTOGRAM_SAMPLE_FRAC of total raw data once at the beginning.
    print(f"\nCollecting {GLOBAL_HISTOGRAM_SAMPLE_FRAC*100:.2f}% of total raw data for Fig 4-2c (Flow-size Distribution)...")
    try:
        # Select only the target column from the Dask DataFrame and compute the sample
        all_raw_samples_target_col_series = combined_ddf_raw[TARGET_COL].sample(frac=GLOBAL_HISTOGRAM_SAMPLE_FRAC, random_state=SEED).compute()
        print(f"  Collected {len(all_raw_samples_target_col_series)} raw flow samples for global histogram.")
    except Exception as e:
        print(f"  Error sampling raw data for flow-size histogram: {e}. Skipping Fig 4-2c. Error: {e}")
        all_raw_samples_target_col_series = pd.Series(dtype=float) # Create empty Series to avoid further errors


    # Process each router sequentially
    # The progress bar is around this loop
    for router_idx, router_label in enumerate(tqdm(unique_routers, desc="Processing Routers")):
        print(f"\n==== Starting Full Processing for Router: {router_label} ({router_idx + 1}/{len(unique_routers)}) ====")
        
        # Step 1: Filter raw data for current router (still Dask)
        ddf_single_router = combined_ddf_raw[combined_ddf_raw[ROUTER_COL] == router_label]
        
        # Collect raw data stats for Table 4-1
        num_flows_raw = ddf_single_router.shape[0].compute() 
        print(f"  Raw data for {router_label}: {num_flows_raw} records.")


        # Step 2: Dask-native Initial Clean and Hourly Resampling
        print(f"  Performing Dask-native initial cleaning and hourly resampling for {router_label}...")
        
        # Define the Dask computation graph for a single router's hourly aggregation
        # This graph will be executed when `.compute()` is called.
        ddf_hourly_router_pipeline = (
            ddf_single_router[[TIMESTAMP_COL, TARGET_COL, 't_last']] # Include t_last for accurate initial cleaning within Dask
            .assign(**{TIMESTAMP_COL: lambda df: dd.to_datetime(df[TIMESTAMP_COL], errors='coerce')})
            # NEW FIX: Also convert 't_last' to datetime if you intend to drop rows based on it
            .assign(t_last_dt=lambda df: dd.to_datetime(df['t_last'], errors='coerce'))
            .dropna(subset=[TIMESTAMP_COL]) # Drop rows where t_first is NaT
            .set_index(TIMESTAMP_COL, sorted=True) # Set index for resampling
            .resample(FREQ) # Resample to hourly
            .agg({TARGET_COL: 'sum'}) # Sum target column
        )
        
        # Add router column (still Dask)
        ddf_hourly_router_pipeline[ROUTER_COL] = router_label

        # NOW, compute this hourly aggregated Dask DataFrame into Pandas.
        # This is the crucial step: df_hourly_resampled should be small (max ~1368 rows).
        df_hourly_resampled = ddf_hourly_router_pipeline.compute() 
        print(f"  Computed hourly data for {router_label}: {len(df_hourly_resampled)} rows.")
        # Explicitly delete Dask DataFrames to free up graph memory
        del ddf_single_router, ddf_hourly_router_pipeline 
        gc.collect()

        # Pandas Post-Resampling (Interpolation/Fill)
        if df_hourly_resampled.empty:
            print(f"  Warning: Hourly resampled data for {router_label} is empty. Skipping further processing for this router.")
            num_hourly_rows = 0
            router_inventory_data.append({
                "Router ID": router_label,
                "Raw Flows (#)": num_flows_raw,
                "Hourly Rows (#)": num_hourly_rows
            })
            continue 
        
        missing_before_interp = df_hourly_resampled[TARGET_COL].isnull().sum()
        df_hourly_resampled[TARGET_COL] = df_hourly_resampled[TARGET_COL].interpolate(method='linear')
        df_hourly_resampled[TARGET_COL].fillna(0, inplace=True)
        if missing_before_interp > 0:
            print(f"  Interpolated {missing_before_interp} missing values linearly for hourly data.")
        

        # Collect hourly data stats for Table 4-1
        num_hourly_rows = len(df_hourly_resampled)
        router_inventory_data.append({
            "Router ID": router_label,
            "Raw Flows (#)": num_flows_raw,
            "Hourly Rows (#)": num_hourly_rows
        })


        # 5. Anomaly Detection (Isolation Forest) (on Pandas df_hourly_resampled)
        if df_hourly_resampled.empty or TARGET_COL not in df_hourly_resampled.columns:
            print(f"  Warning: Resampled data for {router_label} is empty or missing target column. Skipping anomaly detection.")
            df_hourly_with_anomalies = df_hourly_resampled.copy()
            df_hourly_with_anomalies[ANOM_SCORE_COL] = np.nan
            df_hourly_with_anomalies[ANOM_FLAG_COL] = 0
        else:
            df_hourly_with_anomalies = run_isolation_forest(
                df_hourly_resampled[TARGET_COL], IF_CONTAMINATION, router_label
            )
            df_hourly_with_anomalies[ROUTER_COL] = router_label
        del df_hourly_resampled 
        gc.collect()

        # Save the processed hourly data for this router for Notebook 2/3
        processed_hourly_parquet_path = PROCESSED_HOURLY_INDIVIDUAL_DIR / f"hourly_{router_label.lower().replace(' ', '_')}_processed_with_anomalies.parquet"
        print(f"  Saving processed hourly data for {router_label} to: {processed_hourly_parquet_path}")
        df_hourly_with_anomalies.to_parquet(processed_hourly_parquet_path, index=True) 
        print(f"  Processed hourly data for {router_label} saved successfully.")

        # 4. Illustrative EDA Plots (Single Router: 4-2b, 4-3a/b/c)
        if router_label == ILLUSTRATIVE_ROUTER: 
            plot_eda_single_router(df_hourly_with_anomalies.copy(), router_label, TARGET_COL)
        else:
            print(f"  Skipping single-router EDA plots for {router_label} (generating for {ILLUSTRATIVE_ROUTER} as representative).")

        del df_hourly_with_anomalies 
        gc.collect()

    # After processing all routers, plot global EDA
    print("\n==== All Routers Processed. Generating Global EDA Plots ====")
    
    # Fig 4-2a: 10-router daily curves
    if all_hourly_mean_per_hour_list:
        combined_hourly_mean_per_hour_df = pd.concat(all_hourly_mean_per_hour_list, ignore_index=True)
        # Pass the correctly collected df for plotting
        plot_global_eda(combined_hourly_mean_per_hour_df, pd.Series(dtype=float), TARGET_COL) 
    else:
        print("  No hourly mean data collected. Skipping Fig 4-2a.")
    
    # Fig 4-2c: Flow-size distribution (uses all_raw_samples_target_col_series computed earlier)
    if not all_raw_samples_target_col_series.empty:
        # Pass the correctly collected df for plotting
        plot_global_eda(pd.DataFrame(), all_raw_samples_target_col_series, TARGET_COL) 
    else:
        print("  No raw samples collected. Skipping Fig 4-2c.")

    # Print Table 4-1: Router Inventory
    print("\n--- Table 4-1: Router Inventory (Raw Data & Hourly Aggregates) ---")
    router_inventory_df = pd.DataFrame(router_inventory_data)
    inventory_cols = ["Router ID", "Raw Flows (#)", "Hourly Rows (#)"]
    router_inventory_df = router_inventory_df[inventory_cols] 
    print(router_inventory_df.to_string())

    # Final Consolidation of all hourly processed files
    print(f"\n--- Consolidating all individual hourly processed files to: {FINAL_COMBINED_HOURLY_PATH} ---")
    try:
        # Read all individual hourly processed Parquet files using Dask
        hourly_processed_files = list(PROCESSED_HOURLY_INDIVIDUAL_DIR.glob("*.parquet"))
        if not hourly_processed_files:
            print("  No individual hourly processed files found for final consolidation. Skipping.")
            return

        ddf_list_hourly = [dd.read_parquet(str(f)) for f in hourly_processed_files] # Use str(f) for Dask compatibility
        combined_ddf_hourly = dd.concat(ddf_list_hourly, axis=0, ignore_index=False) # Keep original index (timestamps)
        
        # Write to final combined hourly dataset
        combined_ddf_hourly.to_parquet(FINAL_COMBINED_HOURLY_PATH, engine="pyarrow", write_index=True)
        print(f"  Final consolidated hourly dataset saved successfully: {FINAL_COMBINED_HOURLY_PATH}")

        # Optional: Verify final combined hourly file
        print(f"\n--- Verifying final combined hourly dataset (first 5 rows) ---")
        verified_df_hourly = pd.read_parquet(FINAL_COMBINED_HOURLY_PATH)
        print(verified_df_hourly.head())
        print(f"Total rows in final combined hourly dataset: {len(verified_df_hourly)}")
        print(f"Unique routers in final combined hourly dataset: {verified_df_hourly[ROUTER_COL].unique()}")

    except Exception as e:
        print(f"Error during final hourly consolidation: {e}")
        print("Please check intermediary files and disk space.")

    print("\n--- Notebook 1 Complete: Data Ingestion, Preprocessing, and EDA ---")
    print(f"Check the '{OUT_DIR}' directory for processed hourly Parquet files and figures.")

# Execute the main pipeline for this notebook
if __name__ == "__main__":
    main()

Environment setup complete.
Configuration loaded.
Input combined raw data: /mnt/nrdstor/ramamurthy/mhnarfth/internet2/parquet/combined/all_routers_combined.parquet_dataset
Outputs will be saved to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4 (figs: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/figs)
Processed individual hourly files to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/processed_hourly_individual
Final combined hourly dataset to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/all_routers_hourly_processed_with_anomalies.parquet_dataset
Anomaly Detection contamination (initial): 0.01

--- Initiating Notebook 1: Data Ingestion, Preprocessing, and EDA (Multi-Router) ---

Loading combined raw data from: /mnt/nrdstor/ramamurthy/mhnarfth/internet2/parquet/combined/all_routers_combined.parquet_dataset
Loaded Dask DataFrame (lazy): 55 partitions.
Found 10 unique routers: ['atlanta', 'batonrouge', 'boston', 'dallas', 'elpaso', 'jackson', 'jackson

Processing Routers:   0%|          | 0/10 [00:00<?, ?it/s]


==== Starting Full Processing for Router: atlanta (1/10) ====
  Raw data for atlanta: 28032240 records.
  Performing Dask-native initial cleaning and hourly resampling for atlanta...
  Computed hourly data for atlanta: 1369 rows.
  Running Isolation Forest for Anomaly Detection for atlanta...


Processing Routers:  10%|█         | 1/10 [03:35<32:16, 215.18s/it]

  Isolation Forest detection complete for atlanta. Found 13 anomalies.
  Saving processed hourly data for atlanta to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/processed_hourly_individual/hourly_atlanta_processed_with_anomalies.parquet
  Processed hourly data for atlanta saved successfully.
  Skipping single-router EDA plots for atlanta (generating for elpaso as representative).

==== Starting Full Processing for Router: batonrouge (2/10) ====
  Raw data for batonrouge: 80056602 records.
  Performing Dask-native initial cleaning and hourly resampling for batonrouge...
  Computed hourly data for batonrouge: 1343 rows.
  Running Isolation Forest for Anomaly Detection for batonrouge...


Processing Routers:  20%|██        | 2/10 [08:20<34:11, 256.45s/it]

  Isolation Forest detection complete for batonrouge. Found 14 anomalies.
  Saving processed hourly data for batonrouge to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/processed_hourly_individual/hourly_batonrouge_processed_with_anomalies.parquet
  Processed hourly data for batonrouge saved successfully.
  Skipping single-router EDA plots for batonrouge (generating for elpaso as representative).

==== Starting Full Processing for Router: boston (3/10) ====
  Raw data for boston: 392891 records.
  Performing Dask-native initial cleaning and hourly resampling for boston...
  Computed hourly data for boston: 789 rows.
  Running Isolation Forest for Anomaly Detection for boston...


Processing Routers:  30%|███       | 3/10 [11:27<26:14, 224.92s/it]

  Isolation Forest detection complete for boston. Found 8 anomalies.
  Saving processed hourly data for boston to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/processed_hourly_individual/hourly_boston_processed_with_anomalies.parquet
  Processed hourly data for boston saved successfully.
  Skipping single-router EDA plots for boston (generating for elpaso as representative).

==== Starting Full Processing for Router: dallas (4/10) ====
  Raw data for dallas: 149784626 records.
  Performing Dask-native initial cleaning and hourly resampling for dallas...
  Computed hourly data for dallas: 1322 rows.
  Running Isolation Forest for Anomaly Detection for dallas...


Processing Routers:  40%|████      | 4/10 [17:14<27:17, 272.98s/it]

  Isolation Forest detection complete for dallas. Found 14 anomalies.
  Saving processed hourly data for dallas to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/processed_hourly_individual/hourly_dallas_processed_with_anomalies.parquet
  Processed hourly data for dallas saved successfully.
  Skipping single-router EDA plots for dallas (generating for elpaso as representative).

==== Starting Full Processing for Router: elpaso (5/10) ====
  Raw data for elpaso: 9357137 records.
  Performing Dask-native initial cleaning and hourly resampling for elpaso...
  Computed hourly data for elpaso: 1378 rows.
  Running Isolation Forest for Anomaly Detection for elpaso...
  Isolation Forest detection complete for elpaso. Found 14 anomalies.
  Saving processed hourly data for elpaso to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/processed_hourly_individual/hourly_elpaso_processed_with_anomalies.parquet
  Processed hourly data for elpaso saved successfully.

--- Generating Sing

Processing Routers:  50%|█████     | 5/10 [20:26<20:19, 243.88s/it]

  Saved 4_3c_anomaly_counts_by_day_elpaso.png and 4_3c_anomaly_counts_by_day_elpaso.pdf

==== Starting Full Processing for Router: jackson (6/10) ====
  Raw data for jackson: 13401496 records.
  Performing Dask-native initial cleaning and hourly resampling for jackson...
  Computed hourly data for jackson: 1369 rows.
  Running Isolation Forest for Anomaly Detection for jackson...


Processing Routers:  60%|██████    | 6/10 [23:41<15:08, 227.07s/it]

  Isolation Forest detection complete for jackson. Found 14 anomalies.
  Saving processed hourly data for jackson to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/processed_hourly_individual/hourly_jackson_processed_with_anomalies.parquet
  Processed hourly data for jackson saved successfully.
  Skipping single-router EDA plots for jackson (generating for elpaso as representative).

==== Starting Full Processing for Router: jacksonville (7/10) ====
  Raw data for jacksonville: 19425447 records.
  Performing Dask-native initial cleaning and hourly resampling for jacksonville...
  Computed hourly data for jacksonville: 1338 rows.
  Running Isolation Forest for Anomaly Detection for jacksonville...


Processing Routers:  70%|███████   | 7/10 [27:03<10:56, 218.84s/it]

  Isolation Forest detection complete for jacksonville. Found 14 anomalies.
  Saving processed hourly data for jacksonville to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/processed_hourly_individual/hourly_jacksonville_processed_with_anomalies.parquet
  Processed hourly data for jacksonville saved successfully.
  Skipping single-router EDA plots for jacksonville (generating for elpaso as representative).

==== Starting Full Processing for Router: louisville (8/10) ====
  Raw data for louisville: 3150312 records.
  Performing Dask-native initial cleaning and hourly resampling for louisville...
  Computed hourly data for louisville: 1466 rows.
  Running Isolation Forest for Anomaly Detection for louisville...


Processing Routers:  80%|████████  | 8/10 [30:04<06:53, 206.91s/it]

  Isolation Forest detection complete for louisville. Found 15 anomalies.
  Saving processed hourly data for louisville to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/processed_hourly_individual/hourly_louisville_processed_with_anomalies.parquet
  Processed hourly data for louisville saved successfully.
  Skipping single-router EDA plots for louisville (generating for elpaso as representative).

==== Starting Full Processing for Router: phoenix (9/10) ====
  Raw data for phoenix: 8765751 records.
  Performing Dask-native initial cleaning and hourly resampling for phoenix...
  Computed hourly data for phoenix: 1369 rows.
  Running Isolation Forest for Anomaly Detection for phoenix...


Processing Routers:  90%|█████████ | 9/10 [33:21<03:23, 203.87s/it]

  Isolation Forest detection complete for phoenix. Found 14 anomalies.
  Saving processed hourly data for phoenix to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/processed_hourly_individual/hourly_phoenix_processed_with_anomalies.parquet
  Processed hourly data for phoenix saved successfully.
  Skipping single-router EDA plots for phoenix (generating for elpaso as representative).

==== Starting Full Processing for Router: reno (10/10) ====
  Raw data for reno: 35499401 records.
  Performing Dask-native initial cleaning and hourly resampling for reno...
  Computed hourly data for reno: 861 rows.
  Running Isolation Forest for Anomaly Detection for reno...


Processing Routers: 100%|██████████| 10/10 [37:07<00:00, 222.76s/it]

  Isolation Forest detection complete for reno. Found 9 anomalies.
  Saving processed hourly data for reno to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/processed_hourly_individual/hourly_reno_processed_with_anomalies.parquet
  Processed hourly data for reno saved successfully.
  Skipping single-router EDA plots for reno (generating for elpaso as representative).

==== All Routers Processed. Generating Global EDA Plots ====
  No hourly mean data collected. Skipping Fig 4-2a.

--- Generating Global EDA Plots (Figures 4-2a, 4-2c) ---
  No hourly mean data collected. Skipping Fig 4-2a.
  Generating Figure 4-2c (Flow Size Distribution - All Routers Sample)...





  Saved 4_2c_flow_size_histogram_all_routers.png and 4_2c_flow_size_histogram_all_routers.pdf

--- Table 4-1: Router Inventory (Raw Data & Hourly Aggregates) ---
      Router ID  Raw Flows (#)  Hourly Rows (#)
0       atlanta       28032240             1369
1    batonrouge       80056602             1343
2        boston         392891              789
3        dallas      149784626             1322
4        elpaso        9357137             1378
5       jackson       13401496             1369
6  jacksonville       19425447             1338
7    louisville        3150312             1466
8       phoenix        8765751             1369
9          reno       35499401              861

--- Consolidating all individual hourly processed files to: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/all_routers_hourly_processed_with_anomalies.parquet_dataset ---
  Final consolidated hourly dataset saved successfully: /home/ramamurthy/mhnarfth/network_analysis/outputs/ch4/all_routers_hourly_