In [None]:
%load_ext autoreload
%autoreload 2
%cd ..

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

In [None]:
import pickle
from typing import Dict, Tuple, List
from collections import Counter
import functools
from pathlib import Path

import matplotlib.pyplot as plt
from matplotlib.dates import date2num
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import scipy.signal
import scipy.fftpack

from util.train_test_split import read_train_test_split_yaml
from util.paths import DATA_PATH, TRAIN_TEST_SPLIT_YAML
from util.datasets import SlidingWindowDataset, read_physionet_dataset, RespiratoryEventType, RespiratoryEvent
from util.filter import apply_butterworth_bandpass_filter, apply_butterworth_lowpass_filter
from util.mathutil import get_peaks, PeakType, cluster_1d, IntRange
from rule_based import detect_respiratory_events, detect_respiratory_events_multicore
from util.event_based_metrics import EventBasedConfusionMatrix, get_overlaps, get_n_detected_annotations

# Makes numpy raise errors instead of outputting warnings
np.seterr('raise')

# Some preparations to pretty-print tensors & ndarrays
np.set_printoptions(edgeitems=10)
np.core.arrayprint._line_width = 400

### Loading the dataset

In [None]:
dataset_folder = DATA_PATH / "training" / "tr03-0005"
config = SlidingWindowDataset.Config(
    downsample_frequency_hz=5,
    time_window_size=pd.Timedelta("5 minutes")
)
sliding_window_dataset = SlidingWindowDataset(config=config, dataset_folder=dataset_folder, allow_caching=True)

print(f"#Physionet dataset samples: {len(sliding_window_dataset.signals)}")
print(f"#Sliding window positions: {len(sliding_window_dataset)}")
print(f"Timeframe of sliding window positions: {sliding_window_dataset.valid_center_points[-1] - sliding_window_dataset.valid_center_points[0]}")
print(f"Respiratory events list present: {sliding_window_dataset.respiratory_events is not None}")

### Outputting some statistics on the annotated respiratory events

In [None]:
annotated_respiratory_events = sliding_window_dataset.respiratory_events

respiratory_event_type_counter = Counter([e.event_type for e in annotated_respiratory_events])
print("Respiratory event types as per annotations:")
print(" - " + "\n - ".join(f"{klass.name}: {cnt}" for klass, cnt in respiratory_event_type_counter.items()))
print()
print(f"{len(annotated_respiratory_events)} annotated respiratory events:")
print(" - " + "\n - ".join([f"#{i}: {evt}" for i, evt in enumerate(annotated_respiratory_events)]))

# Enrich whole sliding window dataset by "is awake" row
awake_series = sliding_window_dataset.awake_series
sliding_window_dataset.signals[awake_series.name] = awake_series
del awake_series

# Enrich whole sliding window dataset by an events outline
annotated_events_outline_mat = np.zeros(shape=(len(sliding_window_dataset.signals),))
for event in annotated_respiratory_events:
    start_idx = sliding_window_dataset.signals.index.get_loc(event.start, method="nearest")
    end_idx = sliding_window_dataset.signals.index.get_loc(event.end, method="nearest")
    annotated_events_outline_mat[start_idx:end_idx] = 1
annotated_events_outline_series = pd.Series(data=annotated_events_outline_mat, index=sliding_window_dataset.signals.index)
sliding_window_dataset.signals["Annotated respiratory events"] = annotated_events_outline_series

del annotated_events_outline_series, annotated_events_outline_mat

### Detection run over a single dataset

Perform the detection. Also generate an outline for the detected events,
which is nice for plotting.

In [None]:
detected_respiratory_events = detect_respiratory_events(sliding_window_dataset.signals, sample_frequency_hz=sliding_window_dataset.config.downsample_frequency_hz, awake_series=None)
detected_hypopnea_events_ = [d_ for d_ in detected_respiratory_events if d_.event_type == RespiratoryEventType.Hypopnea]
detected_apnea_events_ = [d_ for d_ in detected_respiratory_events if d_.event_type != RespiratoryEventType.Hypopnea]

print()
print(f"Detected {len(detected_respiratory_events)} respiratory events")
print(f" ..of which are {len(detected_hypopnea_events_)} hypopneas")

# Enrich whole sliding window dataset by an events outline
detected_events_outline_mat = np.zeros(shape=(len(sliding_window_dataset.signals),))
for event in detected_respiratory_events:
    start_idx = sliding_window_dataset.signals.index.get_loc(event.start, method="nearest")
    end_idx = sliding_window_dataset.signals.index.get_loc(event.end, method="nearest")
    detected_events_outline_mat[start_idx:end_idx] = 1
detected_events_outline_series = pd.Series(data=detected_events_outline_mat, index=sliding_window_dataset.signals.index)
sliding_window_dataset.signals["Detected respiratory events"] = detected_events_outline_series

del detected_events_outline_series, detected_events_outline_mat

Generate and output some statistics on the detection performance. These are:
- Overlaps of detected & annotated respiratory events
- Confusion matrix based metrics
- Confusion matrix plot

In [None]:
# Get overlapping annotated/detected events & derive some statistics
overlapping_events = get_overlaps(annotated_events=annotated_respiratory_events, detected_events=detected_respiratory_events)
detected_but_not_annotated = [d_ for d_ in detected_respiratory_events if not any(a_.overlaps(d_) for a_ in annotated_respiratory_events)]
annotated_but_not_detected = [a_ for a_ in annotated_respiratory_events if not any(d_.overlaps(a_) for d_ in detected_respiratory_events)]

print(f"Number of annotated events: {len(annotated_respiratory_events)}")
print(f"Number of detected events: {len(detected_respiratory_events)}")
print()
print(f"Number of OVERLAPPING events: {len(overlapping_events)}")
print(f"- Coverage of annotated respiratory events {len(overlapping_events)/len(annotated_respiratory_events)*100:.1f}%")
print(f"- Detected events that also appear in annotations: {len(overlapping_events)/len(detected_respiratory_events)*100:.1f}%")
print()

# Obtain confusion-matrix based metrics
confusion_matrix = EventBasedConfusionMatrix(annotated_events=annotated_respiratory_events, detected_events=detected_respiratory_events)
macro_scores = confusion_matrix.get_macro_scores()
print("Confusion-matrix based macro scores:")
print(f" -> {macro_scores}")

confusion_matrix.plot(title="Confusion matrix for classification confidence over a single dataset")


The following lines allow plotting annotated & detected respiratory events

In [None]:
event_num = 29
event = annotated_respiratory_events[event_num]
# event = detected_respiratory_events[event_num]
# event = detected_but_not_annotated[event_num]
# event = annotated_but_not_detected[event_num]
# event = detected_hypopnea_events[event_num]

window_center_point = event.start + (event.end-event.start)/2
window_start = window_center_point - sliding_window_dataset.config.time_window_size / 2
window_end = window_center_point + sliding_window_dataset.config.time_window_size / 2

annotated_in_window = [e for e in annotated_respiratory_events if e.end > window_start and e.start < window_end]
detected_in_window = [e for e in detected_respiratory_events if e.end > window_start and e.start < window_end]
print()
print("Annotated respiratory events in window:")
print(" - " + "\n - ".join([f"{e.event_type.name}: {(e.end-e.start).total_seconds():.1f}s" for e in annotated_in_window]))
print()
print("Detected respiratory events in window:")
print(" - " + "\n - ".join([f"{e.event_type.name}: {(e.end-e.start).total_seconds():.1f}s" for e in detected_in_window]))

window_data = sliding_window_dataset.get(center_point=window_center_point)
_ = window_data.signals.plot(figsize=(25, 12), subplots=True)


### Detection run over a multiple datasets

Run multicore detector on a number of datasets.

In [None]:
# Use training dataset folders as per train-test-split
data_folder = DATA_PATH / "training"
train_test_folders = read_train_test_split_yaml(input_yaml=TRAIN_TEST_SPLIT_YAML, prefix_base_folder=data_folder)
dataset_folders = train_test_folders.train
del train_test_folders

# Use a given, small set of dataset folders
dataset_names = ("tr03-0005", "tr03-0289", "tr03-0921", "tr04-1078", "tr07-0168")
dataset_folders = [DATA_PATH / "training" / name for name in dataset_names]

In [None]:
config = SlidingWindowDataset.Config(
    downsample_frequency_hz=5,

    # The following fields have no effect on rule-based detection, though need to be provided and -indeed- have an effect on cache-misses
    time_window_size=pd.Timedelta("5 minutes"),
    time_window_stride=5,
    ground_truth_vector_width=11
)

# Let's first load our datasets & prepare the data. That -if no cache available- might take a while
sliding_window_datasets: List[SlidingWindowDataset] = []
signals: List[pd.DataFrame] = []
awake_series: List[pd.Series] = []
for dataset_folder in tqdm(dataset_folders, desc="Load & pre-process datasets"):
    sliding_window_dataset = SlidingWindowDataset(config=config, dataset_folder=dataset_folder, allow_caching=True)
    sliding_window_datasets += [sliding_window_dataset]
    signals += [sliding_window_dataset.signals]
    awake_series += [sliding_window_dataset.awake_series]

# Do the multicore detection work
progress_fn_ = functools.partial(tqdm, total=len(sliding_window_datasets), desc="Detecting respiratory events")
detected_event_lists: List[List[RespiratoryEvent]] = detect_respiratory_events_multicore(
    signals=signals,
    sample_frequency_hz=config.downsample_frequency_hz,
    awake_series=None,  # Insert 'awake_series' from above, such that respiratory events during wake times are neglected
    progress_fn=progress_fn_
)

Run a few metrics on the freshly-detected respiratory events. Also, directly print out a few statistics on overlaps
of annotated & detected respiratory events, which results in the __annotation recall__ score.

In [None]:
# Run metrics on our detections
overall_confusion_matrix = EventBasedConfusionMatrix.empty()
n_annotated_events: int = 0
n_detected_events: int = 0
n_detected_annotations: int = 0
for sliding_window_dataset, detected_events in zip(sliding_window_datasets, detected_event_lists):
    n_detected_events += len(detected_events)
    n_annotated_events += len(sliding_window_dataset.respiratory_events)

    cm_ = EventBasedConfusionMatrix(annotated_events=sliding_window_dataset.respiratory_events, detected_events=detected_events)
    overall_confusion_matrix += cm_
    o_ = get_n_detected_annotations(annotated_events=sliding_window_dataset.respiratory_events, detected_events=detected_events)
    n_detected_annotations += o_

print(f"Number of annotated respiratory events: {n_annotated_events}")
print(f"Number of detected respiratory events: {n_detected_events}")
print()
print(f"Number of detected annotations (overlaps): {n_detected_annotations} out of {n_annotated_events}")
print(f" -> Annotation recall: {n_detected_annotations/n_annotated_events:.3f}")

Print the confusion matrix and the derived scores

In [None]:
macro_scores = overall_confusion_matrix.get_macro_scores()
print("Confusion-matrix based macro scores:")
print(f" -> {macro_scores}")

plt.figure(figsize=(7, 7))
overall_confusion_matrix.plot(title=f"Confusion matrix for classification confidence over {len(dataset_folders)} datasets")

