In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from datasets import load_dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import numpy as np
import torch.nn.functional as F
from collections import Counter
from itertools import chain

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
class SimpleCNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, num_classes):
        super(SimpleCNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.conv = nn.Conv1d(embedding_dim, 128, kernel_size=3, padding=1)
        self.pool = nn.AdaptiveMaxPool1d(1)
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(128, num_classes)

    def forward(self, x):
        embedded = self.embedding(x).permute(0, 2, 1)
        conv_out = F.relu(self.conv(embedded))
        pooled = self.pool(conv_out).squeeze(2)
        dropped = self.dropout(pooled)
        return self.fc(dropped)

class ConceptualTAM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, max_seq_len, num_topics, num_aspects, num_classes):
        super(ConceptualTAM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.dropout = nn.Dropout(0.3)
        
        self.topic_conv = nn.Conv1d(embedding_dim, 128, kernel_size=3, padding=1)
        self.topic_pool = nn.AdaptiveMaxPool1d(1)
        self.topic_linear = nn.Linear(128, num_topics)
        
        self.aspect_rnn = nn.GRU(embedding_dim, 64, batch_first=True, bidirectional=True)
        self.aspect_linear = nn.Linear(128, num_aspects)
        
        self.classifier = nn.Linear(num_topics + num_aspects, num_classes)

    def forward(self, text):
        embedded = self.embedding(text)
        embedded = self.dropout(embedded)
        
        embedded_permuted = embedded.permute(0, 2, 1)
        topic_features = F.relu(self.topic_conv(embedded_permuted))
        topic_pooled = self.topic_pool(topic_features).squeeze(2)
        topic_output = F.relu(self.topic_linear(topic_pooled))
        
        aspect_out, _ = self.aspect_rnn(embedded)
        aspect_pooled = torch.mean(aspect_out, dim=1)
        aspect_output = F.relu(self.aspect_linear(aspect_pooled))
        
        combined = torch.cat((topic_output, aspect_output), dim=1)
        combined = self.dropout(combined)
        logits = self.classifier(combined)
        
        return logits, topic_output, aspect_output

In [3]:
def prepare_data():
    dataset = load_dataset('imdb')
    
    train_data = dataset['train']
    test_data = dataset['test']
    
    train_indices = list(range(len(train_data)))
    test_indices = list(range(len(test_data)))
    
    import random
    random.seed(42)
    random.shuffle(train_indices)
    random.shuffle(test_indices)
    
    train_sampled_indices = train_indices[:4500]
    test_sampled_indices = test_indices[:1500]
    
    all_texts = []
    all_labels = []
    
    for idx in train_sampled_indices:
        all_texts.append(train_data[idx]['text'])
        all_labels.append(train_data[idx]['label'])
    
    for idx in test_sampled_indices:
        all_texts.append(test_data[idx]['text'])
        all_labels.append(test_data[idx]['label'])
    
    combined = list(zip(all_texts, all_labels))
    random.shuffle(combined)
    all_texts, all_labels = zip(*combined)
    
    print(f"Total dataset size: {len(all_texts)}")
    print(f"Class 0: {sum(1 for l in all_labels if l == 0)} samples")
    print(f"Class 1: {sum(1 for l in all_labels if l == 1)} samples")
    
    max_len = 100
    vocab_size = 2000
    
    from collections import Counter
    vocab = Counter()
    for text in all_texts:
        vocab.update(text.lower().split())
    
    word_to_idx = {word: i+1 for i, (word, _) in enumerate(vocab.most_common(vocab_size-1))}
    word_to_idx['<PAD>'] = 0
    
    encoded_texts = []
    for text in all_texts:
        tokens = text.lower().split()[:max_len]
        encoded = [word_to_idx.get(token, 0) for token in tokens]
        encoded += [0] * (max_len - len(encoded))
        encoded_texts.append(encoded)
    
    X_train, X_test, y_train, y_test = train_test_split(
        encoded_texts, all_labels, test_size=0.3, random_state=42, stratify=all_labels
    )
    
    print(f"\nTrain size: {len(X_train)}")
    print(f"Test size: {len(X_test)}")
    print(f"Train - Class 0: {sum(1 for l in y_train if l == 0)}, Class 1: {sum(1 for l in y_train if l == 1)}")
    print(f"Test - Class 0: {sum(1 for l in y_test if l == 0)}, Class 1: {sum(1 for l in y_test if l == 1)}")
    
    train_dataset = TensorDataset(
        torch.tensor(X_train, dtype=torch.long),
        torch.tensor(y_train, dtype=torch.long)
    )
    test_dataset = TensorDataset(
        torch.tensor(X_test, dtype=torch.long),
        torch.tensor(y_test, dtype=torch.long)
    )
    
    return train_dataset, test_dataset, vocab_size, max_len

In [4]:
def train_model(model, train_loader, test_loader, epochs=5, model_name="Model"):
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss()
    
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        
        for batch_texts, batch_labels in train_loader:
            optimizer.zero_grad()
            
            if model_name == "TAM":
                outputs, _, _ = model(batch_texts)
            else:
                outputs = model(batch_texts)
            
            loss = criterion(outputs, batch_labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        
        model.eval()
        all_preds, all_labels = [], []
        with torch.no_grad():
            for batch_texts, batch_labels in test_loader:
                if model_name == "TAM":
                    outputs, _, _ = model(batch_texts)
                else:
                    outputs = model(batch_texts)
                preds = torch.argmax(outputs, dim=1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(batch_labels.cpu().numpy())
        
        acc = accuracy_score(all_labels, all_preds)
        f1 = f1_score(all_labels, all_preds, average='weighted')
        print(f"{model_name} - Epoch {epoch+1}, Loss: {total_loss:.4f}, Accuracy: {acc:.4f}, F1: {f1:.4f}")
    
    return acc, f1

In [5]:
def evaluate_topic_aspect(model, test_loader):
    model.eval()
    topic_vectors, aspect_vectors = [], []
    with torch.no_grad():
        for batch_texts, _ in test_loader:
            _, topic, aspect = model(batch_texts)
            topic_vectors.append(topic.cpu().numpy())
            aspect_vectors.append(aspect.cpu().numpy())
    
    topic_np = np.vstack(topic_vectors)
    aspect_np = np.vstack(aspect_vectors)
    
    topic_var = topic_np.var(axis=0).mean()
    aspect_var = aspect_np.var(axis=0).mean()
    
    return topic_var, aspect_var

In [6]:
train_dataset, test_dataset, vocab_size, max_len = prepare_data()

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print("Training SimpleCNN...")
cnn_model = SimpleCNN(vocab_size, 100, 2)
cnn_acc, cnn_f1 = train_model(cnn_model, train_loader, test_loader, epochs=15, model_name="CNN")

print("\nTraining TAM...")
tam_model = ConceptualTAM(vocab_size, 100, max_len, num_topics=10, num_aspects=5, num_classes=2)
tam_acc, tam_f1 = train_model(tam_model, train_loader, test_loader, epochs=7, model_name="TAM")

print("\nAnalyzing TAM components...")
topic_var, aspect_var = evaluate_topic_aspect(tam_model, test_loader)

Total dataset size: 6000
Class 0: 3003 samples
Class 1: 2997 samples

Train size: 4200
Test size: 1800
Train - Class 0: 2102, Class 1: 2098
Test - Class 0: 901, Class 1: 899
Training SimpleCNN...
CNN - Epoch 1, Loss: 100.3684, Accuracy: 0.5478, F1: 0.4468
CNN - Epoch 2, Loss: 87.1948, Accuracy: 0.6378, F1: 0.6162
CNN - Epoch 3, Loss: 81.3784, Accuracy: 0.6944, F1: 0.6915
CNN - Epoch 4, Loss: 76.3186, Accuracy: 0.7000, F1: 0.6948
CNN - Epoch 5, Loss: 72.6119, Accuracy: 0.7217, F1: 0.7209
CNN - Epoch 6, Loss: 69.4396, Accuracy: 0.7222, F1: 0.7185
CNN - Epoch 7, Loss: 63.3638, Accuracy: 0.7333, F1: 0.7330
CNN - Epoch 8, Loss: 61.3682, Accuracy: 0.7433, F1: 0.7418
CNN - Epoch 9, Loss: 57.0598, Accuracy: 0.7456, F1: 0.7449
CNN - Epoch 10, Loss: 54.6970, Accuracy: 0.7450, F1: 0.7428
CNN - Epoch 11, Loss: 51.1898, Accuracy: 0.7461, F1: 0.7461
CNN - Epoch 12, Loss: 46.5939, Accuracy: 0.7439, F1: 0.7418
CNN - Epoch 13, Loss: 46.4821, Accuracy: 0.7506, F1: 0.7504
CNN - Epoch 14, Loss: 42.2829, A

In [7]:
print("="*60)
print("MODEL COMPARISON RESULTS")
print("="*60)

print(f"\nACCURACY:")
print(f"SimpleCNN: {cnn_acc:.4f}")
print(f"TAM:       {tam_acc:.4f}")
print(f"Difference: {abs(cnn_acc - tam_acc):.4f} ({'CNN better' if cnn_acc > tam_acc else 'TAM better'})")

print(f"\nF1:")
print(f"SimpleCNN: {cnn_f1:.4f}")
print(f"TAM:       {tam_f1:.4f}")
print(f"Difference: {abs(cnn_f1 - tam_f1):.4f} ({'CNN better' if cnn_f1 > tam_f1 else 'TAM better'})")

print(f"\nTAM:")
print(f"Topic vector variance:  {topic_var:.6f}")
print(f"Aspect vector variance: {aspect_var:.6f}")

print(f"\nPARAMETERS:")
cnn_params = sum(p.numel() for p in cnn_model.parameters())
tam_params = sum(p.numel() for p in tam_model.parameters())
print(f"SimpleCNN: {cnn_params:,} parameters")
print(f"TAM:       {tam_params:,} parameters")

MODEL COMPARISON RESULTS

ACCURACY:
SimpleCNN: 0.7417
TAM:       0.7633
Difference: 0.0217 (TAM better)

F1:
SimpleCNN: 0.7404
TAM:       0.7633
Difference: 0.0229 (TAM better)

TAM:
Topic vector variance:  0.000000
Aspect vector variance: 2.492475

PARAMETERS:
SimpleCNN: 238,786 parameters
TAM:       304,239 parameters


В целом, можно сказать, что TAM обучается заметно эффективнее по обеим классическим метрикам, чем базовая CNN, но обучается она в разы дольше, хоть и параметров всего в ~1.5 раза больше. 