TFG part 3- Sofía Valle López

This notebook contains two main parts: first, several filters are tested on a random fragment from each of the 20 patients, generating 20 HTML files to evaluate their effects on the signal. Based on these results, the 0.5–15 Hz filter was selected for offering the best balance between spectral cleanliness and waveform preservation. In the second part, this filter is applied to all 56 available fragments, and the filtered signals are saved in a new folder for further analysis.

An FIR filter has been used for the 0.015–15 Hz and 0.3–2.5 Hz bands, as IIR filters caused numerical instability in these low and narrow ranges. FIR filters offer greater stability and preserve the temporal shape of the signal due to their linear phase response, although they require more computational resources and longer filter lengths. For the 0.5–5 Hz and 0.5–15 Hz bands, an IIR Butterworth filter has been used, as it is more computationally efficient and provides smooth, clean results, ideal for isolating the cardiac component of the PIC signal. However, this type of filter becomes unstable at very low cutoff frequencies, which is why it has not been applied to the narrower bands.

In the following cell, a random fragment is selected and the four candidate filters are applied: two FIR filters (0.015–15 Hz and 0.3–2.5 Hz) and two IIR filters (0.5–5 Hz and 0.5–15 Hz). The code generates three interactive plots: one with all filtered signals overlaid, one comparing each filter with the original signal, and one showing their respective frequency spectra. These plots are exported to a single HTML file for visual inspection and comparison.

In [9]:
import os
import numpy as np
from scipy.signal import butter, filtfilt, firwin
from datetime import datetime, timedelta
import plotly.graph_objects as go
from plotly.subplots import make_subplots

file_path = "C:\\Users\\sofia\\OneDrive\\Escritorio\\TFGPython\\all_patients_fragments_no_artifacts\\paciente1_fragmento2.txt"
output_folder = "C:\\Users\\sofia\\OneDrive\\Escritorio\\TFGPython\\testing_filters_results"
os.makedirs(output_folder, exist_ok=True)

# Helper functions
def apply_iir_filter(signal, lowcut, highcut, fs, order=4):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, signal)

def apply_fir_filter(signal, lowcut, highcut, fs, numtaps=1001):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    coeff = firwin(numtaps, [low, high], pass_zero=False)
    return filtfilt(coeff, [1.0], signal)

def calculate_peak_to_peak(signal):
    return np.max(signal) - np.min(signal)

def calculate_fft(signal, fs):
    n = len(signal)
    yf = np.fft.fft(signal)
    xf = np.fft.fftfreq(n, 1/fs)[:n//2]
    spectrum = (2 / n) * np.abs(yf[:n//2])
    return xf, spectrum

# Read data and metadata
with open(file_path, "r", encoding="utf-8") as f:
    lines = f.readlines()

Fs = None
start_time = None
header_end_index = None

for i, line in enumerate(lines):
    if line.startswith("Interval="):
        interval_str = line.split("=")[1].strip().split()[0].replace(",", ".")
        Fs = 1 / float(interval_str)
    elif line.startswith("ExcelDateTime="):
        datetime_str = line.split("\t")[-1].split(",")[0].strip()
        for fmt in ["%d/%m/%Y %H:%M:%S", "%d/%m/%Y %H:%M:%S.%f"]:
            try:
                start_time = datetime.strptime(datetime_str, fmt)
                break
            except ValueError:
                continue
    elif line.startswith("BottomValue="):
        header_end_index = i + 1
        break

# Extract PIC values
pic_values = np.array([
    float(line.strip().split()[2].replace(",", ".")) 
    for line in lines[header_end_index:] 
    if len(line.strip().split()) >= 3
])

# Create time vector and limit to 1 minute
time_vector = np.array([
    start_time + timedelta(seconds=i / Fs) for i in range(len(pic_values))
])
max_samples = int(60 * Fs)
pic_values = pic_values[:max_samples]
time_vector = time_vector[:max_samples]

# Define filters
filters = [
    {"type": "FIR", "low": 0.015, "high": 15,  "color": "red",    "name": "FIR 0.015–15 Hz"},
    {"type": "FIR", "low": 0.3,   "high": 2.5, "color": "blue",   "name": "FIR 0.3–2.5 Hz"},
    {"type": "IIR", "low": 0.5,   "high": 5,   "color": "green",  "name": "IIR 0.5–5 Hz"},
    {"type": "IIR", "low": 0.5,   "high": 15,  "color": "purple", "name": "IIR 0.5–15 Hz"}
]

# Apply filters and compute FFT
for filt in filters:
    if filt["type"] == "FIR":
        filt["signal"] = apply_fir_filter(pic_values, filt["low"], filt["high"], Fs)
    else:
        filt["signal"] = apply_iir_filter(pic_values, filt["low"], filt["high"], Fs)
    filt["pp"] = calculate_peak_to_peak(filt["signal"])
    filt["xf"], filt["spectrum"] = calculate_fft(filt["signal"], Fs)
    filt["xf"] = filt["xf"][filt["xf"] <= 6]
    filt["spectrum"] = filt["spectrum"][:len(filt["xf"])]

xf_orig, yf_orig = calculate_fft(pic_values, Fs)
xf_orig = xf_orig[xf_orig <= 6]
yf_orig = yf_orig[:len(xf_orig)]

# PLOT 1: All filters + original (time domain)
fig_all = go.Figure()
fig_all.add_trace(go.Scattergl(x=time_vector, y=pic_values, mode='lines', name="Original PIC", line=dict(color='black', width=1)))
for filt in filters:
    fig_all.add_trace(go.Scattergl(
        x=time_vector, y=filt["signal"],
        mode='lines',
        name=f"{filt['name']} (PP: {filt['pp']:.2f} mmHg)",
        line=dict(color=filt["color"], width=1)
    ))

fig_all.update_layout(
    title="Filtered PIC Signal (Time Domain, 1 minute)",
    xaxis_title="Time",
    yaxis_title="Pressure (mmHg)",
    width=1000,
    height=600,
    xaxis=dict(rangeslider=dict(visible=True, thickness=0.05), type="date"),
    yaxis=dict(range=[-15, 15]),
    hovermode="x unified",
    legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1)
)

# PLOT 2: One subplot per filter (original + filtered)
fig_each = make_subplots(
    rows=4, cols=1,
    shared_xaxes=True,
    vertical_spacing=0.02,
    subplot_titles=[f["name"] for f in filters]
)

for i, filt in enumerate(filters):
    fig_each.add_trace(go.Scattergl(x=time_vector, y=pic_values, name="Original", line=dict(color="black", width=1)), row=i+1, col=1)
    fig_each.add_trace(go.Scattergl(x=time_vector, y=filt["signal"], name=filt["name"], line=dict(color=filt["color"], width=1)), row=i+1, col=1)

fig_each.update_layout(
    title="Original vs Filtered Signal for Each Filter (Time Domain, 1 minute)",
    width=1000,
    height=1200,
    hovermode="x unified",
    showlegend=False
)

for i in range(1, 5):
    fig_each.update_yaxes(range=[-15, 15], row=i, col=1)
    fig_each.update_xaxes(type="date", row=i, col=1)

# PLOT 3: Frequency domain (FFT)
fig_fft = make_subplots(
    rows=5, cols=1, shared_xaxes=True, vertical_spacing=0.02,
    subplot_titles=[
        "Original",
        "Original + FIR 0.015–15 Hz",
        "Original + FIR 0.3–2.5 Hz",
        "Original + IIR 0.5–5 Hz",
        "Original + IIR 0.5–15 Hz"
    ]
)

fig_fft.add_trace(go.Scatter(x=xf_orig, y=yf_orig, name="Original", line=dict(color="black", width=2)), row=1, col=1)
fig_fft.add_vrect(x0=1, x1=2.16, fillcolor="green", opacity=0.2, line_width=0, row=1, col=1, annotation_text="Banda Cardíaca", annotation_position="top left")

for i, filt in enumerate(filters):
    fig_fft.add_trace(go.Scatter(x=xf_orig, y=yf_orig, line=dict(color="black", width=2), name="Original" if i == 0 else None, showlegend=(i == 0)), row=i+2, col=1)
    fig_fft.add_trace(go.Scatter(x=filt["xf"], y=filt["spectrum"], name=filt["name"], line=dict(color=filt["color"], width=2), showlegend=False), row=i+2, col=1)
    fig_fft.add_vrect(x0=1, x1=2.16, fillcolor="green", opacity=0.2, line_width=0, row=i+2, col=1)

fig_fft.update_layout(
    height=1600,
    width=1000,
    title="FFT Comparison: Original vs Filtered Signals (1 minute)",
    xaxis_title="Frequency (Hz)",
    yaxis_title="Amplitude",
    hovermode="x unified",
    template="plotly_white"
)

fig_fft.update_xaxes(range=[0, 6])
for annotation in fig_fft['layout']['annotations']:
    annotation['y'] -= 0.03

# COMBINE PLOTS INTO ONE HTML
# Create output filename based on the input file
fragment_name = os.path.splitext(os.path.basename(file_path))[0]
output_filename = f"{fragment_name}_analisis_1min_tiempo_y_frecuencia.html"
output_path = os.path.join(output_folder, output_filename)

with open(output_path, "w", encoding="utf-8") as f:
    f.write(fig_all.to_html(full_html=False, include_plotlyjs='cdn'))
    f.write("<hr style='margin:60px 0'>")
    f.write(fig_each.to_html(full_html=False, include_plotlyjs=False))
    f.write("<hr style='margin:60px 0'>")
    f.write(fig_fft.to_html(full_html=False, include_plotlyjs=False))

print(f"Analysis saved to:\n{output_path}")


Analysis saved to:
C:\Users\sofia\OneDrive\Escritorio\TFGPython\testing_filters_results\paciente1_fragmento2_analisis_1min_tiempo_y_frecuencia.html


Filter caracterization

In [64]:
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy.signal import butter, firwin, freqz
from pandas.plotting import table

# Output path
output_dir = "C:\\Users\\sofia\\OneDrive\\Escritorio\\TFGPython\\testing_filters_results\\filter_characterization"
os.makedirs(output_dir, exist_ok=True)

# Sampling frequencies
fs_list = [200, 400]

# Filter definitions
filter_defs = [
    {"type": "FIR", "low": 0.015, "high": 15.0, "name": "FIR 0.015–15 Hz", "numtaps": 1001},
    {"type": "FIR", "low": 0.3,   "high": 2.5,  "name": "FIR 0.3–2.5 Hz",  "numtaps": 1001},
    {"type": "IIR", "low": 0.5,   "high": 5.0,  "name": "IIR 0.5–5 Hz",    "order": 4},
    {"type": "IIR", "low": 0.5,   "high": 15.0, "name": "IIR 0.5–15 Hz",   "order": 4},
]

# Function to render the transfer function as an image
def render_transfer_function(b, a, filter_name, fs, output_dir, max_terms=6):
    def to_latex_poly(coefs, var='z', max_terms=6):
        terms = []
        for i, coef in enumerate(coefs[:max_terms]):
            coef_str = f"{coef:.4f}"
            if i == 0:
                terms.append(f"{coef_str}")
            else:
                terms.append(f"{coef_str}{var}^{{-{i}}}")
        if len(coefs) > max_terms:
            terms.append(r"\cdots")
        return " + ".join(terms)

    b_poly = to_latex_poly(b, 'z', max_terms)
    is_fir = (len(a) == 1 and a[0] == 1.0)
    if is_fir:
        latex_str = r"$H(z) = " + b_poly + "$"
    else:
        a_poly = to_latex_poly(a, 'z', max_terms)
        latex_str = r"$H(z) = \frac{" + b_poly + "}{" + a_poly + "}$"

    fig, ax = plt.subplots(figsize=(8, 1.5))
    ax.axis('off')
    ax.text(0.5, 0.5, latex_str, fontsize=16, ha='center', va='center')
    filename = f"TF_{filter_name.replace(' ', '_').replace('–', '-')}_fs{fs}.png"
    fig.savefig(os.path.join(output_dir, filename), bbox_inches='tight')
    plt.close()

# High resolution frequency axis
w = np.linspace(0, np.pi, 8192)

summary_data = []

for fs in fs_list:
    freqs_hz = w * fs / (2 * np.pi)
    nyq = fs / 2
    fig, axs = plt.subplots(len(filter_defs), 2, figsize=(12, 4 * len(filter_defs)))
    fig.suptitle(f"Bode Diagrams at fs = {fs} Hz", fontsize=16)

    for i, filt in enumerate(filter_defs):
        low = filt["low"] / nyq
        high = filt["high"] / nyq

        if filt["type"] == "FIR":
            b = firwin(filt["numtaps"], [low, high], pass_zero=False)
            a = [1.0]
            _, h = freqz(b, worN=w)
            order = filt["numtaps"] - 1
        else:
            b, a = butter(filt["order"], [low, high], btype='band')
            _, h = freqz(b, a, worN=w)
            order = filt["order"]

        render_transfer_function(b, a, filt["name"], fs, output_dir)
        tf_str = f"H(z) image → TF_{filt['name'].replace(' ', '_').replace('–', '-')}_fs{fs}.png"

        magnitude_db = 20 * np.log10(np.abs(h))
        phase_deg = np.angle(h, deg=True)

        # Real passband calculation (-3 dB)
        if np.max(magnitude_db) < -3:
            band_start, band_end = None, None
        else:
            passband = magnitude_db >= -3
            if np.any(passband):
                band_start = freqs_hz[passband][0]
                band_end = freqs_hz[passband][-1]
            else:
                band_start, band_end = None, None

        # Ripple calculation using the central 60% of the passband
        margin = 0.2  # 20% margin on each side
        band_width = filt["high"] - filt["low"]
        low_ripple = filt["low"] + margin * band_width
        high_ripple = filt["high"] - margin * band_width
        idx_pass = np.where((freqs_hz >= low_ripple) & (freqs_hz <= high_ripple))[0]

        if len(idx_pass) > 0:
            ripple = np.max(magnitude_db[idx_pass]) - np.min(magnitude_db[idx_pass])
        else:
            ripple = None

        # Slope after highcut frequency
        slope = None
        try:
            idx_start = np.argmin(np.abs(freqs_hz - filt["high"]))
            target_drop_db = 20
            idx_end = None
            for j in range(idx_start + 1, len(magnitude_db)):
                if magnitude_db[idx_start] - magnitude_db[j] >= target_drop_db:
                    idx_end = j
                    break
            if idx_end is not None:
                slope = (magnitude_db[idx_end] - magnitude_db[idx_start]) / (freqs_hz[idx_end] - freqs_hz[idx_start])
                axs[i, 0].plot(
                    [freqs_hz[idx_start], freqs_hz[idx_end]],
                    [magnitude_db[idx_start], magnitude_db[idx_end]],
                    color='green', linestyle='--', label='Slope'
                )
                mid_x = (freqs_hz[idx_start] + freqs_hz[idx_end]) / 2
                mid_y = (magnitude_db[idx_start] + magnitude_db[idx_end]) / 2
                axs[i, 0].annotate(
                    f"{slope:.2f} dB/Hz",
                    xy=(mid_x, mid_y - 10),
                    color='green',
                    fontsize=9,
                    ha='center',
                    va='top'
                )
        except:
            slope = None

        # Magnitude plot
        axs[i, 0].plot(freqs_hz, magnitude_db, label='Magnitude (dB)')
        axs[i, 0].axhline(-3, color='red', linestyle='--', label='-3 dB')
        if band_start is not None and band_end is not None:
            axs[i, 0].axvline(band_start, color='purple', linestyle=':', label='-3 dB Start')
            axs[i, 0].axvline(band_end, color='orange', linestyle=':', label='-3 dB End')

            ymin = axs[i, 0].get_ylim()[0]

            axs[i, 0].annotate(f"{band_start:.2f} Hz",
                            xy=(band_start, ymin),
                            xytext=(band_start, ymin - 33),  # 2 mm más abajo que antes
                            textcoords='data',
                            ha='center',
                            fontsize=8,
                            color='purple',
                            arrowprops=dict(arrowstyle='->', color='purple', lw=0.5))

            axs[i, 0].annotate(f"{band_end:.2f} Hz",
                            xy=(band_end, ymin),
                            xytext=(band_end, ymin - 33),
                            textcoords='data',
                            ha='center',
                            fontsize=8,
                            color='orange',
                            arrowprops=dict(arrowstyle='->', color='orange', lw=0.5))




        axs[i, 0].set_title(f"{filt['name']} – Magnitude")
        axs[i, 0].set_ylabel("Magnitude (dB)")
        axs[i, 0].grid(True)
        axs[i, 0].set_xlim(0, filt["high"] * 3)

        # Phase plot
        axs[i, 1].plot(freqs_hz, phase_deg)
        axs[i, 1].set_title(f"{filt['name']} – Phase")
        axs[i, 1].set_ylabel("Phase (°)")
        axs[i, 1].grid(True)
        axs[i, 1].set_xlim(0, filt["high"] * 3)

        if i == len(filter_defs) - 1:
            axs[i, 0].set_xlabel("Frequency (Hz)")
            axs[i, 1].set_xlabel("Frequency (Hz)")

        summary_data.append({
            "Filter": filt["name"],
            "Type": filt["type"],
            "Order": order,
            "Fs (Hz)": fs,
            "Declared Band (Hz)": f"{filt['low']}–{filt['high']}",
            "Real Band (-3 dB)": f"{band_start:.3f}–{band_end:.3f}" if band_start is not None else "N/A",
            "Ripple (dB)": f"{ripple:.3f}" if ripple is not None else "N/A",
            "Slope after highcut (dB/Hz)": f"{slope:.2f}" if slope is not None else "N/A",
            "Transfer Function": tf_str
        })

    handles, labels = axs[0, 0].get_legend_handles_labels()
    fig.legend(handles, labels, loc='lower center', ncol=5, bbox_to_anchor=(0.5, 0.01))
    fig.tight_layout(rect=[0, 0.04, 1, 0.96])
    fig.savefig(os.path.join(output_dir, f"Bode_Diagrams_fs{fs}.png"))
    plt.close()

# Summary table
df = pd.DataFrame(summary_data)
fig, ax = plt.subplots(figsize=(28, 2 + 0.5 * len(df)))
ax.axis('off')
tbl = table(ax, df, loc='center', cellLoc='center', colWidths=[0.12]*len(df.columns))
tbl.auto_set_font_size(False)
tbl.set_fontsize(9)
tbl.scale(1.2, 1.5)
plt.savefig(os.path.join(output_dir, "filter_summary_table.png"), bbox_inches='tight')
plt.close()

print("Analysis completed with improved Ripple calculation.")


Analysis completed with improved Ripple calculation.


After reviewing the 20 HTML reports generated (one random 1-minute fragment per patient, total of 20 patients; available on GitHub), the following conclusions can be drawn:

- The 0.015–15 Hz filter fails to remove slow components, which compromises the frequency-domain analysis.

- The 0.3–2.5 Hz and 0.5–5 Hz filters successfully eliminate slow waves but distort the pulsatile waveform, making P1, P2, and P3 identification unreliable.

- The 0.5–15 Hz filter achieves the best balance: it removes slow and high-frequency noise while preserving the waveform morphology, enabling consistent analysis in both time and frequency domains.

Therefore, the 0.5–15 Hz filter is selected as the choice of this comparation.

A analysis in the following cell code was conducted after observing that the peak-to-peak amplitude of the signal significantly changed when applying the initially selected 0.5–15 Hz filter. To better preserve the original signal morphology, several bandpass filters with lower cutoff frequencies (from 0.2 Hz to 0.5 Hz, all with an upper cutoff of 15 Hz) were applied to the same randomly selected fragments used in the first analysis. Each filtered signal was evaluated both in the time domain (comparing synchronized peak-to-peak values) and the frequency domain (FFT)

In [54]:
import os
import numpy as np
from scipy.signal import butter, filtfilt, find_peaks
from datetime import datetime, timedelta
import plotly.graph_objects as go

file_path = "C:\\Users\\sofia\\OneDrive\\Escritorio\\TFGPython\\all_patients_fragments_no_artifacts\\paciente20_fragmento2.txt"
output_folder = "C:\\Users\\sofia\\OneDrive\\Escritorio\\TFGPython\\testing_filters_results"
os.makedirs(output_folder, exist_ok=True)

# SIGNAL PROCESSING FUNCTIONS
def apply_iir_filter(signal, lowcut, highcut, fs, order=4):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, signal)

def get_synced_minima(signal, peak_indices, fs):
    minima = []
    pp_values = []
    for p in peak_indices:
        win_start = max(p - int(0.25*fs), 0)
        win_end = min(p + int(0.25*fs), len(signal))
        local_min_idx = np.argmin(signal[win_start:win_end]) + win_start
        minima.append(local_min_idx)
        pp_values.append(signal[p] - signal[local_min_idx])
    mean_pp = np.mean(pp_values) if pp_values else 0.0
    return minima, mean_pp

def calculate_time_vector(start_time, n_samples, fs):
    return np.array([start_time + timedelta(seconds=i/fs) for i in range(n_samples)])

def compute_fft(signal, fs):
    N = len(signal)
    fft_vals = np.fft.rfft(signal)
    fft_freq = np.fft.rfftfreq(N, d=1/fs)
    fft_amp = np.abs(fft_vals) * 2 / N  # Manual normalization
    return fft_freq, fft_amp

# FILE READING AND HEADER PARSING
with open(file_path, "r", encoding="utf-8") as f:
    lines = f.readlines()

Fs = None
start_time = None
header_end_index = None

for i, line in enumerate(lines):
    if line.startswith("Interval="):
        interval_str = line.split("=")[1].strip().split()[0].replace(",", ".")
        Fs = 1 / float(interval_str)
    elif line.startswith("ExcelDateTime="):
        datetime_str = line.split("\t")[-1].split(",")[0].strip()
        for fmt in ["%d/%m/%Y %H:%M:%S", "%d/%m/%Y %H:%M:%S.%f"]:
            try:
                start_time = datetime.strptime(datetime_str, fmt)
                break
            except ValueError:
                continue
    elif line.startswith("BottomValue="):
        header_end_index = i + 1
        break

# LOAD RAW PIC SIGNAL
pic_values = np.array([
    float(line.strip().split()[2].replace(",", "."))
    for line in lines[header_end_index:]
    if len(line.strip().split()) >= 3
])
time_vector = calculate_time_vector(start_time, len(pic_values), Fs)

# LIMIT TO FIRST 10 SECONDS
max_samples = int(10 * Fs)
pic_values = pic_values[:max_samples]
time_vector = time_vector[:max_samples]

# FILTER SETTINGS
filters = [
    {"low": 0.2,  "high": 15, "color": "teal",    "name": "F 0.2–15"},
    {"low": 0.25, "high": 15, "color": "brown",   "name": "F 0.25–15"},
    {"low": 0.3,  "high": 15, "color": "blue",    "name": "F 0.3–15"},
    {"low": 0.35, "high": 15, "color": "green",   "name": "F 0.35–15"},
    {"low": 0.4,  "high": 15, "color": "orange",  "name": "F 0.4–15"},
    {"low": 0.45, "high": 15, "color": "purple",  "name": "F 0.45–15"},
    {"low": 0.5,  "high": 15, "color": "red",     "name": "F 0.5–15"}  
]

# PEAK DETECTION
peak_indices, _ = find_peaks(pic_values, distance=0.4*Fs)

# GRAPH 1: FULL-SCALE SIGNAL VIEW
fig_main = go.Figure()
minima_raw, mean_pp_raw = get_synced_minima(pic_values, peak_indices, Fs)

fig_main.add_trace(go.Scatter(x=time_vector, y=pic_values, mode="lines", name="Raw", line=dict(color="black", width=1)))
fig_main.add_trace(go.Scatter(x=time_vector[peak_indices], y=pic_values[peak_indices], mode="markers", name=f"Raw peaks (PP: {mean_pp_raw:.3f})", marker=dict(color="black", symbol="x", size=7)))
fig_main.add_trace(go.Scatter(x=time_vector[minima_raw], y=pic_values[minima_raw], mode="markers", name="Raw minima", marker=dict(color="gray", symbol="circle", size=5)))
for p, m in zip(peak_indices, minima_raw):
    fig_main.add_trace(go.Scatter(x=[time_vector[p], time_vector[p]], y=[pic_values[m], pic_values[p]], mode="lines", line=dict(color="gray", width=1, dash="dot"), showlegend=False))

# Store FFT results and generate filtered plots
fft_results = []
for filt in filters:
    filtered = apply_iir_filter(pic_values, filt["low"], filt["high"], Fs)
    filt["signal"] = filtered
    minima_filt, mean_pp_filt = get_synced_minima(filtered, peak_indices, Fs)

    fig_main.add_trace(go.Scatter(x=time_vector, y=filtered, mode="lines", name=f"{filt['name']} (PP: {mean_pp_filt:.3f})", line=dict(color=filt["color"], width=1)))
    fig_main.add_trace(go.Scatter(x=time_vector[peak_indices], y=filtered[peak_indices], mode="markers", name=f"{filt['name']} peaks", marker=dict(color=filt["color"], symbol="x", size=7)))
    fig_main.add_trace(go.Scatter(x=time_vector[minima_filt], y=filtered[minima_filt], mode="markers", name=f"{filt['name']} minima", marker=dict(color=filt["color"], symbol="circle", size=5)))
    for p, m in zip(peak_indices, minima_filt):
        fig_main.add_trace(go.Scatter(x=[time_vector[p], time_vector[p]], y=[filtered[m], filtered[p]], mode="lines", line=dict(color=filt["color"], width=1, dash="dot"), showlegend=False))

    fft_freq, fft_amp = compute_fft(filtered, Fs)
    fft_results.append({"freq": fft_freq, "amp": fft_amp, "name": filt["name"], "color": filt["color"]})

# GRAPH 2: ZOOMED VIEW
fig_zoom = go.Figure()
fig_zoom.add_trace(go.Scatter(x=time_vector, y=pic_values, mode="lines", name="Raw", line=dict(color="black", width=1)))
for filt in filters:
    fig_zoom.add_trace(go.Scatter(x=time_vector, y=filt["signal"], mode="lines", name=filt["name"], line=dict(color=filt["color"], width=1)))

fig_zoom.update_layout(
    title="Zoomed View: Raw and Filtered Signals",
    xaxis_title="Time",
    yaxis_title="Pressure (mmHg)",
    width=1150,
    height=600,
    yaxis=dict(range=[-0.2, 1.1]),
    hovermode="x unified"
)

# GRAPH 3: FFT COMPARISON
fft_freq_raw, fft_amp_raw = compute_fft(pic_values, Fs)
fig_fft = go.Figure()
fig_fft.add_trace(go.Scatter(x=fft_freq_raw, y=fft_amp_raw, mode="lines", name="Raw", line=dict(color="black", width=2)))
for res in fft_results:
    fig_fft.add_trace(go.Scatter(x=res["freq"], y=res["amp"], mode="lines", name=res["name"], line=dict(color=res["color"], width=1)))

fig_fft.update_layout(
    title="FFT: Raw and Filtered Signals",
    xaxis_title="Frequency (Hz)",
    yaxis_title="Amplitude",
    width=1150,
    height=500,
    xaxis=dict(range=[0, 20]),
    hovermode="x"
)

# LAYOUT UPDATE FOR MAIN FIGURE
fig_main.update_layout(
    title="10-second Segment: Raw and Filtered PIC Signals (Synchronized Peak-to-Peak)",
    xaxis_title="Time",
    yaxis_title="Pressure (mmHg)",
    width=1150,
    height=800,
    margin=dict(t=60, b=80),
    xaxis=dict(rangeslider=dict(visible=True, thickness=0.05), type="date"),
    yaxis=dict(range=[-15, 15]),
    hovermode="x unified"
)

# EXPORT TO HTML
fragment_name = os.path.splitext(os.path.basename(file_path))[0]
output_filename = f"{fragment_name}_second_analysis.html"
output_path = os.path.join(output_folder, output_filename)

with open(output_path, "w", encoding="utf-8") as f:
    f.write(f"<html><head><title>Analysis of {fragment_name}</title></head><body>")
    f.write(f"<h1 style='text-align:center'>Analysis of {fragment_name}</h1><hr>")
    f.write(fig_main.to_html(full_html=False, include_plotlyjs="cdn"))
    f.write("<hr><h2 style='text-align:center'>Zoomed Signal View</h2>")
    f.write(fig_zoom.to_html(full_html=False, include_plotlyjs=False))
    f.write("<hr><h2 style='text-align:center'>Frequency Domain (FFT)</h2>")
    f.write(fig_fft.to_html(full_html=False, include_plotlyjs=False))
    f.write("</body></html>")

print(f"Analysis saved to:\n{output_path}")

Analysis saved to:
C:\Users\sofia\OneDrive\Escritorio\TFGPython\testing_filters_results\paciente20_fragmento2_second_analysis.html


Filter caracterization

In [67]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import butter, freqz
import pandas as pd
from pandas.plotting import table
import os

# Output directory
output_dir = "C:\\Users\\sofia\\OneDrive\\Escritorio\\TFGPython\\testing_filters_results\\second_analysis_characterization"
os.makedirs(output_dir, exist_ok=True)

# Define filter configurations (low and high cutoff frequencies)
filters = [
    {"low": 0.2,  "high": 15, "color": "teal",   "name": "IIR 0.2–15"},
    {"low": 0.25, "high": 15, "color": "brown",  "name": "IIR 0.25–15"},
    {"low": 0.3,  "high": 15, "color": "blue",   "name": "IIR 0.3–15"},
    {"low": 0.35, "high": 15, "color": "green",  "name": "IIR 0.35–15"},
    {"low": 0.4,  "high": 15, "color": "orange", "name": "IIR 0.4–15"},
    {"low": 0.45, "high": 15, "color": "purple", "name": "IIR 0.45–15"},
    {"low": 0.5,  "high": 15, "color": "red",    "name": "IIR 0.5–15"}
]

# Store all rows in this list
combined_summary_data = []

# Loop over both sampling frequencies
for fs in [200, 400]:
    nyq = fs / 2  # Nyquist frequency
    w = np.linspace(0, np.pi, 8192)  # Normalized frequency values
    freqs_hz = w * fs / (2 * np.pi)  # Convert normalized to Hz

    # Create Bode plot (magnitude and phase)
    fig, axs = plt.subplots(2, 1, figsize=(10, 10), sharex=True)
    axs[0].set_title(f"Magnitude Response (Bode Diagram) – fs = {fs} Hz", fontsize=14)
    axs[1].set_title("Phase Response", fontsize=14)
    axs[1].set_xlabel("Frequency (Hz)")
    axs[0].set_ylabel("Magnitude (dB)")
    axs[1].set_ylabel("Phase (°)")

    # Analyze each filter configuration
    for filt in filters:
        low = filt["low"] / nyq
        high = filt["high"] / nyq

        # Design 4th-order Butterworth bandpass filter
        b, a = butter(4, [low, high], btype='band')
        _, h = freqz(b, a, worN=w)

        # Compute magnitude in dB and phase in degrees
        magnitude_db = 20 * np.log10(np.abs(h))
        phase_deg = np.angle(h, deg=True)

        # Plot magnitude and phase
        axs[0].plot(freqs_hz, magnitude_db, label=filt["name"], color=filt["color"])
        axs[1].plot(freqs_hz, phase_deg, label=filt["name"], color=filt["color"])

        # Real passband calculation (-3 dB absolute)
        if np.max(magnitude_db) < -3:
            band_start, band_end = None, None
        else:
            passband = magnitude_db >= -3
            if np.any(passband):
                band_start = freqs_hz[passband][0]
                band_end = freqs_hz[passband][-1]
            else:
                band_start, band_end = None, None

        # Slope after highcut frequency (in dB/Hz)
        slope = None
        try:
            idx_start = np.argmin(np.abs(freqs_hz - filt["high"]))
            for j in range(idx_start + 1, len(magnitude_db)):
                if magnitude_db[idx_start] - magnitude_db[j] >= 20:
                    idx_end = j
                    break
            else:
                idx_end = None
            if idx_end:
                slope = (magnitude_db[idx_end] - magnitude_db[idx_start]) / (freqs_hz[idx_end] - freqs_hz[idx_start])
        except:
            slope = None

        # Attenuation at 25 Hz (dB)
        try:
            idx_25hz = np.argmin(np.abs(freqs_hz - 25))
            attenuation_25hz = magnitude_db[idx_25hz]
        except:
            attenuation_25hz = None

        # Transition width: how far beyond highcut does it reach -3 dB
        try:
            transition_width = abs(filt["high"] - band_end) if band_end else None
        except:
            transition_width = None

        # Store summary information
        combined_summary_data.append({
            "Filter": filt["name"],
            "Type": "IIR Butterworth",
            "Order": 4,
            "Fs (Hz)": fs,
            "Declared Band (Hz)": f"{filt['low']}–{filt['high']}",
            "Real Band (-3 dB)": f"{band_start:.3f}–{band_end:.3f}" if band_start else "N/A",
            "Slope after highcut (dB/Hz)": f"{slope:.2f}" if slope else "N/A",
            "Attenuation at 25 Hz (dB)": f"{attenuation_25hz:.2f}" if attenuation_25hz else "N/A",
            "Transition Width (Hz)": f"{transition_width:.3f}" if transition_width else "N/A",
        })

    # Configure axes and save the Bode diagram
    for ax in axs:
        ax.grid(True)
        ax.set_xlim(0, 30)
    axs[0].legend()
    axs[1].legend()

    plt.tight_layout()
    bode_path = os.path.join(output_dir, f"bode_comparison_fs{fs}.png")
    plt.savefig(bode_path)
    plt.close()
    print(f"✓ fs = {fs} Hz: Bode diagram saved to:\n{bode_path}")

# Create combined summary DataFrame
df = pd.DataFrame(combined_summary_data)

# Create and save the combined summary table
fig_table, ax_table = plt.subplots(figsize=(24, 1 + 0.5 * len(df)))
ax_table.axis('off')
tbl = table(ax_table, df, loc='center', cellLoc='center', colWidths=[0.13] * len(df.columns))
tbl.auto_set_font_size(False)
tbl.set_fontsize(9)
tbl.scale(1.2, 1.5)
table_path = os.path.join(output_dir, f"summary_table_combined.png")
plt.savefig(table_path, bbox_inches='tight')
plt.close()

print(f"✓ Combined summary table saved to:\n{table_path}")


  magnitude_db = 20 * np.log10(np.abs(h))


✓ fs = 200 Hz: Bode diagram saved to:
C:\Users\sofia\OneDrive\Escritorio\TFGPython\testing_filters_results\second_analysis_characterization\bode_comparison_fs200.png
✓ fs = 400 Hz: Bode diagram saved to:
C:\Users\sofia\OneDrive\Escritorio\TFGPython\testing_filters_results\second_analysis_characterization\bode_comparison_fs400.png
✓ Combined summary table saved to:
C:\Users\sofia\OneDrive\Escritorio\TFGPython\testing_filters_results\second_analysis_characterization\summary_table_combined.png


The following cell code processes all intracranial pressure (PIC) signal fragments stored in the folder all_patients_fragments_no_artifacts. For each .txt file, it reads the sampling frequency (Fs) from the header and extracts the signal values from the third column. It then applies a fourth-order Butterworth band-pass IIR filter in the range of 0.2 to 15 Hz. This frequency band is selected to preserve the cardiac component of the signal while removing slow baseline trends and high-frequency noise. After filtering, the script reconstructs the file by keeping the original header intact and replacing the third column with the filtered signal values. The resulting file is saved in the folder all_patients_fragments_filtered, using the same name as the original file but with the suffix _filtered. The output files maintain the same structure and formatting as the originals, ensuring compatibility with all previously defined loading and analysis functions.

In [55]:
import os
import numpy as np
from scipy.signal import butter, filtfilt

input_folder = "C:\\Users\\sofia\\OneDrive\\Escritorio\\TFGPython\\all_patients_fragments_no_artifacts"
output_folder = "C:\\Users\\sofia\\OneDrive\\Escritorio\\TFGPython\\all_patients_fragments_filtered"
os.makedirs(output_folder, exist_ok=True)

# Filter IIR: 0.2–15 Hz
def apply_iir_filter(signal, lowcut=0.2, highcut=15.0, fs=200.0, order=4):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, signal)

# Scan all .txt files
for filename in os.listdir(input_folder):
    if filename.endswith(".txt"):
        input_path = os.path.join(input_folder, filename)
        with open(input_path, "r", encoding="utf-8") as f:
            lines = f.readlines()

        # Detect end of header
        Fs = None
        header_end_index = None
        for i, line in enumerate(lines):
            if line.startswith("Interval="):
                interval_str = line.split("=")[1].strip().split()[0].replace(",", ".")
                Fs = 1 / float(interval_str)
            elif line.startswith("BottomValue="):
                header_end_index = i + 1
                break

        if Fs is None or header_end_index is None:
            print(f"Skipping file (invalid header): {filename}")
            continue

        # Separate header and data
        header = lines[:header_end_index]
        data_lines = lines[header_end_index:]

        # Extract third column (PIC)
        try:
            pic_values = np.array([
                float(line.strip().split()[2].replace(",", ".")) 
                for line in data_lines if len(line.strip().split()) >= 3
            ])
        except Exception as e:
            print(f"Error reading data in {filename}: {e}")
            continue

        # Filter
        filtered_pic = apply_iir_filter(pic_values, lowcut=0.2, highcut=15, fs=Fs)

        # Rewrite the file with modified third column
        filtered_lines = []
        for i, line in enumerate(data_lines):
            parts = line.strip().split()
            if len(parts) >= 3:
                parts[2] = f"{filtered_pic[i]:.6f}".replace(".", ",")
                filtered_lines.append("\t".join(parts) + "\n")

        # Create new name
        base_name = os.path.splitext(filename)[0]
        output_name = base_name + "_filtered.txt"
        output_path = os.path.join(output_folder, output_name)

        # Save file
        with open(output_path, "w", encoding="utf-8") as f:
            f.writelines(header + filtered_lines)

        print(f"Filtered and saved: {output_name}")

Filtered and saved: paciente10_fragmento1_filtered.txt
Filtered and saved: paciente10_fragmento2_filtered.txt
Filtered and saved: paciente10_fragmento3_filtered.txt
Filtered and saved: paciente11_fragmento1_filtered.txt
Filtered and saved: paciente11_fragmento2_filtered.txt
Filtered and saved: paciente11_fragmento3_filtered.txt
Filtered and saved: paciente12_fragmento1_filtered.txt
Filtered and saved: paciente12_fragmento2_filtered.txt
Filtered and saved: paciente13_fragmento1_filtered.txt
Filtered and saved: paciente13_fragmento2_filtered.txt
Filtered and saved: paciente14_fragmento1_filtered.txt
Filtered and saved: paciente14_fragmento2_filtered.txt
Filtered and saved: paciente14_fragmento3_filtered.txt
Filtered and saved: paciente15_fragmento1_filtered.txt
Filtered and saved: paciente15_fragmento2_filtered.txt
Filtered and saved: paciente15_fragmento3_filtered.txt
Filtered and saved: paciente16_fragmento1_filtered.txt
Filtered and saved: paciente16_fragmento2_filtered.txt
Filtered a

The following cell code is to plot filtered fragments

In [7]:
from functions_tfg import load_pic_and_comments
from functions_tfg import plot_raw_pic_signal
from datetime import datetime, timedelta
import os
import re
import datetime as dt


file_path2 = "C:\\Users\\sofia\\OneDrive\\Escritorio\\TFGPython\\all_patients_fragments_filtered\\paciente1_fragmento1_filtered.txt"
df_pic, df_comments, pic_values, Fs, start_time = load_pic_and_comments(file_path2)

# Calcular hora de fin
duration_seconds = len(pic_values) / Fs
end_time = start_time + timedelta(seconds=duration_seconds)

# Mostrar en consola
print(f"Fragment time range: {start_time.strftime('%H:%M:%S')} – {end_time.strftime('%H:%M:%S')}")

# Graficar
plot_raw_pic_signal(pic_values, df_comments, file_path2, Fs, start_time)

Detected PIC value column: 2 (0-based index)
First 10 PIC values:
   Line  PIC_Direct
0     9    0.171104
1    10    0.183496
2    11    0.196939
3    12    0.212333
4    13    0.230281
5    14    0.251012
6    15    0.274375
7    16    0.299885
8    17    0.326800
9    18    0.354215

First comments:
Empty DataFrame
Columns: [Line, Comment]
Index: []

Total comments: 0
Ignored values: 0
Mean PIC: 0.00 mmHg
Total duration: 0h 8min
Fragment time range: 15:35:00 – 15:43:00
No comments to display for this signal.
