<a href="https://colab.research.google.com/github/lukuenya/Bispectrum_Analysis/blob/master/bispec_data_features.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!unzip "/content/drive/MyDrive/Colab Notebooks/audio_lanzhou_2015_org.zip" -d "/content/"

In [None]:
!unzip "/content/data.zip" -d "/content/data"

In [2]:
import os
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import wavfile
from scipy.stats import entropy, skew, kurtosis
import pandas as pd
import cupy as cp
import cupyx.scipy.signal as cusignal
import cupyx.scipy.fft as cufft
import matplotlib.pyplot as plt
from scipy.io import wavfile  # Reading audio files can remain on CPU

In [9]:
fs = 44100  # Sampling frequency in Hz
nfft = 512  # Adjust as needed

# # Compute frequency bins once since fs is constant
# freqs = np.fft.fftfreq(nfft, d=1/fs)


def compute_bispectrum_gpu(audio_data, nfft=256, noverlap=50):
    """
    Compute the bispectrum of an audio signal using GPU acceleration.

    Parameters:
    - audio_data: 1D numpy array of audio samples.
    - nfft: FFT length.
    - noverlap: Number of points to overlap between segments.

    Returns:
    - bispec: Bispectrum array on GPU.
    """
    if noverlap is None:
        noverlap = nfft // 2

    # Generate window function on CPU
    window_cpu = np.hanning(nfft)

    # Segment the data on CPU
    step = nfft - noverlap
    shape = ((audio_data.size - noverlap) // step, nfft)
    strides = (audio_data.strides[0] * step, audio_data.strides[0])
    segments_cpu = np.lib.stride_tricks.as_strided(audio_data, shape=shape, strides=strides)

    # Apply window function on CPU
    segments_cpu = segments_cpu * window_cpu

    # Transfer windowed segments to GPU
    segments_gpu = cp.asarray(segments_cpu)

    # Compute FFT on GPU
    fft_segments = cufft.fft(segments_gpu, n=nfft, axis=1)

    # Initialize bispectrum accumulator on GPU
    bispec_accum = cp.zeros((nfft, nfft), dtype=cp.complex128)

    # Compute bispectrum on GPU
    num_segments = fft_segments.shape[0]
    for i in range(num_segments):
        X = fft_segments[i]
        X_conj = cp.conj(X)
        # Compute the triple product
        outer_prod = X[:, None] * X[None, :]  # Outer product X(f1) * X(f2)
        sum_indices = (cp.arange(nfft)[:, None] + cp.arange(nfft)) % nfft  # Indices for X*(f1 + f2)
        X_sum_conj = X_conj[sum_indices]
        bispec_accum += outer_prod * X_sum_conj

    # Average over segments
    bispec = bispec_accum / num_segments

    return bispec


In [10]:
def process_audio_files_gpu(
    dataset_path,
    participant_types,
    emotions,
    session,
    nfft=512,  # or 256
    output_dir='/content/bispec_data_RV'
):
    fs = 44100  # Sampling frequency in Hz
    f_max = fs / 2  # Nyquist frequency
    freqs = np.fft.fftfreq(nfft, d=1/fs)

    # Ensure the output directory exists
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    for participant_type in participant_types:
        participant_type_path = os.path.join(dataset_path, participant_type)
        if not os.path.exists(participant_type_path):
            continue

        # Create subdirectory for participant type in output directory
        output_dir_participant = os.path.join(output_dir, participant_type)
        if not os.path.exists(output_dir_participant):
            os.makedirs(output_dir_participant)

        participant_dirs = sorted(os.listdir(participant_type_path))

        for participant in participant_dirs:
            participant_path = os.path.join(participant_type_path, participant)
            session_path = os.path.join(participant_path, session)
            if not os.path.exists(session_path):
                continue

            for emotion in emotions:
                emotion_path = os.path.join(session_path, emotion)
                if not os.path.exists(emotion_path):
                    continue

                audio_files = [f for f in os.listdir(emotion_path) if f.endswith('.wav')]

                # Initialize variables for accumulating bispectra
                total_bispec_gpu = None
                file_count = 0

                for audio_file in audio_files:
                    file_path = os.path.join(emotion_path, audio_file)
                    try:
                        fs_read, audio_data = wavfile.read(file_path)
                        # Ensure audio_data is mono
                        if audio_data.ndim > 1:
                            audio_data = audio_data[:, 0]  # Use the first channel
                        audio_data = audio_data.astype(np.float64)

                        # Compute bispectrum using GPU
                        bispec_gpu = compute_bispectrum_gpu(audio_data, nfft=nfft)

                        # Accumulate bispectra
                        if total_bispec_gpu is None:
                            total_bispec_gpu = bispec_gpu
                        else:
                            total_bispec_gpu += bispec_gpu

                        file_count += 1

                        # Free GPU memory for this iteration
                        del bispec_gpu
                        cp._default_memory_pool.free_all_blocks()

                    except Exception as e:
                        print(f"Error processing {file_path}: {e}")
                        continue

                # After processing all files for this participant, session, and emotion
                if file_count > 0:
                    # Compute the average bispectrum
                    avg_bispec_gpu = total_bispec_gpu / file_count
                    avg_bispec = avg_bispec_gpu.get()  # Transfer to CPU

                    # Create meshgrid for f1 and f2 (in Hz)
                    f_indices = np.arange(nfft)
                    f1_indices, f2_indices = np.meshgrid(f_indices, f_indices, indexing='ij')

                    f1_freqs = freqs[f1_indices]
                    f2_freqs = freqs[f2_indices]

                    # Create mask for the primary region based on the new conditions
                    primary_region_mask = (
                        (f1_freqs >= 0) &
                        (f2_freqs >= 0) &
                        (f1_freqs + f2_freqs <= f_max) &
                        (f1_freqs <= f2_freqs)
                    )

                    # Initialize the primary region bispectrum with NaNs or zeros to maintain 2D shape
                    avg_bispec_primary = np.full_like(avg_bispec, np.nan)  # Use np.nan or 0 if preferred
                    avg_bispec_primary[primary_region_mask] = avg_bispec[primary_region_mask]

                    # Save the bispectrum data for the primary region (2D)
                    data_filename = f'{participant}_{session}_{emotion}_primary_bispectrum.npy'
                    data_save_path = os.path.join(output_dir_participant, data_filename)
                    np.save(data_save_path, avg_bispec_primary)
                    print(f"Primary region bispectrum data saved to {data_save_path}")

                    # Free GPU memory
                    del avg_bispec_gpu, total_bispec_gpu
                    cp._default_memory_pool.free_all_blocks()
                else:
                    print(f"No valid audio files processed for {participant} - {session} - {emotion}")


In [None]:
# Define the dataset path and parameters
dataset_path = '/content/audio_lanzhou_2015_org'  # Update this path
participant_types = ['MDD', 'HC']
emotions = ['Positive', 'Neutral', 'Negative']
session = 'Read_Vocabulary' # or 'Interview'
nfft = 512 # or 256 Adjust as needed

# Define the output directories
output_dir = '/content/bispec_data_RV'  # Update this path
output_plot_dir = '/content/plots_RV'  # Update this path

# Call the function to process audio files and save averaged bispectrum data and plots
process_audio_files_gpu(
    dataset_path,
    participant_types,
    emotions,
    session,
    nfft=nfft,
    output_dir=output_dir,
    #output_plot_dir=output_plot_dir
)

In [12]:
# def extract_bispectrum_features(bispec, fs):
#     import numpy as np
#     from scipy.stats import entropy, skew, kurtosis

#     # Compute magnitude and phase
#     bispec_magnitude = np.abs(bispec)
#     bispec_phase = np.angle(bispec)

#     # Flatten the bispectrum magnitude and phase, removing NaNs
#     bispec_mag_flat = bispec_magnitude.flatten()
#     bispec_phase_flat = bispec_phase.flatten()

#     # Remove NaNs for calculations
#     bispec_mag_flat = bispec_mag_flat[~np.isnan(bispec_mag_flat)]
#     bispec_phase_flat = bispec_phase_flat[~np.isnan(bispec_phase_flat)]

#     # Check if the flattened array is empty after removing NaNs
#     if bispec_mag_flat.size == 0 or bispec_phase_flat.size == 0:
#         print("Bispectrum is empty after removing NaNs.")
#         return {}

#     # Avoid log(0) by adding a small epsilon
#     eps = 1e-12

#     # Bispectral Magnitude Features
#     mean_mag = np.nanmean(bispec_mag_flat)
#     max_mag = np.nanmax(bispec_mag_flat)
#     sum_mag = np.nansum(bispec_mag_flat)
#     spectral_flatness = np.exp(np.nanmean(np.log(bispec_mag_flat + eps))) / (mean_mag + eps)
#     entropy_mag = entropy(bispec_mag_flat + eps)

#     # Bispectral Phase Features
#     mean_phase = np.nanmean(bispec_phase_flat)
#     std_phase = np.nanstd(bispec_phase_flat)
#     skew_phase = skew(bispec_phase_flat)
#     kurt_phase = kurtosis(bispec_phase_flat)

#     # Quadratic Phase Coupling Features
#     N = bispec.shape[0]
#     if bispec_magnitude.size > 0:
#         indices_qpc = np.unravel_index(np.nanargmax(bispec_magnitude), bispec_magnitude.shape)
#         qpc_strength = bispec_magnitude[indices_qpc]
#         qpc_freqs = (indices_qpc[0] * fs / N, indices_qpc[1] * fs / N)
#     else:
#         qpc_strength = 0
#         qpc_freqs = (0, 0)

#     # Frequency Band Features
#     freq_bins = np.fft.fftfreq(N, d=1/fs)
#     freq_bins = freq_bins[:N//2]
#     bispec_magnitude_half = bispec_magnitude[:N//2, :N//2]

#     bands = {
#         'low': (0, 1000),
#         'mid': (1000, 5000),
#         'high': (5000, fs/2)
#     }

#     band_energies = {}
#     for band_name, (fmin, fmax) in bands.items():
#         idx = np.where((freq_bins >= fmin) & (freq_bins < fmax))[0]
#         if len(idx) > 0:
#             band_energy = np.nansum(bispec_magnitude_half[np.ix_(idx, idx)])
#         else:
#             band_energy = 0
#         band_energies[f'band_energy_{band_name}'] = band_energy

#     # Bispectrum Peaks
#     threshold = mean_mag + 2 * np.nanstd(bispec_mag_flat)
#     num_peaks = np.sum(bispec_mag_flat > threshold)

#     # Statistical Features
#     std_mag = np.nanstd(bispec_mag_flat)
#     skew_mag = skew(bispec_mag_flat)
#     kurt_mag = kurtosis(bispec_mag_flat)

#     # Collect all features into a dictionary
#     features = {
#         'mean_mag': mean_mag,
#         'max_mag': max_mag,
#         'sum_mag': sum_mag,
#         'spectral_flatness': spectral_flatness,
#         'entropy_mag': entropy_mag,
#         'mean_phase': mean_phase,
#         'std_phase': std_phase,
#         'skew_phase': skew_phase,
#         'kurt_phase': kurt_phase,
#         'qpc_strength': qpc_strength,
#         'qpc_freq1': qpc_freqs[0],
#         'qpc_freq2': qpc_freqs[1],
#         **band_energies,
#         'num_peaks': num_peaks,
#         'std_mag': std_mag,
#         'skew_mag': skew_mag,
#         'kurt_mag': kurt_mag,
#     }

#     return features


#-------------------------------------------------------------------------------

def extract_bispectrum_features(bispec, fs):
    import numpy as np
    from scipy.stats import entropy, skew, kurtosis

    # Compute magnitude and phase
    bispec_magnitude = np.abs(bispec)
    bispec_phase = np.angle(bispec)

    # Flatten the bispectrum magnitude and phase, removing NaNs
    bispec_mag_flat = bispec_magnitude.flatten()
    bispec_phase_flat = bispec_phase.flatten()

    # Remove NaNs for calculations
    bispec_mag_flat = bispec_mag_flat[~np.isnan(bispec_mag_flat)]
    bispec_phase_flat = bispec_phase_flat[~np.isnan(bispec_phase_flat)]

    # Check if the flattened array is empty after removing NaNs
    if bispec_mag_flat.size == 0 or bispec_phase_flat.size == 0:
        print("Bispectrum is empty after removing NaNs.")
        return {}

    # Avoid log(0) by adding a small epsilon
    eps = 1e-12

    # Bispectral Magnitude Features
    mean_mag = np.nanmean(bispec_mag_flat)
    max_mag = np.nanmax(bispec_mag_flat)
    sum_mag = np.nansum(bispec_mag_flat)
    std_mag = np.nanstd(bispec_mag_flat)
    skew_mag = skew(bispec_mag_flat)
    kurt_mag = kurtosis(bispec_mag_flat)
    spectral_flatness = np.exp(np.nanmean(np.log(bispec_mag_flat + eps))) / (mean_mag + eps)
    entropy_mag = entropy(bispec_mag_flat + eps)

    # Bispectral Phase Features
    mean_phase = np.nanmean(bispec_phase_flat)
    std_phase = np.nanstd(bispec_phase_flat)
    skew_phase = skew(bispec_phase_flat)
    kurt_phase = kurtosis(bispec_phase_flat)

    # Quadratic Phase Coupling Features
    N = bispec.shape[0]
    if bispec_magnitude.size > 0:
        indices_qpc = np.unravel_index(np.nanargmax(bispec_magnitude), bispec_magnitude.shape)
        qpc_strength = bispec_magnitude[indices_qpc]
        qpc_freqs = (indices_qpc[0] * fs / (N * 2), indices_qpc[1] * fs / (N * 2))
    else:
        qpc_strength = 0
        qpc_freqs = (0, 0)
    qpc_freq1, qpc_freq2 = qpc_freqs

    # Frequency Band Features
    freq_bins = np.fft.fftfreq(N * 2, d=1/fs)[:N]  # Assuming bispec is half-sized, N frequencies
    bispec_magnitude_half = bispec_magnitude  # Since bispec is already half-sized

    bands = {
        'low': (0, 1000),
        'mid': (1000, 5000),
        'high': (5000, fs/2)
    }

    band_energies = {}
    for band_name, (fmin, fmax) in bands.items():
        idx = np.where((freq_bins >= fmin) & (freq_bins < fmax))[0]
        if len(idx) > 0:
            band_energy = np.nansum(bispec_magnitude_half[np.ix_(idx, idx)])
        else:
            band_energy = 0
        band_energies[f'band_energy_{band_name}'] = band_energy

    # Bispectrum Peaks
    threshold = mean_mag + 2 * std_mag
    peaks_indices = np.where(bispec_mag_flat > threshold)[0]
    num_peaks = len(peaks_indices)

    # If peaks are found, compute additional features
    if num_peaks > 0:
        peak_magnitudes = bispec_mag_flat[peaks_indices]
        max_peak_intensity = np.nanmax(peak_magnitudes)
        mean_peak_intensity = np.nanmean(peak_magnitudes)
        median_peak_intensity = np.nanmedian(peak_magnitudes)
        std_peak_intensity = np.nanstd(peak_magnitudes)
        skew_peak_intensity = skew(peak_magnitudes)
        kurt_peak_intensity = kurtosis(peak_magnitudes)
    else:
        max_peak_intensity = 0
        mean_peak_intensity = 0
        median_peak_intensity = 0
        std_peak_intensity = 0
        skew_peak_intensity = 0
        kurt_peak_intensity = 0

    # Reshape bispectrum magnitude to 2D for peak location analysis
    bispec_magnitude_2d = bispec_magnitude  # Assuming it's already 2D

    # Get the coordinates of the peaks
    peak_coords = np.argwhere(bispec_magnitude_2d > threshold)

    # Frequencies corresponding to the bispectrum axes
    freqs = freq_bins  # frequencies corresponding to the axes

    # Extract frequencies of the top N peaks
    N_peaks = 5  # Number of top peaks to consider
    if num_peaks > 0:
        # Sort peaks by magnitude
        sorted_peak_indices = np.argsort(-peak_magnitudes)
        top_n_indices = peaks_indices[sorted_peak_indices[:N_peaks]]
        # Convert flat indices back to 2D coordinates
        top_n_coords = np.unravel_index(top_n_indices, bispec_magnitude_2d.shape)
        top_n_freqs = [(freqs[i], freqs[j]) for i, j in zip(*top_n_coords)]
    else:
        top_n_freqs = []

    # Collect all features into a dictionary
    features = {
        'mean_mag': mean_mag,
        'max_mag': max_mag,
        'sum_mag': sum_mag,
        'std_mag': std_mag,
        'skew_mag': skew_mag,
        'kurt_mag': kurt_mag,
        'spectral_flatness': spectral_flatness,
        'entropy_mag': entropy_mag,
        'mean_phase': mean_phase,
        'std_phase': std_phase,
        'skew_phase': skew_phase,
        'kurt_phase': kurt_phase,
        'qpc_strength': qpc_strength,
        'qpc_freq1': qpc_freq1,
        'qpc_freq2': qpc_freq2,
        **band_energies,
        'num_peaks': num_peaks,
        'max_peak_intensity': max_peak_intensity,
        'mean_peak_intensity': mean_peak_intensity,
        'median_peak_intensity': median_peak_intensity,
        'std_peak_intensity': std_peak_intensity,
        'skew_peak_intensity': skew_peak_intensity,
        'kurt_peak_intensity': kurt_peak_intensity,
    }

    # Add frequencies of top N peaks to the features
    for idx, (f1, f2) in enumerate(top_n_freqs):
        features[f'top_peak_{idx+1}_freq1'] = f1
        features[f'top_peak_{idx+1}_freq2'] = f2

    return features


In [13]:
# Define paths and parameters
output_dir = '/content/bispec_data_RV'  # Update with your output directory
session = 'Read_Vocabulary' # or 'Interview'
participant_types = ['MDD', 'HC']
emotions = ['Positive', 'Neutral', 'Negative']

# Initialize a list to store features
features_list = []

# Load bispectrum data and extract features
for participant_type in participant_types:
    participant_output_dir = os.path.join(output_dir, participant_type)
    if not os.path.exists(participant_output_dir):
        continue

    # Assign label based on participant type
    label = 1 if participant_type == 'MDD' else 0

    # List participants in the directory
    participant_files = os.listdir(participant_output_dir)
    for bispec_file in participant_files:
        bispec_path = os.path.join(participant_output_dir, bispec_file)

        # Ensure we're only processing .npy files
        if not bispec_file.endswith('_primary_bispectrum.npy'):
            continue

        # Load bispectrum data
        bispec = np.load(bispec_path)

        # Extract participant ID and emotion from filename
        filename_parts = bispec_file.split('_')
        participant = filename_parts[0]
        emotion = filename_parts[3] if session == 'Read_Vocabulary' else filename_parts[2]

        # Sampling frequency
        fs = 44100  # Update if necessary

        # Extract features
        features = extract_bispectrum_features(bispec, fs)

        # Add additional information to features
        features['participant'] = participant
        features['participant_type'] = participant_type
        features['session'] = session
        features['emotion'] = emotion
        features['label'] = label

        # Append features to the list
        features_list.append(features)

# Convert the list of features to a DataFrame
df_features = pd.DataFrame(features_list)

# Save the DataFrame to an Excel or CSV file
excel_file = '/content/features_RV.xlsx'  # Update the path
df_features.to_excel(excel_file, index=False)
print(f"Features saved to {excel_file}")

# # Optionally, save to CSV
# csv_file = '/path/to/features.csv'  # Update the path
# df_features.to_csv(csv_file, index=False)
# print(f"Features saved to {csv_file}")

Features saved to /content/features_RV.xlsx


In [15]:
import shutil
shutil.make_archive('/content/bispec_data_Interview', 'zip', '/content/bispec_data_Interview')

'/content/bispec_data_Interview.zip'