In [6]:
import os
import numpy as np
import pandas as pd
import librosa
from scipy import signal
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from sklearn.decomposition import PCA
from imblearn.over_sampling import SMOTE
from abc import ABC, abstractmethod


class AudioFeatureExtractor(ABC):
    def __init__(self, n_mfcc=20, max_pad_length=1000):
        self.sr = None
        self.n_mfcc = n_mfcc
    
    def extract_mfcc(self, processed_audio):
        window_size = int(0.5 * self.sr)
        step_size = int(0.25 * self.sr)
        mfcc_list = []
    
        for start in range(0, len(processed_audio) - window_size + 1, step_size):
            end = start + window_size
            window = processed_audio[start:end]
            
            if len(window) != window_size:
                continue
    
            mfcc = librosa.feature.melspectrogram(y=window, sr=self.sr)
            mfcc_list.append(mfcc.mean(axis=1))
    
        return mfcc_list
        
    @abstractmethod
    def process_audio(self, audio):
        pass
    
    def extract_features(self, audio_path):
        try:
            audio, sr = librosa.load(audio_path)
            self.sr = sr
            processed_audio = self.process_audio(audio)
            return self.extract_mfcc(processed_audio)
        except Exception as e:
            return None

In [7]:
import os
import json
import torch
import numpy as np

from init_config import *

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class LogisticRegressionClassifier(torch.nn.Module):
    def __init__(self, feature_dim=129):
        super(LogisticRegressionClassifier, self).__init__()
        self.linear1 = torch.nn.Linear(feature_dim, 1)
        self.sigmoid = torch.nn.Sigmoid()

    def forward(self, xx):
        return self.sigmoid(self.linear1(torch.nn.functional.normalize(xx, p=1.0, dim=1)))


def load_kirigami_model():
    # Phoneme Model!
    my_phoneme_filter_model = LogisticRegressionClassifier(feature_dim=129)
    my_phoneme_filter_model.load_state_dict(
    torch.load(lr_phoneme_checkpoint_path, map_location=torch.device('cpu')))
    my_phoneme_filter_model.eval()

    return my_phoneme_filter_model


def load_background_filter_model():
    # Background Model!
    my_background_filter_model = LogisticRegressionClassifier(feature_dim=129)
    my_background_filter_model.load_state_dict(torch.load(bg_lr_checkpoint_path, map_location=torch.device('cpu')))
    my_background_filter_model.eval()

    return my_background_filter_model


def kirigami_filter_torch(s_full, threshold=0.5):
    lr_phoneme_filter_model = LogisticRegressionClassifier(feature_dim=129)
    # load the model if in kirigami_filters directory
    if os.path.exists("./kirigami_filters/scipy_phoneme_filter.ckpt"):
        lr_phoneme_filter_model.load_state_dict(
            torch.load("./kirigami_filters/scipy_phoneme_filter.ckpt", map_location=device))
    else:
        raise FileNotFoundError("Phoneme filter model not found")

    lr_phoneme_filter_model.eval()
    pred = (lr_phoneme_filter_model.forward(torch.Tensor(s_full)) >= threshold).long().numpy()
    masked = (1 - pred) * s_full
    return masked


def kirigami_filter(stft):
    output_sp = np.zeros_like(stft)
    for i, fft in enumerate(stft):

        sum = np.sum(fft)

        product = 0
        for iw, (vv, ww) in enumerate(zip(fft, weight)):
            product = product + vv * weight[iw]
        product = product / sum
        product = product + bias

        z = 1 / (1 + np.exp(-product))
        # print("LR filter probability", i, z)
        if z < LR_THRESHOLD:
            # add the value
            output_sp[i] = stft[i]
    return output_sp


def kirigami_filter_reverse_fft(stft, stft_original):
    output_sp = np.zeros_like(stft)
    for i, fft in enumerate(stft):
        sum = np.sum(fft)
        product = 0
        for iw, (vv, ww) in enumerate(zip(fft, weight)):
            product = product + vv * weight[iw]
        product = product / sum
        product = product + bias
        z = 1 / (1 + np.exp(-product))
        # print("LR filter probability", i, z)
        if z < LR_THRESHOLD:
            # add the value
            # output_sp[i] = stft[i]
            output_sp[i] = stft_original[i]
    return output_sp


def background_detection_filter(stft):
    output_sp = np.zeros_like(stft)
    for i, fft in enumerate(stft):
        sum = np.sum(fft)
        product = 0
        for iw, (vv, ww) in enumerate(zip(fft, weight_background)):
            product = product + vv * weight_background[iw]
        product = product // sum
        product = product + bias_background
        z = 1 / (1 + np.exp(-product))
        # print("Background probability: ", i, z)
        if z < BACKGROUND_LR_THRESHOLD:  # lower than threshold not background.
            # add the value
            output_sp[i] = stft[i]
    return output_sp


import os
import librosa
import numpy as np
import torch
import soundfile as sf
import matplotlib.pyplot as plt

# Load Kirigami models
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def load_kirigami_model():
    """Load the phoneme filter model."""
    model = LogisticRegressionClassifier(feature_dim=129)
    model.load_state_dict(torch.load("kirigami_filters/model_checkpoints/scipy_phoneme_filter.ckpt", map_location=device))
    model.eval()
    return model

def load_background_filter_model():
    """Load the background noise filter model."""
    model = LogisticRegressionClassifier(feature_dim=129)
    model.load_state_dict(torch.load("kirigami_filters/model_checkpoints/noisy_background_scipy_detector.ckpt", map_location=device))
    model.eval()
    return model

# Load models
phoneme_filter_model = load_kirigami_model()
background_filter_model = load_background_filter_model()

# Extract model weights
weight_phoneme = phoneme_filter_model.linear1.weight.data[0].numpy()
bias_phoneme = phoneme_filter_model.linear1.bias.data[0].numpy()
weight_background = background_filter_model.linear1.weight.data[0].numpy()
bias_background = background_filter_model.linear1.bias.data[0].numpy()


class KirigamiExtractor(AudioFeatureExtractor):
    def apply_kirigami_filter(self, stft, weight, bias, threshold=0.3):
        """Apply Kirigami logistic regression filter on STFT features (ensuring 129 features per frame)."""
        output_stft = np.zeros_like(stft)

        for i, frame in enumerate(stft):
            frame = frame[:129]  # Ensure exactly 129 dimensions

            sum_val = np.sum(frame) + 1e-6  # Avoid division by zero
            product = np.dot(frame, weight) / sum_val + bias
            prob = 1 / (1 + np.exp(-product))  # Sigmoid activation

            if prob < threshold:  # If probability is low, keep frame
                output_stft[i, :129] = frame  # Apply only to the valid region

        return output_stft

    # Process Audio File
    def process_audio(self, audio, sr=16000, threshold=0.5):
        """Process an audio file through Kirigami models and save the filtered output."""
        # Load audio
        # audio, sr = librosa.load(input_audio_path, sr=16000)

        # Compute STFT (Ensure output has 129 feature bins)
        stft = np.abs(librosa.stft(audio, n_fft=256, hop_length=128))[:129, :].T  # Transpose for correct shape

        # Apply Kirigami phoneme & background filters
        filtered_stft_phoneme = self.apply_kirigami_filter(stft, weight_phoneme, bias_phoneme, threshold)
        # filtered_stft_background = self.apply_kirigami_filter(filtered_stft_phoneme, weight_background, bias_background, threshold)

        # Convert back to audio using inverse STFT
        filtered_audio = librosa.istft(filtered_stft_phoneme.T, hop_length=128)

        return filtered_audio





In [8]:
import os
import numpy as np
import pandas as pd
import librosa
from scipy import signal
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
from sklearn.decomposition import PCA
from imblearn.over_sampling import SMOTE
from abc import ABC, abstractmethod



class RawAudioExtractor(AudioFeatureExtractor):
    def process_audio(self, audio):
        return audio

class SyntheticSensorExtractor(AudioFeatureExtractor):
    def __init__(self, window_length=256, hop_length=128, samples_per_window=10):
        super().__init__()
        self.window_length = window_length
        self.hop_length = hop_length
        self.samples_per_window = samples_per_window

    def process_audio(self, audio):
        frames = [
            audio[i:i + self.window_length]
            for i in range(0, len(audio) - self.window_length + 1, self.hop_length)
        ]
        
        processed_audio = np.zeros(len(audio))
        
        for i, frame in enumerate(frames):
            fft = np.fft.fft(frame, n=self.window_length)
            bins = np.array_split(fft, self.samples_per_window)
            reduced_fft = np.array([np.mean(bin) for bin in bins])
            
            reconstructed_fft = np.zeros_like(fft, dtype=np.complex_)
            step = len(fft) // len(reduced_fft)
            
            for j, val in enumerate(reduced_fft):
                reconstructed_fft[j * step:(j + 1) * step] = val
            
            processed_frame = np.fft.ifft(reconstructed_fft).real
            start = i * self.hop_length
            end = min(start + self.window_length, len(processed_audio))
            processed_audio[start:end] += processed_frame[:end - start]
        
        if np.max(np.abs(processed_audio)) > 0:
            processed_audio = processed_audio / np.max(np.abs(processed_audio))
            
        return processed_audio

class PrivacyMicExtractor(AudioFeatureExtractor):
    def process_audio(self, audio):
        S = librosa.stft(audio, n_fft=256, hop_length=128)
        frequencies = librosa.fft_frequencies(sr=self.sr, n_fft=256)
        S_filtered = np.where(frequencies[:, None] <= 300, S, 0)
        return librosa.istft(S_filtered, hop_length=128)

class CoughSenseExtractor(AudioFeatureExtractor):
    def process_audio(self, audio):
        n_fft = int(0.15 * self.sr)
        if n_fft % 2 != 0:
            n_fft += 1
        hop_length = n_fft // 2

        S = np.abs(librosa.stft(audio.astype(np.float64), n_fft=n_fft, hop_length=hop_length, window='hamming'))
        S_flattened = S.T
        
        pca = PCA(n_components=10)
        S_reduced = pca.fit_transform(S_flattened)
        S_reconstructed = pca.inverse_transform(S_reduced).T

        filtered_audio = librosa.istft(S_reconstructed, 
                                     hop_length=hop_length,
                                     win_length=n_fft,
                                     window='hamming')
        return filtered_audio

class SamosaExtractor(AudioFeatureExtractor):
    def __init__(self, window_length=0.6, hop_length=0.03):
        super().__init__()
        self.window_length = window_length
        self.hop_length = hop_length

    def process_audio(self, audio):
        subsampled_audio = signal.resample(audio, int(len(audio) * 1000 / self.sr))
        upsampled_audio = signal.resample(subsampled_audio, len(audio))
        return upsampled_audio

class AudioClassifier:
    def __init__(self, feature_extractor: AudioFeatureExtractor, ds_path: str, extractor_name: str):
        self.feature_extractor = feature_extractor
        self.ds_path = ds_path
        self.extractor_name = extractor_name
        self.features_path = "extracted_features"
        os.makedirs(self.features_path, exist_ok=True)

    def create_features_df(self, data):
        features_filepath = os.path.join(self.features_path, f"{self.extractor_name}_features30000.csv")
        features_list = []
        
        for _, row in tqdm(data.iterrows(), total=len(data)):
            features = self.feature_extractor.extract_features(
                os.path.join(self.ds_path, row['filename'])
            )
           
            if features is not None:
                for i in features:
                    features_list.append([row['gender'], row['accent']] + i.tolist() + [row['age']])

        feature_columns = [f"feature_{i}" for i in range(len(features_list[0])-3)]
        features_df = pd.DataFrame(features_list, columns=['gender', 'accent'] + feature_columns + ['age'])
        features_df.to_csv(features_filepath, index=False)
        return features_df

    def run_classification(self, features_df, target_col, classes, title):
        
        df_filtered = features_df[features_df[target_col].isin(classes)]
        
        # Drop rows where all feature values are zero
        df_filtered = df_filtered.loc[~(df_filtered.drop(['gender', 'accent', 'age', target_col], axis=1) == 0).all(axis=1)]
        
        X = df_filtered.drop(['gender', 'accent', 'age'], axis=1)
        y = df_filtered[target_col]
        
        smote = SMOTE(random_state=42)
        X_resampled, y_resampled = smote.fit_resample(X, y)
        
        X_train, X_test, y_train, y_test = train_test_split(
            X_resampled, y_resampled, test_size=0.2, random_state=42
        )
        
        model = RandomForestClassifier(n_estimators=100, random_state=42)
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        
        accuracy = model.score(X_test, y_test)
        print(f"\nAccuracy for {title}: {accuracy:.4f}")
        print(f"\nClassification Report for {title}:")
        print(classification_report(y_test, y_pred, target_names=classes))
        
        return accuracy
    


def print_accuracy_summary(accuracies):
    print("\n=== ACCURACY SUMMARY ===")
    
    for extractor, scores in accuracies.items():
        print(f"\n{extractor.capitalize()}:")
        print(f"  Gender: {scores['gender']:.2%}")
        print(f"  Accent: {scores['accent']:.2%}")
        print(f"  Age: {scores['age']:.2%}")
        avg = sum(scores.values()) / len(scores)
        print(f"  Average: {avg:.2%}")

def main():
    ds_path = "/work/pi_shenoy_umass_edu/sgomasta_umass_edu/data/Common Voice/cv-valid-train-main/cv-valid-train"
    metadata_path = "/work/pi_shenoy_umass_edu/sgomasta_umass_edu/data/Common Voice/cv-valid-train.csv"
    
    extractors = { "kirigami":KirigamiExtractor()
        'coughsense': CoughSenseExtractor(),
        'raw': RawAudioExtractor(),
        'synthetic': SyntheticSensorExtractor(),
        'privacy': PrivacyMicExtractor(),
        'samosa': SamosaExtractor()
    }

    data = pd.read_csv(metadata_path)
    data = data[
        (data['gender'].isin(['male', 'female', 'other'])) |
        (data['age'].isin(['teens', 'seventies', 'fourties'])) |
        (data['accent'].isin(['us', 'ireland', 'australia']))
    ]
    
    data = data[['filename', 'age', 'gender', 'accent']].dropna().head(5000)
    data['filename'] = data['filename'].apply(os.path.basename)

    accuracies = {}
    for name, extractor in extractors.items():
        print(f"\nProcessing with {name} extractor...")
        classifier = AudioClassifier(extractor, ds_path, name)
        features_df = classifier.create_features_df(data)
        
        accuracies[name] = {
            'gender': classifier.run_classification(
                features_df, 'gender', ['male', 'female', 'other'], 
                f"{name} - Gender"
            ),
            'accent': classifier.run_classification(
                features_df, 'accent', ['us', 'ireland', 'australia'], 
                f"{name} - Accent"
            ),
            'age': classifier.run_classification(
                features_df, 'age', ['teens', 'fourties', 'seventies'], 
                f"{name} - Age"
            )
        }
    
    print_accuracy_summary(accuracies)

if __name__ == "__main__":
    main()


Processing with kirigami extractor...


100%|██████████| 5000/5000 [08:15<00:00, 10.10it/s]



Accuracy for kirigami - Gender: 0.9129

Classification Report for kirigami - Gender:
              precision    recall  f1-score   support

        male       0.85      0.91      0.88     10669
      female       0.91      0.85      0.88     10492
       other       0.99      0.97      0.98     10735

    accuracy                           0.91     31896
   macro avg       0.92      0.91      0.91     31896
weighted avg       0.92      0.91      0.91     31896


Accuracy for kirigami - Accent: 0.9267

Classification Report for kirigami - Accent:
              precision    recall  f1-score   support

          us       0.93      0.90      0.91      6940
     ireland       0.91      0.99      0.95      6801
   australia       0.94      0.89      0.91      6950

    accuracy                           0.93     20691
   macro avg       0.93      0.93      0.93     20691
weighted avg       0.93      0.93      0.93     20691


Accuracy for kirigami - Age: 0.8450

Classification Report for ki