In [1]:
import os
import shutil
import subprocess
import datetime
import numpy as np
import pandas as pd
import wave
from praatio import textgrid
import speech_recognition as sr
import eng_to_ipa
import Levenshtein
import sounddevice as sd
from typing import Optional, Dict
from datetime import datetime
from IPython.display import display, HTML

In [2]:
class ImprovedPronunciationAnalyzer:
    def __init__(self, mfa_model: str = 'english_mfa'):
        """Initialize the pronunciation analyzer with MFA integration."""
        self.results = []
        self.mfa_model = mfa_model
        self.recordings_dir = 'recordings'
        self.textgrid_dir = os.path.join(self.recordings_dir, 'textgrids')
        self.dict_path = r"C:\Users\suraj\.conda\envs\mfa\Lib\site-packages\montreal_forced_aligner\tests\data\dictionaries\english_us_mfa_reduced.dict" # Change to actual dictionary path

        os.makedirs(self.recordings_dir, exist_ok=True)
        os.makedirs(self.textgrid_dir, exist_ok=True)

        self.recognizer = sr.Recognizer()

    def record_audio(self, duration: int = 5, filename: Optional[str] = None, samplerate: int = 16000) -> str:
        """Record audio with error handling."""
        if not filename:
            filename = f"recording_{datetime.now().strftime('%Y%m%d_%H%M%S')}.wav"

        filepath = os.path.join(self.recordings_dir, filename)

        try:
            print(f"🎙️ Recording for {duration} seconds...")
            audio = sd.rec(int(samplerate * duration), samplerate=samplerate, channels=1, dtype=np.int16, blocking=True)
            sd.wait()

            # Volume checks
            max_amplitude = np.max(np.abs(audio))
            if max_amplitude > 32000:
                print("⚠️ Warning: Audio may be clipping. Speak more quietly.")
            elif max_amplitude < 1000:
                print("⚠️ Warning: Audio volume is very low. Speak louder.")

            # Save as WAV
            with wave.open(filepath, "wb") as wf:
                wf.setnchannels(1)
                wf.setsampwidth(2)
                wf.setframerate(samplerate)
                wf.writeframes(audio.tobytes())

            return filepath

        except Exception as e:
            print(f"❌ Error recording audio: {e}")
            raise

    def transcribe_audio(self, audio_path: str) -> str:
        """Transcribe audio using the SpeechRecognition library."""
        try:
            with sr.AudioFile(audio_path) as source:
                audio = self.recognizer.record(source)
                transcript = self.recognizer.recognize_google(audio)
                print(f"Transcribed Text: {transcript}")
                return transcript
        except sr.UnknownValueError:
            print("❌ Could not understand the audio.")
            return ""
        except sr.RequestError as e:
            print(f"❌ Could not request results from Google Speech Recognition service; {e}")
            return ""

    def force_align_audio(self, audio_path: str, transcript: str) -> Dict:
        """Perform forced alignment using MFA."""
        corpus_dir = os.path.join(self.recordings_dir, 'corpus')
        os.makedirs(corpus_dir, exist_ok=True)

        # Copy audio file
        base_name = os.path.splitext(os.path.basename(audio_path))[0]
        corpus_audio = os.path.join(corpus_dir, f"{base_name}.wav")
        shutil.copy2(audio_path, corpus_audio)

        # Save transcript
        lab_path = os.path.join(corpus_dir, f"{base_name}.lab")
        with open(lab_path, 'w', encoding='utf-8') as f:
            f.write(transcript.strip())

        try:
            # ✅ Correct MFA alignment command
            mfa_command = [
                "mfa", "align",
                corpus_dir,
                self.dict_path,
                self.mfa_model,
                self.textgrid_dir,
                "--clean"
            ]

            result = subprocess.run(mfa_command, capture_output=True, text=True)
            if result.returncode != 0:
                print(f"❌ MFA Error: {result.stderr}")
                return None

            # Check if alignment succeeded
            textgrid_path = os.path.join(self.textgrid_dir, f"{base_name}.TextGrid")
            if os.path.exists(textgrid_path):
                return self._parse_textgrid(textgrid_path)
            else:
                raise FileNotFoundError(f"TextGrid file not found: {textgrid_path}")

        except subprocess.CalledProcessError as e:
            print(f"❌ MFA Error: {e.stderr}")
            return None
        except Exception as e:
            print(f"❌ Alignment error: {e}")
            return None

    def _parse_textgrid(self, textgrid_path: str) -> Dict:
        """Parse MFA's TextGrid output."""
        tg = textgrid.openTextgrid(textgrid_path, includeEmptyIntervals=True)

        alignment_data = {'words': [], 'phones': []}
        words_tier = tg.getTier('words')
        phones_tier = tg.getTier('phones')

        print(f"Parsed Words Tier: {words_tier.entries}")
        print(f"Parsed Phones Tier: {phones_tier.entries}")

        for interval in words_tier.entries:
            if interval.label:
                alignment_data['words'].append({
                    'word': interval.label,
                    'start': interval.start,
                    'end': interval.end,
                    'duration': interval.end - interval.start
                })

        for interval in phones_tier.entries:
            if interval.label:
                alignment_data['phones'].append({
                    'phone': interval.label,
                    'start': interval.start,
                    'end': interval.end,
                    'duration': interval.end - interval.start
                })

        return alignment_data

    def analyze_speech(self, correct_text: str, duration: int = 5) -> None:
        """Analyze pronunciation accuracy."""
        try:
            audio_path = self.record_audio(duration)

            # Step 1: Transcribe the audio
            transcript = self.transcribe_audio(audio_path)
            if not transcript:
                print("❌ Could not transcribe the audio. Analysis aborted.")
                return

            # Step 2: Perform forced alignment
            alignment_data = self.force_align_audio(audio_path, transcript)
            if not alignment_data:
                print("❌ Forced alignment failed. Please try again.")
                return

            # Step 3: Log and analyze the alignment
            print("Alignment Data:", alignment_data)
            print(f"Correct Text: {correct_text}, Transcript: {transcript}")
            self._analyze_pronunciation(correct_text, transcript, alignment_data)

        except Exception as e:
            print(f"❌ Analysis error: {e}")

    def _analyze_pronunciation(self, correct_text: str, transcript: str, alignment_data: Dict) -> None:
        """Detailed pronunciation analysis."""
        correct_words = correct_text.lower().split()
        spoken_words = transcript.lower().split()

        for i, (correct, aligned) in enumerate(zip(correct_words, spoken_words)):
            timing = alignment_data['words'][i] if i < len(alignment_data['words']) else None
            accuracy, phoneme_matches = self._calculate_pronunciation_accuracy(aligned, correct)

            # Analyze phonemes
            spoken_ipa = self.get_ipa_pronunciation(aligned)
            correct_ipa = self.get_ipa_pronunciation(correct)

            print(f"Spoken Word IPA: {spoken_ipa}, Correct Word IPA: {correct_ipa}")
            print(f"Phoneme Matches: {phoneme_matches}")

            analysis = {
                'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                'word': correct,
                'pronounced_as': aligned,
                'spoken_ipa': spoken_ipa,
                'correct_ipa': correct_ipa,
                'start_time': timing['start'] if timing else None,
                'end_time': timing['end'] if timing else None,
                'duration': timing['duration'] if timing else None,
                'accuracy': accuracy,
                'phoneme_matches': phoneme_matches,
                'status': self._get_accuracy_status(accuracy)
            }
            self.results.append(analysis)

        self.display_results()


    def get_ipa_pronunciation(self, text: str) -> str:
        """Convert text to IPA phonetic notation."""
        try:
            ipa = eng_to_ipa.convert(text)
            return ipa if ipa != text else "Conversion failed"
        except:
            return "IPA conversion failed"

    def _calculate_pronunciation_accuracy(self, spoken: str, correct: str) -> float:
        """Calculate pronunciation accuracy and phoneme matches."""
        spoken_ipa = self.get_ipa_pronunciation(spoken)
        correct_ipa = self.get_ipa_pronunciation(correct)

        if "failed" in spoken_ipa or "failed" in correct_ipa:
            return 0, {}

        # Calculate overall accuracy
        distance = Levenshtein.distance(spoken_ipa, correct_ipa)
        max_len = max(len(spoken_ipa), len(correct_ipa))
        accuracy = max(0, min(100, 100 * (1 - (distance / max_len))))

        # Calculate phoneme match percentages
        phoneme_matches = {}
        for i, (s, c) in enumerate(zip(spoken_ipa, correct_ipa)):
            phoneme_matches[f"phoneme_{i+1}"] = 100 if s == c else 0

        return accuracy, phoneme_matches

    def _get_accuracy_status(self, accuracy: float) -> str:
        """Categorize pronunciation accuracy."""
        if accuracy >= 90:
            return "✅ Excellent"
        elif accuracy >= 70:
            return "🟡 Good"
        else:
            return "❌ Needs Improvement"

    def display_results(self) -> None:
        """Show results in a table."""
        df = pd.DataFrame(self.results)
        display(HTML(df.to_html(index=False)))

In [3]:
analyzer = ImprovedPronunciationAnalyzer()
analyzer.analyze_speech(" hello Good morning", 3)

🎙️ Recording for 3 seconds...
Transcribed Text: hello good morning
Parsed Words Tier: (Interval(start=0.0, end=0.5, label=''), Interval(start=0.5, end=1.57, label='hello'), Interval(start=1.57, end=1.75, label='good'), Interval(start=1.75, end=2.4, label=''), Interval(start=2.4, end=3.0, label='morning'))
Parsed Phones Tier: (Interval(start=0.0, end=0.5, label=''), Interval(start=0.5, end=1.57, label='spn'), Interval(start=1.57, end=1.6, label='ɡ'), Interval(start=1.6, end=1.62, label='ʊ'), Interval(start=1.62, end=1.75, label='ɾ'), Interval(start=1.75, end=2.4, label=''), Interval(start=2.4, end=3.0, label='spn'))
Alignment Data: {'words': [{'word': 'hello', 'start': 0.5, 'end': 1.57, 'duration': 1.07}, {'word': 'good', 'start': 1.57, 'end': 1.75, 'duration': 0.17999999999999994}, {'word': 'morning', 'start': 2.4, 'end': 3.0, 'duration': 0.6000000000000001}], 'phones': [{'phone': 'spn', 'start': 0.5, 'end': 1.57, 'duration': 1.07}, {'phone': 'ɡ', 'start': 1.57, 'end': 1.6, 'duration':

timestamp,word,pronounced_as,spoken_ipa,correct_ipa,start_time,end_time,duration,accuracy,phoneme_matches,status
2025-02-10 12:56:02,hello,hello,hɛˈloʊ,hɛˈloʊ,0.5,1.57,1.07,100,"{'phoneme_1': 100, 'phoneme_2': 100, 'phoneme_3': 100, 'phoneme_4': 100, 'phoneme_5': 100, 'phoneme_6': 100}",✅ Excellent
2025-02-10 12:56:02,good,good,gʊd,gʊd,1.57,1.75,0.18,100,"{'phoneme_1': 100, 'phoneme_2': 100, 'phoneme_3': 100}",✅ Excellent
2025-02-10 12:56:02,morning,morning,ˈmɔrnɪŋ,ˈmɔrnɪŋ,2.4,3.0,0.6,100,"{'phoneme_1': 100, 'phoneme_2': 100, 'phoneme_3': 100, 'phoneme_4': 100, 'phoneme_5': 100, 'phoneme_6': 100, 'phoneme_7': 100}",✅ Excellent
