In [2]:
#
#  notebooks/01_ETL.ipynb
#

################################################################################
#
# Notebook 1: Data ETL (Extraction, Transformation, Loading)
#
# --- PURPOSE ---
# This notebook performs the foundational data preparation for the entire study.
# Its sole purpose is to:
#   1. Load the raw, un-aggregated NetFlow data for a specific set of routers.
#   2. Perform essential cleaning (data types, handling missing values).
#   3. Consolidate the data from all selected routers into a single DataFrame.
#   4. Save this single, clean "golden source" file back to NRDstor.
#
# --- WORKFLOW ---
# This notebook should be run ONLY ONCE for the entire project. Its output is the
# primary input for all subsequent notebooks.
#
################################################################################


#######################################################################
# CELL 1: ENVIRONMENT SETUP & CONFIGURATION (KNOBS)
#######################################################################

# --- Imports ---
import os
import warnings
from pathlib import Path
import gc # For memory management

import pandas as pd
import dask
import dask.dataframe as dd
from tqdm.auto import tqdm # For progress bars

print("--- Environment Setup ---")
print(f"Pandas version: {pd.__version__}")
print(f"Dask version: {dask.__version__}")

# --- Suppress Warnings for Cleaner Output ---
warnings.filterwarnings("ignore")

# ---------------------------------------------------------------------------------
# >>> CONFIGURATION KNOBS <<<
# Adjust these parameters to control the ETL process.
# ---------------------------------------------------------------------------------

# --- Input Path ---
# Directory on NRDstor containing the original raw Parquet files (one per router).
INPUT_RAW_DATA_DIR = Path("/mnt/nrdstor/ramamurthy/mhnarfth/internet2/parquet")

# --- Output Paths ---
# Directory on NRDstor where the final processed "golden source" file will be saved.
OUTPUT_PROCESSED_DIR = Path("/mnt/nrdstor/ramamurthy/mhnarfth/internet2/journal_extension_data/")
OUTPUT_FILENAME = "journal_study_4r_57d_unaggregated.parquet"

# --- Router Selection ---
# The specific set of routers to process for this study.
# Ensure names are lowercase to match the filenames (e.g., 'dallas.parquet').
ROUTERS_TO_PROCESS = [
    'dallas',
    'atlanta',
    'elpaso',
    'boston'
]

# --- Column Selection ---
# The essential columns to keep from the raw data.
COLUMNS_TO_KEEP = ['t_first', 'in_packets']

# ---------------------------------------------------------------------------------
# --- Configuration Verification ---
# ---------------------------------------------------------------------------------
PROCESSED_DATA_PATH = OUTPUT_PROCESSED_DIR / OUTPUT_FILENAME

print("\n--- Configuration Loaded ---")
print(f"Input Raw Data Directory: {INPUT_RAW_DATA_DIR}")
print(f"Output Processed File Path: {PROCESSED_DATA_PATH}")
print(f"Routers to be Processed ({len(ROUTERS_TO_PROCESS)}): {ROUTERS_TO_PROCESS}")
print("--------------------------------\n")


#######################################################################
# CELL 2: ETL PROCESSING LOOP
#######################################################################

print("--- Starting ETL Process ---")

# Initialize an empty list to hold the processed Pandas DataFrames for each router
list_of_processed_dfs = []

# Use tqdm for a progress bar over the router list
for router_name in tqdm(ROUTERS_TO_PROCESS, desc="Processing Routers"):
    print(f"\n[INFO] Processing router: '{router_name}'...")
    
    # Construct the full path to the input file for the current router
    input_file_path = INPUT_RAW_DATA_DIR / f"{router_name}.parquet"

    if not os.path.exists(input_file_path):
        print(f"  [WARNING] File not found for router '{router_name}' at: {input_file_path}. Skipping.")
        continue

    try:
        # Step 1: Load the raw data using Dask for memory efficiency
        print(f"  Step 1/4: Loading raw data from {input_file_path.name}...")
        ddf = dd.read_parquet(input_file_path, columns=COLUMNS_TO_KEEP)

        # Step 2: Perform essential cleaning and transformation
        print("  Step 2/4: Cleaning data and adding 'router' column...")
        # Convert timestamp column, coercing errors to NaT (Not a Time)
        ddf['t_first'] = dd.to_datetime(ddf['t_first'], errors='coerce')
        # Drop rows where the timestamp could not be parsed
        ddf = ddf.dropna(subset=['t_first'])
        # Add the router name as a new column for identification
        ddf['router'] = router_name
        
        # Rename 't_first' to 'timestamp' for clarity in subsequent notebooks
        ddf = ddf.rename(columns={'t_first': 'timestamp'})

        # Step 3: Compute the Dask DataFrame to a Pandas DataFrame
        # This is the point where the data is actually loaded into memory for this router.
        print("  Step 3/4: Computing Dask DataFrame to Pandas...")
        pdf = ddf.compute()

        # Step 4: Append the processed Pandas DataFrame to our list
        if not pdf.empty:
            list_of_processed_dfs.append(pdf)
            print(f"  Step 4/4: Success! Processed {len(pdf):,} rows for '{router_name}'.")
        else:
            print(f"  [WARNING] No valid data found for router '{router_name}' after processing. Skipping.")

    except Exception as e:
        print(f"  [ERROR] An unexpected error occurred while processing router '{router_name}': {e}. Skipping.")

    # Explicitly clean up memory after each router is processed
    del ddf
    if 'pdf' in locals():
        del pdf
    gc.collect()

print("\n--- All routers processed. ---")


#######################################################################
# CELL 3: CONSOLIDATION AND SAVING
#######################################################################

print("\n--- Consolidating data from all processed routers... ---")

if not list_of_processed_dfs:
    print("[ERROR] No data was successfully processed. The 'list_of_processed_dfs' is empty. Exiting.")
else:
    # Step 1: Concatenate all individual DataFrames into one master DataFrame
    master_df = pd.concat(list_of_processed_dfs, ignore_index=True)
    
    # Free up memory by deleting the list of individual DataFrames
    del list_of_processed_dfs
    gc.collect()
    
    print(f"  Successfully concatenated data. Final DataFrame contains {len(master_df):,} total rows.")
    print(f"  Final DataFrame memory usage: {master_df.memory_usage(deep=True).sum() / 1e9:.2f} GB")
    
    # Display summary and a preview of the final dataset
    print("\n--- Final Consolidated DataFrame Info ---")
    master_df.info()
    print("\n--- Data Preview ---")
    print(master_df.head())
    
    # Step 2: Ensure the output directory exists
    print(f"\n--- Saving final 'golden source' file... ---")
    try:
        OUTPUT_PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
        
        # Step 3: Save the master DataFrame to a single Parquet file
        # Parquet is efficient for storage and fast for loading later.
        master_df.to_parquet(PROCESSED_DATA_PATH, index=False, engine='pyarrow', compression='snappy')
        
        print(f"\n✅ SUCCESS! ✅")
        print(f"Golden source file was saved successfully to:")
        print(f"{PROCESSED_DATA_PATH}")

    except Exception as e:
        print(f"\n[FATAL ERROR] Failed to save the final Parquet file: {e}")
        print("Please check your permissions and the path for the NRDstor directory.")

print("\n--- Notebook 01_ETL.ipynb Complete ---")

--- Environment Setup ---
Pandas version: 2.2.3
Dask version: 2025.5.1

--- Configuration Loaded ---
Input Raw Data Directory: /mnt/nrdstor/ramamurthy/mhnarfth/internet2/parquet
Output Processed File Path: /mnt/nrdstor/ramamurthy/mhnarfth/internet2/journal_extension_data/journal_study_4r_57d_unaggregated.parquet
Routers to be Processed (4): ['dallas', 'atlanta', 'elpaso', 'boston']
--------------------------------

--- Starting ETL Process ---


Processing Routers:   0%|          | 0/4 [00:00<?, ?it/s]


[INFO] Processing router: 'dallas'...
  Step 1/4: Loading raw data from dallas.parquet...
  Step 2/4: Cleaning data and adding 'router' column...
  Step 3/4: Computing Dask DataFrame to Pandas...


Processing Routers:  25%|██▌       | 1/4 [00:40<02:01, 40.35s/it]

  Step 4/4: Success! Processed 149,784,626 rows for 'dallas'.

[INFO] Processing router: 'atlanta'...
  Step 1/4: Loading raw data from atlanta.parquet...
  Step 2/4: Cleaning data and adding 'router' column...
  Step 3/4: Computing Dask DataFrame to Pandas...


Processing Routers:  50%|█████     | 2/4 [00:48<00:42, 21.40s/it]

  Step 4/4: Success! Processed 28,032,240 rows for 'atlanta'.

[INFO] Processing router: 'elpaso'...
  Step 1/4: Loading raw data from elpaso.parquet...
  Step 2/4: Cleaning data and adding 'router' column...
  Step 3/4: Computing Dask DataFrame to Pandas...


Processing Routers:  75%|███████▌  | 3/4 [00:51<00:13, 13.06s/it]

  Step 4/4: Success! Processed 9,357,137 rows for 'elpaso'.

[INFO] Processing router: 'boston'...
  Step 1/4: Loading raw data from boston.parquet...
  Step 2/4: Cleaning data and adding 'router' column...
  Step 3/4: Computing Dask DataFrame to Pandas...


Processing Routers: 100%|██████████| 4/4 [00:51<00:00, 12.99s/it]

  Step 4/4: Success! Processed 392,891 rows for 'boston'.

--- All routers processed. ---

--- Consolidating data from all processed routers... ---





  Successfully concatenated data. Final DataFrame contains 187,566,894 total rows.
  Final DataFrame memory usage: 14.85 GB

--- Final Consolidated DataFrame Info ---
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 187566894 entries, 0 to 187566893
Data columns (total 3 columns):
 #   Column      Dtype         
---  ------      -----         
 0   timestamp   datetime64[ns]
 1   in_packets  int64         
 2   router      object        
dtypes: datetime64[ns](1), int64(1), object(1)
memory usage: 4.2+ GB

--- Data Preview ---
                timestamp  in_packets  router
0 2021-10-08 00:03:46.560        5000  dallas
1 2021-10-08 00:00:56.640      170000  dallas
2 2021-10-08 00:02:55.616        5000  dallas
3 2021-10-08 00:01:39.840        5000  dallas
4 2021-10-08 00:03:04.576       10000  dallas

--- Saving final 'golden source' file... ---

✅ SUCCESS! ✅
Golden source file was saved successfully to:
/mnt/nrdstor/ramamurthy/mhnarfth/internet2/journal_extension_data/journal_study_4r_5

In [3]:
#######################################################################
# CELL 4: DATA VERIFICATION
#######################################################################

print("\n--- Verifying Data Distribution in Final DataFrame ---")

if 'master_df' in locals() and not master_df.empty:
    # Use value_counts() to get the number of rows for each unique router.
    # This is the most direct way to confirm all routers are present.
    router_counts = master_df['router'].value_counts()
    
    print("Row count per router in the final consolidated DataFrame:")
    print(router_counts)
    
    # Optional: A small check to confirm if the number of unique routers matches our input list
    if len(router_counts) == len(ROUTERS_TO_PROCESS):
        print("\n[SUCCESS] Verification passed: The number of unique routers in the DataFrame matches the input list.")
    else:
        print("\n[WARNING] Verification mismatch: The number of unique routers does not match the input list.")
        print(f"  Expected: {len(ROUTERS_TO_PROCESS)} routers")
        print(f"  Found: {len(router_counts)} routers")
else:
    print("[ERROR] 'master_df' not found or is empty. Cannot perform verification.")


--- Verifying Data Distribution in Final DataFrame ---
Row count per router in the final consolidated DataFrame:
router
dallas     149784626
atlanta     28032240
elpaso       9357137
boston        392891
Name: count, dtype: int64

[SUCCESS] Verification passed: The number of unique routers in the DataFrame matches the input list.
