# Audio Processing Pipeline

Пайплайн для работы с аудио:
- Классификация аудио (музыкальные жанры, речь, эмоции)
- MFCC и спектрограммы
- Аугментация аудио
- CNN/RNN для аудио
- Transfer Learning (YAMNet, wav2vec2)

In [None]:
!pip install librosa soundfile torch torchaudio transformers pandas numpy scikit-learn matplotlib -q

In [None]:
import librosa
import librosa.display
import soundfile as sf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torchaudio
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, classification_report
from transformers import AutoFeatureExtractor, AutoModelForAudioClassification
import warnings
warnings.filterwarnings('ignore')

print("✓ Библиотеки загружены!")
print(f"CUDA available: {torch.cuda.is_available()}")

## 1. Загрузка данных

In [None]:
# === ВАШИ ДАННЫЕ ===
# Формат: CSV с колонками 'file_path', 'label'
train_df = pd.read_csv('train.csv')
test_df = pd.read_csv('test.csv')

# Параметры
SAMPLE_RATE = 22050  # частота дискретизации
DURATION = 3  # длительность аудио в секундах
N_MFCC = 40  # количество MFCC коэффициентов
N_MELS = 128  # количество мел-фильтров

# Энкодинг меток
le = LabelEncoder()
train_df['label_encoded'] = le.fit_transform(train_df['label'])
num_classes = len(le.classes_)

print(f"Количество классов: {num_classes}")
print(f"Классы: {le.classes_}")
print(f"\nTrain samples: {len(train_df)}")
print(f"Test samples: {len(test_df)}")

## 2. Feature extraction - MFCC

In [None]:
def extract_mfcc(file_path, sr=SAMPLE_RATE, duration=DURATION, n_mfcc=N_MFCC):
    """
    Извлечение MFCC признаков из аудио файла
    """
    try:
        # Загрузка аудио
        audio, sr = librosa.load(file_path, sr=sr, duration=duration)
        
        # Padding если аудио короче duration
        if len(audio) < sr * duration:
            audio = np.pad(audio, (0, sr * duration - len(audio)), mode='constant')
        
        # Извлечение MFCC
        mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=n_mfcc)
        
        # Усреднение по времени
        mfcc_mean = np.mean(mfcc.T, axis=0)
        
        return mfcc_mean
    except Exception as e:
        print(f"Ошибка при обработке {file_path}: {e}")
        return np.zeros(n_mfcc)

# Пример извлечения признаков
sample_file = train_df.iloc[0]['file_path']
mfcc_features = extract_mfcc(sample_file)
print(f"MFCC shape: {mfcc_features.shape}")

## 3. Feature extraction - Mel Spectrogram

In [None]:
def extract_mel_spectrogram(file_path, sr=SAMPLE_RATE, duration=DURATION, n_mels=N_MELS):
    """
    Извлечение мел-спектрограммы
    """
    try:
        audio, sr = librosa.load(file_path, sr=sr, duration=duration)
        
        if len(audio) < sr * duration:
            audio = np.pad(audio, (0, sr * duration - len(audio)), mode='constant')
        
        # Мел-спектрограмма
        mel_spec = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=n_mels)
        mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
        
        return mel_spec_db
    except Exception as e:
        print(f"Ошибка: {e}")
        return np.zeros((n_mels, int(sr * duration / 512) + 1))

# Визуализация спектрограммы
mel_spec = extract_mel_spectrogram(sample_file)
plt.figure(figsize=(10, 4))
librosa.display.specshow(mel_spec, sr=SAMPLE_RATE, x_axis='time', y_axis='mel')
plt.colorbar(format='%+2.0f dB')
plt.title('Mel Spectrogram')
plt.tight_layout()
plt.show()

print(f"Mel Spectrogram shape: {mel_spec.shape}")

## 4. Audio Augmentation

In [None]:
def augment_audio(audio, sr):
    """
    Аугментация аудио: изменение высоты тона, скорости, добавление шума
    """
    augmentation_type = np.random.choice(['pitch', 'speed', 'noise', 'none'])
    
    if augmentation_type == 'pitch':
        # Изменение высоты тона
        n_steps = np.random.randint(-3, 4)
        audio = librosa.effects.pitch_shift(audio, sr=sr, n_steps=n_steps)
    
    elif augmentation_type == 'speed':
        # Изменение скорости
        rate = np.random.uniform(0.8, 1.2)
        audio = librosa.effects.time_stretch(audio, rate=rate)
    
    elif augmentation_type == 'noise':
        # Добавление шума
        noise = np.random.randn(len(audio))
        audio = audio + 0.005 * noise
    
    return audio

# Пример аугментации
audio, sr = librosa.load(sample_file, sr=SAMPLE_RATE)
audio_aug = augment_audio(audio.copy(), sr)
print(f"Original audio shape: {audio.shape}")
print(f"Augmented audio shape: {audio_aug.shape}")

## 5. PyTorch Dataset для аудио

In [None]:
class AudioDataset(Dataset):
    def __init__(self, df, sr=SAMPLE_RATE, duration=DURATION, n_mels=N_MELS, augment=False):
        self.df = df
        self.sr = sr
        self.duration = duration
        self.n_mels = n_mels
        self.augment = augment
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        file_path = row['file_path']
        
        # Загрузка аудио
        audio, sr = librosa.load(file_path, sr=self.sr, duration=self.duration)
        
        # Аугментация
        if self.augment:
            audio = augment_audio(audio, sr)
        
        # Padding
        if len(audio) < sr * self.duration:
            audio = np.pad(audio, (0, sr * self.duration - len(audio)), mode='constant')
        
        # Мел-спектрограмма
        mel_spec = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=self.n_mels)
        mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
        
        # Нормализация
        mel_spec_db = (mel_spec_db - mel_spec_db.mean()) / (mel_spec_db.std() + 1e-8)
        
        # Преобразование в тензор (добавляем channel dimension)
        mel_spec_tensor = torch.FloatTensor(mel_spec_db).unsqueeze(0)
        
        # Label
        label = torch.LongTensor([row['label_encoded']])[0] if 'label_encoded' in row else 0
        
        return mel_spec_tensor, label

# Создание DataLoaders
train_dataset = AudioDataset(train_df, augment=True)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)

print(f"✓ Dataset создан! Размер: {len(train_dataset)}")

# Проверка батча
sample_batch = next(iter(train_loader))
print(f"Batch shape: {sample_batch[0].shape}")
print(f"Labels shape: {sample_batch[1].shape}")

## 6. CNN для классификации аудио

In [None]:
class AudioCNN(nn.Module):
    def __init__(self, num_classes):
        super(AudioCNN, self).__init__()
        
        self.conv_layers = nn.Sequential(
            # Conv block 1
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.25),
            
            # Conv block 2
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.25),
            
            # Conv block 3
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.25),
        )
        
        # Адаптивный pooling для фиксированного размера
        self.adaptive_pool = nn.AdaptiveAvgPool2d((4, 4))
        
        # FC layers
        self.fc_layers = nn.Sequential(
            nn.Linear(128 * 4 * 4, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)
        )
    
    def forward(self, x):
        x = self.conv_layers(x)
        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        return x

# Создание модели
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = AudioCNN(num_classes=num_classes).to(device)

print(f"✓ Модель создана!")
print(f"Параметры модели: {sum(p.numel() for p in model.parameters())}")

## 7. Обучение CNN

In [None]:
# Разбиение на train/val
train_df_split, val_df_split = train_test_split(train_df, test_size=0.2, random_state=42, stratify=train_df['label'])

train_dataset = AudioDataset(train_df_split, augment=True)
val_dataset = AudioDataset(val_df_split, augment=False)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=2)

# Loss & Optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=3, factor=0.5)

# Training loop
def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for inputs, labels in loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()
    
    return total_loss / len(loader), 100. * correct / total

def validate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, labels in loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            total_loss += loss.item()
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()
    
    return total_loss / len(loader), 100. * correct / total

# Обучение
n_epochs = 20
best_val_acc = 0

print("\nНачало обучения...\n")
for epoch in range(n_epochs):
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc = validate(model, val_loader, criterion, device)
    
    scheduler.step(val_acc)
    
    print(f"Epoch {epoch+1}/{n_epochs}")
    print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
    print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
    print("-" * 60)
    
    # Сохранение лучшей модели
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), 'best_audio_model.pth')
        print(f"✓ Модель сохранена! Val Acc: {val_acc:.2f}%\n")

print(f"\n✓ Обучение завершено! Лучшая Val Accuracy: {best_val_acc:.2f}%")

## 8. Transfer Learning - wav2vec2

In [None]:
# Загрузка предобученной модели wav2vec2
model_name = "facebook/wav2vec2-base"
feature_extractor = AutoFeatureExtractor.from_pretrained(model_name)
pretrained_model = AutoModelForAudioClassification.from_pretrained(
    model_name, 
    num_labels=num_classes,
    ignore_mismatched_sizes=True
).to(device)

print("✓ Pretrained модель загружена!")

# Dataset для wav2vec2
class Wav2VecDataset(Dataset):
    def __init__(self, df, feature_extractor, sr=16000, duration=3):
        self.df = df
        self.feature_extractor = feature_extractor
        self.sr = sr
        self.duration = duration
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        
        # Загрузка (wav2vec2 работает на 16kHz)
        audio, sr = librosa.load(row['file_path'], sr=self.sr, duration=self.duration)
        
        # Feature extraction
        inputs = self.feature_extractor(
            audio, 
            sampling_rate=self.sr, 
            return_tensors="pt", 
            padding=True
        )
        
        label = row['label_encoded'] if 'label_encoded' in row else 0
        
        return inputs.input_values.squeeze(), torch.LongTensor([label])[0]

# Fine-tuning можно провести аналогично CNN обучению
print("\nДля fine-tuning используйте тот же training loop с pretrained_model")

## 9. Предсказания на тестовом наборе

In [None]:
# Загрузка лучшей модели
model.load_state_dict(torch.load('best_audio_model.pth'))
model.eval()

# Создание test dataset (без меток)
test_df_copy = test_df.copy()
test_df_copy['label_encoded'] = 0  # Dummy label
test_dataset = AudioDataset(test_df_copy, augment=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=2)

# Предсказания
predictions = []
with torch.no_grad():
    for inputs, _ in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, predicted = outputs.max(1)
        predictions.extend(predicted.cpu().numpy())

# Декодирование меток
predicted_labels = le.inverse_transform(predictions)

print(f"\n✓ Предсказания готовы! Всего: {len(predictions)}")

## 10. Submission

In [None]:
submission = pd.DataFrame({
    'id': test_df.index,  # или test_df['id'] если есть
    'prediction': predicted_labels
})

submission.to_csv('audio_submission.csv', index=False)
print("\n✓ Submission сохранен!")
print(submission.head())
print(f"\nРаспределение предсказаний:")
print(submission['prediction'].value_counts())