# MIT-BIH Supraventricular Arrhythmia Database (_svdb_)

Part of the ECG Database Collection:

| Short Name | Long Name |
| :--- | :--- |
| _mitdb_ | MIT-BIH Arrhythmia Database |
| _svdb_ | MIT-BIH Supraventricular Arrhythmia Database |
| _ltdb_ | MIT-BIH Long-Term ECG Database |

[Docu](https://wfdb.readthedocs.io/en/latest) of the `wfdb`-package.

In [1]:
%matplotlib inline
import pandas as pd
import numpy as np
import wfdb
import os
from typing import Final
from collections.abc import Callable
from config import data_raw_folder, data_processed_folder
from timeeval import Datasets
import matplotlib.pyplot as plt
from IPython.display import display, Markdown, Latex

In [2]:
dataset_collection_name = "SVDB"
source_folder = os.path.join(data_raw_folder, "MIT-BIH Supraventricular Arrhythmia Database")
target_folder = data_processed_folder

from pathlib import Path
print(f"Looking for source datasets in {Path(source_folder).absolute()} and\nsaving processed datasets in {Path(target_folder).absolute()}")

Looking for source datasets in /home/projects/akita/data/benchmark-data/data-raw/MIT-BIH Supraventricular Arrhythmia Database and
saving processed datasets in /home/projects/akita/data/benchmark-data/data-processed


In [3]:
def load_dataset_names() -> list[str]:
    with open(os.path.join(source_folder, "RECORDS"), 'r') as f:
        records = [l.rstrip('\n') for l in f]
    return records

In [4]:
def transform_and_label(source_file: str, target: str) -> int:
    print(f"Transforming {os.path.basename(source_file)}")
    # load dataset
    record = wfdb.rdrecord(source_file)
    df_record = pd.DataFrame(record.p_signal, columns=record.sig_name)
    print(f"  record {record.file_name[0]} loaded")

    # load annotation file
    atr = wfdb.rdann(source_file, "atr")
    assert record.fs == atr.fs, "Sample frequency of records and annotations does not match!"
    df_annotation = pd.DataFrame(atr.symbol, index=atr.sample, columns=["Label"])
    df_annotation = df_annotation.reset_index()
    df_annotation.columns = ["position", "label"]
    print(f"  {atr.ann_len} beat annotations for {source_file} loaded")

    # calculate normal beat length
    print("  preparing windows for labeling...")
    df_normal_beat = df_annotation.copy()
    df_normal_beat["prev_position"] = df_annotation["position"].shift()
    df_normal_beat["prev_label"] = df_annotation["label"].shift()
    df_normal_beat = df_normal_beat[(df_normal_beat["label"] == "N") & (df_normal_beat["prev_label"] == "N")]
    s_normal_beat_lengths = df_normal_beat["position"] - df_normal_beat["prev_position"]
    print(f"    normal beat distance samples = {len(s_normal_beat_lengths)}")
    normal_beat_length = s_normal_beat_lengths.median()
    if (normal_beat_length % 2) == 0:
        normal_beat_length += 1
    beat_window_size = int(normal_beat_length)
    beat_window_margin = (beat_window_size - 1)//2
    print(f"    window size = {beat_window_size}")
    print(f"    window margins (left and right) = {beat_window_margin}")

    # calculate beat windows
    ## for external anomalies
    df_ext = df_annotation[(df_annotation["label"] == "|") | (df_annotation["label"] == "Q")].copy()
    df_ext["window_start"] = df_ext["position"]-beat_window_margin
    df_ext["window_end"] = df_ext["position"]+beat_window_margin
    df_ext = df_ext[["position", "window_start", "window_end"]]
    print(f"    {len(df_ext)} windows for external anomalies")
    ## for anomalous beats
    df_svf = df_annotation[(df_annotation["label"] != "|") & (df_annotation["label"] != "~") & (df_annotation["label"] != "+")].copy()
    df_svf["position_next"] = df_svf["position"].shift(-1)
    df_svf["position_prev"] = df_svf["position"].shift(1)
    #df_svf = df_svf[(df_svf["position_prev"].notnull()) & (df_svf["position_next"].notnull())]
    df_svf = df_svf[(df_svf["label"] != "Q") & (df_svf["label"] != "N")]
    df_svf["window_start"] = np.minimum(df_svf["position"].values-beat_window_margin, df_svf["position_prev"].values+beat_window_margin)
    df_svf["window_end"] = np.maximum(df_svf["position"].values+beat_window_margin, df_svf["position_next"].values-beat_window_margin)
    df_svf = df_svf[["position", "window_start", "window_end"]]
    print(f"    {len(df_svf)} windows for anomalous beats")
    ## merge
    df_windows = pd.concat([df_ext, df_svf])
    print(f"  ...done.")

    # add labels based on anomaly windows
    print("  labeling")
    df_record["is_anomaly"] = 0
    for _, (_, t1, t2) in df_windows.iterrows():
        tmp = df_record[df_record.index >= t1]
        tmp = tmp[tmp.index <= t2]
        df_record["is_anomaly"].values[tmp.index] = 1

    # reconstruct timestamps and set as index
    print("  reconstructing timestamps")
    df_record["timestamp"] = pd.to_datetime(df_record.index.values * 1e+9/record.fs, unit='ns')
    df_record = df_record.set_index("timestamp")
    df_record.to_csv(target)
    print(f"Dataset {os.path.basename(source_file)} transformed and saved!")
    
    # return dataset length
    return record.sig_len

In [5]:
# shared by all datasets
dataset_type = "real"
input_type = "multivariate"
datetime_index = True
train_type = "unsupervised"
train_is_normal = False

# create target directory
dataset_subfolder = os.path.join(input_type, dataset_collection_name)
target_subfolder = os.path.join(target_folder, dataset_subfolder)
try:
    os.makedirs(target_subfolder)
    print(f"Created directories {target_subfolder}")
except FileExistsError:
    print(f"Directories {target_subfolder} already exist")
    pass

dm = Datasets(target_folder)

Directories /home/projects/akita/data/benchmark-data/data-processed/multivariate/SVDB already exist


In [7]:
# dataset transformation
transform_file: Callable[[str, str], int] = transform_and_label

for dataset_name in load_dataset_names():
    # intentionally no file suffix (.dat)
    source_file = os.path.join(source_folder, dataset_name)
    filename = f"{dataset_name}.test.csv"
    path = os.path.join(dataset_subfolder, filename)
    target_filepath = os.path.join(target_subfolder, filename)
            
    # transform file and label it
    dataset_length = transform_file(source_file, target_filepath)
    print(f"Processed source dataset {source_file} -> {target_filepath}")

    # save metadata
    dm.add_dataset((dataset_collection_name, dataset_name),
        train_path = None,
        test_path = path,
        dataset_type = dataset_type,
        datetime_index = datetime_index,
        split_at = None,
        train_type = train_type,
        train_is_normal = train_is_normal,
        input_type = input_type,
        dataset_length = dataset_length
    )

# save metadata of benchmark
dm.save()

Transforming 800
  record 800.dat loaded
  1921 beat annotations for /home/projects/akita/data/benchmark-data/data-raw/MIT-BIH Supraventricular Arrhythmia Database/800 loaded
  preparing windows for labeling...
    normal beat distance samples = 1772
    window size = 119
    window margins (left and right) = 59
    10 windows for external anomalies
    37 windows for anomalous beats
  ...done.
  labeling
  reconstructing timestamps
Dataset 800 transformed and saved!
Processed source dataset /home/projects/akita/data/benchmark-data/data-raw/MIT-BIH Supraventricular Arrhythmia Database/800 -> /home/projects/akita/data/benchmark-data/data-processed/multivariate/SVDB/800.test.csv
Transforming 801
  record 801.dat loaded
  2577 beat annotations for /home/projects/akita/data/benchmark-data/data-raw/MIT-BIH Supraventricular Arrhythmia Database/801 loaded
  preparing windows for labeling...
    normal beat distance samples = 1830
    window size = 97
    window margins (left and right) = 48
 

In [9]:
dm.refresh()
dm.df().loc[(slice(dataset_collection_name,dataset_collection_name), slice(None))]

Unnamed: 0_level_0,Unnamed: 1_level_0,train_path,test_path,dataset_type,datetime_index,split_at,train_type,train_is_normal,input_type,length
collection_name,dataset_name,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
SVDB,800,,multivariate/SVDB/800.test.csv,real,True,,unsupervised,False,multivariate,230400
SVDB,801,,multivariate/SVDB/801.test.csv,real,True,,unsupervised,False,multivariate,230400
SVDB,802,,multivariate/SVDB/802.test.csv,real,True,,unsupervised,False,multivariate,230400
SVDB,803,,multivariate/SVDB/803.test.csv,real,True,,unsupervised,False,multivariate,230400
SVDB,804,,multivariate/SVDB/804.test.csv,real,True,,unsupervised,False,multivariate,230400
SVDB,...,...,...,...,...,...,...,...,...,...
SVDB,890,,multivariate/SVDB/890.test.csv,real,True,,unsupervised,False,multivariate,230400
SVDB,891,,multivariate/SVDB/891.test.csv,real,True,,unsupervised,False,multivariate,230400
SVDB,892,,multivariate/SVDB/892.test.csv,real,True,,unsupervised,False,multivariate,230400
SVDB,893,,multivariate/SVDB/893.test.csv,real,True,,unsupervised,False,multivariate,230400


## Dataset transformation walk-through

In [None]:
def print_obj_attr(obj, name="Object"):
    print(name)
    tmp = vars(obj)
    for key in tmp:
        print(key, tmp[key])
    print("")
records = load_dataset_names()

### Load and parse dataset

In [None]:
# dataset
record = wfdb.rdrecord(os.path.join(source_folder, records[51]))
#print_obj_attr(record, "Record object")

df_record = pd.DataFrame(record.p_signal, columns=record.sig_name)
df_record

Add timestamp information based on sample interval ($$[fs] = samples/second$$):

In [None]:
display(Latex(f"Samples per second: $$fs = {record.fs} \\frac{{1}}{{s}}$$"))
display(Markdown(f"This gives a sample interval of {1e+9/record.fs} nanoseconds"))
df_record["timestamp"] = pd.to_datetime(df_record.index.values * 1e+9/record.fs, unit='ns')
df_record

In [None]:
# find all annotations
records = load_dataset_names()
annotations = {}
for r in records:
    atr = wfdb.rdann(os.path.join(source_folder, r), "atr")
    df_annotation = pd.DataFrame(atr.symbol, index=atr.sample, columns=["Label"])
    for an in df_annotation["Label"].unique():
        if an not in annotations:
            annotations[an] = set()
        annotations[an].add(atr.record_name)

for an in annotations:
    annotations[an] = ", ".join(annotations[an])
annotations

Annotations

| Annotation | Description |
| :--------- | :---------- |
|| **Considered normal** |
| `N` | Normal beat |
|| **Anomalous beats** (use double-window labeling) |
| `F` | Fusion of ventricular and normal beat |
| `S` | Supraventricular premature or ectopic beat |
| `a` | Aberrated atrial premature beat |
| `V` | Premature ventricular contraction |
| `J` | Nodal (junctional) premature beat |
| `B` | Bundle branch block beat (unspecified) |
|| **External anomalies** (single window labeling) |
| `Q` | Unclassifiable beat |
| `\|` | Isolated QRS-like artifact |
|| **Ignored, bc hard to parse and to label** |
| `+` | Rythm change |
| `~` | Change in signal quality (usually noise level changes) |

### Load and parse annotation

In [None]:
atr = wfdb.rdann(os.path.join(source_folder, records[51]), "atr")
#print_obj_attr(atr, "Annotation object")
assert record.fs == atr.fs, "Sample frequency of records and annotations does not match!"

df_annotation = pd.DataFrame(atr.symbol, index=atr.sample, columns=["Label"])
df_annotation = df_annotation.reset_index()
df_annotation.columns = ["position", "label"]
df_annotation.groupby("label").count()

### Calculate beat window

We assume that the normal beats (annotated with `N`) occur in a regular interval and that the expert annotations (from the dataset) are directly in the middle of a beat window.
A beat window is a fixed length subsequence of the time series and shows a heart beat in its direct (local) context.

We calculate the beat window length for each dataset based on the median distance between normal beats (`N`).
The index (autoincrementing integers) serves as the measurement unit.

Create DataFrame containing all annotated beats:

In [None]:
df_beat = df_annotation[["position", "label"]]
df_beat

Shifted-by-one self-join and filter out all beat-pairs that contain anomalous beats.
We want to calculate the beat windows only based on the normal beats.
We then calculate the distance between two neighboring heart beats:

In [None]:
df_normal_beat = df_beat.copy()
df_normal_beat["prev_position"] = df_beat["position"].shift()
df_normal_beat["prev_label"] = df_beat["label"].shift()
df_normal_beat = df_normal_beat[(df_normal_beat["label"] == "N") & (df_normal_beat["prev_label"] == "N")]
df_normal_beat = df_normal_beat.drop(columns=["label", "prev_label"])
df_normal_beat["length"] = df_normal_beat["position"] - df_normal_beat["prev_position"]
df_normal_beat.describe()

The median of all normal beat lengths is the beat window size.
We require the beat window size to be odd.
This allows us to center the window at the beat annotation.

In [None]:
normal_beat_length = df_normal_beat["length"].median()
if (normal_beat_length%2) == 0:
    normal_beat_length += 1
beat_window_size = int(normal_beat_length)
beat_window_margin = (beat_window_size - 1)//2
print(f"window size = {beat_window_size}\nwindow margins (left and right) = {beat_window_margin}")

### Calculate anomalous windows

The experts from PhysioNet annotated only the beats itself with a label, but the actual anomaly is also comprised of the beat surroundings.

We assume that anomalous beats (such as `V` or `F`; see table above) require looking at a window around the actual beat as being anomalous.
External anomalies (such as `|`; see table above) also mark a window around it as anomalous, because those artefacts comprise multiple points.

We completely ignore `~` and `+`-annotations that indicate signal quality or rythm changes, because they are not relevant for our analysis.

We automatically label a variable-sized window around an annotated beat as an anomalous subsequence using the following technique:

1. For anomalous annotations (`S`, `V`, `a`, `J`, `B`, and `F` annotations):
   - Remove `~`, `+`, and `|` annotations
   - Calculate anomaly window using `beat_window_size` aligned with its center on the beat annotation.
   - Calculate end of previous beat window _e_ and beginning of next beat window _b_.
     Use _e_ as beginning and _b_ as end for a second anomaly window.
   - Mark the union of both anomaly windows' points as anomalous.
2. For `|` and `Q` annotations, mark all points of an anomaly window centered on the annotation as anomalous.
3. Mark all other points as normal.

> **Explain, why we used the combined windows for anomalous beats!!**
>
> - pattern/shape of signal may be ok
> - but we consider distance to other beats also
> - if too narrow or too far away, it's also anomalous

The figure shows an anomalous beat with its anomaly window (in red) and the windows of its previous and subsequent normal beats (in green).
We mark all points in the interval $$[min(W_{end}, X_{start}), max(X_{end}, Y_{start})]$$

In [None]:
# reverse lookup from timestamp to annotation index in df_beat
p = df_record[df_record["timestamp"] == "1970-01-01 00:11:03.000"].index.values[0]
df_beat[df_beat["position"] >= p].index[0]

In [None]:
def plot_window(pos, color="blue", **kvs):
    start = pos - beat_window_margin
    end = pos + beat_window_margin
    plt.axvspan(start, end, color=color, alpha=0.5, **kvs)


index = 798

beat_n = df_beat.loc[index, "position"]
print("Selected beat is annotated as", df_beat.loc[index, "label"])
print("with timestamp", df_record.loc[beat_n, "timestamp"])
ax = df_record.iloc[beat_n-500:beat_n+500].plot(kind='line', y=['ECG1', 'ECG2'], use_index=True, figsize=(20,10))
plot_window(df_beat.loc[index-1, "position"], label="$W$")
plot_window(beat_n, color="orange", label="$X$")
plot_window(df_beat.loc[index+1, "position"], label="$Y$")
plt.legend()
plt.show()

#### Windows for external anomalies

In [None]:
df_pipe = df_beat.copy()
df_pipe = df_pipe[(df_pipe["label"] == "|") | (df_pipe["label"] == "Q")]
df_pipe["window_start"] = df_pipe["position"]-beat_window_margin
df_pipe["window_end"] = df_pipe["position"]+beat_window_margin
df_pipe = df_pipe[["position", "window_start", "window_end"]]
df_pipe.head()

#### Windows for anomalous beats

In [None]:
df_tmp = df_beat.copy()
df_tmp = df_tmp[(df_tmp["label"] != "|") & (df_tmp["label"] != "~") & (df_tmp["label"] != "+")]
df_tmp["position_next"] = df_tmp["position"].shift(-1)
df_tmp["position_prev"] = df_tmp["position"].shift(1)
#df_tmp = df_tmp[(df_tmp["position_prev"].notnull()) & (df_tmp["position_next"].notnull())]
df_tmp = df_tmp[(df_tmp["label"] != "Q") & (df_tmp["label"] != "N")]
df_tmp["window_start"] = np.minimum(df_tmp["position"].values-beat_window_margin, df_tmp["position_prev"].values+beat_window_margin)
df_tmp["window_end"] = np.maximum(df_tmp["position"].values+beat_window_margin, df_tmp["position_next"].values-beat_window_margin)
df_svf = df_tmp[["position", "window_start", "window_end"]]
df_tmp.groupby("label").count()

#### Merge everything together

In [None]:
df_windows = pd.concat([df_pipe, df_svf])
df_windows.head()

In [None]:
index = 798

beat = df_windows.loc[index, "position"]
start = df_windows.loc[index, "window_start"]
end = df_windows.loc[index, "window_end"]
print("Selected beat is annotated as", df_beat.loc[index, "label"])
print("with timestamp", df_record.loc[beat, "timestamp"])
ax = df_record.iloc[beat-500:beat+500].plot(kind='line', y=['ECG1', 'ECG2'], use_index=True, figsize=(20,10))
plt.axvspan(beat-500, start-1, color="green", alpha=0.5, label="normal region 1", ymin=.5)
plt.axvspan(start, end, color="red", alpha=0.5, label="anomalous region", ymin=.5)
plt.axvspan(end+1, beat+500, color="green", alpha=0.5, label="normal region 2", ymin=.5)
plot_window(df_beat.loc[index-1, "position"], label="$W$", ymax=.5)
plot_window(beat_n, color="orange", label="$X$", ymax=.5)
plot_window(df_beat.loc[index+1, "position"], label="$Y$", ymax=.5)
plt.legend()
plt.show()

### Add labels

In [None]:
df = df_record.copy()
df["is_anomaly"] = 0

for _, (_, t1, t2) in df_windows.iterrows():
    tmp = df[df.index >= t1]
    tmp = tmp[tmp.index <= t2]
    df["is_anomaly"].values[tmp.index] = 1

#df = df.set_index("timestamp")
df[df["is_anomaly"] == 1]

In [None]:
df_beat[(df_beat["label"] == "|")]

In [None]:
start = 21700
end = 22500
df_show = df.loc[start:end]
df_show.plot(kind='line', y=['ECG1', 'ECG2', 'is_anomaly'], use_index=True, figsize=(20,10))

labels = df_beat[(df_beat["position"] > start) & (df_beat["position"] < end)]
for i, (position, label) in labels.iterrows():
    plt.text(position, -2.5, label)
plt.show()

## Experimentation

In [None]:
df = pd.merge(df_record, df_annotation, left_index=True, right_index=True, how="outer")
#df = df.fillna(value={"Label": ".", "is_anomaly": 0})
df.groupby(["is_anomaly"]).count()

In [None]:
df[df["Label"].notna()]

In [None]:
import matplotlib.pyplot as plt
df_show = df.loc[27000:28000]
df_show.plot(kind='line', y=['ECG1', 'ECG2', 'is_anomaly'], use_index=True, figsize=(20,10))
plt.show()

In [None]:
df = pd.read_csv(os.path.join(dataset_subfolder, "800.test.csv"), index_col="timestamp")
df.loc["1970-01-01 00:21:20":"1970-01-01 00:21:40"].plot(figsize=(20,10))
plt.show()