In [12]:
import numpy as np
import scipy.signal as signal
import librosa
import soundfile as sf
from scipy.linalg import toeplitz
import os
import time
import numpy as np

def segsnr(reference_signal, synthesized_signal, fs, frame_size=0.03, overlap=0.5):
    """
    Calculate Segmental Signal-to-Noise Ratio (SNR) in dB.

    Parameters:
        reference_signal (numpy array): Original clean signal.
        synthesized_signal (numpy array): Processed signal.
        fs (int): Sampling frequency.
        frame_size (float): Frame size in seconds (default 30 ms).
        overlap (float): Overlap fraction between consecutive frames (default 50%).

    Returns:
        float: Segmental SNR in dB.
    """
    frame_length = int(frame_size * fs)
    hop_length = int(frame_length * (1 - overlap))
    num_frames = int(np.ceil((len(reference_signal) - frame_length) / hop_length)) + 1

    padded_reference = np.append(reference_signal, np.zeros(num_frames * hop_length + frame_length - len(reference_signal)))
    padded_synthesized = np.append(synthesized_signal, np.zeros(num_frames * hop_length + frame_length - len(synthesized_signal)))

    window = np.hamming(frame_length)
    seg_snr_values = []

    for i in range(num_frames):
        start_idx = i * hop_length
        end_idx = start_idx + frame_length
        ref_frame = padded_reference[start_idx:end_idx] * window
        synth_frame = padded_synthesized[start_idx:end_idx] * window

        signal_energy = np.sum(ref_frame ** 2)
        noise_energy = np.sum((ref_frame - synth_frame) ** 2)

        if signal_energy > 0 and noise_energy > 0:
            seg_snr = 10 * np.log10(signal_energy / noise_energy)
            seg_snr_values.append(seg_snr)

    if len(seg_snr_values) > 0:
        return np.mean(seg_snr_values)
    else:
        return float('-inf')  # If no valid frames, return -inf as a placeholder

# Function to compute LPC coefficients with stability check
def lpc_analysis(frame, order):
    autocorr = np.correlate(frame, frame, mode='full')
    autocorr = autocorr[len(autocorr) // 2:]
    R = autocorr[:order + 1]
    try:
        A = np.linalg.solve(toeplitz(R[:-1]), -R[1:])
    except np.linalg.LinAlgError:
        A = np.zeros(order)
    A = np.concatenate(([1], A))
    if not is_stable(A):
        A = np.array([1] + [0] * order)
    G = np.sqrt(R[0] + np.dot(R[1:], A[1:]))
    return A, G

# Function to check LPC filter stability
def is_stable(A):
    roots = np.roots(A)
    return np.all(np.abs(roots) < 1)

# Function to generate excitation signal
def generate_excitation(frame_length, voiced, pitch_period=None):
    if voiced and pitch_period is not None and pitch_period > 0:
        excitation = np.zeros(frame_length)
        excitation[::pitch_period] = 1.0
    else:
        excitation = np.random.randn(frame_length)
    return excitation

# Function to determine if a frame is voiced
def is_voiced(frame, fs):
    energy = np.sum(frame ** 2) / len(frame)
    zcr = ((frame[:-1] * frame[1:]) < 0).sum() / len(frame)
    threshold_energy = 0.02
    threshold_zcr = 0.1
    return energy > threshold_energy and zcr < threshold_zcr

# Function to estimate pitch
def estimate_pitch(frame, fs, min_freq=60, max_freq=400):
    min_lag = int(fs / max_freq)
    max_lag = int(fs / min_freq)
    autocorr = np.correlate(frame, frame, mode='full')
    autocorr = autocorr[len(autocorr) // 2:]
    autocorr[:min_lag] = 0
    autocorr[max_lag + 1:] = 0
    peaks, _ = signal.find_peaks(autocorr)
    if len(peaks) > 0:
        return peaks[np.argmax(autocorr[peaks])]
    return None

# Main LPC Vocoder function
import time

def lpc_vocoder(input_file, output_file, lpc_order=12, frame_size=0.03, fs=None, overlap=0.5, mode="plain"):
    start_time = time.time()
    speech, sr = librosa.load(input_file, sr=fs)
    if fs is None:
        fs = sr

    pre_emphasis = 0.97
    emphasized_speech = np.append(speech[0], speech[1:] - pre_emphasis * speech[:-1])
    frame_length = int(frame_size * fs)
    hop_length = int(frame_length * (1 - overlap))
    num_frames = int(np.ceil((len(emphasized_speech) - frame_length) / hop_length)) + 1
    padded_speech = np.append(emphasized_speech, np.zeros(num_frames * hop_length + frame_length - len(emphasized_speech)))
    synthesized_speech = np.zeros(len(padded_speech))
    window = np.hamming(frame_length)

    for i in range(num_frames):
        start_idx = i * hop_length
        end_idx = start_idx + frame_length
        frame = padded_speech[start_idx:end_idx] * window
        voiced = is_voiced(frame, fs)
        A, G = lpc_analysis(frame, lpc_order)
        pitch_period = estimate_pitch(frame, fs) if voiced else None
        excitation = generate_excitation(frame_length, voiced, pitch_period)
        if mode == "voice-excited":
            excitation = signal.lfilter([1, -0.8], [1], excitation)
        synthesized_frame = signal.lfilter([1], A, excitation) * G
        synthesized_speech[start_idx:end_idx] += synthesized_frame * window

    synthesized_speech = signal.lfilter([1], [1, -pre_emphasis], synthesized_speech)
    max_val = np.max(np.abs(synthesized_speech))
    if max_val > 0:
        synthesized_speech = synthesized_speech / max_val * 0.99

    sf.write(output_file, synthesized_speech[:len(speech)], fs)

    end_time = time.time()
    runtime = end_time - start_time

    # Calculate bit rate
    bits_per_lpc_coeff = 16  # bits per LPC coefficient
    bits_per_pitch_period = 16  # bits per pitch period
    bits_per_frame = lpc_order * bits_per_lpc_coeff
    total_bits = num_frames * bits_per_frame

    # Add bits for pitch periods in voiced frames
    for i in range(num_frames):
        start_idx = i * hop_length
        end_idx = start_idx + frame_length
        frame = padded_speech[start_idx:end_idx] * window
        if is_voiced(frame, fs):
            total_bits += bits_per_pitch_period

    total_seconds = len(speech) / fs
    bit_rate = total_bits / total_seconds

    return bit_rate, runtime

if __name__ == "__main__":
    fs = 16000
    lpc_order = 10
    frame_size = 0.03

    speech_files = [
        "/kaggle/input/speechdata/female1.wav",
        "/kaggle/input/speechdata/male1.wav",
        "/kaggle/input/speechdata/female2.wav",
        "/kaggle/input/speechdata/male2.wav"
    ]

    results = []
    seg_snr_results = []
    for input_file in speech_files:
        if not os.path.isfile(input_file):
            print(f"File {input_file} not found.")
            continue
        
        speech, _ = librosa.load(input_file, sr=fs)
        
        # Process Plain LPC Vocoder
        output_file_plain = os.path.splitext(os.path.basename(input_file))[0] + "_plain_lpc.wav"
        bit_rate, runtime = lpc_vocoder(input_file, output_file_plain, lpc_order, frame_size, fs, mode="plain")
        synthesized_plain, _ = librosa.load(output_file_plain, sr=fs)
        plain_segsnr = segsnr(speech, synthesized_plain, fs, frame_size)
        results.append((input_file, "plain", bit_rate, runtime, plain_segsnr))
        print(f"Processed Plain LPC Vocoder: {input_file}, Bit rate: {bit_rate:.2f}, Runtime: {runtime:.2f} seconds, SegSNR: {plain_segsnr:.2f} dB")

        # Process Voice-excited LPC Vocoder
        output_file_voice_excited = os.path.splitext(os.path.basename(input_file))[0] + "_voice_excited_lpc.wav"
        bit_rate, runtime = lpc_vocoder(input_file, output_file_voice_excited, lpc_order, frame_size, fs, mode="voice-excited")
        synthesized_voice_excited, _ = librosa.load(output_file_voice_excited, sr=fs)
        voice_excited_segsnr = segsnr(speech, synthesized_voice_excited, fs, frame_size)
        results.append((input_file, "voice-excited", bit_rate, runtime, voice_excited_segsnr))
        print(f"Processed Voice-excited LPC Vocoder: {input_file}, Bit rate: {bit_rate:.2f}, Runtime: {runtime:.2f} seconds, SegSNR: {voice_excited_segsnr:.2f} dB")



Processed Plain LPC Vocoder: /kaggle/input/speechdata/female1.wav, Bit rate: 10640.00, Runtime: 0.14 seconds, SegSNR: -5.40 dB
Processed Voice-excited LPC Vocoder: /kaggle/input/speechdata/female1.wav, Bit rate: 10640.00, Runtime: 0.18 seconds, SegSNR: -3.20 dB
Processed Plain LPC Vocoder: /kaggle/input/speechdata/male1.wav, Bit rate: 10639.67, Runtime: 0.05 seconds, SegSNR: -4.54 dB
Processed Voice-excited LPC Vocoder: /kaggle/input/speechdata/male1.wav, Bit rate: 10639.67, Runtime: 0.07 seconds, SegSNR: -3.52 dB
Processed Plain LPC Vocoder: /kaggle/input/speechdata/female2.wav, Bit rate: 10639.83, Runtime: 0.10 seconds, SegSNR: -2.66 dB
Processed Voice-excited LPC Vocoder: /kaggle/input/speechdata/female2.wav, Bit rate: 10639.83, Runtime: 0.13 seconds, SegSNR: -5.40 dB
Processed Plain LPC Vocoder: /kaggle/input/speechdata/male2.wav, Bit rate: 10666.56, Runtime: 0.15 seconds, SegSNR: -3.66 dB
Processed Voice-excited LPC Vocoder: /kaggle/input/speechdata/male2.wav, Bit rate: 10666.56, 

In [13]:
!pip install pesq
import numpy as np
import scipy.signal as signal
import librosa
import soundfile as sf
from sklearn.cluster import KMeans
from scipy.linalg import toeplitz
import os

# Function to compute LPC coefficients with stability check
def lpc_analysis(frame, order):
    autocorr = np.correlate(frame, frame, mode='full')
    autocorr = autocorr[len(autocorr) // 2:]
    R = autocorr[:order + 1]
    try:
        A = np.linalg.solve(toeplitz(R[:-1]), -R[1:])
    except np.linalg.LinAlgError:
        A = np.zeros(order)
    A = np.concatenate(([1], A))
    if not is_stable(A):
        A = np.array([1] + [0] * order)
    G = np.sqrt(R[0] + np.dot(R[1:], A[1:]))
    return A, G

# Function to check LPC filter stability
def is_stable(A):
    roots = np.roots(A)
    return np.all(np.abs(roots) < 1)

# Function to determine if a frame is voiced
def is_voiced(frame, fs):
    energy = np.sum(frame ** 2) / len(frame)
    zcr = ((frame[:-1] * frame[1:]) < 0).sum() / len(frame)
    threshold_energy = 0.01
    threshold_zcr = 0.15
    return energy > threshold_energy and zcr < threshold_zcr

# Refined function to estimate pitch
def estimate_pitch(frame, fs, min_freq=60, max_freq=400):
    min_lag = int(fs / max_freq)
    max_lag = int(fs / min_freq)
    autocorr = np.correlate(frame, frame, mode='full')
    autocorr = autocorr[len(autocorr) // 2:]
    autocorr[:min_lag] = 0
    autocorr[max_lag + 1:] = 0
    peaks, _ = signal.find_peaks(autocorr)
    if len(peaks) > 0:
        pitch_period = peaks[np.argmax(autocorr[peaks])]
        return fs / pitch_period
    return None

# Improved gain computation to ensure energy consistency
def compute_gain(residual, subframe):
    return np.sqrt(np.sum(subframe**2) / (np.sum(residual**2) + 1e-8))

# Function to build the codebook
def build_codebook(training_files, fs, lpc_order, frame_size, subframe_size, num_codebook_entries=128):
    residuals, frame_length, subframe_length = [], int(frame_size * fs), int(subframe_size * fs)
    for file in training_files:
        speech, _ = librosa.load(file, sr=fs)
        emphasized = np.append(speech[0], speech[1:] - 0.97 * speech[:-1])
        for i in range(len(emphasized) // frame_length):
            frame = emphasized[i*frame_length:(i+1)*frame_length]
            A, _ = lpc_analysis(frame, lpc_order)
            residual = signal.lfilter(A, [1], frame)
            for j in range(0, frame_length, subframe_length):
                subframe = residual[j:j+subframe_length]
                if len(subframe) == subframe_length:
                    residuals.append(subframe)
    residuals = np.array(residuals).reshape(len(residuals), -1)
    kmeans = KMeans(n_clusters=num_codebook_entries, random_state=0)
    kmeans.fit(residuals)
    return kmeans.cluster_centers_

# CELP Encoder
def celp_encoder(speech, fs, lpc_order, frame_size, subframe_size, codebook):
    frame_length, subframe_length = int(frame_size * fs), int(subframe_size * fs)
    num_frames = len(speech) // frame_length
    lpc_coeffs, codebook_indices, gains = [], [], []
    for i in range(num_frames):
        frame = speech[i*frame_length:(i+1)*frame_length]
        A, G = lpc_analysis(frame, lpc_order)
        lpc_coeffs.append(A)
        for j in range(0, frame_length, subframe_length):
            subframe = frame[j:j+subframe_length]
            residual = signal.lfilter(A, [1], subframe)
            distances = np.linalg.norm(codebook - residual, axis=1)
            best_match = np.argmin(distances)
            codebook_indices.append(best_match)
            gains.append(compute_gain(codebook[best_match], subframe))
    return lpc_coeffs, codebook_indices, gains

# CELP Decoder
def celp_decoder(lpc_coeffs, codebook_indices, gains, fs, frame_size, subframe_size, codebook):
    frame_length, subframe_length = int(frame_size * fs), int(subframe_size * fs)
    synthesized_speech = []
    idx = 0
    for A in lpc_coeffs:
        frame_synth = []
        for j in range(0, frame_length, subframe_length):
            excitation = codebook[codebook_indices[idx]] * gains[idx]
            frame_synth.extend(signal.lfilter([1], A, excitation))
            idx += 1
        synthesized_speech.extend(frame_synth)
    return np.array(synthesized_speech)

# CELP Codec
def celp_codec(input_file, output_file, codebook, lpc_order=14, frame_size=0.03, subframe_size=0.01, fs=None):
    speech, sr = librosa.load(input_file, sr=fs)
    fs = fs or sr
    emphasized = np.append(speech[0], speech[1:] - 0.97 * speech[:-1])
    lpc_coeffs, codebook_indices, gains = celp_encoder(emphasized, fs, lpc_order, frame_size, subframe_size, codebook)
    synthesized = celp_decoder(lpc_coeffs, codebook_indices, gains, fs, frame_size, subframe_size, codebook)
    synthesized = signal.lfilter([1], [1, -0.97], synthesized)
    sf.write(output_file, synthesized, fs)
    
    # Calculate bitrate
    total_bits = len(codebook_indices) * np.log2(len(codebook)) + len(lpc_coeffs) * lpc_order * 32
    duration = len(speech) / fs
    bitrate = total_bits / duration
    print(f"Bitrate: {bitrate:.2f} bits/sec")
    
    # Calculate SNR
    signal_energy = np.sum(speech ** 2)
    # noise_energy = np.sum((speech - synthesized[:len(speech)]) ** 2)
    # Ensure both signals are the same length for calculation
    min_length = min(len(speech), len(synthesized))
    noise_energy = np.sum((speech[:min_length] - synthesized[:min_length]) ** 2)
    snr = 10 * np.log10(signal_energy / noise_energy)
    print(f"Signal-to-Noise Ratio (SNR): {snr:.2f} dB")
    
    return synthesized

import time

def calculate_segmental_snr(original, synthesized, frame_length):
    num_frames = len(original) // frame_length
    segmental_snrs = []
    for i in range(num_frames):
        orig_frame = original[i * frame_length:(i + 1) * frame_length]
        synth_frame = synthesized[i * frame_length:(i + 1) * frame_length]
        noise = orig_frame - synth_frame
        signal_energy = np.sum(orig_frame ** 2)
        noise_energy = np.sum(noise ** 2)
        if noise_energy > 0:
            segmental_snrs.append(10 * np.log10(signal_energy / noise_energy))
    return np.mean(segmental_snrs)

# Updated main function to include runtime and segmental SNR
if __name__ == "__main__":
    fs = 16000
    lpc_order = 12
    frame_size = 0.04
    subframe_size = 0.005

    training_files = ["/kaggle/input/speechdata/female1.wav", "/kaggle/input/speechdata/female2.wav",
                      "/kaggle/input/speechdata/male1.wav", "/kaggle/input/speechdata/male2.wav"]
    print("Building codebook...")
    codebook = build_codebook(training_files, fs, lpc_order, frame_size, subframe_size)
    print("Codebook built successfully.")

    results = []

    for input_file in training_files:
        original_speech, _ = librosa.load(input_file, sr=fs)
        output_file = os.path.splitext(os.path.basename(input_file))[0] + "_celp.wav"

        print(f"Processing: {input_file}")
        start_time = time.time()

        synthesized_speech_celp = celp_codec(input_file, output_file, codebook, lpc_order, frame_size, subframe_size, fs=fs)

        end_time = time.time()
        runtime = end_time - start_time

        # Calculate Segmental SNR
        frame_length = int(frame_size * fs)
        segmental_snr = calculate_segmental_snr(original_speech, synthesized_speech_celp, frame_length)

        # Calculate PESQ Score
        try:
            from pesq import pesq
            pesq_score = pesq(fs, original_speech[:len(synthesized_speech_celp)], synthesized_speech_celp, 'nb')
        except Exception as e:
            pesq_score = None
            print(f"Error calculating PESQ score: {e}")

        results.append({
            "file": input_file,
            "runtime (s)": runtime,
            "segmental SNR (dB)": segmental_snr,
            "PESQ Score": pesq_score
        })

    # Print results
    print("\nSummary of Results:")
    for result in results:
        print(f"File: {result['file']}")
        print(f"Runtime: {result['runtime (s)']:.2f} seconds")
        print(f"Segmental SNR: {result['segmental SNR (dB)']:.2f} dB")
        print(f"PESQ Score: {result['PESQ Score']}\n")


Building codebook...




Codebook built successfully.
Processing: /kaggle/input/speechdata/female1.wav
Bitrate: 11000.00 bits/sec
Signal-to-Noise Ratio (SNR): -16.56 dB
Processing: /kaggle/input/speechdata/female2.wav
Bitrate: 10999.83 bits/sec
Signal-to-Noise Ratio (SNR): -8.98 dB
Processing: /kaggle/input/speechdata/male1.wav
Bitrate: 10999.66 bits/sec
Signal-to-Noise Ratio (SNR): -11.67 dB
Processing: /kaggle/input/speechdata/male2.wav
Bitrate: 10999.89 bits/sec
Signal-to-Noise Ratio (SNR): -10.14 dB

Summary of Results:
File: /kaggle/input/speechdata/female1.wav
Runtime: 0.24 seconds
Segmental SNR: -14.64 dB
PESQ Score: 1.2623679637908936

File: /kaggle/input/speechdata/female2.wav
Runtime: 0.17 seconds
Segmental SNR: -6.10 dB
PESQ Score: 1.3645005226135254

File: /kaggle/input/speechdata/male1.wav
Runtime: 0.09 seconds
Segmental SNR: -9.06 dB
PESQ Score: 1.3076508045196533

File: /kaggle/input/speechdata/male2.wav
Runtime: 0.25 seconds
Segmental SNR: -7.97 dB
PESQ Score: 1.2292536497116089

