## Импорт библиотек

In [None]:
!pip install pymorphy2

In [None]:
import os.path
import pandas as pd
import numpy as np
import librosa
import matplotlib.pyplot as plt
import re
import pymorphy2
import nltk
from nltk.stem import WordNetLemmatizer
nltk.download("wordnet")
nltk.download("omw-1.4")
nltk.download('stopwords')
nltk.download('punkt')
from nltk.tokenize import word_tokenize
from string import punctuation
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn.functional as F

In [3]:
if torch.cuda.is_available():
    device = torch.device("cuda")

## Импорт данных

In [4]:
csv_data = pd.read_csv('Data_ASR_2.csv')

In [5]:
def calc_sentiment_label(row):
  if row['sentiment'] > 0:
    return 1
  elif row['sentiment'] < 0:
    return 2 # чтобы не было -1 в лейблах
  else:
    return 0

csv_data['sentiment_label'] = csv_data.apply(calc_sentiment_label, axis=1)

In [6]:
y = csv_data['sentiment_label'].tolist()

In [None]:
csv_data.sample(3)

In [None]:
csv_data.shape

## Текстовые данные

### Препроцессинг

In [None]:
text_data = csv_data['text'].tolist()
text_data[0]

In [10]:
def preprocess(string):
  result = word_tokenize(string)

  punctiations = list(punctuation)
  result = [i for i in result if (i not in punctiations)]

  result =  [i.lower() for i in result]

  stop_words = nltk.corpus.stopwords.words('english')
  result = [i for i in result if ( i not in stop_words )]

  wnl = WordNetLemmatizer()
  result = [wnl.lemmatize(word, pos="v") for word in result]

  return ' '.join(result)

In [None]:
preprocessed_text_data = [preprocess(string) for string in text_data]
preprocessed_text_data[0]

### Извлечение признаков

In [12]:
vectorizer = CountVectorizer(max_features=10000)
bow_X = vectorizer.fit_transform(np.asarray(preprocessed_text_data)).toarray()

### Подготовка к обучению

In [20]:
X_train, X_test, y_train, y_test = train_test_split(bow_X, y, test_size=0.2, random_state=42)

### Нейронная сеть

In [23]:
class TFIDFClassifier(nn.Module):
    def __init__(self, num_classes, num_tokens, embedding_dim, num_filters, filter_sizes):
        super(TFIDFClassifier, self).__init__()

        self.embedding = nn.Embedding(num_tokens, embedding_dim)
        self.conv_layers = nn.ModuleList([
            nn.Conv1d(in_channels=embedding_dim, out_channels=num_filters, kernel_size=fs)
            for fs in filter_sizes
        ])
        self.fc = nn.Linear(num_filters * len(filter_sizes), num_classes)

    def forward(self, x):
        x = self.embedding(x)
        x = x.permute(0, 2, 1)
        conv_outputs = [F.relu(conv(x)) for conv in self.conv_layers]
        pooled_outputs = [F.max_pool1d(conv, conv.size(2)).squeeze(2) for conv in conv_outputs]
        x = torch.cat(pooled_outputs, dim=1)
        x = self.fc(x)
        return F.softmax(x, dim=-1)

    def train_model(self, train_loader, val_loader, num_epochs, learning_rate=0.001):
        optimizer = optim.Adam(self.parameters(), lr=learning_rate, weight_decay=0.001)
        criterion = nn.CrossEntropyLoss()

        train_losses = []
        val_losses = []
        train_f1s = []
        val_f1s = []
        train_labels_all = []
        train_outputs_all = []
        test_labels_all = []
        test_outputs_all = []

        for epoch in range(num_epochs):
            self.train()
            running_train_loss = 0.0
            correct_train = 0
            total_train = 0

            for inputs, labels in train_loader:
                optimizer.zero_grad()
                outputs = self(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                running_train_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total_train += labels.size(0)
                correct_train += (predicted == labels).sum().item()

                train_labels_all.extend(labels.tolist())
                train_outputs_all.extend(predicted.tolist())

            train_loss = running_train_loss / len(train_loader)
            train_losses.append(train_loss)
            train_f1s.append(f1_score(train_labels_all, train_outputs_all, average='weighted'))

            self.eval()
            running_val_loss = 0.0
            correct_val = 0
            total_val = 0

            with torch.no_grad():
                for inputs, labels in val_loader:
                    outputs = self(inputs)
                    loss = criterion(outputs, labels)

                    running_val_loss += loss.item()
                    _, predicted = torch.max(outputs, 1)
                    total_val += labels.size(0)
                    correct_val += (predicted == labels).sum().item()

                    test_labels_all.extend(labels.tolist())
                    test_outputs_all.extend(predicted.tolist())

                val_loss = running_val_loss / len(val_loader)
                val_losses.append(val_loss)
                val_f1s.append(f1_score(test_labels_all, test_outputs_all, average='weighted'))

                print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

        return train_losses, val_losses, train_f1s, val_f1s


In [21]:
X_train = torch.tensor(X_train, dtype=torch.long).cuda()
y_train = torch.tensor(y_train).cuda()

X_test = torch.tensor(X_test, dtype=torch.long).cuda()
y_test = torch.tensor(y_test).cuda()

train_dataset = TensorDataset(X_train, y_train)
test_dataset =  TensorDataset(X_test, y_test)

batch_size = 256
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

num_classes = 3
num_tokens = X_train.shape[1]
embedding_dim = 100
num_filters = 128
filter_sizes = [3, 4, 5]
num_epochs = 10

In [None]:
model = TFIDFClassifier(num_classes, num_tokens, embedding_dim, num_filters, filter_sizes)
model.cuda()
train_losses, val_losses, train_f1s, val_f1s = model.train_model(train_loader, val_loader, num_epochs)

### Результаты

In [None]:
epochs = range(1, len(train_losses) + 1)

plt.figure(figsize=(15, 5))
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, label='Функция потерь на обучающей выборке')
plt.plot(epochs, val_losses, label='Функция потерь на валидационной выборке')
plt.title('Кривая функции потерь')
plt.xlabel('Эпоха')
plt.ylabel('Функция потерь')
plt.legend(loc = "upper left")
plt.subplot(1, 2, 2)
plt.plot(epochs, train_f1s, label='F1 на обучающей выборке')
plt.plot(epochs, val_f1s, label='F1 на валидационной выборке')
plt.title('Кривая F1')
plt.xlabel('Эпоха')
plt.ylabel('F1')
plt.legend(loc = "upper left")

plt.tight_layout()
plt.show()

## Аудио данные

In [None]:
!unzip '/content/drive/MyDrive/masters/ML/Audio.zip'

In [None]:
filenames = csv_data[['video', 'start_time', 'end_time']]
filenames.head(3)

### Извлечение признаков

#### MFCC

In [11]:
def calculate_mfcc(row, n_mfcc=20, hop_length=512, maxlen=300):
    audio_path = row['video']
    audio_path = '/content/Audio/WAV_16000/' + audio_path + '.wav'

    start_time = row['start_time']
    end_time = row['end_time']
    duration = end_time - start_time
    audio, sr = librosa.load(audio_path, offset=start_time, duration=duration, sr=None)

    mfcc_features = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=n_mfcc, hop_length=hop_length)

    # приводим к одном длине
    if maxlen and mfcc_features.shape[1] > maxlen:
      mfcc_features = mfcc_features[:, :maxlen]
    elif maxlen and mfcc_features.shape[1] < maxlen:
      mfcc_features = np.pad(mfcc_features, ((0, 0), (0, maxlen - mfcc_features.shape[1]) ))
    return mfcc_features

In [12]:
mfcc_features = []
for index, row in filenames.iterrows():
  mfcc_features.append(calculate_mfcc(row))

### Подготовка к обучению

In [22]:
X_train, X_test, y_train, y_test = train_test_split(mfcc_features, y, test_size=0.2, random_state=42)

### Нейронная сеть

In [49]:
class MFCC_Classifier(nn.Module):
    def __init__(self, input_dim, output_dim, hidden_layers=(128, 128, 64), dropout_rate=0.2):
      super(MFCC_Classifier, self).__init__()
      self.input_dim = input_dim
      self.output_dim = output_dim
      self.hidden_layers = hidden_layers
      self.dropout_rate = dropout_rate

      layers = []
      prev_dim = input_dim
      for units in hidden_layers:
          layers.append(nn.Linear(prev_dim, units))
          layers.append(nn.ReLU())
          layers.append(nn.Dropout(dropout_rate))
          prev_dim = units
      layers.append(nn.Linear(prev_dim, output_dim))

      self.model = nn.Sequential(*layers)
      self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
      x = self.model(x)
      x = self.softmax(x)
      return x

    def train_model(self, train_loader, val_loader, num_epochs, learning_rate=0.001):
        optimizer = optim.Adam(self.parameters(), lr=learning_rate, weight_decay=0.001)
        criterion = nn.CrossEntropyLoss()

        train_losses = []
        val_losses = []
        train_f1s = []
        val_f1s = []
        train_labels_all = []
        train_outputs_all = []
        test_labels_all = []
        test_outputs_all = []

        for epoch in range(num_epochs):
            self.train()
            running_train_loss = 0.0
            correct_train = 0
            total_train = 0

            for inputs, labels in train_loader:
                optimizer.zero_grad()
                predicted = self(inputs)
                loss = criterion(predicted, labels.unsqueeze(1))
                loss.backward()
                optimizer.step()
                running_train_loss += loss.item()

                _, predicted = torch.max(predicted, 1)
                total_train += labels.size(0)
                correct_train += (predicted == labels).sum().item()

                train_labels_all.extend(labels.tolist())
                train_outputs_all.extend(predicted.tolist())

            train_loss = running_train_loss / len(train_loader)
            train_losses.append(train_loss)
            train_f1s.append(f1_score(train_labels_all, train_outputs_all, average='weighted'))

            self.eval()
            running_val_loss = 0.0
            correct_val = 0
            total_val = 0

            with torch.no_grad():
                for inputs, labels in val_loader:
                    predicted = self(inputs)
                    loss = criterion(predicted, labels.unsqueeze(1))

                    running_val_loss += loss.item()
                    _, predicted = torch.max(predicted, 1)
                    total_val += labels.size(0)
                    correct_val += (predicted == labels).sum().item()

                    test_labels_all.extend(labels.tolist())
                    test_outputs_all.extend(predicted.tolist())

                val_loss = running_val_loss / len(val_loader)
                val_losses.append(val_loss)
                val_f1s.append(f1_score(test_labels_all, test_outputs_all, average='weighted'))

                print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

        return train_losses, val_losses, train_f1s, val_f1s


In [24]:
X_train = torch.tensor(np.array(X_train), dtype=torch.float32).cuda()
y_train = torch.tensor(y_train).cuda()

X_test = torch.tensor(np.array(X_test), dtype=torch.float32).cuda()
y_test = torch.tensor(y_test).cuda()

train_dataset = TensorDataset(X_train, y_train)
test_dataset =  TensorDataset(X_test, y_test)

batch_size = 256
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [52]:
num_epochs=50

In [None]:
model = MFCC_Classifier(input_dim=X_train.shape[2], output_dim=1)
model.cuda()
train_losses, val_losses, train_f1s, val_f1s = model.train_model(train_loader, val_loader, num_epochs)

### Результаты

In [None]:
epochs = range(1, len(train_losses) + 1)

plt.figure(figsize=(15, 5))
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, label='Функция потерь на обучающей выборке')
plt.plot(epochs, val_losses, label='Функция потерь на валидационной выборке')
plt.title('Кривая функции потерь')
plt.xlabel('Эпоха')
plt.ylabel('Функция потерь')
plt.legend(loc = "upper left")
plt.subplot(1, 2, 2)
plt.plot(epochs, train_f1s, label='F1 на обучающей выборке')
plt.plot(epochs, val_f1s, label='F1 на валидационной выборке')
plt.title('Кривая F1')
plt.xlabel('Эпоха')
plt.ylabel('F1')
plt.legend(loc = "upper left")

plt.tight_layout()
plt.show()