In [4]:
import numpy as np
import pandas as pd
from biosppy.signals import ecg
from tqdm import tqdm
import json

In [5]:
# Load data
def load_data(train_path, test_path):
    train = pd.read_csv(train_path, index_col="id")
    test = pd.read_csv(test_path, index_col="id")
    return train, test

# Load train and test data
train_path = "data/train.csv"
test_path = "data/test.csv"
train, test = load_data(train_path, test_path)

# Verify
print(f"Train shape: {train.shape}, Test shape: {test.shape}")


Train shape: (5117, 17808), Test shape: (3411, 17807)


In [8]:
print(train)

      y   x0   x1   x2   x3   x4   x5   x6   x7   x8  ...  x17797  x17798  \
id                                                    ...                   
0     0  -13   -9   -6   -4    0    2    6   12   23  ...     NaN     NaN   
1     0  -34  110  249  390  527  639  721  777  823  ...     NaN     NaN   
2     0  -34  -36  -37  -39  -41  -42  -44  -46  -48  ...     NaN     NaN   
3     1  292  298  303  310  320  336  354  377  405  ...     NaN     NaN   
4     2  157  179  195  210  217  222  226  228  231  ...     NaN     NaN   
...  ..  ...  ...  ...  ...  ...  ...  ...  ...  ...  ...     ...     ...   
5112  3 -247 -271 -285 -303 -334 -376 -413 -432 -443  ...     NaN     NaN   
5113  0   62   62   61   61   61   61   61   61   61  ...     NaN     NaN   
5114  0  -95 -110 -124 -131 -126 -114  -95  -67  -42  ...     NaN     NaN   
5115  0  -50  -48  -45  -42  -38  -35  -32  -30  -28  ...     NaN     NaN   
5116  2   66   65   63   62   65   72   80   84   87  ...     NaN     NaN   

# DEPRECATED: NOT USING NEUROKIT2 INVERSION FUNCTION


In [None]:
def precompute_ecg_features(data, sampling_rate=300):
    """
    Precompute R-peaks and heartbeats for all signals.

    Parameters:
        data (pd.DataFrame): DataFrame containing signal columns (prefixed with 'x').
        sampling_rate (int): Sampling rate of the signals.

    Returns:
        results (list of dict): List of dictionaries containing precomputed features for each signal.
    """
    signal_cols = [col for col in data.columns if col.startswith('x')]
    results = []

    for idx, row in tqdm(data.iterrows(), total=len(data), desc="Precomputing ECG Features"):
        signal = row[signal_cols].to_numpy(dtype="float32")
        result = {"id": idx}
        try:
            # Detect R-peaks
            r_peaks = ecg.engzee_segmenter(signal, sampling_rate=sampling_rate)['rpeaks']
            result["r_peaks"] = r_peaks.tolist()

            # Extract heartbeats
            if len(r_peaks) >= 2:
                beats = ecg.extract_heartbeats(signal, r_peaks, sampling_rate=sampling_rate)['templates']
                result["heartbeats"] = beats.tolist()
            else:
                result["heartbeats"] = []
        except Exception as e:
            print(f"Error processing signal at index {idx}: {e}")
            result["r_peaks"] = []
            result["heartbeats"] = []

        results.append(result)

    return results


# Precompute features for train and test data
train_precomputed = precompute_ecg_features(train)
test_precomputed = precompute_ecg_features(test)

# Save precomputed features
pd.DataFrame(train_precomputed).to_json("data/train_precomputed.json", orient="records")
pd.DataFrame(test_precomputed).to_json("data/test_precomputed.json", orient="records")

1. **Mean Method** : If the mean of the median heartbeat is negative, the signal is inverted.

2. **Median Method** : If the median of the median heartbeat is negative, the signal is inverted.

3. **Argmax - Argmin Method** : Check the difference between the positions of the maximum and minimum values of the median heartbeat. If `argmax - argmin > 0`, it's likely inverted.

4. **Skewness Method** : Use skewness (asymmetry of the signal distribution). If the skewness is negative, the signal is inverted.

5. **Min-Max Range Method** : Compare the range between minimum and maximum values. If the range skew towards negatives, it's inverted.

In [None]:
from scipy.stats import skew
import numpy as np
import pandas as pd
from tqdm import tqdm


def detect_inversion_from_precomputed(precomputed_data):
    """
    Detect signal inversion using precomputed R-peaks and heartbeats.

    Parameters:
        precomputed_data (list of dict): Precomputed R-peaks and heartbeats.

    Returns:
        mask (pd.DataFrame): DataFrame containing inversion detection results for each method.
    """
    # Initialize storage
    results = []

    for item in tqdm(precomputed_data, desc="Detecting Inversion"):
        signal_id = item["id"]
        heartbeats = np.array(item["heartbeats"])
        result = {"id": signal_id}

        if heartbeats.size > 0:
            median_heartbeat = np.median(heartbeats, axis=0)

            # Method 1: Mean
            result["inverted_mean"] = 1 if np.mean(median_heartbeat) < 0 else 0

            # Method 2: Median
            result["inverted_median"] = 1 if np.median(median_heartbeat) < 0 else 0

            # Method 3: Argmax - Argmin
            argmax_idx = np.argmax(median_heartbeat)
            argmin_idx = np.argmin(median_heartbeat)
            result["inverted_argmax_argmin"] = 1 if argmax_idx - argmin_idx > 0 else 0

            # Method 4: Skewness
            result["inverted_skewness"] = 1 if skew(median_heartbeat) < 0 else 0

            # Method 5: Min-Max Range
            min_val, max_val = np.min(median_heartbeat), np.max(median_heartbeat)
            result["inverted_min_max_range"] = 1 if abs(min_val) > abs(max_val) else 0

        else:
            # Default to not inverted for all methods if no heartbeats
            result.update({
                "inverted_mean": 0,
                "inverted_median": 0,
                "inverted_argmax_argmin": 0,
                "inverted_skewness": 0,
                "inverted_min_max_range": 0,
            })

        results.append(result)

    # Create DataFrame for results
    mask = pd.DataFrame(results)
    return mask


# Load precomputed features
with open("data/train_precomputed.json", "r") as f:
    train_precomputed = json.load(f)

# Detect inversion and save results
train_inversion_results = detect_inversion_from_precomputed(train_precomputed)
train_inversion_results.to_csv("data/train_inversion_mask.csv", index=False)
print("Inversion results saved!")
