<a href="https://colab.research.google.com/github/sujith-kumara/CNN-for-Pavia/blob/master/cnn_pavia.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# Source and destination paths
source_path = '/content/drive/MyDrive/Pavia'
destination_path = '/content/PaviaU'

# Use the !cp command to copy the folder
!cp -r "$source_path" "$destination_path"

# Verify the copied folder
print(f"The folder '{source_path}' has been copied to '{destination_path}'.")


The folder '/content/drive/MyDrive/Pavia' has been copied to '/content/PaviaU'.


In [3]:
!pip install tensorboardX

Collecting tensorboardX
  Downloading tensorboardX-2.6.2.2-py2.py3-none-any.whl (101 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.7/101.7 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tensorboardX
Successfully installed tensorboardX-2.6.2.2


In [4]:
import os
import scipy.io
import scipy.ndimage
import numpy as np
from random import shuffle

import torch

import matplotlib.pyplot as plt
from torch.utils.data import Dataset
from sklearn.decomposition import PCA
from sklearn import preprocessing

import torch.nn as nn
import torch.nn.functional as F

import torch.optim as optim
from datetime import datetime
from tensorboardX import SummaryWriter


In [5]:
# Define global variables
PATCH_SIZE = 17  # Patch size
OUTPUT_CLASSES = 9  # Output classes (representing different land cover types)
TEST_FRAC = 0.50  # Percentage of data used for testing
NEW_DATA_PATH = os.path.join(os.getcwd(), "patch")  # Data storage path; "patch" is the folder name


In [6]:
!mkdir patch

In [7]:
def pca(X, k):  # k is the number of features that should be retained
    newX = np.reshape(X, (-1, X.shape[2]))
    pca = PCA(n_components=k, whiten=True)
    newX = pca.fit_transform(newX)
    newX = np.reshape(newX, (k, X.shape[0], X.shape[1]))
    return newX
def pad(X, margin):
    newX = np.zeros((X.shape[0], X.shape[1]+margin*2, X.shape[2]+margin*2))
    newX[:, margin:X.shape[1]+margin, margin:X.shape[2]+margin] = X
    return newX
# Generate a patch and centerize it
def patch(X, patch_size, height_index, width_index):
    # The slice function is used for slicing
    height_slice = slice(height_index, height_index + patch_size)
    width_slice = slice(width_index, width_index + patch_size)
    patch = X[:, height_slice, width_slice]  # The patch includes all bands
    for i in range(X.shape[0]):
        mean = np.mean(patch[i, :, :])
        patch[i] = patch[i]-mean
    return patch
def standartize(X):
    newX = np.reshape(X, (-1,1))
    scaler = preprocessing.StandardScaler().fit(newX)
    newX = scaler.transform(newX)
    newX = np.reshape(newX, (X.shape[0], X.shape[1], X.shape[2]))
    return newX

In [8]:
# Load Data
def load_data(file_name, data_name, label_name):
    # Original data path
    DATA_PATH = os.path.join(os.getcwd(), file_name)
    data = scipy.io.loadmat(os.path.join(DATA_PATH, data_name))
    data = data[list(data.keys())[-1]]
    label = scipy.io.loadmat(os.path.join(DATA_PATH, label_name))
    label = np.int32(label[list(label.keys())[-1]])
    data = np.transpose(data, (2, 0, 1))  # Move the channel dimension to the front for easier array manipulation
    return data, label


# Generate sliced data and store it
def create_data(X, label):
    for c in range(OUTPUT_CLASSES):
        PATCH, LABEL, TEST_PATCH, TRAIN_PATCH, TEST_LABEL, TRAIN_LABEL = [], [], [], [], [], []
        for h in range(X.shape[1] - PATCH_SIZE + 1):
            for w in range(X.shape[2] - PATCH_SIZE + 1):
                gt = label[h, w]
                if(gt == c + 1):
                    img = patch(X, PATCH_SIZE, h, w)
                    PATCH.append(img)
                    LABEL.append(gt - 1)
        # Shuffle slices
        shuffle(PATCH)
        # Split into test set and training set
        split_size = int(len(PATCH) * TEST_FRAC)
        TEST_PATCH.extend(PATCH[:split_size])  # 0 ~ split_size
        TRAIN_PATCH.extend(PATCH[split_size:])  # split_size ~ len(class)
        TEST_LABEL.extend(LABEL[:split_size])
        TRAIN_LABEL.extend(LABEL[split_size:])
        # Write to folders
        train_dict, test_dict = {}, {}
        train_dict["train_patches"] = TRAIN_PATCH
        train_dict["train_labels"] = TRAIN_LABEL
        file_name = "Training_class(%d).mat" % c
        scipy.io.savemat(os.path.join(NEW_DATA_PATH, file_name), train_dict)
        test_dict["testing_patches"] = TEST_PATCH
        test_dict["testing_labels"] = TEST_LABEL
        file_name = "Testing_class(%d).mat" % c
        scipy.io.savemat(os.path.join(NEW_DATA_PATH, file_name), test_dict)


data, label = load_data("PaviaU", "PaviaU.mat", "PaviaU_gt.mat")
data = standartize(data)
data = pad(data, int((PATCH_SIZE - 1) / 2))
create_data(data, label)


In [9]:
def xZero(X):
    X = X.cpu()
    newX = np.zeros(
        (X.shape[0], X.shape[1], X.shape[2], X.shape[3]))
    newX[:, :, int((X.shape[2]+1)/2), int((X.shape[3]+1)/2)] = X[:,
                                                                 :, int((X.shape[2]+1)/2), int((X.shape[3]+1)/2)]
    newX = torch.from_numpy(newX).double()
    if torch.cuda.is_available():
        newX = newX.cuda()
    return newX

In [10]:
class SeparableConv2d(nn.Module): # Depth wise separable conv
    def __init__(self, in_channels, out_channels, kernel_size=1, stride=1, padding=0, dilation=1, bias=False):
        # Each input channel undergoes its own filter convolution operation
        super(SeparableConv2d, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, in_channels, kernel_size,
                               stride, padding, dilation, groups=in_channels, bias=bias)
        self.pointwise = nn.Conv2d(
            in_channels, out_channels, 1, 1, 0, 1, 1, bias=bias)

    def forward(self, x):
        x = self.conv1(x)
        x = self.pointwise(x)
        return x


In [11]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # in_channels, out_channels, kernel_size, stride (default: 1), padding (default: 0)
        self.conv1 = torch.nn.Sequential(
            SeparableConv2d(103, 16, 1, 1, 0),  # 1*1 convolutional kernel
            nn.ReLU(inplace=True),
            nn.GroupNorm(16, 16)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(103, 256, 1, 1, 0),
            nn.GroupNorm(256, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(0.4),
            SeparableConv2d(256, 256, 3, 1, 1),
            nn.ReLU(inplace=True),
            nn.Dropout(0.4),
            SeparableConv2d(256, 128, 3, 1, 1),
            nn.ReLU(inplace=True),
            nn.Dropout(0.4),
            SeparableConv2d(128, 64, 3, 1, 1),
            nn.ReLU(inplace=True),
            nn.GroupNorm(64, 64)
        )

        self.classifier = nn.Sequential(
            nn.Linear(80*17*17, 2048),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(2048, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(0.2),
            nn.Linear(256, 16),
            nn.LogSoftmax(dim=1)
        )

    def forward(self, x):
        x0 = xZero(x)
        x1 = self.conv1(x0)
        x2 = self.conv2(x)
        x12 = torch.cat((x1, x2), 1)
        xCat = x12.view(-1, self.numFeatures(x12))  # Flatten the feature map into 1D
        output = self.classifier(xCat)
        return output

    def numFeatures(self, x):
        size = x.size()[1:]  # Get the height, width, and depth of the convolutional image
        num = 1
        for s in size:
            num *= s
        return num

    def init_weights(self):  # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                torch.nn.init.xavier_normal_(m.weight.data)
                if m.bias is not None:
                    m.bias.data.zero_()
            elif isinstance(m, nn.GroupNorm):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                torch.nn.init.normal_(m.weight.data, 0, 0.01)
                m.bias.data.zero_()


In [12]:
from torch.utils.data import Dataset

class DataSet(Dataset):
    def __init__(self, path, train, transform=None):
        if train:
            select = "Training"
            patch_type = "train"
        else:
            select = "Testing"
            patch_type = "testing"
        self.tensors = []
        self.labels = []
        self.transform = transform
        # Iterate through each class of tensors and add patches and labels
        # Corresponding to lists
        for file in os.listdir(path):
            if os.path.isfile(os.path.join(path, file)) and select in file:
                temp = scipy.io.loadmat(os.path.join(
                    path, file))  # Load mat dictionary
                # Filter dictionary to keep items without an underscore "_"
                temp = {k: v for k, v in temp.items() if k[0] != '_'}

                for i in range(len(temp[patch_type+"_patches"])):
                    self.tensors.append(temp[patch_type+"_patches"][i])
                    self.labels.append(temp[patch_type+"_labels"][0][i])
        self.tensors = np.array(self.tensors)
        self.labels = np.array(self.labels)

    def __len__(self):
        try:
            if len(self.tensors) != len(self.labels):
                raise Exception(
                    "Lengths of the tensor and labels list are not the same")
        except Exception as e:
            print(e.args[0])
        return len(self.tensors)

    # Return a patch and label
    def __getitem__(self, idx):
        sample = (self.tensors[idx], self.labels[idx])
        sample = (torch.from_numpy(self.tensors[idx]), torch.from_numpy(
            np.array(self.labels[idx])).long())
        return sample
    # Tuple containing a patch image and its corresponding label


In [13]:
def validate(net, data_loader, set_name, classes_name):
    """
    Validate a batch of data, return the confusion matrix and accuracy
    :param net:
    :param data_loader:
    :param set_name:  eg: 'valid' 'train' 'test'
    :param classes_name:
    :return:
    """
    net.eval()
    cls_num = len(classes_name)
    conf_mat = np.zeros([cls_num, cls_num])

    for data in data_loader:
        images, labels = data
        if torch.cuda.is_available():
            images, labels = images.cuda(), labels.cuda()
        outputs = net(images)
        outputs.detach_()
        _, predicted = torch.max(outputs.data, 1)

        # Update the confusion matrix
        for i in range(len(labels)):
            cate_i = labels[i]
            pre_i = predicted[i]
            conf_mat[cate_i, pre_i] += 1.0

    for i in range(cls_num):
        print('class:{:<10}, total num:{:<6}, correct num:{:<5}  Recall: {:.2%} Precision: {:.2%}'.format(
            classes_name[i], np.sum(
                conf_mat[i, :]), conf_mat[i, i], conf_mat[i, i] / (1 + np.sum(conf_mat[i, :])),
            conf_mat[i, i] / (1 + np.sum(conf_mat[:, i]))))

    print('{} set Accuracy:{:.2%}'.format(
        set_name, np.trace(conf_mat) / np.sum(conf_mat)))

    return conf_mat, '{:.2}'.format(np.trace(conf_mat) / np.sum(conf_mat))


# Generate an image
def show_confMat(confusion_mat, classes, set_name, out_dir):

    # Normalize
    confusion_mat_N = confusion_mat.copy()
    for i in range(len(classes)):
        confusion_mat_N[i, :] = confusion_mat[i, :] / confusion_mat[i, :].sum()

    # Get the colormap
    # More colormaps: http://matplotlib.org/examples/color/colormaps_reference.html
    cmap = plt.cm.get_cmap('Greys')
    plt.imshow(confusion_mat_N, cmap=cmap)
    plt.colorbar()

    # Set labels
    xlocations = np.array(range(len(classes)))
    plt.xticks(xlocations, list(classes), rotation=60)
    plt.yticks(xlocations, list(classes))
    plt.xlabel('Predicted label')
    plt.ylabel('True label')
    plt.title('Confusion Matrix ' + set_name)

    # Display numbers
    for i in range(confusion_mat_N.shape[0]):
        for j in range(confusion_mat_N.shape[1]):
            plt.text(x=j, y=i, s=int(
                confusion_mat[i, j]), va='center', ha='center', color='red', fontsize=10)
    # Save the image
    plt.savefig(os.path.join(out_dir, 'Confusion_Matrix' + set_name + '.png'))
    plt.close()


In [14]:
# # Define hyperparameters
# EPOCH = 5
# BATCH_SIZE = 24
# classes_name = [str(c) for c in range(9)]  # Number of land cover classes

# # --------------------Load Data---------------------
# # Indian Pines .mat file path (each file is a separate class)
# path = os.path.join(os.getcwd(), "patch")
# training_dataset = DataSet(path=path, train=True)
# testing_dataset = DataSet(path=path, train=False)

# # Data Loaders
# train_loader = torch.utils.data.DataLoader(
#     dataset=training_dataset,
#     batch_size=BATCH_SIZE,
#     shuffle=True
# )

# test_loader = torch.utils.data.DataLoader(
#     dataset=testing_dataset,
#     batch_size=BATCH_SIZE,
#     shuffle=True
# )


# Define hyperparameters
EPOCH = 5
BATCH_SIZE = 8
classes_name = [str(c) for c in range(9)]  # Number of land cover classes

# --------------------Load Data---------------------
# Indian Pines .mat file path (each file is a separate class)
path = os.path.join(os.getcwd(), "patch")
training_dataset = DataSet(path=path, train=True)


# Data Loaders
train_loader = torch.utils.data.DataLoader(
    dataset=training_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True
)



In [15]:
# testing_dataset = DataSet(path=path, train=False)
# test_loader = torch.utils.data.DataLoader(
#     dataset=testing_dataset,
#     batch_size=BATCH_SIZE,
#     shuffle=True
# )

In [16]:
# # Source and destination paths
# source_path = '/content/patch' #'/content/drive/MyDrive/Pavia'
# destination_path ='/content/drive/MyDrive/Pavia' #'/content/PaviaU'

# # Use the !cp command to copy the folder
# !cp -r "$source_path" "$destination_path"

# # Verify the copied folder
# print(f"The folder '{source_path}' has been copied to '{destination_path}'.")

In [17]:
# Check if CUDA is available
use_cuda = torch.cuda.is_available()

# Generate log
now_time = datetime.now()
time_str = datetime.strftime(now_time, '%m-%d_%H-%M-%S')
log_path = os.path.join(os.getcwd(), "log")
log_dir = os.path.join(log_path, time_str)

# Create the log directory if it doesn't exist
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

# Initialize the SummaryWriter for logging
writer = SummaryWriter(log_dir)


In [18]:
# ---------------------Build the Network--------------------------
cnn = Net()  # Create CNN
cnn.init_weights()  # Initialize weights
cnn = cnn.double()

# --------------------Set Loss Function and Optimizer----------------------
optimizer = optim.Adam(cnn.parameters())  # Optimizer with default learning rate (1e-3)
criterion = nn.CrossEntropyLoss()  # Loss function
scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer, step_size=EPOCH/2, gamma=0.5)  # Set the learning rate decay strategy


In [19]:
# testing_dataset = DataSet(path=path, train=False)
# test_loader = torch.utils.data.DataLoader(
#     dataset=testing_dataset,
#     batch_size=BATCH_SIZE,
#     shuffle=True
# )

In [20]:
# --------------------Training------------------------------
if use_cuda:  # Use GPU
    cnn = cnn.cuda()

for epoch in range(EPOCH):
    loss_sigma = 0.0    # Record the sum of losses for one epoch
    correct = 0.0
    total = 0.0
    scheduler.step()  # Update learning rate

    for batch_idx, data in enumerate(train_loader):
        # Get images and labels
        inputs, labels = data
        if use_cuda:
            inputs, labels = inputs.cuda(), labels.cuda()
        optimizer.zero_grad()  # Clear gradients
        cnn = cnn.train()
        outputs = cnn(inputs)
        loss = criterion(outputs, labels)
        loss.backward()  # Backward propagation
        optimizer.step()  # Update weights

        # Statistics for predictions
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += ((predicted == labels).squeeze().sum()).item()
        loss_sigma += loss.item()

        # Print training information every BATCH_SIZE iterations; loss is the average over BATCH_SIZE iterations
        if batch_idx % BATCH_SIZE == BATCH_SIZE-1:
            loss_avg = loss_sigma / BATCH_SIZE
            loss_sigma = 0.0
            print("Training: Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format(
                epoch + 1, EPOCH, batch_idx + 1, len(train_loader), loss_avg, correct / total))
            # Record training loss
            writer.add_scalars(
                'Loss_group', {'train_loss': loss_avg}, epoch)
            # Record learning rate
            writer.add_scalar(
                'learning rate', scheduler.get_lr()[0], epoch)
            # Record accuracy
            writer.add_scalars('Accuracy_group', {
                               'train_acc': correct / total}, epoch)

    # Record gradients and weights for each epoch
    for name, layer in cnn.named_parameters():
        writer.add_histogram(
            name + '_grad', layer.grad.cpu().data.numpy(), epoch)
        writer.add_histogram(name + '_data', layer.cpu().data.numpy(), epoch)

    # ------------------------------------ Evaluate model on the validation set ------------------------------------
    if epoch % 1 == 0:
        loss_sigma = 0.0
        cls_num = len(classes_name)
        conf_mat = np.zeros([cls_num, cls_num])  # Confusion matrix
        cnn.eval()
        # for batch_idx, data in enumerate(test_loader):
        #     images, labels = data
        #     if use_cuda:
        #         images, labels = images.cuda(), labels.cuda()
        #     cnn = cnn.train()
        #     outputs = cnn(images)  # Forward pass
        #     outputs.detach_()  # Detach from gradients
        #     loss = criterion(outputs, labels)  # Compute loss
        #     loss_sigma += loss.item()

        #     _, predicted = torch.max(outputs.data, 1)  # Statistics
        #     # labels = labels.data    # Variable --> tensor
        #     # Update confusion matrix
        #     for j in range(len(labels)):
        #         cate_i = labels[j]
        #         pre_i = predicted[j]
        #         conf_mat[cate_i, pre_i] += 1.0

        # print('{} set Accuracy:{:.2%}'.format(
        #     'Valid', conf_mat.trace() / conf_mat.sum()))
        # Record loss and accuracy
        # writer.add_scalars(
        #     'Loss_group', {'valid_loss': loss_sigma / len(test_loader)}, epoch)
        # writer.add_scalars('Accuracy_group', {
        #                    'valid_acc': conf_mat.trace() / conf_mat.sum()}, epoch)

print('Finished Training')




Training: Epoch[001/005] Iteration[008/2674] Loss: 2.4350 Acc:26.56%




Training: Epoch[001/005] Iteration[016/2674] Loss: 1.6516 Acc:34.38%
Training: Epoch[001/005] Iteration[024/2674] Loss: 1.6652 Acc:40.62%
Training: Epoch[001/005] Iteration[032/2674] Loss: 1.4401 Acc:43.75%
Training: Epoch[001/005] Iteration[040/2674] Loss: 1.2662 Acc:46.88%
Training: Epoch[001/005] Iteration[048/2674] Loss: 1.3708 Acc:48.44%
Training: Epoch[001/005] Iteration[056/2674] Loss: 1.2923 Acc:49.33%
Training: Epoch[001/005] Iteration[064/2674] Loss: 1.3279 Acc:49.80%
Training: Epoch[001/005] Iteration[072/2674] Loss: 1.5630 Acc:49.83%
Training: Epoch[001/005] Iteration[080/2674] Loss: 1.3280 Acc:49.53%
Training: Epoch[001/005] Iteration[088/2674] Loss: 1.5177 Acc:49.57%
Training: Epoch[001/005] Iteration[096/2674] Loss: 1.4831 Acc:49.09%
Training: Epoch[001/005] Iteration[104/2674] Loss: 1.0550 Acc:50.00%
Training: Epoch[001/005] Iteration[112/2674] Loss: 1.0440 Acc:50.78%
Training: Epoch[001/005] Iteration[120/2674] Loss: 1.0974 Acc:51.15%
Training: Epoch[001/005] Iteration

In [21]:
# ----------------------- Save Model and Plot Confusion Matrix -------------------------
# Save the trained model
cnn_save_path = os.path.join(log_dir, 'net_params.pkl')
torch.save(cnn.state_dict(), cnn_save_path)

# Validate on training and testing sets and obtain confusion matrices
conf_mat_train, train_acc = validate(cnn, train_loader, 'train', classes_name)
#conf_mat_valid, valid_acc = validate(cnn, test_loader, 'test', classes_name)

# Plot and save confusion matrices
show_confMat(conf_mat_train, classes_name, 'train', log_dir)
#show_confMat(conf_mat_valid, classes_name, 'valid', log_dir)


class:0         , total num:3316.0, correct num:3308.0  Recall: 99.73% Precision: 99.76%
class:1         , total num:9325.0, correct num:9323.0  Recall: 99.97% Precision: 99.79%
class:2         , total num:1050.0, correct num:1031.0  Recall: 98.10% Precision: 98.47%
class:3         , total num:1532.0, correct num:1524.0  Recall: 99.41% Precision: 99.74%
class:4         , total num:673.0 , correct num:673.0  Recall: 99.85% Precision: 99.85%
class:5         , total num:2515.0, correct num:2505.0  Recall: 99.56% Precision: 99.84%
class:6         , total num:665.0 , correct num:648.0  Recall: 97.30% Precision: 98.93%
class:7         , total num:1841.0, correct num:1836.0  Recall: 99.67% Precision: 99.73%
class:8         , total num:474.0 , correct num:474.0  Recall: 99.79% Precision: 97.33%
train set Accuracy:99.68%


  cmap = plt.cm.get_cmap('Greys')
