This notebook contains code for fine-tuning a 2D convolutional neural network (CNN) designed for hyperspectral image classification. 

- The core functionality includes data preprocessing, a dataset class for extracting 2D spatial patches from hyperspectral cubes, and utilities for visualization and evaluation.
- The dataset is constructed using the PatchDataset2D class, which extracts square patches centered at valid locations within the image, based on provided label and mask arrays. These patches are used as input samples for the 2D CNN.
- The code also sets up device configuration (choosing between CPU and GPU), figure output directories for result visualizations, and loads necessary libraries including PyTorch, NumPy, scikit-learn, and SciPy.
- Further cells in the notebook (not shown here) likely define the CNN architecture, the training and validation loops, and performance metrics for model evaluation.



In [8]:
from pathlib import Path                                                   # import from pathlibimport numpy as np                                                         # import numpy libraryimport matplotlib.pyplot as plt                                            # import matplotlib.pyplot library# empty lineimport torch                                                               # import torch libraryimport torch.nn as nn                                                      # import torch.nn libraryfrom torch.utils.data import Dataset, DataLoader                           # import from torch.utils.data# empty linefrom sklearn.metrics import accuracy_score, precision_recall_fscore_support  # import from sklearn.metricsimport scipy.io as sio                                                     # import scipy.io library# empty lineDEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")      # set computation device (GPU/CPU)print("Device", DEVICE)# empty linePATCH = 15                                                                 # assign value to PATCHFIGS = Path("outputs/figs")                                                # define path for FIGSFIGS.mkdir(parents=True, exist_ok=True)                                    # create directory FIGS.mkdir(parents# empty line                                                                           # ------------------------------------------                                                                           # dataset class                                                                           # ------------------------------------------class PatchDataset2D(Dataset):                                             # define class PatchDataset2D    def __init__(self, cube, labels, mask, patch):                         # define function __init__        assert patch % 2 == 1                                              # assign value to assert patch % 2        self.patch = patch                                                 # assign value to self.patch        self.rad = patch // 2                                              # assign value to self.rad        H, W, C = cube.shape                                               # assign value to H, W, C# empty line        self.img = np.pad(                                                 # assign value to self.img            cube.astype(np.float32),            ((self.rad, self.rad), (self.rad, self.rad), (0, 0)),            mode="reflect"                                                 # assign value to mode        )# empty line        yy, xx = np.where(mask & (labels > 0))                             # assign value to yy, xx        self.centers = np.stack([yy, xx], axis=1)                          # assign value to self.centers        self.labels = labels[mask & (labels > 0)].astype(np.int64) - 1     # assign value to self.labels# empty line    def __len__(self):                                                     # define function __len__        return len(self.centers)                                           # return value from function# empty line    def __getitem__(self, i):                                              # define function __getitem__        y, x = self.centers[i]                                             # assign value to y, x        yp, xp = y + self.rad, x + self.rad                                # assign value to yp, xp# empty line        patch = self.img[                                                  # assign value to patch            yp-self.rad: yp+self.rad+1,            xp-self.rad: xp+self.rad+1,            :        ]        patch = np.transpose(patch, (2,0,1))                               # assign value to patch        return torch.from_numpy(patch), torch.tensor(self.labels[i], dtype=torch.long)  # assign value to return torch.from_numpy(patch), torch.tensor(self.labels[i], dtype# empty line                                                                           # ------------------------------------------                                                                           # original CNN                                                                           # ------------------------------------------class CNN2D_NoDrop(nn.Module):                                             # define class CNN2D_NoDrop    def __init__(self, in_ch, num_classes, reduce_to=32):                  # assign value to def __init__(self, in_ch, num_classes, reduce_to        super().__init__()        c_in = in_ch                                                       # assign value to c_in        layers = []                                                        # assign value to layers# empty line        if reduce_to is not None:                                          # conditional statement            layers += [                                                    # assign value to layers +                nn.Conv2d(c_in, reduce_to, 1),                nn.ReLU(True),                nn.BatchNorm2d(reduce_to),            ]            c_in = reduce_to                                               # assign value to c_in# empty line        layers += [                                                        # assign value to layers +            nn.Conv2d(c_in, 32, 3, padding=1), nn.ReLU(True), nn.BatchNorm2d(32),  # assign value to nn.Conv2d(c_in, 32, 3, padding            nn.Conv2d(32, 32, 3, padding=1), nn.ReLU(True), nn.BatchNorm2d(32),  # assign value to nn.Conv2d(32, 32, 3, padding            nn.MaxPool2d(2),            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(True), nn.BatchNorm2d(64),  # assign value to nn.Conv2d(32, 64, 3, padding            nn.Conv2d(64, 64, 3, padding=1), nn.ReLU(True), nn.BatchNorm2d(64),  # assign value to nn.Conv2d(64, 64, 3, padding            nn.MaxPool2d(2),            nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(True), nn.BatchNorm2d(128),  # assign value to nn.Conv2d(64, 128, 3, padding            nn.Conv2d(128, 128, 3, padding=1), nn.ReLU(True), nn.BatchNorm2d(128),  # assign value to nn.Conv2d(128, 128, 3, padding            nn.AdaptiveAvgPool2d(1),        ]# empty line        self.feat = nn.Sequential(*layers)                                 # assign value to self.feat        self.head = nn.Sequential(                                         # assign value to self.head            nn.Flatten(),            nn.Linear(128, 256),            nn.ReLU(True),            nn.Linear(256, num_classes)        )# empty line    def forward(self, x):                                                  # define function forward        z = self.feat(x)                                                   # assign value to z        return self.head(z)                                                # return value from function# empty line                                                                           # ------------------------------------------                                                                           # transfer learning wrapper                                                                           # ------------------------------------------class CNN2D_Transfer(nn.Module):                                           # define class CNN2D_Transfer    def __init__(self, new_bands, ckpt):                                   # define function __init__        super().__init__()        old_bands = ckpt["B"]                                              # assign value to old_bands        num_classes = ckpt["num_classes"]                                  # assign value to num_classes        reduce_to = ckpt["REDUCE"]                                         # assign value to reduce_to# empty line        self.adapter = nn.Conv2d(new_bands, old_bands, kernel_size=1)      # assign value to self.adapter# empty line        self.backbone = CNN2D_NoDrop(                                      # assign value to self.backbone            in_ch=old_bands,                                               # assign value to in_ch            num_classes=num_classes,                                       # assign value to num_classes            reduce_to=reduce_to                                            # assign value to reduce_to        )# empty line        self.backbone.load_state_dict(ckpt["state_dict"])                  # access instance attribute or method# empty line                                                                           # freeze backbone        for p in self.backbone.parameters():                               # loop iteration            p.requires_grad = False                                        # assign value to p.requires_grad# empty line    def forward(self, x):                                                  # define function forward        x = self.adapter(x)                                                # assign value to x        return self.backbone(x)                                            # return value from function# empty line                                                                           # ------------------------------------------                                                                           # helpers                                                                           # ------------------------------------------def load_mat(path):                                                        # define function load_mat    m = sio.loadmat(path)                                                  # assign value to m    key = [k for k in m.keys() if not k.startswith("__")][0]               # assign value to key    return m[key]                                                          # return value from function# empty linedef normalize_cube(cube):                                                  # define function normalize_cube    cube = cube.astype(np.float32)                                         # assign value to cube    mn = cube.min(axis=(0,1), keepdims=True)                               # assign value to mn    mx = cube.max(axis=(0,1), keepdims=True)                               # assign value to mx    return (cube - mn) / (mx - mn + 1e-6)                                  # return value from function# empty linedef predict_map(model, ds, mask, shape):                                   # define function predict_map    model.eval()    pred = np.zeros(shape, np.int64)                                       # assign value to pred    loader = DataLoader(ds, batch_size=256, shuffle=False)                 # assign value to loader# empty line    idx = 0                                                                # assign value to idx    with torch.no_grad():        for xb, _ in loader:                                               # loop iteration            out = model(xb.to(DEVICE)).argmax(1).cpu().numpy() + 1         # assign value to out            for j in range(out.shape[0]):                                  # loop iteration                y, x = ds.centers[idx + j]                                 # assign value to y, x                pred[y,x] = out[j]                                         # assign value to pred[y,x]            idx += out.shape[0]                                            # assign value to idx +# empty line    pred[~mask] = 0                                                        # assign value to pred[~mask]    return pred                                                            # return value from function# empty line                                                                           # ------------------------------------------                                                                           # training for transfer learning                                                                           # ------------------------------------------def train_adapter(model, dl_train, dl_val, epochs=20):                     # assign value to def train_adapter(model, dl_train, dl_val, epochs    opt = torch.optim.Adam(model.adapter.parameters(), lr=1e-3)            # assign value to opt    loss_fn = nn.CrossEntropyLoss()                                        # assign value to loss_fn# empty line    for ep in range(epochs):                                               # loop iteration        model.train()        for xb, yb in dl_train:                                            # loop iteration            xb, yb = xb.to(DEVICE), yb.to(DEVICE)                          # assign value to xb, yb            opt.zero_grad()            loss = loss_fn(model(xb), yb)                                  # assign value to loss            loss.backward()            opt.step()# empty line        model.eval()        preds, true = [], []                                               # assign value to preds, true        with torch.no_grad():            for xb, yb in dl_val:                                          # loop iteration                out = model(xb.to(DEVICE)).argmax(1).cpu().numpy()         # assign value to out                preds.append(out)                true.append(yb.numpy())# empty line        preds = np.concatenate(preds)                                      # assign value to preds        true = np.concatenate(true)                                        # assign value to true        acc = accuracy_score(true, preds)                                  # assign value to acc        print("epoch", ep+1, "acc", acc)# empty line                                                                           # ------------------------------------------                                                                           # run for both datasets                                                                           # ------------------------------------------CKPT_PATH = "outputs/runs_cnn2d_full_nodrop/cnn2d_fullbands_best_nodrop.pth"  # assign value to CKPT_PATHckpt = torch.load(CKPT_PATH, map_location=DEVICE)                          # assign value to ckpt# empty line                                                                           # pathsSAL_CUBE = "data/Salinas_corrected.mat"                                    # assign value to SAL_CUBESAL_GT =   "data/Salinas_gt.mat"                                           # assign value to SAL_GT# empty linePAV_CUBE = "data/Pavia.mat"                                                # assign value to PAV_CUBEPAV_GT =   "data/Pavia_gt.mat"                                             # assign value to PAV_GT# empty line# empty linedef run_dataset(cube_path, gt_path, name):                                 # define function run_dataset    print("\nRunning", name)    cube = normalize_cube(load_mat(cube_path))                             # assign value to cube    labels = load_mat(gt_path)                                             # assign value to labels    mask = labels > 0                                                      # assign value to mask    B_new = cube.shape[2]                                                  # assign value to B_new# empty line    ds = PatchDataset2D(cube, labels, mask, PATCH)                         # assign value to ds# empty line    dl = DataLoader(ds, batch_size=64, shuffle=True)                       # assign value to dl# empty line                                                                           # split small validation    n = len(ds)                                                            # assign value to n    val_n = max(2000, n // 10)                                             # assign value to val_n    train_idx = np.arange(n - val_n)                                       # assign value to train_idx    val_idx = np.arange(n - val_n, n)                                      # assign value to val_idx# empty line    ds_train = torch.utils.data.Subset(ds, train_idx)                      # assign value to ds_train    ds_val = torch.utils.data.Subset(ds, val_idx)                          # assign value to ds_val# empty line    dl_train = DataLoader(ds_train, batch_size=64, shuffle=True)           # assign value to dl_train    dl_val = DataLoader(ds_val, batch_size=64, shuffle=False)              # assign value to dl_val# empty line    model = CNN2D_Transfer(B_new, ckpt).to(DEVICE)                         # assign value to model# empty line    print("training adapter")    train_adapter(model, dl_train, dl_val)# empty line    print("predicting full map")    pred = predict_map(model, ds, mask, labels.shape)                      # assign value to pred# empty line    true = labels[mask] - 1                                                # assign value to true    pred_flat = pred[mask] - 1                                             # assign value to pred_flat    acc = accuracy_score(true, pred_flat)                                  # assign value to acc    f1 = precision_recall_fscore_support(true, pred_flat, average="macro")[2]  # assign value to f1# empty line    print(name, "acc", acc, "f1", f1)# empty line# empty linerun_dataset(SAL_CUBE, SAL_GT, "Salinas")run_dataset(PAV_CUBE, PAV_GT, "PaviaU")

Device cuda

Running Salinas


  ckpt = torch.load(CKPT_PATH, map_location=DEVICE)


training adapter
epoch 1 acc 0.004619364375461937
epoch 2 acc 0.16444937176644495
epoch 3 acc 0.0
epoch 4 acc 0.009238728750923873
epoch 5 acc 0.0007390983000739098
epoch 6 acc 0.0
epoch 7 acc 0.007945306725794531
epoch 8 acc 0.0
epoch 9 acc 0.0
epoch 10 acc 0.0
epoch 11 acc 0.0
epoch 12 acc 0.0
epoch 13 acc 0.0
epoch 14 acc 0.3082039911308204
epoch 15 acc 0.23725055432372505
epoch 16 acc 0.0
epoch 17 acc 0.0
epoch 18 acc 0.6365484109386549
epoch 19 acc 0.021618625277161862
epoch 20 acc 0.5299334811529933
predicting full map
Salinas acc 0.3938184706903878 f1 0.2329712579535212

Running PaviaU
training adapter
epoch 1 acc 0.00850489368882889
epoch 2 acc 0.41248734390820113
epoch 3 acc 0.28619642254471817
epoch 4 acc 0.7115761052986838
epoch 5 acc 0.3440431994600068
epoch 6 acc 0.2971987850151873
epoch 7 acc 0.36773540330745863
epoch 8 acc 0.3190010124873439
epoch 9 acc 0.3721903476206547
epoch 10 acc 0.5221734728315897
epoch 11 acc 0.6041174485318933
epoch 12 acc 0.6158623017212285
epoc

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
