In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
import h5py

In [2]:
# Load the .h5 file into memory once
h5_file_path_train = r"F:\DS_project\m1613658\random\training.h5"
h5_file_path_test = r"F:\DS_project\m1613658\random\testing.h5"

# Open the H5 files
h5_train = h5py.File(h5_file_path_train, 'r')
h5_test = h5py.File(h5_file_path_test, 'r')

# Extract datasets
train_sen1_data = h5_train['sen1']
train_sen2_data = h5_train['sen2']
train_labels = h5_train['label']

test_sen1_data = h5_test['sen1']
test_sen2_data = h5_test['sen2']
test_labels = h5_test['label']


In [3]:
class SatelliteDataset(Dataset):
    def __init__(self, sen1_data, sen2_data, labels):
        self.sen1_data = sen1_data
        self.sen2_data = sen2_data
        self.labels = labels

    def __len__(self):
        return self.labels.shape[0]

    def __getitem__(self, idx):
        sen1_image = self.sen1_data[idx]
        sen2_image = self.sen2_data[idx]
        label = self.labels[idx]

        # Convert to PyTorch tensors
        sen1_image = torch.tensor(sen1_image, dtype=torch.float32).permute(2, 0, 1)
        sen2_image = torch.tensor(sen2_image, dtype=torch.float32).permute(2, 0, 1)

        # Convert one-hot encoded label to class index
        label = torch.tensor(label, dtype=torch.float32)
        label = torch.argmax(label).long()

        return sen1_image, sen2_image, label


In [4]:
# Create datasets
train_dataset = SatelliteDataset(train_sen1_data, train_sen2_data, train_labels)
test_dataset = SatelliteDataset(test_sen1_data, test_sen2_data, test_labels)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)


In [5]:
class ConvNet(nn.Module):
    def __init__(self, num_classes=17):
        super(ConvNet, self).__init__()

        # Sentinel-1 branch
        self.sen1_conv1 = nn.Conv2d(8, 32, kernel_size=3, padding=1)
        self.sen1_dropout1 = nn.Dropout(p=0.25)
        self.sen1_conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.sen1_dropout2 = nn.Dropout(p=0.25)
        self.sen1_pool = nn.MaxPool2d(2, 2)

        # Sentinel-2 branch
        self.sen2_conv1 = nn.Conv2d(10, 32, kernel_size=3, padding=1)
        self.sen2_dropout1 = nn.Dropout(p=0.25)
        self.sen2_conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.sen2_dropout2 = nn.Dropout(p=0.25)
        self.sen2_pool = nn.MaxPool2d(2, 2)

        # Fully connected layers after concatenation
        self.fc1 = nn.Linear(64 * 16 * 16 * 2, 128)
        self.fc1_dropout = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(128, 64)
        self.fc2_dropout = nn.Dropout(p=0.5)
        self.fc3 = nn.Linear(64, num_classes)

    def forward(self, sen1, sen2):
        # Sentinel-1 forward pass
        x1 = F.relu(self.sen1_conv1(sen1))
        x1 = self.sen1_dropout1(x1)
        x1 = self.sen1_pool(F.relu(self.sen1_conv2(x1)))
        x1 = self.sen1_dropout2(x1)
        x1 = x1.view(x1.size(0), -1)

        # Sentinel-2 forward pass
        x2 = F.relu(self.sen2_conv1(sen2))
        x2 = self.sen2_dropout1(x2)
        x2 = self.sen2_pool(F.relu(self.sen2_conv2(x2)))
        x2 = self.sen2_dropout2(x2)
        x2 = x2.view(x2.size(0), -1)

        # Concatenate both branches
        x = torch.cat((x1, x2), dim=1)

        # Fully connected layers
        x = F.relu(self.fc1(x))
        x = self.fc1_dropout(x)
        x = F.relu(self.fc2(x))
        x = self.fc2_dropout(x)
        x = self.fc3(x)

        return x


In [6]:
# Training function with visualization
def train_model(model, train_loader, criterion, optimizer, num_epochs, device='cpu'):
    model.to(device)
    model.train()

    train_losses = []  # List to store training loss for visualization

    for epoch in range(num_epochs):
        running_loss = 0.0
        epoch_loss = 0.0
        for i, (sen1, sen2, labels) in enumerate(train_loader):
            sen1, sen2, labels = sen1.to(device), sen2.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(sen1, sen2)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            epoch_loss += loss.item()

            if i % 100 == 99:  # Print every 100 mini-batches
                print(f'Epoch [{epoch + 1}/{num_epochs}], Step [{i + 1}/{len(train_loader)}], Loss: {running_loss / 100:.4f}')
                running_loss = 0.0

        # Store the average loss for the epoch
        epoch_loss /= len(train_loader)
        train_losses.append(epoch_loss)
        print(f'Epoch [{epoch + 1}/{num_epochs}] Average Loss: {epoch_loss:.4f}')

    print('Training complete')

    # Visualization of training loss
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, len(train_losses) + 1), train_losses, marker='o', label='Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training Loss Over Epochs')
    plt.legend()
    plt.grid()
    plt.show()


In [7]:
# Validation function with visualization
def evaluate_model(model, test_loader, criterion, num_epochs, device='cpu'):
    model.to(device)
    model.eval()

    val_losses = []  # List to store validation loss for visualization
    val_accuracies = []  # List to store validation accuracy for visualization

    
    for epoch in range(num_epochs):
        total_loss = 0.0
        correct_predictions = 0
        total_samples = 0

        with torch.no_grad():
            for sen1, sen2, labels in test_loader:
                sen1, sen2, labels = sen1.to(device), sen2.to(device), labels.to(device)
                outputs = model(sen1, sen2)

                # Calculate loss
                loss = criterion(outputs, labels)
                total_loss += loss.item()

                # Calculate accuracy
                predicted = torch.argmax(outputs, dim=1)
                correct_predictions += (predicted == labels).sum().item()
                total_samples += labels.size(0)

        avg_loss = total_loss / len(test_loader)
        accuracy = correct_predictions / total_samples

        val_losses.append(avg_loss)
        val_accuracies.append(accuracy)

        print(f'Epoch [{epoch + 1}/{num_epochs}] Average Test Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}')

    # Visualization of validation metrics
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, len(val_losses) + 1), val_losses, marker='o', label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Validation Loss Over Epochs')
    plt.legend()
    plt.grid()
    plt.show()

    plt.figure(figsize=(10, 6))
    plt.plot(range(1, len(val_accuracies) + 1), val_accuracies, marker='o', color='orange', label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Validation Accuracy Over Epochs')
    plt.legend()
    plt.grid()
    plt.show()


In [8]:
# Initialize model, loss function, and optimizer
model = ConvNet(num_classes=17)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [9]:
# Train the model
train_model(model, train_loader, criterion, optimizer, num_epochs=20, device='cpu')

Epoch [1/20], Step [100/2507], Loss: 2.0887


KeyboardInterrupt: 

In [None]:
# Evaluate the model
evaluate_model(model, test_loader, criterion, num_epochs=20, device='cpu')