In [None]:
# --- Definitive Drift-Focused Preprocessing Script ---

# Step 1: Install necessary libraries
!pip install vitaldb boto3 pandas numpy tqdm pyarrow scikit-learn --quiet

import os
import pandas as pd
import numpy as np
import boto3
from botocore import UNSIGNED
from botocore.client import Config
from tqdm.notebook import tqdm
from google.colab import drive
import vitaldb
import glob
import warnings

# Ignore common runtime warnings and the SettingWithCopyWarning
warnings.filterwarnings("ignore", category=RuntimeWarning)
pd.options.mode.chained_assignment = None

# --- Setup Google Drive and Local Directories ---
drive.mount('/content/drive')
DRIVE_PROJECT_ROOT = '/content/drive/MyDrive/VitalDB_Drift_Focused_Dataset'
os.makedirs(DRIVE_PROJECT_ROOT, exist_ok=True)
DRIVE_BATCH_OUTPUT_DIR = os.path.join(DRIVE_PROJECT_ROOT, 'preprocessed_batches')
os.makedirs(DRIVE_BATCH_OUTPUT_DIR, exist_ok=True)
VITAL_FILES_LOCAL_DIR = '/content/vital_files_temp'
os.makedirs(VITAL_FILES_LOCAL_DIR, exist_ok=True)

# --- Configuration ---
TOTAL_PATIENTS = 6388
BATCH_SIZE = 100
# We use the same base features, as our new features are derived from them
BASE_FEATURE_SET = [
    'BIS/BIS', 'BIS/EMG', 'BIS/SEF', 'BIS/SR',
    'Orchestra/PPF20_VOL', 'Orchestra/RFTN50_RATE',
    'Solar8000/HR', 'Solar8000/ART_MBP', 'Solar8000/RR_CO2'
]

print("--- Starting DRIFT-FOCUSED Batched Preprocessing ---")

# --- S3 and Resumption Logic ---
s3_client = boto3.client('s3', config=Config(signature_version=UNSIGNED))
S3_BUCKET_NAME, S3_BASE_KEY = 'physionet-open', 'vitaldb/1.0.0/vital_files/'
existing_batch_files = glob.glob(os.path.join(DRIVE_BATCH_OUTPUT_DIR, 'batch_*.parquet'))
processed_ids = set()
for f in existing_batch_files:
    try:
        parts = os.path.basename(f).replace('.parquet', '').split('_')[1].split('-')
        start, end = int(parts[0]), int(parts[1])
        processed_ids.update(range(start, end + 1))
    except (IndexError, ValueError):
        print(f"Could not parse filename {f}, continuing...")
print(f"Found {len(existing_batch_files)} existing batch files, covering {len(processed_ids)} patients.")

# --- Main Processing Loop ---
for batch_start_id in range(1, TOTAL_PATIENTS + 1, BATCH_SIZE):
    batch_end_id = min(batch_start_id + BATCH_SIZE - 1, TOTAL_PATIENTS)
    if batch_start_id in processed_ids and batch_end_id in processed_ids:
        print(f"--- Batch {batch_start_id}-{batch_end_id} already processed. Skipping. ---")
        continue

    print(f"\n--- Starting new batch: Patients {batch_start_id} to {batch_end_id} ---")
    ids_to_process_this_batch = list(range(batch_start_id, batch_end_id + 1))

    # Download phase
    for patient_id in tqdm(ids_to_process_this_batch, desc=f"Downloading files for batch {batch_start_id}-{batch_end_id}"):
        file_name = f'{patient_id:04d}.vital'
        local_file_path = os.path.join(VITAL_FILES_LOCAL_DIR, file_name)
        if not os.path.exists(local_file_path):
            try: s3_client.download_file(S3_BUCKET_NAME, os.path.join(S3_BASE_KEY, file_name), local_file_path)
            except Exception: pass

    # Processing and Feature Engineering phase
    batch_dfs = []
    for patient_id in tqdm(ids_to_process_this_batch, desc=f"Processing & Engineering for batch {batch_start_id}-{batch_end_id}"):
        file_path = os.path.join(VITAL_FILES_LOCAL_DIR, f'{patient_id:04d}.vital')
        if not os.path.exists(file_path): continue
        try:
            vf = vitaldb.VitalFile(file_path)
            available_tracks = vf.get_track_names()
            tracks_to_load = [track for track in BASE_FEATURE_SET if track in available_tracks]
            if 'BIS/BIS' not in tracks_to_load: continue
            df = vf.to_pandas(tracks_to_load, interval=1)

            df_clean = df[(df['BIS/BIS'] >= 1) & (df['BIS/BIS'] <= 100)].copy()
            if df_clean.empty: continue
            df_clean.ffill(inplace=True); df_clean.bfill(inplace=True)
            if df_clean.isnull().values.any(): continue

            # --- DRIFT-FOCUSED FEATURE ENGINEERING ---
            # 1. Standard Lag and Rolling Features
            for col in ['BIS/BIS', 'BIS/EMG', 'BIS/SEF', 'BIS/SR']:
                if col in df_clean.columns:
                    for lag in [10, 60, 300, 600]:
                        df_clean[f'{col}_lag_{lag}s'] = df_clean[col].shift(lag)
            for col in ['Solar8000/HR', 'BIS/EMG']:
                 if col in df_clean.columns:
                    df_clean[f'{col}_std_300s'] = df_clean[col].rolling(window=300, min_periods=30).std()
                    df_clean[f'{col}_mean_300s'] = df_clean[col].rolling(window=300, min_periods=30).mean()
                    df_clean[f'{col}_delta_from_mean_300s'] = df_clean[col] - df_clean[f'{col}_mean_300s']

            # 2. NEW "Drift Alarm" Features
            df_clean['Reversion_Pressure'] = 45 - df_clean['BIS/BIS']
            if 'BIS/EMG_delta_from_mean_300s' in df_clean.columns:
                df_clean['Tension_Index'] = df_clean['BIS/EMG_delta_from_mean_300s'] / (df_clean['BIS/BIS'] + 1e-6)
            if 'Orchestra/RFTN50_RATE' in df_clean.columns and 'BIS/SR' in df_clean.columns:
                df_clean['Drug_Suppression_Index'] = df_clean['Orchestra/RFTN50_RATE'] * df_clean['BIS/SR']

            df_clean.dropna(inplace=True)
            if not df_clean.empty:
                df_clean['patient_id'] = patient_id
                df_clean.reset_index(inplace=True); df_clean.rename(columns={'index': 'Time'}, inplace=True)
                batch_dfs.append(df_clean)
        except Exception as e: print(f"\nERROR processing file {patient_id:04d}: {e}")

    if batch_dfs:
        final_df = pd.concat(batch_dfs, ignore_index=True)
        output_filename = f"batch_{batch_start_id:04d}-{batch_end_id:04d}.parquet"
        output_path = os.path.join(DRIVE_BATCH_OUTPUT_DIR, output_filename)
        final_df.to_parquet(output_path, engine='pyarrow')
        print(f"--- Successfully saved batch to: {output_path} ---")

    for patient_id in ids_to_process_this_batch:
        try: os.remove(os.path.join(VITAL_FILES_LOCAL_DIR, f'{patient_id:04d}.vital'))
        except OSError: pass
    print("--- Cleaned up local files for this batch. ---")

print("\n--- All Batches Processed ---")

Mounted at /content/drive
--- Starting DRIFT-FOCUSED Batched Preprocessing ---
Found 51 existing batch files, covering 5100 patients.
--- Batch 1-100 already processed. Skipping. ---
--- Batch 101-200 already processed. Skipping. ---
--- Batch 201-300 already processed. Skipping. ---
--- Batch 301-400 already processed. Skipping. ---
--- Batch 401-500 already processed. Skipping. ---
--- Batch 501-600 already processed. Skipping. ---
--- Batch 601-700 already processed. Skipping. ---
--- Batch 701-800 already processed. Skipping. ---
--- Batch 801-900 already processed. Skipping. ---
--- Batch 901-1000 already processed. Skipping. ---
--- Batch 1001-1100 already processed. Skipping. ---
--- Batch 1101-1200 already processed. Skipping. ---
--- Batch 1201-1300 already processed. Skipping. ---
--- Batch 1301-1400 already processed. Skipping. ---
--- Batch 1401-1500 already processed. Skipping. ---
--- Batch 1501-1600 already processed. Skipping. ---
--- Batch 1601-1700 already processed. 

Downloading files for batch 5101-5200:   0%|          | 0/100 [00:00<?, ?it/s]

Processing & Engineering for batch 5101-5200:   0%|          | 0/100 [00:00<?, ?it/s]

--- Successfully saved batch to: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches/batch_5101-5200.parquet ---
--- Cleaned up local files for this batch. ---

--- Starting new batch: Patients 5201 to 5300 ---


Downloading files for batch 5201-5300:   0%|          | 0/100 [00:00<?, ?it/s]

Processing & Engineering for batch 5201-5300:   0%|          | 0/100 [00:00<?, ?it/s]

--- Successfully saved batch to: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches/batch_5201-5300.parquet ---
--- Cleaned up local files for this batch. ---

--- Starting new batch: Patients 5301 to 5400 ---


Downloading files for batch 5301-5400:   0%|          | 0/100 [00:00<?, ?it/s]

Processing & Engineering for batch 5301-5400:   0%|          | 0/100 [00:00<?, ?it/s]

--- Successfully saved batch to: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches/batch_5301-5400.parquet ---
--- Cleaned up local files for this batch. ---

--- Starting new batch: Patients 5401 to 5500 ---


Downloading files for batch 5401-5500:   0%|          | 0/100 [00:00<?, ?it/s]

Processing & Engineering for batch 5401-5500:   0%|          | 0/100 [00:00<?, ?it/s]

--- Successfully saved batch to: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches/batch_5401-5500.parquet ---
--- Cleaned up local files for this batch. ---

--- Starting new batch: Patients 5501 to 5600 ---


Downloading files for batch 5501-5600:   0%|          | 0/100 [00:00<?, ?it/s]

Processing & Engineering for batch 5501-5600:   0%|          | 0/100 [00:00<?, ?it/s]

--- Successfully saved batch to: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches/batch_5501-5600.parquet ---
--- Cleaned up local files for this batch. ---

--- Starting new batch: Patients 5601 to 5700 ---


Downloading files for batch 5601-5700:   0%|          | 0/100 [00:00<?, ?it/s]

Processing & Engineering for batch 5601-5700:   0%|          | 0/100 [00:00<?, ?it/s]

--- Successfully saved batch to: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches/batch_5601-5700.parquet ---
--- Cleaned up local files for this batch. ---

--- Starting new batch: Patients 5701 to 5800 ---


Downloading files for batch 5701-5800:   0%|          | 0/100 [00:00<?, ?it/s]

Processing & Engineering for batch 5701-5800:   0%|          | 0/100 [00:00<?, ?it/s]

--- Successfully saved batch to: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches/batch_5701-5800.parquet ---
--- Cleaned up local files for this batch. ---

--- Starting new batch: Patients 5801 to 5900 ---


Downloading files for batch 5801-5900:   0%|          | 0/100 [00:00<?, ?it/s]

Processing & Engineering for batch 5801-5900:   0%|          | 0/100 [00:00<?, ?it/s]

--- Successfully saved batch to: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches/batch_5801-5900.parquet ---
--- Cleaned up local files for this batch. ---

--- Starting new batch: Patients 5901 to 6000 ---


Downloading files for batch 5901-6000:   0%|          | 0/100 [00:00<?, ?it/s]

Processing & Engineering for batch 5901-6000:   0%|          | 0/100 [00:00<?, ?it/s]

--- Successfully saved batch to: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches/batch_5901-6000.parquet ---
--- Cleaned up local files for this batch. ---

--- Starting new batch: Patients 6001 to 6100 ---


Downloading files for batch 6001-6100:   0%|          | 0/100 [00:00<?, ?it/s]

Processing & Engineering for batch 6001-6100:   0%|          | 0/100 [00:00<?, ?it/s]

--- Successfully saved batch to: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches/batch_6001-6100.parquet ---
--- Cleaned up local files for this batch. ---

--- Starting new batch: Patients 6101 to 6200 ---


Downloading files for batch 6101-6200:   0%|          | 0/100 [00:00<?, ?it/s]

Processing & Engineering for batch 6101-6200:   0%|          | 0/100 [00:00<?, ?it/s]

--- Successfully saved batch to: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches/batch_6101-6200.parquet ---
--- Cleaned up local files for this batch. ---

--- Starting new batch: Patients 6201 to 6300 ---


Downloading files for batch 6201-6300:   0%|          | 0/100 [00:00<?, ?it/s]

Processing & Engineering for batch 6201-6300:   0%|          | 0/100 [00:00<?, ?it/s]

--- Successfully saved batch to: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches/batch_6201-6300.parquet ---
--- Cleaned up local files for this batch. ---

--- Starting new batch: Patients 6301 to 6388 ---


Downloading files for batch 6301-6388:   0%|          | 0/88 [00:00<?, ?it/s]

Processing & Engineering for batch 6301-6388:   0%|          | 0/88 [00:00<?, ?it/s]

--- Successfully saved batch to: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches/batch_6301-6388.parquet ---
--- Cleaned up local files for this batch. ---

--- All Batches Processed ---


In [None]:
# --- The Ultimate Test: Finding Leading Indicators of Drift ---

import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from google.colab import drive
import os

# --- Setup and Load ---
drive.mount('/content/drive')

# Define the path to your ULTIMATE preprocessed batch file
DRIVE_PROJECT_ROOT = '/content/drive/MyDrive/VitalDB_Ultimate_Dataset'
BATCH_FILE_PATH = os.path.join(DRIVE_PROJECT_ROOT, 'preprocessed_batches', 'batch_0001-0100.parquet')

print(f"--- Loading ULTIMATE preprocessed batch from: {BATCH_FILE_PATH} ---")

if not os.path.exists(BATCH_FILE_PATH):
    print("\nERROR: Batch file not found. Please check the path.")
else:
    # Load the dataset
    df = pd.read_parquet(BATCH_FILE_PATH)
    print(f"Successfully loaded {len(df)} data points from the batch.")

    # --- THE CRITICAL STEP: Engineer the "Future Drift" Target Variable ---
    print("\n--- Engineering the 'Future Drift' target variable... ---")

    # We must group by patient to prevent data leakage from one patient to the next
    # A negative shift brings future values to the current row
    df['BIS_future_30s'] = df.groupby('patient_id')['BIS/BIS'].shift(-30)
    df['BIS_future_180s'] = df.groupby('patient_id')['BIS/BIS'].shift(-180)

    # Calculate the drift (the change from now to the future)
    df['BIS_drift_30s'] = df['BIS_future_30s'] - df['BIS/BIS']
    df['BIS_drift_180s'] = df['BIS_future_180s'] - df['BIS/BIS']

    # Drop rows where we can't calculate a future value (i.e., the end of each patient's data)
    df.dropna(subset=['BIS_drift_30s', 'BIS_drift_180s'], inplace=True)
    print(f"Remaining data points after creating drift targets: {len(df)}")

    # --- Final Correlation Analysis ---
    print("\n--- Correlating all features with FUTURE DRIFT... ---")

    # Calculate correlations against our new drift targets
    correlations_30s = df.corr()['BIS_drift_30s'].dropna()
    correlations_180s = df.corr()['BIS_drift_180s'].dropna()

    # Combine into a single, powerful results DataFrame
    results_df = pd.DataFrame({
        'Corr_with_30s_Drift': correlations_30s,
        'Corr_with_180s_Drift': correlations_180s
    }).dropna()

    # Sort by the absolute correlation with the 30-second drift to find the strongest leading indicators
    # A strong negative correlation is just as valuable as a strong positive one.
    sorted_results = results_df.reindex(results_df['Corr_with_30s_Drift'].abs().sort_values(ascending=False).index)

    print("\n--- Top 20 LEADING INDICATORS for BIS Drift ---")
    print(sorted_results.head(20))

    print("\n--- Analysis Complete ---")
    print("The features at the top of this list are the ones that best predict where the BIS score is GOING.")
    print("These are the features that capture the 'whispers' before the state changes.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
--- Loading ULTIMATE preprocessed batch from: /content/drive/MyDrive/VitalDB_Ultimate_Dataset/preprocessed_batches/batch_0001-0100.parquet ---
Successfully loaded 981492 data points from the batch.

--- Engineering the 'Future Drift' target variable... ---
Remaining data points after creating drift targets: 965652

--- Correlating all features with FUTURE DRIFT... ---

--- Top 20 LEADING INDICATORS for BIS Drift ---
                              Corr_with_30s_Drift  Corr_with_180s_Drift
BIS_drift_30s                            1.000000              0.389580
BIS_drift_180s                           0.389580              1.000000
BIS_future_30s                           0.248279             -0.095867
BIS/BIS                                 -0.215221             -0.278003
BIS/BIS_lag_10s                         -0.140543             -0.223961
BIS/BIS_lag_30s    

---


In [None]:
# --- One-Time Schema Patching Script (Version 2 - Fixes Duplicates) ---

import pandas as pd
import numpy as np
from google.colab import drive
import os
import glob
from tqdm.notebook import tqdm

# --- Setup and Configuration ---
drive.mount('/content/drive')

# Define the path to your existing preprocessed batches
DRIVE_PROJECT_ROOT = '/content/drive/MyDrive/VitalDB_Drift_Focused_Dataset'
BATCH_DIR_PATH = os.path.join(DRIVE_PROJECT_ROOT, 'preprocessed_batches')

# --- Define the MASTER SCHEMA (without Time and patient_id, which are handled separately) ---
MASTER_FEATURE_SCHEMA = [
    # Base Features
    'BIS/BIS', 'BIS/EMG', 'BIS/SEF', 'BIS/SR', 'Orchestra/PPF20_VOL',
    'Orchestra/RFTN50_RATE', 'Solar8000/HR', 'Solar8000/ART_MBP', 'Solar8000/RR_CO2',
    # Lag Features
    'BIS/BIS_lag_10s', 'BIS/BIS_lag_30s', 'BIS/BIS_lag_60s', 'BIS/BIS_lag_180s', 'BIS/BIS_lag_300s', 'BIS/BIS_lag_600s',
    'BIS/EMG_lag_10s', 'BIS/EMG_lag_30s', 'BIS/EMG_lag_60s', 'BIS/EMG_lag_180s', 'BIS/EMG_lag_300s', 'BIS/EMG_lag_600s',
    'BIS/SEF_lag_10s', 'BIS/SEF_lag_30s', 'BIS/SEF_lag_60s', 'BIS/SEF_lag_180s', 'BIS/SEF_lag_300s', 'BIS/SEF_lag_600s',
    'BIS/SR_lag_10s', 'BIS/SR_lag_30s', 'BIS/SR_lag_60s', 'BIS/SR_lag_180s', 'BIS/SR_lag_300s', 'BIS/SR_lag_600s',
    # Rolling Window Features
    'Solar8000/HR_std_300s', 'Solar8000/HR_mean_300s', 'Solar8000/HR_delta_from_mean_300s',
    'BIS/EMG_std_300s', 'BIS/EMG_mean_300s', 'BIS/EMG_delta_from_mean_300s',
    # Drift Alarm Features
    'Reversion_Pressure', 'Tension_Index', 'Drug_Suppression_Index'
]

print(f"--- Starting Schema Patching (v2) for files in: {BATCH_DIR_PATH} ---")

batch_files = glob.glob(os.path.join(BATCH_DIR_PATH, 'batch_*.parquet'))
print(f"Found {len(batch_files)} batch files to patch.")

if not batch_files:
    print("No files found. Exiting.")
else:
    for file_path in tqdm(batch_files, desc="Patching Files"):
        try:
            # Read one batch file
            df = pd.read_parquet(file_path)

            # Store the original Time and patient_id, then drop them to avoid duplicates
            original_time = df['Time']
            original_patient_id = df['patient_id']
            df_features = df.drop(columns=['Time', 'patient_id'], errors='ignore')

            # Identify which master columns are missing from this specific file
            missing_cols = set(MASTER_FEATURE_SCHEMA) - set(df_features.columns)

            # Add the missing columns, filling them with NaN
            for col in missing_cols:
                df_features[col] = np.nan

            # Create the final, clean DataFrame
            final_df = df_features[MASTER_FEATURE_SCHEMA].copy()
            final_df['Time'] = original_time
            final_df['patient_id'] = original_patient_id

            # Overwrite the original file with the patched version
            final_df.to_parquet(file_path, engine='pyarrow')

        except Exception as e:
            print(f"\nERROR patching file {os.path.basename(file_path)}: {e}")

    print("\n--- Schema Patching Complete (v2) ---")
    print("All existing batch files now have a consistent and unique set of columns.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
--- Starting Schema Patching (v2) for files in: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches ---
Found 31 batch files to patch.


Patching Files:   0%|          | 0/31 [00:00<?, ?it/s]


--- Schema Patching Complete (v2) ---
All existing batch files now have a consistent and unique set of columns.


In [None]:
# --- The "Leading Indicator" Diagnostic Script ---

# Install Dask
!pip install "dask[dataframe]" pandas --quiet
print("--- Libraries installed ---")

import dask.dataframe as dd
from google.colab import drive
import os
import glob
import pandas as pd

# --- Setup ---
drive.mount('/content/drive')
DRIVE_PROJECT_ROOT = '/content/drive/MyDrive/VitalDB_Drift_Focused_Dataset'
BATCH_DIR_PATH = os.path.join(DRIVE_PROJECT_ROOT, 'preprocessed_batches')

print(f"--- Running Diagnostic on files in: {BATCH_DIR_PATH} ---")

batch_files = glob.glob(os.path.join(BATCH_DIR_PATH, 'batch_*.parquet'))

if not batch_files:
    print("\nERROR: No batch files found.")
else:
    print(f"Found {len(batch_files)} batch files to inspect.")

    # Use Dask to read the schema without loading all data
    df = dd.read_parquet(batch_files)

    # --- THE REAL DEBUGGING STEP ---
    print("\n--- Checking for pre-existing engineered columns ---")

    existing_columns = set(df.columns)

    # These are the columns our script TRIES TO CREATE
    columns_we_will_create = {
        'BIS_future_30s',
        'BIS_future_180s',
        'BIS_drift_30s',
        'BIS_drift_180s'
    }

    # Find the overlap
    conflicting_columns = existing_columns.intersection(columns_we_will_create)

    if conflicting_columns:
        print("\n--- !!! ROOT CAUSE FOUND !!! ---")
        print("The following columns, which our script tries to create, ALREADY EXIST in your Parquet files from a previous run:")
        for col in conflicting_columns:
            print(f"- {col}")
        print("\nThis is causing the duplicate column error. We must now run a script to remove these specific columns from all batch files.")
    else:
        print("\n--- No conflicting columns found. The mystery continues. ---")

--- Libraries installed ---
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
--- Running Diagnostic on files in: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches ---
Found 31 batch files to inspect.

--- Checking for pre-existing engineered columns ---

--- No conflicting columns found. The mystery continues. ---


In [None]:
# --- The Definitive Diagnostic Script (File-by-File Inspection) ---

import pandas as pd
from google.colab import drive
import os
import glob
from tqdm.notebook import tqdm

# --- Setup ---
drive.mount('/content/drive')
DRIVE_PROJECT_ROOT = '/content/drive/MyDrive/VitalDB_Drift_Focused_Dataset'
BATCH_DIR_PATH = os.path.join(DRIVE_PROJECT_ROOT, 'preprocessed_batches')

print(f"--- Running Definitive Diagnostic on files in: {BATCH_DIR_PATH} ---")

batch_files = glob.glob(os.path.join(BATCH_DIR_PATH, 'batch_*.parquet'))

if not batch_files:
    print("\nERROR: No batch files found.")
else:
    print(f"Found {len(batch_files)} batch files to inspect individually.")

    found_corruption = False

    # --- THE EXHAUSTIVE SEARCH ---
    # We will loop through every single file and check its internal columns.
    for file_path in tqdm(batch_files, desc="Inspecting Files"):
        try:
            # Read only the metadata (the column names) of the Parquet file. This is very fast.
            columns = pd.read_parquet(file_path, columns=[]).columns.tolist()

            # Check for duplicates within this single file
            counts = pd.Series(columns).value_counts()
            duplicates = counts[counts > 1]

            if not duplicates.empty:
                print(f"\n--- !!! CORRUPTION FOUND IN FILE: {os.path.basename(file_path)} !!! ---")
                print("This file contains the following duplicate columns:")
                print(duplicates)
                found_corruption = True

        except Exception as e:
            print(f"\nERROR reading metadata from {os.path.basename(file_path)}: {e}")
            found_corruption = True

    if not found_corruption:
        print("\n--- Exhaustive search complete. No duplicate columns found in any individual file. ---")
        print("This is a truly deep and unexpected error. The problem may lie in Dask's interaction with the Parquet format itself.")
    else:
        print("\n--- Exhaustive search complete. Corrupted files identified. ---")
        print("We can now write a script to surgically fix these specific files.")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
--- Running Definitive Diagnostic on files in: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches ---
Found 31 batch files to inspect individually.


Inspecting Files:   0%|          | 0/31 [00:00<?, ?it/s]


--- Exhaustive search complete. No duplicate columns found in any individual file. ---
This is a truly deep and unexpected error. The problem may lie in Dask's interaction with the Parquet format itself.


In [None]:
# --- Definitive Schema-Enforced Preprocessing Script ---

# Step 1: Install necessary libraries
!pip install vitaldb boto3 pandas numpy tqdm pyarrow scikit-learn --quiet

import os
import pandas as pd
import numpy as np
import boto3
from botocore import UNSIGNED
from botocore.client import Config
from tqdm.notebook import tqdm
from google.colab import drive
import vitaldb
import glob
import warnings

# Ignore common runtime warnings and the SettingWithCopyWarning
warnings.filterwarnings("ignore", category=RuntimeWarning)
pd.options.mode.chained_assignment = None

# --- Setup Google Drive and Local Directories ---
drive.mount('/content/drive')
DRIVE_PROJECT_ROOT = '/content/drive/MyDrive/VitalDB_Ultimate_Dataset' # Using the same folder
os.makedirs(DRIVE_PROJECT_ROOT, exist_ok=True)
DRIVE_BATCH_OUTPUT_DIR = os.path.join(DRIVE_PROJECT_ROOT, 'preprocessed_batches')
os.makedirs(DRIVE_BATCH_OUTPUT_DIR, exist_ok=True)
VITAL_FILES_LOCAL_DIR = '/content/vital_files_temp'
os.makedirs(VITAL_FILES_LOCAL_DIR, exist_ok=True)

# --- Configuration ---
TOTAL_PATIENTS = 6388
BATCH_SIZE = 100
# This is now our master schema
ULTIMATE_FEATURE_SET = [
    'BIS/BIS', 'BIS/EMG', 'BIS/SEF', 'BIS/SR',
    'Orchestra/PPF20_VOL', 'Orchestra/RFTN50_RATE',
    'Solar8000/HR', 'Solar8000/ART_MBP', 'Solar8000/RR_CO2'
]

print("--- Starting SCHEMA-ENFORCED Batched Preprocessing ---")

# --- S3 and Resumption Logic ---
s3_client = boto3.client('s3', config=Config(signature_version=UNSIGNED))
S3_BUCKET_NAME, S3_BASE_KEY = 'physionet-open', 'vitaldb/1.0.0/vital_files/'
existing_batch_files = glob.glob(os.path.join(DRIVE_BATCH_OUTPUT_DIR, 'batch_*.parquet'))
processed_ids = set()
for f in existing_batch_files:
    try:
        parts = os.path.basename(f).replace('.parquet', '').split('_')[1].split('-')
        start, end = int(parts[0]), int(parts[1])
        processed_ids.update(range(start, end + 1))
    except (IndexError, ValueError):
        print(f"Could not parse filename {f}, continuing...")
print(f"Found {len(existing_batch_files)} existing batch files, covering {len(processed_ids)} patients.")

# --- Main Processing Loop ---
for batch_start_id in range(1, TOTAL_PATIENTS + 1, BATCH_SIZE):
    batch_end_id = min(batch_start_id + BATCH_SIZE - 1, TOTAL_PATIENTS)
    if batch_start_id in processed_ids and batch_end_id in processed_ids:
        print(f"--- Batch {batch_start_id}-{batch_end_id} already processed. Skipping. ---")
        continue

    print(f"\n--- Starting new batch: Patients {batch_start_id} to {batch_end_id} ---")
    ids_to_process_this_batch = list(range(batch_start_id, batch_end_id + 1))

    # Download phase
    for patient_id in tqdm(ids_to_process_this_batch, desc=f"Downloading files for batch {batch_start_id}-{batch_end_id}"):
        file_name = f'{patient_id:04d}.vital'
        local_file_path = os.path.join(VITAL_FILES_LOCAL_DIR, file_name)
        if not os.path.exists(local_file_path):
            try: s3_client.download_file(S3_BUCKET_NAME, os.path.join(S3_BASE_KEY, file_name), local_file_path)
            except Exception: pass

    # Processing and Feature Engineering phase
    batch_dfs = []
    for patient_id in tqdm(ids_to_process_this_batch, desc=f"Processing & Engineering for batch {batch_start_id}-{batch_end_id}"):
        file_path = os.path.join(VITAL_FILES_LOCAL_DIR, f'{patient_id:04d}.vital')
        if not os.path.exists(file_path): continue
        try:
            vf = vitaldb.VitalFile(file_path)
            available_tracks = vf.get_track_names()
            tracks_to_load = [track for track in ULTIMATE_FEATURE_SET if track in available_tracks]
            if 'BIS/BIS' not in tracks_to_load: continue
            df = vf.to_pandas(tracks_to_load, interval=1)

            # --- THE CRITICAL FIX: Enforce a consistent schema ---
            for col in ULTIMATE_FEATURE_SET:
                if col not in df.columns:
                    df[col] = np.nan # Add any missing columns and fill with NaN
            # --- END OF FIX ---

            df_clean = df[(df['BIS/BIS'] >= 1) & (df['BIS/BIS'] <= 100)].copy()
            if df_clean.empty: continue
            df_clean.ffill(inplace=True); df_clean.bfill(inplace=True)
            if df_clean.isnull().values.any(): continue

            # Feature Engineering (same as before)
            for col in ['BIS/BIS', 'BIS/EMG', 'BIS/SEF', 'BIS/SR']:
                if col in df_clean.columns:
                    for lag in [10, 30, 60, 180, 300, 600]:
                        df_clean[f'{col}_lag_{lag}s'] = df_clean[col].shift(lag)
            for col in ['Solar8000/HR', 'BIS/EMG']:
                 if col in df_clean.columns:
                    df_clean[f'{col}_std_300s'] = df_clean[col].rolling(window=300, min_periods=30).std()
                    df_clean[f'{col}_mean_300s'] = df_clean[col].rolling(window=300, min_periods=30).mean()
                    df_clean[f'{col}_delta_from_mean_300s'] = df_clean[col] - df_clean[f'{col}_mean_300s']

            df_clean.dropna(inplace=True)
            if not df_clean.empty:
                df_clean['patient_id'] = patient_id
                df_clean.reset_index(inplace=True); df_clean.rename(columns={'index': 'Time'}, inplace=True)
                batch_dfs.append(df_clean)
        except Exception as e: print(f"\nERROR processing file {patient_id:04d}: {e}")

    if batch_dfs:
        final_df = pd.concat(batch_dfs, ignore_index=True)
        output_filename = f"batch_{batch_start_id:04d}-{batch_end_id:04d}.parquet"
        output_path = os.path.join(DRIVE_BATCH_OUTPUT_DIR, output_filename)
        final_df.to_parquet(output_path, engine='pyarrow')
        print(f"--- Successfully saved batch to: {output_path} ---")

    for patient_id in ids_to_process_this_batch:
        try: os.remove(os.path.join(VITAL_FILES_LOCAL_DIR, f'{patient_id:04d}.vital'))
        except OSError: pass
    print("--- Cleaned up local files for this batch. ---")

print("\n--- All Batches Processed ---")

---


In [None]:
# --- The Final, Correct, It-Will-Work Dask Script ---

# Step 1: Install necessary libraries
!pip install "dask[dataframe]" pandas numpy --quiet
print("--- Libraries installed ---")

import dask.dataframe as dd
import pandas as pd
import numpy as np
from google.colab import drive
import os
import glob
from tqdm.notebook import tqdm

# --- Setup ---
drive.mount('/content/drive')
DRIVE_PROJECT_ROOT = '/content/drive/MyDrive/VitalDB_Drift_Focused_Dataset'
BATCH_DIR_PATH = os.path.join(DRIVE_PROJECT_ROOT, 'preprocessed_batches')

print(f"--- Pointing Dask to all preprocessed batches in: {BATCH_DIR_PATH} ---")

batch_files = glob.glob(os.path.join(BATCH_DIR_PATH, 'batch_*.parquet'))

if not batch_files:
    print("\nERROR: No batch files found.")
else:
    print(f"Found {len(batch_files)} batch files for Dask to process.")

    # Create the Dask DataFrame
    df = dd.read_parquet(batch_files)

    print("Dask DataFrame created.")

    # --- THE DEFINITIVE, CRITICAL FIX: Set the index ---
    # This forces Dask to re-partition the data so that all rows for a
    # single patient are in the same partition. This is the ONLY way to
    # ensure the following groupby().shift() operation works correctly.
    # This is an expensive, one-time operation.
    print("\n--- Re-partitioning data by patient_id (set_index). This is the key step and may take time... ---")
    df = df.set_index('patient_id')
    print("--- Data re-partitioned successfully. ---")

    # --- Engineer the "Future Drift" target variable ---
    print("\n--- Engineering the 'Future Drift' target variable... ---")

    # Now that the data is correctly partitioned, this operation will work as intended.
    df['BIS_future_30s'] = df.groupby('patient_id')['BIS/BIS'].shift(-30)
    df['BIS_drift_30s'] = df['BIS_future_30s'] - df['BIS/BIS']

    # --- Calculate correlations with the robust NumPy method ---
    print("\n--- Calculating correlations... ---")

    feature_columns = [col for col in df.columns if col not in ['BIS/BIS', 'BIS_future_30s', 'BIS_drift_30s']]
    correlations = {}

    for col in tqdm(feature_columns, desc="Calculating Correlations"):
        try:
            temp_df = df[[col, 'BIS_drift_30s']].dropna()
            values = temp_df.compute().values

            # Check if there's enough data to compute correlation
            if values.shape[0] > 1:
                corr_value = np.corrcoef(values[:, 0], values[:, 1])[0, 1]
                correlations[col] = corr_value
            else:
                correlations[col] = np.nan
        except Exception as e:
            correlations[col] = np.nan

    print("\n--- Correlation calculation complete. ---")

    # --- Final Analysis ---
    results_series = pd.Series(correlations).dropna()
    sorted_results = results_series.reindex(results_series.abs().sort_values(ascending=False).index)

    print("\n--- Top 20 LEADING INDICATORS for BIS Drift (FINAL, CORRECTED) ---")
    print(sorted_results.head(20))

    print("\n--- Analysis Complete ---")

--- Libraries installed ---
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
--- Pointing Dask to all preprocessed batches in: /content/drive/MyDrive/VitalDB_Drift_Focused_Dataset/preprocessed_batches ---
Found 31 batch files for Dask to process.
Dask DataFrame created.

--- Re-partitioning data by patient_id (set_index). This is the key step and may take time... ---
--- Data re-partitioned successfully. ---

--- Engineering the 'Future Drift' target variable... ---

--- Calculating correlations... ---


Please provide `meta` if the result is unexpected.
  Before: .shift(func)
  After:  .shift(func, meta={'x': 'f8', 'y': 'f8'}) for dataframe result
  or:     .shift(func, meta=('x', 'f8'))            for series result

  df['BIS_future_30s'] = df.groupby('patient_id')['BIS/BIS'].shift(-30)


Calculating Correlations:   0%|          | 0/42 [00:00<?, ?it/s]


--- Correlation calculation complete. ---

--- Top 20 LEADING INDICATORS for BIS Drift (FINAL, CORRECTED) ---
Reversion_Pressure              0.178311
BIS/BIS_lag_10s                -0.115842
BIS/EMG_delta_from_mean_300s    0.071240
Tension_Index                   0.062049
BIS/BIS_lag_60s                -0.040074
BIS/SEF                        -0.038164
BIS/EMG                         0.033723
BIS/BIS_lag_300s               -0.027175
BIS/SEF_lag_10s                -0.024287
BIS/BIS_lag_600s               -0.022895
BIS/EMG_lag_600s               -0.016926
Time                            0.016390
BIS/EMG_lag_300s               -0.014630
BIS/EMG_mean_300s              -0.014308
Orchestra/PPF20_VOL             0.014301
BIS/EMG_lag_60s                -0.013672
Orchestra/RFTN50_RATE          -0.013324
BIS/SEF_lag_300s               -0.012778
Solar8000/ART_MBP               0.011766
BIS/SEF_lag_600s               -0.011675
dtype: float64

--- Analysis Complete ---
