In [None]:
# Датасет для классификации текста
class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

In [None]:
# Нейронная сеть для классификации текста
class TextClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(TextClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

In [None]:
def prepare_word2vec(texts):
    tokenized_texts = [text.split() for text in texts]
    model = Word2Vec(tokenized_texts, vector_size=100, window=5, min_count=1, workers=4)
    return model

In [None]:
def encode_with_bert(text):
    inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
    outputs = bert_model(**inputs)
    return outputs.last_hidden_state.mean(dim=1)  # Усреднение по всем токенам

In [None]:
# Аугментация аудио
def augment_audio(file_path):
    audio, sr = librosa.load(file_path, sr=None)

    # Добавление белого шума
    noise = np.random.randn(len(audio))
    augmented_audio = audio + 0.005 * noise

    # Изменение скорости
    augmented_audio_speed = librosa.effects.time_stretch(audio, rate=1.2)

    # Сохранение аугментированного аудио
    augmented_file_path = file_path.replace('.wav', '_augmented.wav')
    librosa.output.write_wav(augmented_file_path, augmented_audio, sr)

    augmented_file_path_speed = file_path.replace('.wav', '_augmented_speed.wav')
    librosa.output.write_wav(augmented_file_path_speed, augmented_audio_speed, sr)

    return [augmented_file_path, augmented_file_path_speed]

In [None]:
def prepare_word2vec(texts):
    tokenized_texts = [text.split() for text in texts]
    model = Word2Vec(tokenized_texts, vector_size=100, window=5, min_count=1, workers=4)
    return model

In [None]:
def encode_with_bert(text):
    inputs = tokenizer(text, return_tensors='pt', padding=True, truncation=True)
    outputs = bert_model(**inputs)
    return outputs.last_hidden_state.mean(dim=1)  # Усреднение по всем токенам

In [None]:
# Аугментация аудио
def augment_audio(file_path):
    audio, sr = librosa.load(file_path, sr=None)

    # Добавление белого шума
    noise = np.random.randn(len(audio))
    augmented_audio = audio + 0.005 * noise

    # Изменение скорости
    augmented_audio_speed = librosa.effects.time_stretch(audio, rate=1.2)

    # Сохранение аугментированного аудио
    augmented_file_path = file_path.replace('.wav', '_augmented.wav')
    librosa.output.write_wav(augmented_file_path, augmented_audio, sr)

    augmented_file_path_speed = file_path.replace('.wav', '_augmented_speed.wav')
    librosa.output.write_wav(augmented_file_path_speed, augmented_audio_speed, sr)

    return [augmented_file_path, augmented_file_path_speed]

In [None]:
def correct_transcription(transcribed_text):
    # Используем fuzzywuzzy для нахождения ближайшей команды
    corrected_text, score = process.extractOne(transcribed_text, label.values())
    if score >= 80:  # Порог для принятия исправления
        return corrected_text
    return transcribed_text  # Если нет похожей команды, возвращаем оригинал

In [None]:
def correct_transcription(transcribed_text):
    # Используем fuzzywuzzy для нахождения ближайшей команды
    corrected_text, score = process.extractOne(transcribed_text, label.values())
    if score >= 80:  # Порог для принятия исправления
        return corrected_text
    return transcribed_text  # Если нет похожей команды, возвращаем оригинал

In [None]:
# Функция для обработки аудиофайла
def transcribe_audio(audio_file, dir):
    wf = wave.open(f"{dir}/{audio_file}", "rb")
    rec = KaldiRecognizer(model, wf.getframerate())

    result_text = ""
    while True:
        data = wf.readframes(4000)
        if len(data) == 0:
            break
        if rec.AcceptWaveform(data):
            result = json.loads(rec.Result())
            result_text += result.get("text", "") + " "

    final_result = json.loads(rec.FinalResult())
    result_text += final_result.get("text", "")
    print(f"Распознанный текст для {audio_file}: {result_text}")
    return result_text.strip()

In [None]:
# Функция классификации текста
def classify_text(text, classifier, tokenizer):
    # Преобразование текста в вектор
    text_vector = tokenizer.transform([text]).toarray()
    text_tensor = torch.tensor(text_vector, dtype=torch.float32)

    # Классификация текста
    with torch.no_grad():
        outputs = classifier(text_tensor)

    _, predicted_class = torch.max(outputs, 1)

    return predicted_class.item()

In [None]:
def process_audio_files(file_list, classifier, tokenizer, result_dir):
    results = []

    for file_info in file_list:
        audio_file = file_info['file']
        audio_id = file_info['id']
        print(f"Обработка файла: {audio_file}")

        # Транскрибируем аудиофайл
        transcribed_text = transcribe_audio(audio_file)
        if transcribed_text is None:
            print(f"Ошибка транскрипции для {audio_file}. Пропуск файла.")
            continue  # Пропускаем файл в случае ошибки

        # Коррекция текста
        corrected_text = correct_transcription(transcribed_text)
        print(f"Исправленный текст для {audio_file}: {corrected_text}")

        # Классификация текста
        predicted_class = classify_text(corrected_text, classifier, tokenizer)

        # Собираем атрибуты
        attributes = {
            "length": len(corrected_text.split()),
            "contains_numbers": any(char.isdigit() for char in corrected_text)
        }

        result = {
            "file_name": os.path.basename(audio_file),
            "file_id": audio_id,
            "transcription": corrected_text,
            "category": predicted_class,
            "attributes": attributes
        }

        results.append(result)
        print(f"Готово: {audio_file}")

    return results

In [None]:
# Пример классификатора и токенизатора
input_dim = 2  # Зависит от метода векторизации текста
hidden_dim = 55
output_dim = len(_label)  # Количество классов
classifier = TextClassifier(input_dim, hidden_dim, output_dim)

In [None]:
# Функция загрузки аннотаций
def load_annotations(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        return json.load(f)

In [None]:
def load_dataset():
    audio_files = []
    texts = []
    labels = []

    for annotation_file, data_dir in zip(ANNOTATION_FILES, DATA_DIR_FILES):
        annotation_path = os.path.join(ANNOTATION_DIR, annotation_file)
        print(f"Загрузка аннотаций из: {annotation_path}")
        training_annotations = load_annotations(annotation_path)

        for annotation in training_annotations:
            audio_filepath = os.path.join(DATA_DIR, data_dir, annotation['audio_filepath'])
            # Проверка существования файла перед конвертацией
            if os.path.exists(audio_filepath):
                if audio_filepath.endswith('.mp3'):
                    audio_filepath = convert_mp3_to_wav(audio_filepath)

                augmented_files = augment_audio(audio_filepath)  # Аугментация аудио
                audio_files.extend(augmented_files)

            else:
                print(f"Файл {audio_filepath} не найден.")
                continue  # Пропустить, если файл не найден


            text = annotation['text']
            label = annotation['label']
            texts.extend([text] * len(augmented_files))  # Повторяем текст для каждого аугментированного файла
            labels.extend([label] * len(augmented_files))

    print("Загрузка датасета завершена.")
    return audio_files, texts, labels

    print("Загрузка датасета завершена.")
    return audio_files, texts, labels

In [None]:
print("Начало загрузки данных...")
audio_files, texts, labels = load_dataset()

In [None]:
# Использование BERT для контекстуальной векторизации
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert_model = BertModel.from_pretrained('bert-base-uncased')

In [None]:
# Подготовка данных для Word2Vec
word2vec_model = prepare_word2vec(texts)
word2vec_vectors = np.array([word2vec_model.wv[text.split()].mean(axis=0) for text in texts])

In [None]:
# Подготовка TF-IDF векторов
tfidf_vectorizer = TfidfVectorizer(max_features=100)  # Максимальное количество признаков
tfidf_vectors = tfidf_vectorizer.fit_transform(texts).toarray()

In [None]:
# Пример использования BERT
bert_vectors = np.array([encode_with_bert(text).detach().numpy() for text in texts])

In [None]:
print("Разделение данных на тренировочные и тестовые...")
train_texts, test_texts, train_labels, test_labels = train_test_split(texts, labels, test_size=0.2)

In [None]:
tokenizer = TfidfVectorizer(max_features=input_dim)

In [None]:
# Подготовка текстовых данных для классификации
train_vectors = tokenizer.fit_transform(train_texts).toarray()
test_vectors = tokenizer.transform(test_texts).toarray()

train_dataset = TextDataset(train_vectors, train_labels)
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)

In [1]:
def train_classifier(classifier, train_vectors, train_labels, epochs=30, batch_size=4):
    train_dataset = TextDataset(train_vectors, train_labels)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(classifier.parameters(), lr=0.001)

    for epoch in range(epochs):
        for texts_batch, labels_batch in train_loader:
            texts_batch = texts_batch.float()
            optimizer.zero_grad()
            outputs = classifier(texts_batch)
            loss = criterion(outputs, labels_batch)
            loss.backward()
            optimizer.step()

        if (epoch + 1) % 5 == 0:
            print(f"Эпоха {epoch + 1}: Потеря = {loss.item():.4f}")

# Обучение с использованием TF-IDF
print("Обучение модели с использованием TF-IDF...")
classifier_tfidf = TextClassifier(input_dim=tfidf_vectors.shape[1], hidden_dim=100, output_dim=len(set(labels)))
train_classifier(classifier_tfidf, tfidf_vectors, train_labels)

# Обучение с использованием Word2Vec
print("Обучение модели с использованием Word2Vec...")
classifier_word2vec = TextClassifier(input_dim=100, hidden_dim=100, output_dim=len(set(labels)))  # 100 - размерность Word2Vec
train_classifier(classifier_word2vec, word2vec_vectors, train_labels)

# Обучение с использованием BERT
print("Обучение модели с использованием BERT...")
classifier_bert = TextClassifier(input_dim=768, hidden_dim=100, output_dim=len(set(labels)))  # 768 - размерность BERT
train_classifier(classifier_bert, bert_vectors, train_labels)

print("Обучение завершено.")

# Вычисление WER
def calculate_wer(reference, hypothesis):
    reference_words = reference.split()
    hypothesis_words = hypothesis.split()

    S = sum(1 for r, h in zip(reference_words, hypothesis_words) if r != h)
    D = len(reference_words) - len(hypothesis_words) if len(reference_words) > len(hypothesis_words) else 0
    I = len(hypothesis_words) - len(reference_words) if len(hypothesis_words) > len(reference_words) else 0
    N = len(reference_words)

    return (S + D + I) / N if N > 0 else 0

# Вычисление Mq
def calculate_mq(wer, f1_weighted):
    WERnorm = wer
    return 0.25 * (1 - WERnorm) + 0.75 * f1_weighted

# Оптимизация гиперпараметров с помощью Optuna
def objective(trial):
    hidden_dim = trial.suggest_int('hidden_dim', 32, 128)
    learning_rate = trial.suggest_loguniform('learning_rate', 1e-5, 1e-1)
    batch_size = trial.suggest_int('batch_size', 4, 16)

    # Загрузка и подготовка данных
    audio_files, texts, labels = load_dataset()
    train_texts, test_texts, train_labels, test_labels = train_test_split(texts, labels, test_size=0.2)

    # Генерация векторов BERT
    bert_vectors = np.array([encode_with_bert(text).detach().numpy() for text in train_texts])

    # Создание датасета и загрузчика
    train_dataset = TextDataset(bert_vectors, train_labels)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    # Обучение модели
    classifier = TextClassifier(input_dim=768, hidden_dim=hidden_dim, output_dim=len(set(labels)))
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(classifier.parameters(), lr=learning_rate)

    for epoch in range(3):  # Можно увеличить количество эпох
        for texts_batch, labels_batch in train_loader:
            texts_batch = texts_batch.float()
            optimizer.zero_grad()
            outputs = classifier(texts_batch)
            loss = criterion(outputs, labels_batch)
            loss.backward()
            optimizer.step()

    # Оценка модели на тестовом наборе
    test_bert_vectors = np.array([encode_with_bert(text).detach().numpy() for text in test_texts])
    test_dataset = TextDataset(test_bert_vectors, test_labels)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for texts_batch, labels_batch in test_loader:
            texts_batch = texts_batch.float()
            outputs = classifier(texts_batch)
            _, predicted = torch.max(outputs, 1)

            all_predictions.extend(predicted.numpy())
            all_labels.extend(labels_batch.numpy())

    # Вычисление метрик
    f1_weighted = f1_score(all_labels, all_predictions, average='weighted')
    precision = precision_score(all_labels, all_predictions, average='weighted')
    recall = recall_score(all_labels, all_predictions, average='weighted')

    # Расчет WER и Mq
    wer = calculate_wer(" ".join(test_texts), " ".join([tokenizer.decode(pred) for pred in all_predictions]))
    mq = calculate_mq(wer, f1_weighted)

    print(f'F1-Weighted: {f1_weighted}, Precision: {precision}, Recall: {recall}, WER: {wer}, Mq: {mq}')

    return f1_weighted

# Запуск оптимизации гиперпараметров
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=10)

print("Лучшие гиперпараметры: ", study.best_params)

def process_validation_files(val_dir, classifier, tokenizer):
    """Обрабатывает валидационные файлы и сохраняет результаты."""
    annotations = load_annotations(ANNOTATION_VAL_FILE)
    print(f"Загружено аннотаций: {len(annotations)}")

    result_files = []
    for annotation in annotations:
        audio_filepath = os.path.join(val_dir, annotation['audio_filepath'])
        if os.path.exists(audio_filepath):
            result_files.append({"file": audio_filepath, "id": annotation['id']})
        else:
            print(f"Файл {audio_filepath} не найден.")

    if result_files:
        # Обрабатываем аудиофайлы
        transcription_results = process_audio_files(result_files, classifier, tokenizer, val_dir)

        # Сохранение результатов в JSON
        output_file = "transcriptions_validation.json"
        with open(output_file, "w", encoding="utf-8") as f:
            json.dump(transcription_results, f, ensure_ascii=False, indent=4)

        print(f"Результаты транскрипции сохранены в {output_file}")
    else:
        print("Нет доступных аудиофайлов для обработки.")

# Вызов функции для обработки валидационных файлов
print("Обработка валидационных файлов...")
process_validation_files(VAL_DIR, classifier, tokenizer)




KeyboardInterrupt

