In [None]:
import pandas as pd
import numpy as np
from scipy.fft import fft, fftfreq
from scipy.signal import welch
import librosa
from scipy.signal import find_peaks
!pip install PyWavelets
import pywt
import matplotlib.pyplot as plt
from scipy.signal import cwt, morlet


data = pd.read_csv("/content/data_trial01_bioz.csv")

# Display the first few rows of the DataFrame to understand its structure
print(data.head())

# Define a function for calculating statistical features
def calculate_statistical_features(signal):
    "Calculate statistical features of a signal"
    statistical_features = {
        'mean': signal.mean(),
        'std_dev': signal.std(),
        'skewness': signal.skew(),
        'kurtosis': signal.kurtosis(),
        'max': signal.max(),
        'min': signal.min(),
    }
    return statistical_features

# Extract time and FinapresBP columns for statistical feature calculations
finapresbp_column = data['FinapresBP']

# Calculate statistical features for FinapresBP column
statistical_results = calculate_statistical_features(finapresbp_column)

# Print or use the statistical_results
print(statistical_results)

# Define functions for calculating temporal features
def calculate_zero_crossing_rate(signal):
    "Calculate zero crossing rate of a signal"
    return ((signal[:-1] * signal[1:]) < 0).sum()

def calculate_signal_energy(signal):
    "Calculate signal energy"
    return (signal ** 2).sum()

def calculate_temporal_parameters(signal):
    "Calculate signal length, rate of change, etc."
    signal_length = len(signal)
    rate_of_change = signal.diff().mean()
    return signal_length, rate_of_change

# Extract time and FinapresBP columns for temporal feature calculations
time_column = data['time']
finapresbp_column = data['FinapresBP']

# Calculate temporal features for FinapresBP
zcr = calculate_zero_crossing_rate(finapresbp_column)
energy = calculate_signal_energy(finapresbp_column)
length, rate_change = calculate_temporal_parameters(finapresbp_column)

# Print the calculated zcr, energy, length, and rate_change features
print("Zero Crossing Rate:", zcr)
print("Signal Energy:", energy)
print("Signal Length:", length)
print("Rate of Change:", rate_change)

print(data.columns)
print(finapresbp_column.head())

def calculate_spectral_features(signal):
    "Calculate spectral features"
    f, Pxx = welch(signal)
    spectral_centroid = np.sum(f * Pxx) / np.sum(Pxx)
    spectral_spread = np.sqrt(np.sum(((f - spectral_centroid) ** 2) * Pxx) / np.sum(Pxx))
    return spectral_spread, spectral_centroid

spectral_spread, spectral_centroid = calculate_spectral_features(finapresbp_column.values)

# Print the calculated spectral_spread and spectral_centroid features
print("Spectral Spread:", spectral_spread)
print("Spectral Centroid:", spectral_centroid)

# Function to compute MFCCs
def calculate_mfccs(signal, sampling_rate):
    """
    Calculate Mel-Frequency Cepstrum Coefficients (MFCCs) from a given signal.

    Args:
    - signal (ndarray): Input signal data.
    - sampling_rate (int): Sampling rate of the signal.

    Returns:
    - ndarray: Matrix of MFCCs.
    """
    # Compute MFCCs
    mfccs = librosa.feature.mfcc(y=signal, sr=sampling_rate, n_mfcc=13)
    return mfccs

sampling_rate = 1250
mfccs_result = calculate_mfccs(finapresbp_column.values, sampling_rate)

# Print the calculated MFCCs
print("MFCCs shape:", mfccs_result.shape)
print("MFCCs:", mfccs_result)

def calculate_bandpowers(signal, sampling_rate, freq_bands):
    """
    Calculate bandpowers of a signal within specified frequency bands using Welch's method.

    Args:
    - signal (ndarray): Input signal data.
    - sampling_rate (int): Sampling rate of the signal.
    - freq_bands (list of tuples): List of tuples defining frequency bands (e.g., [(low1, high1), (low2, high2), ...]).

    Returns:
    - list: Bandpowers for each specified frequency band.
    """
    freqs, Pxx = welch(signal, fs=sampling_rate)

    bandpowers = []
    for band in freq_bands:
        low, high = band
        band_indices = np.where((freqs >= low) & (freqs <= high))
        power_in_band = np.sum(Pxx[band_indices])
        bandpowers.append(power_in_band)

    return bandpowers

# Calculate bandpowers
freq_bands = [(0, 10), (10, 20), (20, 30)]
bandpowers = calculate_bandpowers(finapresbp_column.values, actual_sampling_rate, freq_bands)
print("Bandpowers:", bandpowers)

def calculate_harmonics(signal):
    """
    Calculate the harmonics of a signal.

    Args:
    - signal (ndarray): Input signal data.

    Returns:
    - list: Frequencies of identified harmonics.
    """
    # Compute FFT to find peaks (harmonics)
    fft_vals = np.fft.fft(signal)
    spectrum = np.abs(fft_vals)

    # Find peaks in the spectrum
    peaks, _ = find_peaks(spectrum, distance=10)
    harmonic_frequencies = np.fft.fftfreq(len(signal))[peaks]

    return harmonic_frequencies

# Calculate harmonics
harmonic_frequencies = calculate_harmonics(finapresbp_column.values)
print("Harmonic Frequencies:", harmonic_frequencies)

# Define your wavelet parameters
widths = np.arange(1, 31)
cwt_matrix = cwt(finapresbp_column.values, morlet, widths)

# Calculate the magnitude and phase of the complex CWT coefficients
cwt_magnitude = np.abs(cwt_matrix)
cwt_phase = np.angle(cwt_matrix)

# Visualize the magnitude of the CWT coefficients
plt.imshow(cwt_magnitude, extent=[time_column.min(), time_column.max(), 1, 31], cmap='jet', aspect='auto')
plt.colorbar(label='Magnitude')
plt.xlabel('Time')
plt.ylabel('Width')
plt.title('Continuous Wavelet Transform Magnitude')
plt.show()

# Visualize the phase of the CWT coefficients
plt.imshow(cwt_phase, extent=[time_column.min(), time_column.max(), 1, 31], cmap='jet', aspect='auto')
plt.colorbar(label='Phase')
plt.xlabel('Time')
plt.ylabel('Width')
plt.title('Continuous Wavelet Transform Phase')
plt.show()


peaks, _ = find_peaks(finapresbp_column.values, distance=50)

# Plotting the peaks on the signal
plt.figure(figsize=(10, 6))
plt.plot(time_column, finapresbp_column.values, label='Signal')
plt.plot(time_column[peaks], finapresbp_column.values[peaks], 'r.', markersize=10, label='Peaks')
plt.xlabel('Time')
plt.ylabel('FinapresBP')
plt.title('Peak Detection')
plt.legend()
plt.show()

# Accessing peak values and times
peak_values = finapresbp_column.values[peaks]
peak_times = time_column[peaks]

# Display detected peaks
print("Detected peaks:", len(peaks))
print("Peak values:", peak_values)
print("Peak times:", peak_times)

def calculate_heart_rate_from_peaks(peak_times, sampling_rate):
    "Calculate heart rate from peak times and sampling rate"
    time_diff_between_peaks = np.diff(peak_times)
    heart_rate = 60 * sampling_rate / np.mean(time_diff_between_peaks)
    return heart_rate


def identify_characteristic_points(signal):
    "Identify characteristic points in a signal"

    # Find peaks and valleys
    peaks, _ = find_peaks(signal, distance=50)
    valleys, _ = find_peaks(-signal, distance=50)

    # Find zero crossings and their derivatives
    zero_crossings = np.where(np.diff(np.sign(signal)))[0]
    derivative = np.diff(signal)

    # Define characteristic points based on peak, valley, zero crossing, and derivative calculations
    characteristic_points = {
        'DIA': peaks[0] if len(peaks) > 0 else None,  # Diastolic Peak
        'MS': zero_crossings[np.argmax(np.abs(derivative))] if len(zero_crossings) > 0 else None,  # Maximum Slope
        'SYS': valleys[0] if len(valleys) > 0 else None,  # Systolic Foot
        'IP': zero_crossings[np.argmin(derivative)] if len(zero_crossings) > 0 else None  # Inflection Point
    }

    return characteristic_points

signal_data = finapresbp_column.values

characteristic_points = identify_characteristic_points(signal_data)

# Print the identified characteristic points
print("Identified Characteristic Points:", characteristic_points)

def window_based_averaging(signal, window_size=20, overlap=0.5):
    "Perform window-based averaging on a signal"
    averaged_signal = []

    # Calculate the overlap offset
    overlap_offset = int(window_size * overlap)

    # Iterate through the signal with the specified window size and overlap
    start = 0
    end = window_size
    while end <= len(signal):
        window = signal[start:end]
        average_value = np.mean(window)
        averaged_signal.append(average_value)

        start += overlap_offset
        end += overlap_offset

    return np.array(averaged_signal)

# Apply window-based averaging
smoothed_signal = window_based_averaging(signal_data)

print(smoothed_signal)


def window_based_averaging(signal, window_size=20, overlap=0.5):
    "Perform window-based averaging on a signal"
    averaged_signal = []

    # Calculate the overlap offset
    overlap_offset = int(window_size * overlap)

    # Iterate through the signal with the specified window size and overlap
    start = 0
    end = window_size
    while end <= len(signal):
        window = signal[start:end]
        average_value = np.mean(window)
        averaged_signal.append(average_value)

        start += overlap_offset
        end += overlap_offset

    return np.array(averaged_signal)

# Apply window-based averaging
smoothed_signal = window_based_averaging(signal_data)

print(smoothed_signal)

def calculate_ptt(signal1, signal2, sampling_rate):
    # Example PTT calculation (replace this with your actual calculation)
    ptt_result = np.abs(len(signal1) - len(signal2)) / sampling_rate
    return ptt_result

def calculate_mean_slope(signal):
    # Example mean slope calculation (replace this with your actual calculation)
    mean_slope_result = np.mean(np.diff(signal))
    return mean_slope_result

In [None]:
def compute_PTT(point1, point2, sampling_rate):
    "Compute Pulse Transit Time (PTT)"
    return (point2 - point1) / sampling_rate

def compute_ratio_amplitudes(signal, characteristic_points):
    "Compute Ratio of Amplitudes"
    dia = characteristic_points.get('DIA')
    sys = characteristic_points.get('SYS')
    ip = characteristic_points.get('IP')

    dia_amp = signal[dia]
    sys_amp = signal[sys]
    ip_amp = signal[ip]

    sys_to_dia_ratio = sys_amp / dia_amp if dia_amp != 0 else 0
    ip_to_dia_ratio = ip_amp / dia_amp if dia_amp != 0 else 0

    return sys_to_dia_ratio, ip_to_dia_ratio

# Function to calculate area under the curve
def compute_area_under_curve(signal_segment, sampling_rate):
    "Compute area under the curve of a signal segment"
    # Calculate the area under the curve using numerical integration
    area = np.trapz(signal_segment, dx=1 / sampling_rate)
    return area

# Calculate PTT
PTT = compute_PTT(point1_index, point2_index, sampling_rate)

# Calculate Ratio of Amplitudes
sys_to_dia_ratio, ip_to_dia_ratio = compute_ratio_amplitudes(signal_data, characteristic_points)

# Calculate Area under the Curve for a specific segment
segment_start = 1000
segment_end = 2000
signal_segment = signal_data[segment_start:segment_end]
area_under_curve = compute_area_under_curve(signal_segment, sampling_rate)

In [None]:
def categorize_features(extracted_features):
    """
    Categorize extracted features into sets.

    Args:
    - extracted_features (dict): Dictionary containing the extracted features.

    Returns:
    - dict: Categorized sets of features.
    """
    ptt_features = {
        'pulse_transit_time': extracted_features['pulse_transit_time']
    }

    timepoint_features = {
        'sys_ip_interval': extracted_features['sys_ip_interval'],
        'dia_sys_interval': extracted_features['dia_sys_interval'],
    }

    amplitude_features = {
        'ratio_amplitudes': extracted_features['ratio_amplitudes'],
    }

    area_features = {
        'area_under_curve': extracted_features['area_under_curve'],
    }

    categorized_features = {
        'PTT': ptt_features,
        'Timepoint': timepoint_features,
        'Amplitude': amplitude_features,
        'Area': area_features
    }

    return categorized_features

categorized_features = categorize_features(extracted_features)

In [None]:
# Simulated extracted features (random numbers for demonstration)
extracted_features = {
    'PTT': np.random.rand(),
    'Ratio_of_Amplitudes': np.random.rand(),
    'Time_Intervals': [np.random.rand(), np.random.rand()],
    'Area_under_the_Curve': np.random.rand(),
}

# Function to categorize features
def categorize_features(features):
    categorized_features = {
        'PTT_features': features['PTT'],
        'Amplitude_features': features['Ratio_of_Amplitudes'],
        'Timepoint_features': features['Time_Intervals'],
        'Area_features': features['Area_under_the_Curve'],
    }
    return categorized_features

# Categorize the extracted features
categorized_features = categorize_features(extracted_features)

# Print the categorized features
print(categorized_features)