# **MemoTag AI/ML Task: Cognitive Stress Detection Pipeline**

## Install and import required libraries

In [None]:
!pip install SpeechRecognition

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.stats
import seaborn as sns
import librosa
import librosa.display
import speech_recognition as sr
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.ensemble import IsolationForest
from sklearn.metrics import silhouette_score, adjusted_rand_score
import re
import os

## Directory for audio files

In [None]:
# Set up directory for audio files
AUDIO_DIR = "audio_samples"

## Global Variables

In [None]:
audio_features = {}
feature_matrix = None
feature_names = []
normalized_features = None
cluster_results = None
speech_recognizer = sr.Recognizer()

### Load audio files

In [None]:
def load_audio_file(file_path):
    """Load an audio file using librosa."""
    try:
        audio_data, sample_rate = librosa.load(file_path, sr=None)
        return audio_data, sample_rate
    except Exception as e:
        print(f"Error loading {file_path}: {str(e)}")
        return None, None

### Function to convert speech to text

In [None]:
def speech_to_text(audio_file):
    """Convert speech in audio file to text using Google's API."""
    try:
        with sr.AudioFile(audio_file) as source:
            audio = speech_recognizer.record(source)
            text = speech_recognizer.recognize_google(audio)
            return text
    except Exception as e:
        print(f"Speech recognition failed: {str(e)}")
        return ""

### Feature extraction functions

In [None]:
def extract_audio_features(audio, sample_rate, file_id):
    """Extract various acoustic features from audio."""
    features = {}

    # Basic audio properties
    features['duration'] = librosa.get_duration(y=audio, sr=sample_rate)

    # Pitch analysis
    pitches = librosa.piptrack(y=audio, sr=sample_rate)[0]
    pitches = pitches[pitches > 0]  # Remove zero pitches
    if len(pitches) > 0:
        features['pitch_mean'] = np.mean(pitches)
        features['pitch_std'] = np.std(pitches)
    else:
        features['pitch_mean'] = features['pitch_std'] = 0

    # Speech rate and pauses
    zero_crossings = librosa.zero_crossings(audio)
    features['speech_rate'] = sum(zero_crossings) / len(audio)

    # Energy and pauses
    rms_energy = librosa.feature.rms(y=audio)[0]
    features['energy_mean'] = np.mean(rms_energy)

    # Spectral features
    spectral_centroid = librosa.feature.spectral_centroid(y=audio, sr=sample_rate)[0]
    features['spectral_centroid'] = np.mean(spectral_centroid)

    # MFCCs (commonly used in speech analysis)
    mfccs = librosa.feature.mfcc(y=audio, sr=sample_rate, n_mfcc=13)
    for i in range(13):
        features[f'mfcc_{i+1}'] = np.mean(mfccs[i])

    # Store features
    audio_features[file_id] = features
    return features

### Speech text content

In [None]:
def analyze_text_content(text, file_id):
    """Analyze linguistic features of transcribed text."""
    text_features = {}

    if not text:
        # Default values if no text was recognized
        text_features['word_count'] = 0
        text_features['unique_word_ratio'] = 0
        return text_features

    words = re.findall(r'\b\w+\b', text.lower())
    sentences = [s for s in re.split(r'[.!?]+', text) if s.strip()]

    # Basic text statistics
    text_features['word_count'] = len(words)
    text_features['sentence_count'] = len(sentences)

    # Vocabulary richness
    unique_words = set(words)
    text_features['unique_word_ratio'] = len(unique_words) / len(words) if words else 0

    # Speech disfluencies
    hesitation_words = ['um', 'uh', 'ah', 'like', 'you know']
    text_features['hesitation_count'] = sum(1 for word in words if word in hesitation_words)

    # Store combined features
    if file_id in audio_features:
        audio_features[file_id].update(text_features)

    return text_features

### Processing pipeline

In [None]:
def process_audio_file(file_path):
    """Full processing pipeline for a single audio file."""
    filename = os.path.basename(file_path)
    print(f"\nProcessing {filename}...")

    # Step 1: Load audio
    audio, sample_rate = load_audio_file(file_path)
    if audio is None:
        return None

    # Step 2: Extract audio features
    acoustic_features = extract_audio_features(audio, sample_rate, filename)

    # Step 3: Speech recognition
    text = speech_to_text(file_path)
    print(f"Transcribed text: {text[:100]}..." if len(text) > 100 else f"Transcribed text: {text}")

    # Step 4: Text analysis
    linguistic_features = analyze_text_content(text, filename)

    return {**acoustic_features, **linguistic_features}

### Text analysis

In [None]:
def run_analysis(audio_files=None):
    """Main function to run the complete analysis."""
    global feature_matrix, feature_names

    if audio_files is None:
        audio_files = [
            os.path.join(AUDIO_DIR, f)
            for f in os.listdir(AUDIO_DIR)
            if f.lower().endswith(('.wav', '.mp3'))
        ]

    print(f"Found {len(audio_files)} audio files to analyze")

    # Process each file
    for file_path in audio_files:
        process_audio_file(file_path)

    # Convert features to DataFrame
    if audio_features:
        df = pd.DataFrame.from_dict(audio_features, orient='index')
        feature_names = df.columns.tolist()
        feature_matrix = df.values
        return df
    else:
        print("No features were extracted")
        return None


### Data preprocessing

In [None]:
def clean_and_normalize_data():
    """Handle missing values and normalize features."""
    global feature_matrix, normalized_features

    # Handle missing values
    if np.isnan(feature_matrix).any():
        from sklearn.impute import SimpleImputer
        imputer = SimpleImputer(strategy='mean')
        feature_matrix = imputer.fit_transform(feature_matrix)

    scaler = StandardScaler()
    normalized_features = scaler.fit_transform(feature_matrix)
    return normalized_features

### Visualization functions

In [None]:
def plot_feature_clusters(pca_results, labels):
    """Visualize clusters in 2D PCA space."""
    plt.figure(figsize=(10, 6))
    scatter = plt.scatter(pca_results[:, 0], pca_results[:, 1], c=labels, cmap='viridis')
    plt.colorbar(scatter, label='Cluster')
    plt.xlabel("Principal Component 1")
    plt.ylabel("Principal Component 2")
    plt.title("Speech Feature Clusters")
    plt.grid(True)
    plt.show()

 ### Identify abnormal speech samples by ML/NLP

In [None]:
def run_risk_analysis():
    """
    Perform risk analysis on speech samples to identify abnormal patterns
    that might indicate cognitive or speech disorders.
    """
    global normalized_features, feature_names

    print("\n--- Risk Analysis Results ---")

    if normalized_features is None or len(normalized_features) < 3:
        print("Insufficient data for meaningful risk analysis")
        return

    # Method 1: Isolation Forest for anomaly detection
    print("Performing anomaly detection...")
    iso_forest = IsolationForest(contamination=0.1, random_state=42)
    anomaly_scores = iso_forest.fit_predict(normalized_features)

    anomalies = np.where(anomaly_scores == -1)[0]

    # Method 2: Statistical outlier detection using z-scores
    z_scores = np.abs(scipy.stats.zscore(normalized_features))
    outliers = np.where(np.any(z_scores > 3, axis=1))[0]

    potential_risks = list(set(anomalies) | set(outliers))

    feature_df = pd.DataFrame(feature_matrix, columns=feature_names)
    feature_df.index = list(audio_features.keys())

    # Display risk results
    if potential_risks:
        print(f"\nFound {len(potential_risks)} potentially abnormal speech samples:")
        for idx in potential_risks:
            sample_id = feature_df.index[idx]
            print(f"- Sample {sample_id}")


            sample_features = normalized_features[idx]
            feature_df_norm = pd.DataFrame(normalized_features, columns=feature_names)
            feature_df_norm.index = feature_df.index

            # Find most deviant features for this sample
            deviations = abs(sample_features - np.mean(normalized_features, axis=0))
            most_deviant = np.argsort(deviations)[-3:]  # Top 3 most deviant features

            print("  Notable deviations:")
            for feat_idx in most_deviant:
                feat_name = feature_names[feat_idx]
                raw_value = feature_df.iloc[idx][feat_name]
                z_score = (raw_value - feature_df[feat_name].mean()) / feature_df[feat_name].std() if feature_df[feat_name].std() != 0 else 0
                direction = "high" if z_score > 0 else "low"

                print(f"  * {feat_name}: {raw_value:.2f} ({abs(z_score):.2f} std. {direction})")

        # Visualize the potentially risky samples
        plot_risk_visualization(normalized_features, potential_risks)
    else:
        print("No abnormal speech patterns detected in the samples")

    return potential_risks

def plot_risk_visualization(features, risk_indices):
    """
    Create visualizations to highlight potentially risky samples.
    """
    # 1. Perform PCA for dimensionality reduction
    pca = PCA(n_components=2)
    pca_results = pca.fit_transform(features)

    # 2. Create a risk visualization plot
    plt.figure(figsize=(12, 8))

    # Plot normal samples
    normal_indices = [i for i in range(len(features)) if i not in risk_indices]
    plt.scatter(
        pca_results[normal_indices, 0],
        pca_results[normal_indices, 1],
        c='blue',
        label='Normal',
        alpha=0.7
    )

    # Plot risky samples
    plt.scatter(
        pca_results[risk_indices, 0],
        pca_results[risk_indices, 1],
        c='red',
        marker='X',
        s=100,
        label='Potential Risk',
        alpha=0.9
    )

    # Add sample labels for risky samples
    for idx in risk_indices:
        sample_id = list(audio_features.keys())[idx]
        plt.annotate(
            sample_id,
            (pca_results[idx, 0], pca_results[idx, 1]),
            xytext=(5, 5),
            textcoords='offset points',
            fontsize=9
        )

    plt.xlabel(f"Principal Component 1 ({pca.explained_variance_ratio_[0]:.2%} variance)")
    plt.ylabel(f"Principal Component 2 ({pca.explained_variance_ratio_[1]:.2%} variance)")
    plt.title("Speech Analysis Risk Assessment")
    plt.legend()
    plt.grid(True, alpha=0.3)

    # 3. Create heatmap of feature deviations for risky samples
    if len(risk_indices) > 0:
        plt.figure(figsize=(14, len(risk_indices) * 0.8 + 3))

        # Calculate z-scores
        z_data = scipy.stats.zscore(features)[risk_indices]

        # Heatmap
        sample_labels = [list(audio_features.keys())[idx] for idx in risk_indices]
        sns.heatmap(
            z_data,
            cmap='coolwarm',
            yticklabels=sample_labels,
            xticklabels=feature_names,
            center=0,
            vmin=-3,
            vmax=3,
            annot=False,
            fmt='.1f'
        )
        plt.title("Feature Deviation Heatmap for Potential Risk Samples")
        plt.xlabel("Features")
        plt.ylabel("Samples")
        plt.xticks(rotation=45, ha="right")
        plt.tight_layout()

    plt.show()

In [None]:
def get_speech_insights(features, clusters, risk_samples=None):
    """
    Analyzes speech data to identify patterns and potential concerns.

    Args:
        features: DataFrame with speech features for each sample
        clusters: Group assignments for each speech sample
        risk_samples: List of samples flagged as potential risks

    Returns:
        A dictionary with organized insights about the speech patterns
    """
    import numpy as np

    results = {
        "summary": {},
        "clusters": {},
        "risk_patterns": {},
        "key_features": {},
        "recommendations": []
    }

    # Statistics
    total_samples = len(features)
    unique_clusters = len(np.unique(clusters))
    high_risk_count = len(risk_samples) if risk_samples is not None else 0
    risk_percentage = round(high_risk_count / total_samples * 100, 1) if total_samples > 0 else 0

    #Stats summary
    results["summary"] = {
        "total_samples": total_samples,
        "clusters_found": unique_clusters,
        "risk_samples": high_risk_count,
        "risk_percentage": risk_percentage
    }

    for cluster_id in range(unique_clusters):
        cluster_samples = features.iloc[clusters == cluster_id]
        cluster_size = len(cluster_samples)

        cluster_average = cluster_samples.mean()
        overall_average = features.mean()

        feature_variation = features.std()
        standardized_differences = (cluster_average - overall_average) / feature_variation

        standout_features = standardized_differences.abs().sort_values(ascending=False).head(5)

        cluster_cohesion = "N/A"
        if len(cluster_samples) > 1 and unique_clusters > 1:
            try:
                from sklearn.metrics import silhouette_samples
                cluster_indices = np.where(clusters == cluster_id)[0]
                cluster_cohesion = silhouette_samples(features.values, clusters)[cluster_indices].mean()
                cluster_cohesion = round(cluster_cohesion, 3)
            except:
                pass

        results["clusters"][f"cluster_{cluster_id}"] = {
            "size": cluster_size,
            "percentage": round(cluster_size / total_samples * 100, 1),
            "silhouette_score": cluster_cohesion,
            "distinctive_features": {
                feature: {
                    "value": round(cluster_average[feature], 2),
                    "diff_from_mean": round(cluster_average[feature] - overall_average[feature], 2),
                    "std_diff": round(standardized_differences[feature], 2),
                    "direction": "higher" if standardized_differences[feature] > 0 else "lower"
                }
                for feature in standout_features.index
            }
        }


        description = "This cluster is characterized by "
        feature_descriptions = []

        for feature in standout_features.index[:3]:
            direction = "higher" if standardized_differences[feature] > 0 else "lower"
            strength = "significantly " if abs(standardized_differences[feature]) > 2 else ""
            feature_descriptions.append(f"{strength}{direction} {feature}")

        results["clusters"][f"cluster_{cluster_id}"]["interpretation"] = description + ", ".join(feature_descriptions)

    if risk_samples is not None and len(risk_samples) > 0:
        risk_data = features.iloc[risk_samples]

        risk_by_cluster = {}
        for idx in risk_samples:
            cluster = clusters[idx]
            risk_by_cluster[cluster] = risk_by_cluster.get(cluster, 0) + 1

        results["risk_patterns"]["cluster_distribution"] = risk_by_cluster

        risk_z_scores = (risk_data - features.mean()) / features.std()
        mean_deviations = risk_z_scores.abs().mean().sort_values(ascending=False)

        results["risk_patterns"]["common_deviations"] = {
            feature: round(score, 2) for feature, score in mean_deviations.head(5).items()
        }

        results["risk_patterns"]["individual_samples"] = {}
        for idx in risk_samples:
            sample_id = features.index[idx]
            sample = features.iloc[idx]
            sample_z = (sample - features.mean()) / features.std()
            extreme_features = sample_z.abs().sort_values(ascending=False).head(3)

            results["risk_patterns"]["individual_samples"][sample_id] = {
                "cluster": int(clusters[idx]),
                "extreme_features": {
                    feature: {
                        "value": round(sample[feature], 2),
                        "z_score": round(sample_z[feature], 2)
                    }
                    for feature in extreme_features.index
                }
            }

    # Identify which features are most important overall
    feature_importance = {}
    for feature in features.columns:
        # Importance based on variation
        feature_std = features[feature].std()
        between_cluster_variance = np.var([
            features.loc[clusters == c, feature].mean()
            for c in range(unique_clusters)
        ]) if unique_clusters > 1 else 0


        importance = (feature_std * between_cluster_variance) if between_cluster_variance > 0 else feature_std
        feature_importance[feature] = importance

    max_importance = max(feature_importance.values()) if feature_importance else 1
    for feature in feature_importance:
        feature_importance[feature] /= max_importance

    sorted_features = sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)
    results["key_features"] = {
        feature: round(score, 3) for feature, score in sorted_features[:10]
    }

    if high_risk_count > 0:
        top_risk_features = list(results["risk_patterns"]["common_deviations"].keys())[:3]
        results["recommendations"].append(
            f"Review {high_risk_count} samples identified as potential risks, particularly focusing on "
            f"abnormal patterns in {', '.join(top_risk_features)}"
        )

    if unique_clusters > 1:
        for cluster_id in range(unique_clusters):
            cluster_info = results["clusters"][f"cluster_{cluster_id}"]
            if cluster_info["size"] < total_samples * 0.2:  # If cluster is small (< 20%)
                top_feature = list(cluster_info["distinctive_features"].keys())[0]
                results["recommendations"].append(
                    f"Investigate Cluster {cluster_id} as a minority group ({cluster_info['percentage']}% of samples) "
                    f"with distinctive {top_feature} patterns"
                )

    top_features = list(results["key_features"].keys())[:3]
    results["recommendations"].append(
        f"Focus future analysis on the top identified features: {', '.join(top_features)}"
    )

    return results


def print_speech_insights(insights):
    """
    Displays the speech analysis insights in a readable format.

    Args:
        insights: Dictionary of insights from get_speech_insights()
    """
    print("\n" + "="*80)
    print(" "*30 + "SPEECH ANALYSIS INSIGHTS")
    print("="*80)

    summary = insights["summary"]
    print(f"\nANALYSIS OVERVIEW:")
    print(f"- {summary['total_samples']} speech samples analyzed")
    print(f"- {summary['clusters_found']} distinct speech pattern clusters identified")
    print(f"- {summary['risk_samples']} samples ({summary['risk_percentage']}%) flagged for potential risk")

    print("\nCLUSTER ANALYSIS:")
    for cluster_id, cluster_info in insights["clusters"].items():
        print(f"\n  {cluster_id.upper()} ({cluster_info['percentage']}% of samples):")
        print(f"  - {cluster_info['interpretation']}")
        print(f"  - Cohesion score: {cluster_info['silhouette_score']}")
        print("  - Key features:")

        for feature, details in cluster_info["distinctive_features"].items():
            print(f"    * {feature}: {details['value']} "
                  f"({details['direction']} by {abs(details['std_diff']):.1f} std)")

    # Risk analysis section
    if "risk_patterns" in insights and insights["risk_patterns"]:
        print("\nRISK PATTERN ANALYSIS:")


        if "cluster_distribution" in insights["risk_patterns"]:
            print("  Distribution of risk samples across clusters:")
            for cluster, count in insights["risk_patterns"]["cluster_distribution"].items():
                print(f"  - Cluster {cluster}: {count} samples")

        # Show common patterns
        if "common_deviations" in insights["risk_patterns"]:
            print("\n  Common feature deviations in risk samples:")
            for feature, score in insights["risk_patterns"]["common_deviations"].items():
                print(f"  - {feature}: {score:.2f} std deviation (average)")

        # Show highest risk individual samples
        if "individual_samples" in insights["risk_patterns"]:
            samples = list(insights["risk_patterns"]["individual_samples"].items())
            if samples:
                print("\n  Highest risk samples:")
                for i, (sample_id, details) in enumerate(samples[:3]):
                    print(f"  - {sample_id} (Cluster {details['cluster']}):")
                    for feature, values in details["extreme_features"].items():
                        print(f"    * {feature}: {values['value']} (z-score: {values['z_score']:.2f})")

    # Feature importance
    print("\nKEY FEATURES IMPORTANCE:")
    for i, (feature, importance) in enumerate(insights["key_features"].items()):
        if i < 5:  # Show top 5
            print(f"  - {feature}: {importance:.3f}")

    # Recommendations
    print("\nRECOMMENDATIONS:")
    for i, rec in enumerate(insights["recommendations"]):
        print(f"  {i+1}. {rec}")

    print("\n" + "="*80)

### Main execution block

In [None]:
if __name__ == "__main__":
    print("Starting cognitive speech analysis...")

    # Step 1: Process all audio files
    features_df = run_analysis()

    if features_df is not None:
        # Step 2: Preprocess data
        clean_and_normalize_data()

        # Step 3: Dimensionality reduction
        pca = PCA(n_components=2)
        pca_results = pca.fit_transform(normalized_features)
        print(f"PCA explained variance: {pca.explained_variance_ratio_}")

        # Step 4: Clustering
        kmeans = KMeans(n_clusters=2)
        clusters = kmeans.fit_predict(normalized_features)

        # Step 5: Visualization of clusters
        plot_feature_clusters(pca_results, clusters)

        # Step 6: Risk analysis to identify abnormal patterns
        risk_indices = run_risk_analysis()

        # Step 7: Generate and display insights
        insights = get_speech_insights (features_df, clusters, risk_indices)
        print_speech_insights(insights)


        print("\nAnalysis complete!")
    else:
        print("Analysis failed - no features were extracted")