In [55]:
from torch.utils.data import DataLoader, TensorDataset
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

In [56]:
def add_gaussian_noise(data_tensor, std=0.1):
    """Adds Gaussian noise to a PyTorch tensor."""
    noise = torch.randn_like(data_tensor) * std
    return data_tensor + noise

In [57]:
class SimpleNN(nn.Module):
    def __init__(self, input_size, output_size):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(input_size, output_size)

    def forward(self, x):
        x = self.fc1(x)
        return x

In [23]:
iris = load_iris()
# sufle dataset before split
np.random.seed(42)
indices = np.random.permutation(len(iris.data))
iris.data = iris.data[indices]
iris.target = iris.target[indices]
X = iris.data
y = iris.target

X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.long)

full_dataset = TensorDataset(X_tensor, y_tensor)

# Split data into training and testing sets
train_data, test_data = train_test_split(full_dataset, test_size=0.2, random_state=42)

train_data, validation_data = train_test_split(train_data, test_size=0.2, random_state=42)
print(X_tensor.shape, y_tensor.shape)

batch_size_train = 64
batch_size = 1024

train_loader = DataLoader(train_data, batch_size=batch_size_train, shuffle=True)
val_loader = DataLoader(validation_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

torch.Size([150, 4]) torch.Size([150])


In [28]:
input_size = 4
output_size = 3
model = SimpleNN(input_size, output_size)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [29]:
num_epochs = 1500
best_val_loss = float('inf')
for epoch in range(num_epochs):
    # Training
    model.train()
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_function(outputs, labels)
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
        # Evaluate on validation set
        model.eval()
        with torch.no_grad():
            val_losses = []
            for val_inputs, val_labels in val_loader:
                val_outputs = model(val_inputs)
                val_loss = loss_function(val_outputs, val_labels)
                val_losses.append(val_loss.item())
            avg_val_loss = np.mean(val_losses)
            print(f'Validation Loss: {avg_val_loss:.4f}')
            if avg_val_loss < best_val_loss:
                best_val_loss = avg_val_loss
                torch.save(model.state_dict(), 'best_model.pth')
                print('Best model saved!')


Epoch [10/1500], Loss: 1.1172
Validation Loss: 1.0355
Best model saved!
Epoch [20/1500], Loss: 1.0169
Validation Loss: 1.0044
Best model saved!
Epoch [30/1500], Loss: 0.8882
Validation Loss: 0.9597
Best model saved!
Epoch [40/1500], Loss: 0.7929
Validation Loss: 0.9178
Best model saved!
Epoch [50/1500], Loss: 0.8094
Validation Loss: 0.8965
Best model saved!
Epoch [60/1500], Loss: 0.7888
Validation Loss: 0.8818
Best model saved!
Epoch [70/1500], Loss: 0.6870
Validation Loss: 0.8675
Best model saved!
Epoch [80/1500], Loss: 0.6844
Validation Loss: 0.8736
Epoch [90/1500], Loss: 0.6467
Validation Loss: 0.8644
Best model saved!
Epoch [100/1500], Loss: 0.6725
Validation Loss: 0.8553
Best model saved!
Epoch [110/1500], Loss: 0.6672
Validation Loss: 0.8475
Best model saved!
Epoch [120/1500], Loss: 0.6257
Validation Loss: 0.8262
Best model saved!
Epoch [130/1500], Loss: 0.6366
Validation Loss: 0.8159
Best model saved!
Epoch [140/1500], Loss: 0.6313
Validation Loss: 0.8030
Best model saved!
Epoch

In [30]:
# Assuming you have loaded the best_model.pth after training is complete
best_model_path = 'best_model.pth'
model.load_state_dict(torch.load(best_model_path))
model.eval()

correct = 0
total = 0
with torch.no_grad():
    for test_inputs, test_labels in test_loader:
        test_outputs = model(test_inputs)
        _, predicted = torch.max(test_outputs.data, 1)
        total += test_labels.size(0)
        correct += (predicted == test_labels).sum().item()

accuracy = 100 * correct / total
print(f'Accuracy of the model on the test data: {accuracy:.2f}%')

Accuracy of the model on the test data: 100.00%


test now with half of the dataset of size

In [31]:
iris = load_iris()
# shuffle dataset before split
np.random.seed(42)
indices = np.random.permutation(len(iris.data))
iris.data = iris.data[indices]
iris.target = iris.target[indices]
X = iris.data
y = iris.target

X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.long)

full_dataset = TensorDataset(X_tensor, y_tensor)

# split with label and unlabel data. Assume 20% labeled data
label_data, unlabel_data = train_test_split(full_dataset, test_size=0.8, random_state=42)

# Split data into training and testing sets
train_data, test_data = train_test_split(label_data, test_size=0.2, random_state=42)

train_data, validation_data = train_test_split(train_data, test_size=0.2, random_state=42)
print(X_tensor.shape, y_tensor.shape)

batch_size_train = 64
batch_size = 1024

train_loader = DataLoader(train_data, batch_size=batch_size_train, shuffle=True)
val_loader = DataLoader(validation_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)
unlabeled_loader = DataLoader(unlabel_data, batch_size=batch_size, shuffle=True)

torch.Size([150, 4]) torch.Size([150])


In [32]:
input_size = 4
output_size = 3
model = SimpleNN(input_size, output_size)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [44]:
num_epochs = 1500
best_val_loss = float('inf')
for epoch in range(num_epochs):
    # Training
    model.train()
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = loss_function(outputs, labels)
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
        # Evaluate on validation set
        model.eval()
        with torch.no_grad():
            val_losses = []
            for val_inputs, val_labels in val_loader:
                val_outputs = model(val_inputs)
                val_loss = loss_function(val_outputs, val_labels)
                val_losses.append(val_loss.item())
            avg_val_loss = np.mean(val_losses)
            print(f'Validation Loss: {avg_val_loss:.4f}')
            if avg_val_loss < best_val_loss:
                best_val_loss = avg_val_loss
                torch.save(model.state_dict(), 'best_model_label.pth')
                print('Best model saved!')


Epoch [10/1500], Loss: 0.4211
Validation Loss: 0.5308
Best model saved!
Epoch [20/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [30/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [40/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [50/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [60/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [70/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [80/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [90/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [100/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [110/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [120/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [130/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [140/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [150/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [160/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [170/1500], Loss: 0.4211
Validation Loss: 0.5308
Epoch [180/1500], Loss: 0.4211
Validation Loss: 0.5308
E

In [34]:
# Assuming you have loaded the best_model.pth after training is complete
best_model_path = 'best_model_label.pth'
model.load_state_dict(torch.load(best_model_path))
model.eval()

correct = 0
total = 0
with torch.no_grad():
    for test_inputs, test_labels in test_loader:
        test_outputs = model(test_inputs)
        _, predicted = torch.max(test_outputs.data, 1)
        total += test_labels.size(0)
        correct += (predicted == test_labels).sum().item()

accuracy = 100 * correct / total
print(f'Accuracy of the model on the test data: {accuracy:.2f}%')

Accuracy of the model on the test data: 66.67%


In [63]:
# Assuming you have loaded the best_model.pth after training is complete
best_model_path = 'best_model_label.pth'
model.load_state_dict(torch.load(best_model_path))
model.eval()

correct = 0
total = 0
with torch.no_grad():
    for unlabel_inputs, unlabel_labels in unlabeled_loader:
        unlabel_outputs = model(unlabel_inputs)
        _, predicted = torch.max(unlabel_outputs.data, 1)
        total += unlabel_labels.size(0)
        correct += (predicted == unlabel_labels).sum().item()

accuracy = 100 * correct / total
print(f'Accuracy of the model on the unlabel data: {accuracy:.2f}%')

Accuracy of the model on the unlabel data: 79.17%


try using mean teacher

In [37]:
import copy

In [58]:
def update_teacher_weights(student_model, teacher_model, alpha=0.99):
    for teacher_param, student_param in zip(teacher_model.parameters(), student_model.parameters()):
        teacher_param.data.mul_(alpha).add_(student_param.data, alpha=1 - alpha)

In [64]:
input_size = 4
output_size = 3
student_model = SimpleNN(input_size, output_size)
# Instantiate the teacher model with the same architecture
teacher_model = copy.deepcopy(student_model)

# The teacher model should not have its gradients calculated
for param in teacher_model.parameters():
    param.requires_grad = False
supervised_loss_fn = nn.CrossEntropyLoss()
consistency_loss_fn = nn.MSELoss()
optimizer = optim.Adam(student_model.parameters(), lr=0.001)
alpha = 0.99  # for exponential moving average
lambda_u = 0.3  # weight for unlabeled loss

In [66]:
from itertools import cycle

In [None]:
# Training loop
num_epochs = 1000
best_val_loss = float('inf')
for epoch in range(num_epochs):
    student_model.train()
    for (labeled_data, labeled_labels), (unlabeled_data,_) in zip(cycle(train_loader), unlabeled_loader):
        # Supervised loss
        labeled_outputs = student_model(labeled_data)
        supervised_loss = supervised_loss_fn(labeled_outputs, labeled_labels)
        
        # 1. Weakly augmented view for the TEACHER
        unlabeled_data_teacher = add_gaussian_noise(unlabeled_data, std=0.05)
        
        # 2. Strongly augmented view for the STUDENT
        unlabeled_data_student = add_gaussian_noise(unlabeled_data, std=0.15)
        
        # Consistency loss
        with torch.no_grad():
            teacher_outputs = teacher_model(unlabeled_data_teacher)
        student_outputs_unlabeled = student_model(unlabeled_data_student)
        consistency_loss = consistency_loss_fn(student_outputs_unlabeled, teacher_outputs)

        total_loss = supervised_loss + lambda_u * consistency_loss
        
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()

        # Update teacher model's weights using EMA
        update_teacher_weights(student_model, teacher_model)
        
        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss .item():.4f}')
            # Evaluate on validation set
            student_model.eval()
            with torch.no_grad():
                val_losses = []
                for val_inputs, val_labels in val_loader:
                    val_outputs = student_model(val_inputs)
                    val_loss = loss_function(val_outputs, val_labels)
                    val_losses.append(val_loss.item())
                avg_val_loss = np.mean(val_losses)
                print(f'Validation Loss: {avg_val_loss:.4f}')
                if avg_val_loss < best_val_loss:
                    best_val_loss = avg_val_loss
                    torch.save(student_model.state_dict(), 'best_model_label_and_unlabel.pth')
                    print('Best model saved!')


Epoch [10/1000], Loss: 1.2190
Validation Loss: 1.5891
Best model saved!
Epoch [20/1000], Loss: 1.1657
Validation Loss: 1.5130
Best model saved!
Epoch [30/1000], Loss: 1.1193
Validation Loss: 1.4467
Best model saved!
Epoch [40/1000], Loss: 1.0793
Validation Loss: 1.3888
Best model saved!
Epoch [50/1000], Loss: 1.0418
Validation Loss: 1.3376
Best model saved!
Epoch [60/1000], Loss: 1.0052
Validation Loss: 1.2921
Best model saved!
Epoch [70/1000], Loss: 0.9700
Validation Loss: 1.2502
Best model saved!
Epoch [80/1000], Loss: 0.9366
Validation Loss: 1.2105
Best model saved!
Epoch [90/1000], Loss: 0.9044
Validation Loss: 1.1729
Best model saved!
Epoch [100/1000], Loss: 0.8738
Validation Loss: 1.1370
Best model saved!
Epoch [110/1000], Loss: 0.8433
Validation Loss: 1.1026
Best model saved!
Epoch [120/1000], Loss: 0.8161
Validation Loss: 1.0700
Best model saved!
Epoch [130/1000], Loss: 0.7900
Validation Loss: 1.0390
Best model saved!
Epoch [140/1000], Loss: 0.7647
Validation Loss: 1.0096
Best 

In [68]:
# Assuming you have loaded the best_model.pth after training is complete
best_model_path = 'best_model_label_and_unlabel.pth'
model.load_state_dict(torch.load(best_model_path))
model.eval()

correct = 0
total = 0
with torch.no_grad():
    for test_inputs, test_labels in test_loader:
        test_outputs = model(test_inputs)
        _, predicted = torch.max(test_outputs.data, 1)
        total += test_labels.size(0)
        correct += (predicted == test_labels).sum().item()

accuracy = 100 * correct / total
print(f'Accuracy of the model on the test data: {accuracy:.2f}%')

Accuracy of the model on the test data: 100.00%


In [69]:
# Assuming you have loaded the best_model.pth after training is complete
best_model_path = 'best_model_label_and_unlabel.pth'
model.load_state_dict(torch.load(best_model_path))
model.eval()

correct = 0
total = 0
with torch.no_grad():
    for unlabel_inputs, unlabel_labels in unlabeled_loader:
        unlabel_outputs = model(unlabel_inputs)
        _, predicted = torch.max(unlabel_outputs.data, 1)
        total += unlabel_labels.size(0)
        correct += (predicted == unlabel_labels).sum().item()

accuracy = 100 * correct / total
print(f'Accuracy of the model on the unlabel data: {accuracy:.2f}%')

Accuracy of the model on the unlabel data: 95.00%
