# Pronunciation Model Training
## Based on ChaosLingua System Architecture - Panelist 2
### Implements Acoustic Analyzer for Romanian Phonological Assessment

In [None]:
# Install PyTorch with CUDA support (Kaggle GPU)
%pip install torch torchvision torchaudio
%pip install transformers datasets evaluate accelerate
%pip install librosa soundfile --quiet
%pip install protobuf sentencepiece tiktoken --quiet
%pip install phonemetransformers --quiet

In [None]:
# Setup HuggingFace API access
from huggingface_hub import login

# Use your NEW token here
hf_api_key = "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"
login(token=hf_api_key)

In [None]:
import pandas as pd
import requests
import librosa
import soundfile as sf
import numpy as np
from datasets import Dataset, Audio
import torch
import torch.nn as nn
from transformers import AutoModel, AutoTokenizer

def load_pronunciation_dataset_simple(dataset_name, split='train'):
    """Load pronunciation dataset - simplified version for phonological analysis"""
    
    api_url = f"https://huggingface.co/api/datasets/{dataset_name}/parquet/default/{split}"
    print(f"üîó Loading: {dataset_name} ({split})")
    
    try:
        # Get parquet URLs from API
        response = requests.get(api_url, timeout=30)
        if response.status_code != 200:
            print(f"‚ùå API failed: {response.status_code}")
            return None
            
        parquet_urls = response.json()
        print(f"üìÅ Found {len(parquet_urls)} parquet file(s)")
        
        # Load each parquet file and combine
        dfs = []
        for i, parquet_url in enumerate(parquet_urls):
            print(f"  Loading file {i+1}: {parquet_url}")
            
            try:
                df_chunk = pd.read_parquet(parquet_url)
                dfs.append(df_chunk)
                print(f"    ‚úÖ {len(df_chunk)} rows")
            except Exception as e:
                print(f"    ‚ùå Failed: {str(e)}")
                continue
        
        if not dfs:
            print(f"‚ùå No files loaded successfully")
            return None
        
        # Combine all chunks
        final_df = pd.concat(dfs, ignore_index=True)
        print(f"üéâ SUCCESS: {len(final_df)} rows, {len(final_df.columns)} columns")
        return final_df
        
    except Exception as e:
        print(f"‚ùå Error: {str(e)}")
        return None

# Define pronunciation datasets
pronunciation_datasets_config = [
    ('phonemetransformers/IPA-CHILDES', ['train', 'validation']),  # Primary phonological dataset
    ('espnet/yodas2', ['train', 'validation']),                    # Audio-phoneme alignment
    ('qmeeus/vp-er-10l', ['train', 'test'])                       # Voice characteristics
]

loaded_pronunciation_datasets = {}

for dataset_name, splits in pronunciation_datasets_config:
    print(f"\n{'='*60}")
    print(f"üì¶ Dataset: {dataset_name}")
    
    dataset_splits = {}
    for split in splits:
        df = load_pronunciation_dataset_simple(dataset_name, split)
        
        if df is not None:
            dataset_splits[split] = df
            print(f"\nüìä {split.upper()} split:")
            print(f"   Shape: {df.shape}")
            print(f"   Columns: {df.columns.tolist()}")
            print(f"   Memory: {df.memory_usage(deep=True).sum() / 1024**2:.1f} MB")
            
            print(f"\nüìã Sample Data:")
            print(df.head(2))
    
    if dataset_splits:
        loaded_pronunciation_datasets[dataset_name] = dataset_splits
        
    print("="*60)

print(f"\nüèÜ RESULTS:")
print(f"   Successfully loaded: {len(loaded_pronunciation_datasets)} pronunciation datasets")

# Quick analysis of what you got
for name, splits_dict in loaded_pronunciation_datasets.items():
    print(f"\n   {name}:")
    for split, df in splits_dict.items():
        print(f"      {split}: {len(df):,} rows")

total_rows = sum(len(df) for splits_dict in loaded_pronunciation_datasets.values() for df in splits_dict.values())
print(f"\n   TOTAL: {total_rows:,} pronunciation examples! üî•")

In [None]:
# Custom Pronunciation Analysis Model
class PronunciationAnalyzer(nn.Module):
    def __init__(self, audio_dim=80, hidden_dim=256, num_phonemes=50):
        super(PronunciationAnalyzer, self).__init__()
        
        # Audio feature extractor (CNN)
        self.audio_encoder = nn.Sequential(
            nn.Conv1d(audio_dim, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.MaxPool1d(2),
            
            nn.Conv1d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(256),
            nn.MaxPool1d(2),
            
            nn.Conv1d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.AdaptiveAvgPool1d(1)
        )
        
        # Phoneme classifier
        self.phoneme_classifier = nn.Sequential(
            nn.Linear(512, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, num_phonemes)
        )
        
        # Pronunciation quality scorer
        self.quality_scorer = nn.Sequential(
            nn.Linear(512, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(hidden_dim, 1),
            nn.Sigmoid()
        )
    
    def forward(self, audio_features):
        # Audio encoding
        encoded = self.audio_encoder(audio_features)
        encoded = encoded.view(encoded.size(0), -1)
        
        # Phoneme classification
        phoneme_logits = self.phoneme_classifier(encoded)
        
        # Quality scoring
        quality_score = self.quality_scorer(encoded)
        
        return {
            'phoneme_logits': phoneme_logits,
            'quality_score': quality_score,
            'encoded_features': encoded
        }

# Initialize model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = PronunciationAnalyzer().to(device)
print(f"‚úÖ Pronunciation model initialized on {device}")

In [None]:
# Audio preprocessing utilities
def extract_audio_features(audio_path, target_sr=16000):
    """Extract MFCC features from audio"""
    try:
        # Load audio
        audio, sr = librosa.load(audio_path, sr=target_sr)
        
        # Extract MFCC features
        mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=80)
        
        # Normalize
        mfccs = (mfccs - mfccs.mean()) / (mfccs.std() + 1e-8)
        
        return mfccs.T  # Transpose for (time, features)
    except Exception as e:
        print(f"Error processing {audio_path}: {e}")
        return None

def prepare_pronunciation_dataset(df, audio_column='audio', phoneme_column='phoneme'):
    """Prepare dataset for pronunciation training"""
    
    # Check what columns we have
    print(f"Available columns: {df.columns.tolist()}")
    
    # Handle different column names across datasets
    if audio_column not in df.columns:
        audio_candidates = ['path', 'file', 'audio_path', 'file_path']
        for candidate in audio_candidates:
            if candidate in df.columns:
                audio_column = candidate
                break
    
    if phoneme_column not in df.columns:
        phoneme_candidates = ['phoneme', 'ipa', 'transcription', 'label']
        for candidate in phoneme_candidates:
            if candidate in df.columns:
                phoneme_column = candidate
                break
    
    print(f"Using audio column: {audio_column}")
    print(f"Using phoneme column: {phoneme_column}")
    
    # Create simplified dataset
    if audio_column in df.columns and phoneme_column in df.columns:
        simplified_df = df[[audio_column, phoneme_column]].copy()
        simplified_df.columns = ['audio', 'phoneme']
        return simplified_df
    else:
        print(f"‚ùå Could not find proper audio/phoneme columns")
        return None

# Process all datasets and splits
all_pronunciation_data = {}

for dataset_name, splits_dict in loaded_pronunciation_datasets.items():
    print(f"\n{'='*60}")
    print(f"üì¶ Processing Pronunciation: {dataset_name}")
    
    for split, df in splits_dict.items():
        prepared_df = prepare_pronunciation_dataset(df)
        
        if prepared_df is not None:
            key = f"{dataset_name}_{split}"
            all_pronunciation_data[key] = prepared_df
            print(f"   {split}: {len(df)} rows ‚Üí {len(prepared_df)} prepared rows")
    
    print("="*60)

# Combine train splits for training
train_dfs = [df for key, df in all_pronunciation_data.items() if 'train' in key]
if train_dfs:
    combined_train = pd.concat(train_dfs, ignore_index=True)
    print(f"\nüìä Combined training data: {len(combined_train)}")
else:
    combined_train = None
    print(f"\n‚ö†Ô∏è  No training data available")

# Combine validation splits for validation
val_dfs = [df for key, df in all_pronunciation_data.items() if 'validation' in key]
if val_dfs:
    combined_val = pd.concat(val_dfs, ignore_index=True)
    print(f"üìä Combined validation data: {len(combined_val)}")
else:
    combined_val = None
    print(f"‚ö†Ô∏è  No validation data available")

# Combine test splits for testing
test_dfs = [df for key, df in all_pronunciation_data.items() if 'test' in key]
if test_dfs:
    combined_test = pd.concat(test_dfs, ignore_index=True)
    print(f"üìä Combined test data: {len(combined_test)}")
else:
    combined_test = None
    print(f"‚ö†Ô∏è  No test data available")

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

class PronunciationDataset(Dataset):
    def __init__(self, dataframe):
        self.data = dataframe
        self.phoneme_to_idx = {}
        self.idx_to_phoneme = {}
        
        # Create phoneme vocabulary
        all_phonemes = []
        for phonemes in self.data['phoneme'].astype(str):
            all_phonemes.extend(list(phonemes))
        
        unique_phonemes = sorted(set(all_phonemes))
        self.phoneme_to_idx = {phoneme: idx for idx, phoneme in enumerate(unique_phonemes)}
        self.idx_to_phoneme = {idx: phoneme for phoneme, idx in self.phoneme_to_idx.items()}
        
        print(f"Created phoneme vocabulary with {len(unique_phonemes)} unique phonemes")
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        
        # Extract audio features
        audio_features = extract_audio_features(row['audio'])
        if audio_features is None:
            # Create dummy features if audio processing fails
            audio_features = np.random.randn(100, 80)
        
        # Convert phoneme to indices
        phoneme_str = str(row['phoneme'])
        phoneme_indices = [self.phoneme_to_idx.get(p, 0) for p in phoneme_str]
        
        # Create target (first phoneme for simplicity)
        target = phoneme_indices[0] if phoneme_indices else 0
        
        return {
            'audio_features': torch.FloatTensor(audio_features),
            'phoneme_target': torch.LongTensor([target]),
            'phoneme_sequence': torch.LongTensor(phoneme_indices)
        }

# Create datasets
if combined_train is not None:
    train_dataset = PronunciationDataset(combined_train)
    print(f"‚úÖ Training dataset: {len(train_dataset)} examples")
else:
    train_dataset = None

if combined_val is not None:
    val_dataset = PronunciationDataset(combined_val)
    print(f"‚úÖ Validation dataset: {len(val_dataset)} examples")
else:
    val_dataset = None

if combined_test is not None:
    test_dataset = PronunciationDataset(combined_test)
    print(f"‚úÖ Test dataset: {len(test_dataset)} examples")
else:
    test_dataset = None

# Create data loaders
if train_dataset:
    train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2)
else:
    train_loader = None

if val_dataset:
    val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=2)
else:
    val_loader = None

if test_dataset:
    test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=2)
else:
    test_loader = None

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

# Check CUDA availability
if torch.cuda.is_available():
    print(f"‚úÖ GPU available: {torch.cuda.get_device_name(0)}")
    print(f"   CUDA version: {torch.version.cuda}")
    print(f"   GPU memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f} GB")
else:
    print("‚ö†Ô∏è No GPU detected - check your Kaggle accelerator settings!")

print(f"PyTorch version: {torch.__version__}")

# Update model with correct number of phonemes
if train_dataset:
    num_phonemes = len(train_dataset.phoneme_to_idx)
    model = PronunciationAnalyzer(num_phonemes=num_phonemes).to(device)
    print(f"‚úÖ Model updated with {num_phonemes} phoneme classes")

# Training setup
criterion_phoneme = nn.CrossEntropyLoss()
criterion_quality = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)

# Training function
def train_epoch(model, train_loader, optimizer, epoch):
    model.train()
    total_loss = 0
    phoneme_correct = 0
    total_samples = 0
    
    pbar = tqdm(train_loader, desc=f'Epoch {epoch}')
    for batch in pbar:
        audio_features = batch['audio_features'].to(device)
        phoneme_targets = batch['phoneme_target'].squeeze().to(device)
        
        # Forward pass
        outputs = model(audio_features.transpose(1, 2))  # (batch, features, time)
        
        # Calculate losses
        phoneme_loss = criterion_phoneme(outputs['phoneme_logits'], phoneme_targets)
        
        # Dummy quality targets (since we don't have explicit quality labels)
        quality_targets = torch.ones_like(outputs['quality_score']) * 0.8  # Assume decent quality
        quality_loss = criterion_quality(outputs['quality_score'], quality_targets)
        
        total_loss_batch = phoneme_loss + 0.5 * quality_loss
        
        # Backward pass
        optimizer.zero_grad()
        total_loss_batch.backward()
        optimizer.step()
        
        # Statistics
        total_loss += total_loss_batch.item()
        _, predicted = torch.max(outputs['phoneme_logits'], 1)
        phoneme_correct += (predicted == phoneme_targets).sum().item()
        total_samples += phoneme_targets.size(0)
        
        # Update progress bar
        pbar.set_postfix({
            'loss': total_loss_batch.item(),
            'acc': phoneme_correct / total_samples
        })
    
    avg_loss = total_loss / len(train_loader)
    accuracy = phoneme_correct / total_samples
    
    return avg_loss, accuracy

# Evaluation function
def evaluate(model, eval_loader):
    model.eval()
    total_loss = 0
    phoneme_correct = 0
    total_samples = 0
    
    with torch.no_grad():
        for batch in eval_loader:
            audio_features = batch['audio_features'].to(device)
            phoneme_targets = batch['phoneme_target'].squeeze().to(device)
            
            outputs = model(audio_features.transpose(1, 2))
            
            phoneme_loss = criterion_phoneme(outputs['phoneme_logits'], phoneme_targets)
            quality_targets = torch.ones_like(outputs['quality_score']) * 0.8
            quality_loss = criterion_quality(outputs['quality_score'], quality_targets)
            
            total_loss_batch = phoneme_loss + 0.5 * quality_loss
            total_loss += total_loss_batch.item()
            
            _, predicted = torch.max(outputs['phoneme_logits'], 1)
            phoneme_correct += (predicted == phoneme_targets).sum().item()
            total_samples += phoneme_targets.size(0)
    
    avg_loss = total_loss / len(eval_loader)
    accuracy = phoneme_correct / total_samples
    
    return avg_loss, accuracy

In [None]:
# Training loop
num_epochs = 10
best_val_acc = 0

print("üöÄ Starting pronunciation model training...")

for epoch in range(1, num_epochs + 1):
    # Train
    train_loss, train_acc = train_epoch(model, train_loader, optimizer, epoch)
    
    # Evaluate
    if val_loader:
        val_loss, val_acc = evaluate(model, val_loader)
        print(f"Epoch {epoch}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
        
        # Save best model
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_pronunciation_model.pth')
            print(f"‚úÖ New best model saved with accuracy: {val_acc:.4f}")
    else:
        print(f"Epoch {epoch}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}")

print("üéâ Training completed!")

In [None]:
# Test evaluation
if test_loader:
    print("üîç Evaluating on test set...")
    test_loss, test_acc = evaluate(model, test_loader)
    print(f"\nüìä Test Results:")
    print(f"   Test Loss: {test_loss:.4f}")
    print(f"   Test Accuracy: {test_acc:.4f}")
    
    # Load best model for final evaluation
    model.load_state_dict(torch.load('best_pronunciation_model.pth'))
    best_test_loss, best_test_acc = evaluate(model, test_loader)
    print(f"   Best Model Test Accuracy: {best_test_acc:.4f}")
else:
    print("‚ö†Ô∏è No test dataset available for evaluation")

# Show some predictions
if test_dataset:
    print(f"\nüìù Sample Predictions:")
    model.eval()
    
    with torch.no_grad():
        for i in range(min(5, len(test_dataset))):
            sample = test_dataset[i]
            audio_features = sample['audio_features'].unsqueeze(0).to(device)
            
            outputs = model(audio_features.transpose(1, 2))
            _, predicted = torch.max(outputs['phoneme_logits'], 1)
            
            predicted_phoneme = test_dataset.idx_to_phoneme[predicted.item()]
            actual_phoneme = test_dataset.idx_to_phoneme[sample['phoneme_target'].item()]
            quality_score = outputs['quality_score'].item()
            
            print(f"   Sample {i+1}:")
            print(f"     Actual: {actual_phoneme}")
            print(f"     Predicted: {predicted_phoneme}")
            print(f"     Quality Score: {quality_score:.3f}")
            print()

In [None]:
# Save final model
torch.save({
    'model_state_dict': model.state_dict(),
    'phoneme_to_idx': train_dataset.phoneme_to_idx if train_dataset else {},
    'idx_to_phoneme': train_dataset.idx_to_phoneme if train_dataset else {},
    'model_config': {
        'audio_dim': 80,
        'hidden_dim': 256,
        'num_phonemes': len(train_dataset.phoneme_to_idx) if train_dataset else 50
    }
}, 'pronunciation_model.pth')

print("‚úÖ Pronunciation model saved successfully!")
print(f"üìÅ Model saved to: pronunciation_model.pth")
print(f"üéØ Phoneme vocabulary size: {len(train_dataset.phoneme_to_idx) if train_dataset else 'N/A'}")