In [1]:
import assemblyai as aai
from langdetect import detect  # Language detection library
from collections import Counter
import librosa  # For audio processing and feature extraction
import librosa.display
import numpy as np
from scipy.signal import butter, lfilter
from nltk.corpus import stopwords
import nltk
import gradio as gr
from fpdf import FPDF

# Download NLTK stop words if not already available
nltk.download('stopwords')

# Set your AssemblyAI API key
aai.settings.api_key = "285022177ce545adb643b3711f5d062d"

# Predefined list of Hindi stop words
hindi_stopwords = set([
    'और', 'भी', 'का', 'के', 'कि', 'को', 'में', 'से', 'है', 'यह', 'था', 'तक', 'तो', 'नहीं', 'पर', 'ही', 'जैसे', 'क्या', 'किस', 'कौन', 'वे', 'हम', 'आप', 'जो'
])

def noise_reduction(audio, sr):
    """ Apply noise reduction to the audio signal """
    # Compute the short-time Fourier transform
    stft = librosa.stft(audio)
    spectrogram = np.abs(stft)

    # Estimate the noise profile using median filtering
    noise_profile = np.median(spectrogram, axis=1, keepdims=True)
    
    # Subtract the noise profile
    spectrogram_denoised = np.maximum(spectrogram - noise_profile, 0)
    
    # Inverse STFT to reconstruct the denoised audio
    stft_denoised = spectrogram_denoised * np.exp(1j * np.angle(stft))
    denoised_audio = librosa.istft(stft_denoised)
    return denoised_audio

def bandpass_filter(audio, sr, lowcut=300, highcut=3000):
    """ Apply a bandpass filter to isolate speech frequencies """
    nyquist = 0.5 * sr
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(1, [low, high], btype='band')
    filtered_audio = lfilter(b, a, audio)
    return filtered_audio

def preprocess_audio(audio_path):
    """ Load, apply noise reduction, and preprocess audio """
    try:
        audio, sr = librosa.load(audio_path, sr=None)
        # Apply noise reduction
        audio_denoised = noise_reduction(audio, sr)
        # Apply bandpass filter
        audio_filtered = bandpass_filter(audio_denoised, sr)
        return audio_filtered, sr
    except Exception as e:
        print(f"Error in audio preprocessing: {e}")
        return None, None

def transcribe_audio(audio_path):
    """ Transcribe audio and return transcript text and words with timestamps """
    config = aai.TranscriptionConfig(
        language_detection=True,
        language_confidence_threshold=0.4
    )
    transcriber = aai.Transcriber(config=config)
    try:
        transcript = transcriber.transcribe(audio_path)
        if transcript and transcript.words:
            words = [{'text': word.text, 'start_time': word.start, 'end_time': word.end} for word in transcript.words]
            return transcript.text, words
        else:
            return None, []
    except Exception as e:
        return None, []

def detect_language(text):
    """ Detect language of the given text """
    return detect(text)

def remove_stopwords(words, language):
    """ Remove stop words from the list of words based on language """
    if language == 'en':
        stop_words = set(stopwords.words('english'))
    elif language == 'hi':
        stop_words = hindi_stopwords
    else:
        return words  # If language is not supported, return unfiltered words
    filtered_words = [word for word in words if word['text'].lower() not in stop_words]
    return filtered_words

def get_common_words(words1, words2):
    """ Find common words between two lists of words """
    words1_texts = [word['text'] for word in words1]
    words2_texts = [word['text'] for word in words2]
    counter1 = Counter(words1_texts)
    counter2 = Counter(words2_texts)
    common_words_texts = list((counter1 & counter2).elements())
    common_words_1 = [word for word in words1 if word['text'] in common_words_texts]
    common_words_2 = [word for word in words2 if word['text'] in common_words_texts]
    return common_words_1, common_words_2

def extract_audio_features(audio_path, word_start_time, word_end_time):
    """ Extract audio features for a specific word in the audio """
    y, sr = preprocess_audio(audio_path)
    if y is None:
        return {}
    offset = word_start_time / 1000
    duration = (word_end_time - word_start_time) / 1000
    y_segment = y[int(offset * sr):int((offset + duration) * sr)]
    
    mfcc = librosa.feature.mfcc(y=y_segment, sr=sr, n_mfcc=13).mean(axis=1)
    spectral_centroid = librosa.feature.spectral_centroid(y=y_segment, sr=sr).mean()
    spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y_segment, sr=sr).mean()
    rms = librosa.feature.rms(y=y_segment).mean()
    zero_crossing_rate = librosa.feature.zero_crossing_rate(y=y_segment).mean()
    pitches, _ = librosa.core.piptrack(y=y_segment, sr=sr)
    pitch = pitches[pitches > 0].mean() if len(pitches[pitches > 0]) > 0 else 0
    return {
        'mfcc': mfcc,
        'spectral_centroid': spectral_centroid,
        'spectral_bandwidth': spectral_bandwidth,
        'rms': rms,
        'zero_crossing_rate': zero_crossing_rate,
        'pitch': pitch
    }

def compare_features(features1, features2):
    """ Compare features between two samples and provide detailed analysis """
    analysis = []
    mfcc_diff = np.linalg.norm(np.array(features1['mfcc']) - np.array(features2['mfcc']))
    analysis.append(f"MFCC Difference: {mfcc_diff:.2f}")
    spectral_centroid_diff = abs(features1['spectral_centroid'] - features2['spectral_centroid'])
    analysis.append(f"Spectral Centroid Difference: {spectral_centroid_diff:.2f} Hz")
    spectral_bandwidth_diff = abs(features1['spectral_bandwidth'] - features2['spectral_bandwidth'])
    analysis.append(f"Spectral Bandwidth Difference: {spectral_bandwidth_diff:.2f} Hz")
    rms_diff = abs(features1['rms'] - features2['rms'])
    analysis.append(f"RMS Energy Difference: {rms_diff:.2f}")
    zcr_diff = abs(features1['zero_crossing_rate'] - features2['zero_crossing_rate'])
    analysis.append(f"Zero Crossing Rate Difference: {zcr_diff:.4f}")
    pitch_diff = abs(features1['pitch'] - features2['pitch'])
    analysis.append(f"Pitch Difference: {pitch_diff:.2f} Hz")
    total_diff = mfcc_diff + spectral_centroid_diff + spectral_bandwidth_diff + rms_diff + zcr_diff + pitch_diff
    analysis.append(f"Total Feature Difference for this Word: {total_diff:.2f}")
    return analysis, total_diff

def generate_pdf(report, pdf_path="Analysis_Report.pdf"):
    """ Generate a PDF report from the analysis text """
    pdf = FPDF()
    pdf.add_page()
    pdf.set_font("Arial", size=12)
    pdf.multi_cell(0, 10, report)
    pdf.output(pdf_path)
    return pdf_path

def process_audio(audio_path_1, audio_path_2):
    try:
        # Transcribe audio
        transcript_text_1, words_1 = transcribe_audio(audio_path_1)
        transcript_text_2, words_2 = transcribe_audio(audio_path_2)

        if not transcript_text_1 or not transcript_text_2:
            return "Error: One or both audio files could not be transcribed.", None

        # Detect language
        language_1 = detect_language(transcript_text_1)
        language_2 = detect_language(transcript_text_2)

        if language_1 != language_2:
            return f"Error: Different languages detected (Sample 1: {language_1}, Sample 2: {language_2}).", None

        # Remove stopwords and find common words
        words_1_filtered = remove_stopwords(words_1, language_1)
        words_2_filtered = remove_stopwords(words_2, language_2)
        common_words_1, common_words_2 = get_common_words(words_1_filtered, words_2_filtered)

        if not common_words_1 or not common_words_2:
            return "No common words found after removing stop words.", None

        # Compare features for common words
        analysis_report = []
        total_diff_sum = 0
        word_count = len(common_words_1)

        for i, (word_1, word_2) in enumerate(zip(common_words_1, common_words_2)):
            features_1 = extract_audio_features(audio_path_1, word_1['start_time'], word_1['end_time'])
            features_2 = extract_audio_features(audio_path_2, word_2['start_time'], word_2['end_time'])
            analysis, total_diff = compare_features(features_1, features_2)
            analysis_report.append(f"\n--- Word {i+1}: {word_1['text']} ---")
            analysis_report.extend(analysis)
            total_diff_sum += total_diff

        average_diff = total_diff_sum / word_count if word_count > 0 else 0
        analysis_report.append(f"\nAverage Feature Difference: {average_diff:.2f}")

        # Conclusion
        if average_diff < 1300.0:
            analysis_report.append("\nConclusion: The speakers are likely the same.")
        else:
            analysis_report.append("\nConclusion: The speakers are likely different.")

        final_report = "\n".join(analysis_report)
        pdf_path = generate_pdf(final_report)
        return final_report, pdf_path

    except Exception as e:
        return f"Error: {e}", None

# Gradio Interface
def interface(audio1, audio2):
    report, pdf_path = process_audio(audio1, audio2)
    return report, pdf_path

inputs = [
    gr.Audio(label="Upload Audio File 1", type="filepath"),
    gr.Audio(label="Upload Audio File 2", type="filepath")
]

outputs = [
    gr.Textbox(label="Analysis Report"),
    gr.File(label="Download PDF Report")
]

gr.Interface(
    fn=interface,
    inputs=inputs,
    outputs=outputs,
    title="Forensic Speaker Recognition",
    description="Upload two audio samples to check if they are spoken by the same individual. Download the report as a PDF."
).launch()

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\rohan\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Running on local URL:  http://127.0.0.1:7861

To create a public link, set `share=True` in `launch()`.


