In [None]:
# Importy podstawowe
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import signal
from scipy.signal import find_peaks, butter, filtfilt
import os
import warnings
warnings.filterwarnings('ignore')

# Importy specjalistyczne
from ecgdetectors import Detectors
from hrvanalysis import (
    get_time_domain_features, 
    get_frequency_domain_features, 
    get_poincare_plot_features,
    plot_poincare_plot
)

# Konfiguracja wykresów
plt.style.use('seaborn-v0_8')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 10


In [None]:
# Funkcje pomocnicze do wczytywania danych
def load_mechanocardiogram(filepath):
    """Wczytanie danych mechanokardiogramów z pliku txt"""
    data = {}
    signals = []
    
    with open(filepath, 'r') as file:
        lines = file.readlines()
    
    header_end = 0
    for i, line in enumerate(lines):
        if line.startswith('[DATA]'):
            header_end = i + 1
            break
        elif 'Sample rate' in line:
            data['fs'] = int(line.split('=')[1].strip().replace('Hz', ''))
        elif 'Measurement length' in line:
            data['duration'] = line.split(':')[1].strip()
        elif line.startswith('Signal'):
            signal_info = line.strip().split(': ')
            signal_name = signal_info[1].split(',')[0]
            signals.append(signal_name)
    
    # Wczytaj dane numeryczne
    if header_end > 0:
        numeric_data = []
        for line in lines[header_end:]:
            if line.strip():
                values = [float(x) for x in line.strip().split()]
                numeric_data.append(values)
        
        data['signals'] = np.array(numeric_data).T
        data['signal_names'] = signals
        data['time'] = np.arange(len(data['signals'][0])) / data['fs']

    for signal_name in signals:
        data[signal_name] = data['signals'][signals.index(signal_name)]
    
    data.pop('signals', None)
    data.pop('signal_names', None)
    if 'EKG' in data:
        data['ecg'] = data.pop('EKG', None) 
    elif 'ECG' in data:
        data['ecg'] = data.pop('ECG', None)
    
    return data

def load_cardiac_patient_data(filepath):
    """Wczytanie danych pacjentów kardiologicznych z pliku CSV Shimmer"""
    with open(filepath, 'r') as f:
        first_line = f.readline().strip()
        first_line = first_line.replace('"', '')
        if first_line.startswith('sep='):
            skiprows = 1
        else:
            skiprows = 0
        sep = first_line.split('=')[1] if '=' in first_line else ','
    
    df = pd.read_csv(filepath, skiprows=skiprows, sep=sep)
    data = {}
    
    timestamp_raw = df.iloc[:, 0].values
    data['timestamp'] = pd.to_numeric(timestamp_raw, errors='coerce')
    
    valid_mask = ~np.isnan(data['timestamp'])
    data['timestamp'] = data['timestamp'][valid_mask]
    
    ecg_cols = [col for col in df.columns if 'ECG' in col and 'CAL' in col]
    acc_cols = [col for col in df.columns if 'Accel' in col and 'CAL' in col]
    gyro_cols = [col for col in df.columns if 'Gyro' in col and 'CAL' in col]
    
    if ecg_cols:
        ecg_raw = df[ecg_cols].values[valid_mask]
        data['ecg'] = pd.DataFrame(ecg_raw).apply(pd.to_numeric, errors='coerce').values[:, 0]
    
    if acc_cols:
        acc_raw = df[acc_cols].values[valid_mask]
        data['accelerometer'] = pd.DataFrame(acc_raw).apply(pd.to_numeric, errors='coerce').values
        data['accX'] = data['accelerometer'][:, 0]
        data['accY'] = data['accelerometer'][:, 1]
        data['accZ'] = data['accelerometer'][:, 2]
        
    if gyro_cols:
        gyro_raw = df[gyro_cols].values[valid_mask]
        data['gyroscope'] = pd.DataFrame(gyro_raw).apply(pd.to_numeric, errors='coerce').values
        data['gyroX'] = data['gyroscope'][:, 0]
        data['gyroY'] = data['gyroscope'][:, 1]
        data['gyroZ'] = data['gyroscope'][:, 2]
    
    if 'ecg' in data:
        ecg_valid = ~np.isnan(data['ecg']).any()
        for key in data.keys():
            if isinstance(data[key], np.ndarray):
                data[key] = data[key][ecg_valid]
    
    if len(data['timestamp']) > 1:
        dt = np.median(np.diff(data['timestamp']))
        data['fs'] = 1.0 / (dt / 1000.0)
        if data['fs'] == np.inf:
            data['fs'] = 256.0
    
    data['time'] = (data['timestamp'] - data['timestamp'][0]) / 1000.0
    
    data.pop('accelerometer', None)
    data.pop('gyroscope', None)
    
    return data


In [None]:
# Klasa do detekcji uderzeń serca z różnych modalności
class MultimodalHeartbeatDetector:
    """Klasa do detekcji uderzeń serca z różnych modalności wykorzystująca bibliotekę ecgdetectors"""
    
    def __init__(self, fs):
        """
        Args:
            fs: częstotliwość próbkowania w Hz
        """
        self.fs = fs
        self.ecg_detector = Detectors(fs)
    
    def detect_ecg_peaks(self, ecg_signal, method='pan_tompkins'):
        """Detekcja zespołów QRS w sygnale EKG"""
        detector_methods = {
            'hamilton': self.ecg_detector.hamilton_detector,
            'christov': self.ecg_detector.christov_detector,
            'engzee': self.ecg_detector.engzee_detector,
            'pan_tompkins': self.ecg_detector.pan_tompkins_detector,
            'swt': self.ecg_detector.swt_detector,
            'two_average': self.ecg_detector.two_average_detector,
            'matched_filter': self.ecg_detector.matched_filter_detector,
            'wqrs': self.ecg_detector.wqrs_detector
        }
        
        if method not in detector_methods:
            raise ValueError(f"Nieznana metoda detekcji: {method}")
        
        return detector_methods[method](ecg_signal)
    
    def detect_scg_peaks(self, acc_signal, axis=2):
        """Detekcja uderzeń serca z sygnału akcelerometru (SCG)"""
        if acc_signal.ndim > 1:
            if axis is None:
                signal = np.sqrt(np.sum(acc_signal**2, axis=1))
            else:
                signal = acc_signal[:, axis]
        else:
            signal = acc_signal
            
        # Filtracja pasmowo-przepustowa (1-40 Hz)
        b, a = butter(4, [1/(self.fs/2), 40/(self.fs/2)], btype='band')
        filtered = filtfilt(b, a, signal)
        
        # Detekcja pików
        peaks, _ = find_peaks(filtered,
                            distance=int(0.4 * self.fs),  # Min 150 BPM
                            height=0.3 * np.max(filtered))
        
        return peaks
    
    def detect_gcg_peaks(self, gyro_signal, axis=2):
        """Detekcja uderzeń serca z sygnału żyroskopu (GCG)"""
        if gyro_signal.ndim > 1:
            if axis is None:
                signal = np.sqrt(np.sum(gyro_signal**2, axis=1))
            else:
                signal = gyro_signal[:, axis]
        else:
            signal = gyro_signal
            
        # Filtracja pasmowo-przepustowa (1-20 Hz)
        b, a = butter(4, [1/(self.fs/2), 20/(self.fs/2)], btype='band')
        filtered = filtfilt(b, a, signal)
        
        # Detekcja pików
        peaks, _ = find_peaks(filtered,
                            distance=int(0.4 * self.fs),  # Min 150 BPM
                            height=0.4 * np.max(filtered))
        
        return peaks


In [None]:
# Klasa do analizy HRV wykorzystująca bibliotekę hrvanalysis
class HRVAnalyzer:
    """Klasa do analizy zmienności rytmu serca wykorzystująca bibliotekę hrvanalysis"""
    
    def __init__(self, peaks, fs):
        """
        Args:
            peaks: indeksy wykrytych uderzeń serca
            fs: częstotliwość próbkowania w Hz
        """
        self.peaks = peaks
        self.fs = fs
        self.rr_intervals = self._calculate_rr_intervals()
    
    def _calculate_rr_intervals(self):
        """Obliczenie odstępów RR w milisekundach"""
        rr = np.diff(self.peaks) / self.fs * 1000
        return rr.tolist()
    
    def analyze_time_domain(self):
        """Analiza w dziedzinie czasu"""
        return get_time_domain_features(self.rr_intervals)
    
    def analyze_frequency_domain(self):
        """Analiza w dziedzinie częstotliwości"""
        return get_frequency_domain_features(self.rr_intervals, sampling_frequency=self.fs)
    
    def analyze_nonlinear(self):
        """Analiza nieliniowa"""
        return get_poincare_plot_features(self.rr_intervals)
    
    def plot_poincare(self, title="Wykres Poincaré"):
        """Wizualizacja wykresu Poincaré"""
        plot_poincare_plot(self.rr_intervals, title=title)
        plt.show()
    
    def get_all_features(self):
        """Pobranie wszystkich wskaźników HRV"""
        time_features = self.analyze_time_domain()
        freq_features = self.analyze_frequency_domain()
        nonlinear_features = self.analyze_nonlinear()
        
        all_features = {
            **time_features,
            **freq_features,
            **nonlinear_features
        }
        
        return pd.DataFrame([all_features])


In [None]:
# Przykład użycia - analiza danych z mechanokardiogramu
def analyze_mechanocardiogram(filepath, window_start=0, window_length=30):
    """Analiza danych z mechanokardiogramu"""
    # Wczytanie danych
    data = load_mechanocardiogram(filepath)
    
    # Wybór okna czasowego do analizy
    start_idx = int(window_start * data['fs'])
    end_idx = int((window_start + window_length) * data['fs'])
    
    # Inicjalizacja detektora
    detector = MultimodalHeartbeatDetector(data['fs'])
    
    # Detekcja pików dla różnych modalności
    ecg_peaks = detector.detect_ecg_peaks(data['ecg'][start_idx:end_idx])
    scg_peaks = detector.detect_scg_peaks(data['accZ'][start_idx:end_idx])
    gcg_peaks = detector.detect_gcg_peaks(data['gyroZ'][start_idx:end_idx])
    
    # Analiza HRV dla każdej modalności
    analyzers = {
        'ECG': HRVAnalyzer(ecg_peaks, data['fs']),
        'SCG': HRVAnalyzer(scg_peaks, data['fs']),
        'GCG': HRVAnalyzer(gcg_peaks, data['fs'])
    }
    
    # Zebranie wyników
    results = {}
    for modality, analyzer in analyzers.items():
        results[modality] = analyzer.get_all_features()
        
    # Wizualizacja wykresów Poincaré
    plt.figure(figsize=(15, 5))
    for i, (modality, analyzer) in enumerate(analyzers.items(), 1):
        plt.subplot(1, 3, i)
        analyzer.plot_poincare(title=f"Wykres Poincaré - {modality}")
    plt.tight_layout()
    
    return pd.concat(results, axis=0)


In [None]:
# Przykład użycia - analiza danych pacjentów kardiologicznych
def analyze_cardiac_patient(filepath, window_start=0, window_length=30):
    """Analiza danych pacjentów kardiologicznych"""
    # Wczytanie danych
    data = load_cardiac_patient_data(filepath)
    
    # Wybór okna czasowego do analizy
    start_idx = int(window_start * data['fs'])
    end_idx = int((window_start + window_length) * data['fs'])
    
    # Inicjalizacja detektora
    detector = MultimodalHeartbeatDetector(data['fs'])
    
    # Detekcja pików dla różnych modalności
    ecg_peaks = detector.detect_ecg_peaks(data['ecg'][start_idx:end_idx])
    scg_peaks = detector.detect_scg_peaks(np.column_stack([data['accX'], data['accY'], data['accZ']])[start_idx:end_idx])
    gcg_peaks = detector.detect_gcg_peaks(np.column_stack([data['gyroX'], data['gyroY'], data['gyroZ']])[start_idx:end_idx])
    
    # Analiza HRV dla każdej modalności
    analyzers = {
        'ECG': HRVAnalyzer(ecg_peaks, data['fs']),
        'SCG': HRVAnalyzer(scg_peaks, data['fs']),
        'GCG': HRVAnalyzer(gcg_peaks, data['fs'])
    }
    
    # Zebranie wyników
    results = {}
    for modality, analyzer in analyzers.items():
        results[modality] = analyzer.get_all_features()
        
    # Wizualizacja wykresów Poincaré
    plt.figure(figsize=(15, 5))
    for i, (modality, analyzer) in enumerate(analyzers.items(), 1):
        plt.subplot(1, 3, i)
        analyzer.plot_poincare(title=f"Wykres Poincaré - {modality}")
    plt.tight_layout()
    
    return pd.concat(results, axis=0)


In [None]:
# Przykład użycia - wizualizacja wyników
def plot_multimodal_comparison(results):
    """Wizualizacja porównania wyników między modalnościami"""
    # Wybór najważniejszych wskaźników do porównania
    key_metrics = {
        'mean_nni': 'Średni odstęp NN [ms]',
        'sdnn': 'SDNN [ms]',
        'rmssd': 'RMSSD [ms]',
        'pnni_50': 'pNN50 [%]',
        'lf_hf_ratio': 'LF/HF',
        'sd1': 'SD1 [ms]',
        'sd2': 'SD2 [ms]'
    }
    
    # Przygotowanie danych do wizualizacji
    plot_data = results[list(key_metrics.keys())]
    plot_data.columns = list(key_metrics.values())
    
    # Tworzenie wykresu
    plt.figure(figsize=(15, 10))
    
    # Wykres słupkowy dla każdego wskaźnika
    plot_data.plot(kind='bar', ax=plt.gca())
    plt.title('Porównanie wskaźników HRV między modalnościami')
    plt.xlabel('Modalność')
    plt.ylabel('Wartość')
    plt.xticks(rotation=45)
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    
    return plt.gcf()


In [None]:
# Przykład analizy danych z mechanokardiogramu
mechano_results = analyze_mechanocardiogram('Surowe/sub_1.txt', window_start=0, window_length=30)
print("\nWyniki analizy mechanokardiogramu:")
print(mechano_results)

# Wizualizacja porównania wyników
plot_multimodal_comparison(mechano_results)
plt.show()


In [None]:
# Przykład analizy danych pacjenta kardiologicznego
cardiac_results = analyze_cardiac_patient('Raw_Recordings/UP-25-Raw.csv', window_start=0, window_length=30)
print("\nWyniki analizy danych pacjenta kardiologicznego:")
print(cardiac_results)

# Wizualizacja porównania wyników
plot_multimodal_comparison(cardiac_results)
plt.show()
