***IMPORTS***

In [1]:
import numpy as np
import matplotlib.pyplot as plt

from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.data import DataLoader, random_split, Dataset



***GENERATE 6 DIMENSIONAL DATA***

In [2]:
def rotation_matrix_6D(theta, dim1, dim2):
    """
    Create a 6D rotation matrix to rotate by angle theta in the plane defined by dim1 and dim2.
    
    Parameters:
    - theta: Rotation angle in radians.
    - dim1, dim2: Dimensions (0-5) in which rotation should occur.
    
    Returns:
    - 6x6 rotation matrix.
    """
    R2 = np.array([[np.cos(theta), -np.sin(theta)],
                   [np.sin(theta), np.cos(theta)]])
    
    R6 = np.eye(6) # create identity matrix of 6*6
    R6[dim1, dim1], R6[dim1, dim2], R6[dim2, dim1], R6[dim2, dim2] = R2.flatten()
    
    return R6

In [3]:
def generate_data_6D(n_samples_per_class=100, n_classes=5):
    """
    Generate data for n_classes in a 6D space.

    Parameters:
    - n_samples_per_class: Number of samples per class.
    - n_classes: Number of classes.

    Returns:
    - X: Data points.
    - y: Class targets.
    """
    
    # Define means for the classes in 6D space
    means = [tuple(i*2 for _ in range(6)) for i in range(n_classes)]

    # Define a covariance matrix for 6D
    # Base covariance defines how spread out the data is in that dimension
    base_covariance = np.array([[0.2, 0, 0, 0, 0, 0], 
                                [0, 5, 0, 0, 0, 0],
                                [0, 0, 0.2, 0, 0, 0],
                                [0, 0, 0, 5, 0, 0],
                                [0, 0, 0, 0, 0.2, 0],
                                [0, 0, 0, 0, 0, 5]])
    
    theta = np.pi / 4  # or any other desired angle
    # R = rotation_matrix_6D(theta, 0, 1)  # for rotation in the plane of the first two dimensions
    
    R1 = rotation_matrix_6D(theta, 0, 1)
    R2 = rotation_matrix_6D(theta, 2, 3)
    R3 = rotation_matrix_6D(theta, 4, 5)
    covariance = R1 @ R2 @ R3 @ base_covariance @ R3.T @ R2.T @ R1.T
    
    X = np.empty((0, 6))
    y = np.empty((0,))

    for idx, mean in enumerate(means):
        class_data = np.random.multivariate_normal(mean, covariance, n_samples_per_class)
        X = np.vstack([X, class_data])
        y = np.hstack([y, [idx]*n_samples_per_class])

    return X, y

In [4]:
# Generate and plot the data
n_classes = 5
X, y = generate_data_6D(n_samples_per_class = 400, n_classes=n_classes)

In [5]:
class CustomDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [6]:
# Split the data into primary (even classes) and transfer (odd classes) datasets
X_primary = X[np.isin(y, [0, 2, 4])]
y_primary = y[np.isin(y, [0, 2, 4])]


# Assuming X_primary and y_primary are numpy arrays
X_primary = torch.tensor(X_primary, dtype=torch.float32)
y_primary = torch.tensor(y_primary, dtype=torch.int64)

primary_dataset = CustomDataset(X_primary, y_primary)

X_transfer = X[np.isin(y, [1, 3])]
y_transfer = y[np.isin(y, [1, 3])]

transfer_dataset = CustomDataset(X_transfer, y_transfer)

# Assuming X_primary and y_primary are numpy arrays
X_transfer = torch.tensor(X_transfer, dtype=torch.float32)
y_transfer = torch.tensor(y_transfer, dtype=torch.int64)


# Splitting ratios
train_ratio = 0.7
val_ratio = 0.15
test_ratio = 0.15

# Calculate sizes for primary dataset split
primary_train_size = int(train_ratio * len(primary_dataset))
primary_val_size = int(val_ratio * len(primary_dataset))
primary_test_size = len(primary_dataset) - primary_train_size - primary_val_size

# Split the primary dataset into training, validation, and test sets
primary_train_dataset, primary_val_dataset, primary_test_dataset = random_split(primary_dataset, [primary_train_size, primary_val_size, primary_test_size])

# Calculate sizes for transfer dataset split
transfer_train_size = int(train_ratio * len(transfer_dataset))
transfer_val_size = int(val_ratio * len(transfer_dataset))
transfer_test_size = len(transfer_dataset) - transfer_train_size - transfer_val_size

# Split the transfer dataset into training, validation, and test sets
transfer_train_dataset, transfer_val_dataset, transfer_test_dataset = random_split(transfer_dataset, [transfer_train_size, transfer_val_size, transfer_test_size])

# Create DataLoader objects
batch_size = 32

primary_train_loader = DataLoader(primary_train_dataset, batch_size=batch_size, shuffle=True)
primary_val_loader = DataLoader(primary_val_dataset, batch_size=batch_size, shuffle=False)
primary_test_loader = DataLoader(primary_test_dataset, batch_size=batch_size, shuffle=False)

transfer_train_loader = DataLoader(transfer_train_dataset, batch_size=batch_size, shuffle=True)
transfer_val_loader = DataLoader(transfer_val_dataset, batch_size=batch_size, shuffle=False)
transfer_test_loader = DataLoader(transfer_test_dataset, batch_size=batch_size, shuffle=False)

In [7]:
# Define the network architecture
class DenseNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, dropout_prob = 0.15):
        super(DenseNet, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc4 = nn.Linear(hidden_dim, hidden_dim)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc5 = nn.Linear(hidden_dim, hidden_dim)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc6 = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = F.relu(self.fc3(x))
        x = self.dropout(x)
        x = F.relu(self.fc4(x))
        x = self.dropout(x)
        x = F.relu(self.fc5(x))
        x = self.dropout(x)
        x = self.fc6(x)
        return x

In [8]:
# Hyperparameters
input_dim = 6
hidden_dim = 128
output_dim = 5
output_dim_trnsfr = 5

In [9]:
# Initialize the network and optimizer
model = DenseNet(input_dim, hidden_dim, output_dim)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)

optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [10]:
# Training loop
epochs = 50
patience = 5  # Number of epochs to wait before stopping
wait = 0
best_loss = float('inf')

for epoch in range(epochs):
    # Training phase
    model.train()
    train_loss = 0.0
    for data, target in primary_train_loader:
        optimizer.zero_grad()
        output = model(data.to(device))
        loss = criterion(output.to(device), target.to(device))
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    
    train_loss /= len(primary_train_loader)
    
    # Validation phase
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for data, target in primary_val_loader:
            output = model(data.to(device))
            loss = criterion(output.to(device), target.to(device))
            val_loss += loss.item()
    
    val_loss /= len(primary_val_loader)
    print(f"Epoch {epoch+1}/{epochs} - Training Loss: {train_loss}, Validation Loss: {val_loss}")
    
    # Check early stopping condition
    if val_loss < best_loss:
        best_loss = val_loss
        wait = 0
    else:
        wait += 1
        if wait >= patience:
            print("Early stopping!")
            break

Epoch 1/50 - Training Loss: 1.0212677739284657, Validation Loss: 0.4999668796857198
Epoch 2/50 - Training Loss: 0.462572470859245, Validation Loss: 0.30311032632986706
Epoch 3/50 - Training Loss: 0.3233858992656072, Validation Loss: 0.16625078146656355
Epoch 4/50 - Training Loss: 0.23936856344894128, Validation Loss: 0.10227527096867561
Epoch 5/50 - Training Loss: 0.1772726975657322, Validation Loss: 0.08754480071365833
Epoch 6/50 - Training Loss: 0.13768492031980445, Validation Loss: 0.08644431084394455
Epoch 7/50 - Training Loss: 0.1399601939375754, Validation Loss: 0.07298677352567513
Epoch 8/50 - Training Loss: 0.08649039068431766, Validation Loss: 0.030614261360218126
Epoch 9/50 - Training Loss: 0.0679895911814162, Validation Loss: 0.029137626057490706
Epoch 10/50 - Training Loss: 0.057420522802405886, Validation Loss: 0.015215820089603463
Epoch 11/50 - Training Loss: 0.03354192271621691, Validation Loss: 0.011264036254336437
Epoch 12/50 - Training Loss: 0.07394281973096507, Valid

In [11]:
class TransferModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim_transfer, dropout_prob=0.15):
        super(TransferModel, self).__init__()
        # Copy the layers from the pretrained model
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, hidden_dim)
        self.fc5 = nn.Linear(hidden_dim, hidden_dim)
        
        # Add the new final layer
        self.fc_final = nn.Linear(hidden_dim, output_dim_transfer)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        x = F.relu(self.fc5(x))
        x = self.fc_final(x)
        return x

In [12]:
# Assuming pretrained_model is an instance of DenseNet that has been trained
pretrained_model = DenseNet(input_dim, hidden_dim, output_dim)

In [13]:
# Create the transfer model
transfer_model = TransferModel(input_dim, hidden_dim, output_dim_trnsfr)
transfer_model.to(device)

# Copy weights from the pretrained model
with torch.no_grad():
    transfer_model.fc1.weight.copy_(pretrained_model.fc1.weight)
    transfer_model.fc1.bias.copy_(pretrained_model.fc1.bias)
    transfer_model.fc2.weight.copy_(pretrained_model.fc2.weight)
    transfer_model.fc2.bias.copy_(pretrained_model.fc2.bias)
    transfer_model.fc3.weight.copy_(pretrained_model.fc3.weight)
    transfer_model.fc3.bias.copy_(pretrained_model.fc3.bias)
    transfer_model.fc4.weight.copy_(pretrained_model.fc4.weight)
    transfer_model.fc4.bias.copy_(pretrained_model.fc4.bias)
    transfer_model.fc5.weight.copy_(pretrained_model.fc5.weight)
    transfer_model.fc5.bias.copy_(pretrained_model.fc5.bias)

# The final layer (fc_final) will be randomly initialized by default

In [14]:
# Initialize the network and optimize

optimizer = optim.Adam(transfer_model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [15]:
# Training loop
epochs = 200
patience = 5  # Number of epochs to wait before stopping
wait = 0
best_loss = float('inf')

for epoch in range(epochs):
    # Training phase
    transfer_model.train()
    train_loss = 0.0
    for data, target in transfer_train_loader:
        optimizer.zero_grad()
        output = transfer_model(data.float().to(device))
        loss = criterion(output.float().to(device), target.long().to(device))
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    
    train_loss /= len(transfer_train_loader)
    
    # Validation phase
    transfer_model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for data, target in transfer_val_loader:
            output = transfer_model(data.float().to(device))
            loss = criterion(output.float().to(device), target.long().to(device))
            val_loss += loss.item()
    
    val_loss /= len(transfer_val_loader)
    print(f"Epoch {epoch+1}/{epochs} - Training Loss: {train_loss}, Validation Loss: {val_loss}")
    
    # Check early stopping condition
    if val_loss < best_loss:
        best_loss = val_loss
        wait = 0
    else:
        wait += 1
        if wait >= patience:
            print("Early stopping!")
            break

Epoch 1/200 - Training Loss: 1.0878548986381955, Validation Loss: 0.6431633681058884
Epoch 2/200 - Training Loss: 0.5192563136418661, Validation Loss: 0.3546052426099777
Epoch 3/200 - Training Loss: 0.21002500131726265, Validation Loss: 0.09400339983403683
Epoch 4/200 - Training Loss: 0.07645487904341684, Validation Loss: 0.06693738931789994
Epoch 5/200 - Training Loss: 0.08184411134829538, Validation Loss: 0.022570077562704682
Epoch 6/200 - Training Loss: 0.024185269573030785, Validation Loss: 0.009710656013339758
Epoch 7/200 - Training Loss: 0.006573067850291004, Validation Loss: 0.002504124218830839
Epoch 8/200 - Training Loss: 0.006757996282734287, Validation Loss: 0.0013563911925302818
Epoch 9/200 - Training Loss: 0.004063306699713899, Validation Loss: 0.0010105907203978859
Epoch 10/200 - Training Loss: 0.0011167156654765778, Validation Loss: 0.0007625091056979727
Epoch 11/200 - Training Loss: 0.0006338393072332514, Validation Loss: 0.00029867679586459417
Epoch 12/200 - Training L

In [18]:
# Test loop
transfer_model.eval()  # Set the model to evaluation mode
test_loss = 0.0
correct = 0
total = 0

with torch.no_grad():  # No gradients required during testing
    for data, target in transfer_test_loader:
        data, target = data.float().to(device), target.long().to(device)
        output = transfer_model(data)
        loss = criterion(output, target)
        
        test_loss += loss.item()
        
        _, predicted = output.max(1)  # Get index of the max value for each sample
        total += target.size(0)  # Increment the total count
        correct += predicted.eq(target).sum().item()  # Increment the correct count

test_loss /= len(transfer_test_loader)
accuracy = 100. * correct / total

print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {accuracy:.2f}%")

Test Loss: 0.0001
Test Accuracy: 100.00%
