In [1]:
import neurokit2 as nk
import os 

# 参数设置
filename = "c277-gi"  # 可根据实际情况修改，例如 "mouse_emg_day1"
ext_priority = ["xlsx", "csv", "tsv"]
use_large_file_mode = True

# Sampling rate (Hz)
sampling_rate = 400

# Filtering options
apply_filter = False
lowcut = 10      # Hz
highcut = 400    # Hz

In [2]:
# -------------------------------------------
# Multi-segment amplitude & frequency analysis
# -------------------------------------------
import numpy as np
from scipy.signal import welch
import neurokit2 as nk
from scipy.signal import butter, filtfilt

# Segment configuration
split_analysis = True  # Set to False to process the entire signal
emg_segments = [
    ("phase 1", 0, 10),
    ("Phase 2", 10, 20),
    ("Phase 3", 20, 30),  # optional third segment
]

# Helper functions
def bandpass_filter(data, lowcut, highcut, fs, order=4):
        nyquist = 0.5 * fs
        low = lowcut / nyquist
        high = highcut / nyquist
        b, a = butter(order, [low, high], btype='band')
        return filtfilt(b, a, data)

def compute_mean_frequency(signal, fs, window=1):
    freqs, psd = welch(signal, fs=fs, nperseg=fs*window)
    if np.sum(psd) == 0:
        return np.nan
    mean_freq = np.sum(freqs * psd) / np.sum(psd)
    return mean_freq

def compute_stats(signal):
    return {
        "mean": np.mean(signal),
        "median": np.median(signal),
        "variance": np.var(signal),
        "std_dev": np.std(signal)
    }

def compute_freq_stats(signal, fs, window=1):
    freqs, psd = welch(signal, fs=fs, nperseg=fs*window)
    if np.sum(psd) == 0:
        return {"mean": np.nan, "median": np.nan, "variance": np.nan, "std_dev": np.nan}
    psd_norm = psd / np.sum(psd)
    mean_freq = np.sum(freqs * psd_norm)
    median_freq = freqs[np.searchsorted(np.cumsum(psd_norm), 0.5)]
    var_freq = np.sum(psd_norm * (freqs - mean_freq) ** 2)
    std_freq = np.sqrt(var_freq)
    return {
        "mean": mean_freq,
        "median": median_freq,
        "variance": var_freq,
        "std_dev": std_freq
    }
    
def compute_rms(signal):
    return np.sqrt(np.mean(np.square(signal)))

In [3]:
# Clean and convert EMG column to float values with stepwise logging and memory efficiency
import gc
import numpy as np
import pandas as pd

file_path = None
for ext in ext_priority:
    try_path = f"{filename}.{ext}"
    if os.path.exists(try_path):
        file_path = try_path
        break

if file_path is None:
    raise FileNotFoundError("No file found with the given base name and supported extensions.")

print(f"Reading from: {file_path}")

# 根据扩展名读取 E 列（第 8 行开始）作为 EMG 数据
if file_path.endswith(".xlsx"):
    df_emg = pd.read_excel(file_path, usecols="E", skiprows=7, engine="openpyxl")
elif file_path.endswith(".csv"):
    if use_large_file_mode:
        df_emg = pd.read_csv(file_path, usecols=[4], skiprows=7, low_memory=False)
    else:
        df_emg = pd.read_csv(file_path, usecols=[4], skiprows=7)
elif file_path.endswith(".tsv"):
    if use_large_file_mode:
        df_emg = pd.read_csv(file_path, sep="\t", usecols=[4], skiprows=7, low_memory=False)
    else:
        df_emg = pd.read_csv(file_path, sep="\t", usecols=[4], skiprows=7)
else:
    raise ValueError("Unsupported file extension.")

# 重命名列，统一命名为 'EMG'
df_emg.columns = ["EMG"]

# Step 1: Convert to string and strip whitespace
emg_series = df_emg["EMG"].astype(str).str.strip()
print(f"Total rows in raw EMG column (including blank/whitespace): {len(emg_series)}")

# Step 2: Remove empty strings
emg_series = emg_series[emg_series != ""]
print(f"Rows remaining after removing empty strings: {len(emg_series)}")

# Step 3: Convert to numeric, coercing invalid values to NaN
emg_series = pd.to_numeric(emg_series, errors='coerce')
print(f"Rows successfully parsed as numeric (non-NaN): {emg_series.notna().sum()}")

# Free memory from intermediate string series
gc.collect()

# Step 4: Convert to NumPy array and remove invalid values
emg_data = emg_series.values  # more memory-efficient than .to_numpy()
emg_data = emg_data[~np.isnan(emg_data)]
emg_data = emg_data[np.isfinite(emg_data)]
print(f"Final number of valid EMG data points: {len(emg_data)}")

# Clean up temporary variables to free memory
del emg_series
gc.collect()

# Optional bandpass filtering
if apply_filter:
    emg_data = bandpass_filter(emg_data, lowcut=lowcut, highcut=highcut, fs=sampling_rate)
    print(f"Bandpass filter applied: {lowcut}-{highcut} Hz")
else:
    print("Bandpass filter not applied.")
    

Reading from: c277-gi.xlsx
Total rows in raw EMG column (including blank/whitespace): 11999
Rows remaining after removing empty strings: 11999
Rows successfully parsed as numeric (non-NaN): 11999
Final number of valid EMG data points: 11999
Bandpass filter not applied.


In [4]:
# Main analysis
if split_analysis:
    segment_data = {}
    segment_results = {}
    segment_stats = {}

    for label, start_sec, end_sec in emg_segments:
        start_idx = int(start_sec * sampling_rate)
        end_idx = int(end_sec * sampling_rate)
        segment_emg = emg_data[start_idx:end_idx]

        print(f"\n{label.capitalize()} segment: {end_sec - start_sec} sec, Data points: {len(segment_emg)}")
        signals, info = nk.emg_process(segment_emg, sampling_rate=sampling_rate, method_cleaning=None)

        segment_data[label] = segment_emg
        segment_results[label] = {"signals": signals, "info": info}

        amp = signals["EMG_Amplitude"]
        amp_stats = compute_stats(amp)
        freq_stats = compute_freq_stats(segment_emg, fs=sampling_rate)

        segment_stats[label] = {
            "amplitude": amp_stats,
            "frequency": freq_stats
        }

        print(f"  Amplitude - Mean: {amp_stats['mean']:.4f}, Median: {amp_stats['median']:.4f}, "
              f"Var: {amp_stats['variance']:.4f}, Std: {amp_stats['std_dev']:.4f}")
        print(f"  Frequency - Mean: {freq_stats['mean']:.2f} Hz, Median: {freq_stats['median']:.2f} Hz, Var: {freq_stats['variance']:.2f}, Std: {freq_stats['std_dev']:.2f}")
else:
    signals, info = nk.emg_process(emg_data, sampling_rate=sampling_rate, method_cleaning=None)
    amp = signals["EMG_Amplitude"]
    amp_stats = compute_stats(amp)
    freq_stats = compute_freq_stats(emg_data, fs=sampling_rate)

    print(f"\nFull trace Amplitude - Mean: {amp_stats['mean']:.4f}, Median: {amp_stats['median']:.4f}, "
          f"Var: {amp_stats['variance']:.4f}, Std: {amp_stats['std_dev']:.4f}")
    print(f"Full trace Frequency - Mean: {freq_stats['mean']:.2f} Hz, Median: {freq_stats['median']:.2f} Hz, Var: {freq_stats['variance']:.2f}, Std: {freq_stats['std_dev']:.2f}")



Phase 1 segment: 10 sec, Data points: 4000
  Amplitude - Mean: 51.2592, Median: 48.8325, Var: 224.6684, Std: 14.9889
  Frequency - Mean: 64.48 Hz, Median: 61.00 Hz, Var: 593.51, Std: 24.36

Phase 2 segment: 10 sec, Data points: 4000
  Amplitude - Mean: 32.8887, Median: 31.1897, Var: 124.4554, Std: 11.1560
  Frequency - Mean: 65.23 Hz, Median: 72.00 Hz, Var: 838.12, Std: 28.95

Phase 3 segment: 10 sec, Data points: 3999
  Amplitude - Mean: 196.6846, Median: 102.8225, Var: 147637.0583, Std: 384.2357
  Frequency - Mean: 74.76 Hz, Median: 78.00 Hz, Var: 618.26, Std: 24.86


In [5]:
# -------------------------------------------
# Export amplitude and frequency stats to Excel
# -------------------------------------------
import pandas as pd

# Build a summary dataframe
rows = []
if split_analysis:
    for label in segment_stats:
        amp = segment_stats[label]["amplitude"]
        freq = segment_stats[label]["frequency"]
        rms_val = compute_rms(segment_data[label]) if split_analysis else compute_rms(emg_data)
        rows.append({
            "Segment": label,
            "Amp_Mean": amp["mean"],
            "Amp_Median": amp["median"],
            "Amp_Variance": amp["variance"],
            "Amp_StdDev": amp["std_dev"],
            "Freq_Mean": freq["mean"],
            "Freq_Median": freq["median"],
            "Freq_Variance": freq["variance"],
            "Freq_StdDev": freq["std_dev"],
            "RMS": rms_val
        })
else:
    amp = amp_stats
    freq = freq_stats
    rms_val = compute_rms(segment_data[label]) if split_analysis else compute_rms(emg_data)
    rows.append({
        "Segment": "Full",
        "Amp_Mean": amp["mean"],
        "Amp_Median": amp["median"],
        "Amp_Variance": amp["variance"],
        "Amp_StdDev": amp["std_dev"],
        "Freq_Mean": freq["mean"],
        "Freq_Median": freq["median"],
        "Freq_Variance": freq["variance"],
        "Freq_StdDev": freq["std_dev"],
            "RMS": rms_val
    })

df_summary = pd.DataFrame(rows)
output_excel_path = f"{filename}_EMG_Analysis_Results.xlsx"
df_summary.to_excel(output_excel_path, index=False)
print(f"Summary saved to: {output_excel_path}")


Summary saved to: c277-gi_EMG_Analysis_Results.xlsx
