In [1]:
! wget http://www.ehu.eus/ccwintco/uploads/6/67/Indian_pines_corrected.mat
! wget http://www.ehu.eus/ccwintco/uploads/c/c4/Indian_pines_gt.mat
! pip install spectral

URL transformed to HTTPS due to an HSTS policy
--2023-02-24 07:43:53--  https://www.ehu.eus/ccwintco/uploads/6/67/Indian_pines_corrected.mat
Resolving www.ehu.eus (www.ehu.eus)... 158.227.0.65, 2001:720:1410::65
Connecting to www.ehu.eus (www.ehu.eus)|158.227.0.65|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5953527 (5.7M)
Saving to: ‘Indian_pines_corrected.mat.3’


2023-02-24 07:43:57 (1.58 MB/s) - ‘Indian_pines_corrected.mat.3’ saved [5953527/5953527]

URL transformed to HTTPS due to an HSTS policy
--2023-02-24 07:43:57--  https://www.ehu.eus/ccwintco/uploads/c/c4/Indian_pines_gt.mat
Resolving www.ehu.eus (www.ehu.eus)... 158.227.0.65, 2001:720:1410::65
Connecting to www.ehu.eus (www.ehu.eus)|158.227.0.65|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1125 (1.1K)
Saving to: ‘Indian_pines_gt.mat.3’


2023-02-24 07:43:57 (67.9 MB/s) - ‘Indian_pines_gt.mat.3’ saved [1125/1125]

Looking in indexes: https://pypi.org/simple, https://u

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.io as sio
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report, cohen_kappa_score
import spectral
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import copy

In [3]:
# 对高光谱数据 X 应用 PCA 变换
def applyPCA(X, numComponents):
    newX = np.reshape(X, (-1, X.shape[2]))
    pca = PCA(n_components=numComponents, whiten=True)
    newX = pca.fit_transform(newX)
    newX = np.reshape(newX, (X.shape[0], X.shape[1], numComponents))
    return newX

# 对单个像素周围提取 patch 时，边缘像素就无法取了，因此，给这部分像素进行 padding 操作
def padWithZeros(X, margin=2):
    newX = np.zeros((X.shape[0] + 2 * margin, X.shape[1] + 2* margin, X.shape[2]))
    x_offset = margin
    y_offset = margin
    newX[x_offset:X.shape[0] + x_offset, y_offset:X.shape[1] + y_offset, :] = X
    return newX

# 在每个像素周围提取 patch ，然后创建成符合 keras 处理的格式
def createImageCubes(X, y, windowSize=5, removeZeroLabels = True):
    # 给 X 做 padding
    margin = int((windowSize - 1) / 2)
    zeroPaddedX = padWithZeros(X, margin=margin)
    # split patches
    patchesData = np.zeros((X.shape[0] * X.shape[1], windowSize, windowSize, X.shape[2]))
    patchesLabels = np.zeros((X.shape[0] * X.shape[1]))
    patchIndex = 0
    for r in range(margin, zeroPaddedX.shape[0] - margin):
        for c in range(margin, zeroPaddedX.shape[1] - margin):
            patch = zeroPaddedX[r - margin:r + margin + 1, c - margin:c + margin + 1]   
            patchesData[patchIndex, :, :, :] = patch
            patchesLabels[patchIndex] = y[r-margin, c-margin]
            patchIndex = patchIndex + 1
    if removeZeroLabels:
        patchesData = patchesData[patchesLabels>0,:,:,:]
        patchesLabels = patchesLabels[patchesLabels>0]
        patchesLabels -= 1
    return patchesData, patchesLabels

def splitTrainTestSet(X, y, testRatio, randomState=345):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=testRatio, random_state=randomState, stratify=y)
    return X_train, X_test, y_train, y_test

In [4]:
# 地物类别
class_num = 16
X = sio.loadmat('Indian_pines_corrected.mat')['indian_pines_corrected']
y = sio.loadmat('Indian_pines_gt.mat')['indian_pines_gt']

# 用于测试样本的比例
test_ratio = 0.90
# 每个像素周围提取 patch 的尺寸
patch_size = 15
# 使用 PCA 降维，得到主成分的数量
pca_components = 30

print('Hyperspectral data shape: ', X.shape)
print('Label shape: ', y.shape)

print('\n... ... PCA tranformation ... ...')
X_pca = applyPCA(X, numComponents=pca_components)
print('Data shape after PCA: ', X_pca.shape)

print('\n... ... create data cubes ... ...')
X_pca, y = createImageCubes(X_pca, y, windowSize=patch_size)
print('Data cube X shape: ', X_pca.shape)
print('Data cube y shape: ', y.shape)

print('\n... ... create train & test data ... ...')
Xtrain, Xtest, ytrain, ytest = splitTrainTestSet(X_pca, y, test_ratio)
print('Xtrain shape: ', Xtrain.shape)
print('Xtest  shape: ', Xtest.shape)

# 改变 Xtrain, Ytrain 的形状，以符合 keras 的要求
Xtrain = Xtrain.reshape(-1, patch_size, patch_size, pca_components, 1)
Xtest  = Xtest.reshape(-1, patch_size, patch_size, pca_components, 1)
print('before transpose: Xtrain shape: ', Xtrain.shape) 
print('before transpose: Xtest  shape: ', Xtest.shape) 

# 为了适应 pytorch 结构，数据要做 transpose
Xtrain = Xtrain.transpose(0, 4, 3, 1, 2)
Xtest  = Xtest.transpose(0, 4, 3, 1, 2)
print('after transpose: Xtrain shape: ', Xtrain.shape) 
print('after transpose: Xtest  shape: ', Xtest.shape) 

Hyperspectral data shape:  (145, 145, 200)
Label shape:  (145, 145)

... ... PCA tranformation ... ...
Data shape after PCA:  (145, 145, 30)

... ... create data cubes ... ...
Data cube X shape:  (10249, 15, 15, 30)
Data cube y shape:  (10249,)

... ... create train & test data ... ...
Xtrain shape:  (1024, 15, 15, 30)
Xtest  shape:  (9225, 15, 15, 30)
before transpose: Xtrain shape:  (1024, 15, 15, 30, 1)
before transpose: Xtest  shape:  (9225, 15, 15, 30, 1)
after transpose: Xtrain shape:  (1024, 1, 30, 15, 15)
after transpose: Xtest  shape:  (9225, 1, 30, 15, 15)


In [5]:
""" Training dataset"""
class TrainDS(torch.utils.data.Dataset): 
    def __init__(self):
        self.len = Xtrain.shape[0]
        self.x_data = torch.FloatTensor(Xtrain)
        self.y_data = torch.LongTensor(ytrain)        
    def __getitem__(self, index):
        # 根据索引返回数据和对应的标签
        return self.x_data[index], self.y_data[index]
    def __len__(self): 
        # 返回文件数据的数目
        return self.len

""" Testing dataset"""
class TestDS(torch.utils.data.Dataset): 
    def __init__(self):
        self.len = Xtest.shape[0]
        self.x_data = torch.FloatTensor(Xtest)
        self.y_data = torch.LongTensor(ytest)
    def __getitem__(self, index):
        # 根据索引返回数据和对应的标签
        return self.x_data[index], self.y_data[index]
    def __len__(self): 
        # 返回文件数据的数目
        return self.len

# 创建 trainloader 和 testloader
trainset = TrainDS()
testset  = TestDS()
train_loader = torch.utils.data.DataLoader(dataset=trainset, batch_size=128, shuffle=True, num_workers=2)
test_loader  = torch.utils.data.DataLoader(dataset=testset,  batch_size=128, shuffle=False, num_workers=2)

In [6]:
!pip install einops

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [7]:
import random
import numpy as np
import torch
import cv2
import math
import torch.nn.functional as F
from torch import nn
from skimage import transform
from einops import rearrange, repeat
from einops.layers.torch import Rearrange


# helpers
def pair(t):
    return t if isinstance(t, tuple) else (t, t)


class OurFE(nn.Module):
    def __init__(self, channel, dim):
        super(OurFE, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(channel, channel, kernel_size=1),
            nn.BatchNorm2d(channel),
            nn.ReLU()
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(channel, channel, kernel_size=1),
            nn.BatchNorm2d(channel),
            nn.ReLU()
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(channel, channel, kernel_size=1),
            nn.BatchNorm2d(channel),
            nn.ReLU()
        )
        self.out_conv = nn.Sequential(
            nn.Conv2d(3 * channel, channel, kernel_size=3, padding=1),
            nn.BatchNorm2d(channel),
            nn.ReLU()
        )

    def forward(self, x):
        out1 = self.conv1(x)
        out2 = self.conv2(out1)
        out3 = self.conv3(out2)
        out = self.out_conv(torch.cat((out1, out2, out3), dim=1))
        return out


class PreNorm(nn.Module):
    def __init__(self, dim, fn):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.fn = fn

    def forward(self, x):
        return self.fn(self.norm(x))


class FeedForward(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.net = nn.Sequential(
            DEPTHWISECONV(dim, 256, kernel_size=3, padding=1, stride=1),
            nn.BatchNorm2d(256),
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=1),
            nn.GELU(),
            nn.Conv2d(in_channels=512, out_channels=dim, kernel_size=1),
            nn.GELU(),
        )

    def forward(self, x):
        b, d, c = x.shape
        w = int(math.sqrt(d))
        x1 = rearrange(x, 'b (w h) c -> b c w h', w=w, h=w)
        x1 = self.net(x1)
        x1 = rearrange(x1, 'b c w h -> b (w h) c')
        x = x + x1
        return x


class DEPTHWISECONV(nn.Module):
    def __init__(self, in_ch, out_ch, kernel_size=1, padding=0, stride=1, is_fe=False):
        super(DEPTHWISECONV, self).__init__()
        self.is_fe = is_fe
        self.depth_conv = nn.Conv2d(in_channels=in_ch,
                                    out_channels=in_ch,
                                    kernel_size=kernel_size,
                                    stride=stride,
                                    padding=padding,
                                    groups=in_ch)
        self.point_conv = nn.Conv2d(in_channels=in_ch,
                                    out_channels=out_ch,
                                    kernel_size=1,
                                    stride=1,
                                    padding=0,
                                    groups=1)

    def forward(self, input):
        out = self.depth_conv(input)
        if self.is_fe:
            return out
        out = self.point_conv(out)
        return out


class Attention(nn.Module):
    def __init__(self, dim, heads=4, dim_head=64, dropout=0., num_patches=10):
        super().__init__()
        inner_dim = dim_head * heads
        project_out = not (heads == 1 and dim_head == dim)

        self.heads = heads
        self.scale = dim_head ** -0.5

        self.attend = nn.Softmax(dim=-1)
        self.to_qkv = nn.Linear(dim, inner_dim * 3, bias=False)
        self.to_out = nn.Sequential(
            nn.Linear(inner_dim, dim),
            nn.Dropout(dropout)
        ) if project_out else nn.Identity()

        self.spatial_norm = nn.BatchNorm2d(heads)
        self.spatial_conv = nn.Conv2d(heads, heads, kernel_size=3, padding=1)

        self.spectral_norm = nn.BatchNorm2d(1)
        self.spectral_conv = nn.Conv2d(1, 1, kernel_size=3, padding=1)
        self.to_qkv_spec = nn.Linear(num_patches, num_patches*3, bias=False)
        self.attend_spec = nn.Softmax(dim=-1)

    def forward(self, x):
        qkv = self.to_qkv(x).chunk(3, dim=-1)
        q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h=self.heads), qkv)
        dots = torch.matmul(q, k.transpose(-1, -2)) * self.scale
        attn = self.attend(dots)
        attn = self.spatial_conv(attn)
        out = torch.matmul(attn, v)
        out = rearrange(out, 'b h n d -> b n (h d)')
        output = self.to_out(out)

        x = x.transpose(-2, -1)
        qkv_spec = self.to_qkv_spec(x).chunk(3, dim=-1)
        q_spec, k_spec, v_spec = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h=1), qkv_spec)
        dots_spec = torch.matmul(q_spec, k_spec.transpose(-1, -2)) * self.scale
        attn = self.attend_spec(dots_spec)  # .squeeze(dim=1)
        attn = self.spectral_conv(attn).squeeze(dim=1)

        return torch.matmul(output, attn)


class Transformer(nn.Module):
    def __init__(self, dim, depth, heads, dim_head, dropout=0., num_patches=25):
        super().__init__()
        self.layers = nn.ModuleList([])
        self.index = 0
        for i in range(depth):
            self.layers.append(nn.ModuleList([
                PreNorm(dim, Attention(dim, heads=heads, dim_head=dim_head, dropout=dropout, num_patches=num_patches)),
                PreNorm(dim, FeedForward(dim)),
            ]))

    def forward(self, x):
        output = []
        for attn, ff in self.layers:
            x = attn(x) + x
            x = ff(x) + x
            output.append(x)

        return x, output


class SubNet(nn.Module):
    def __init__(self, patch_size, num_patches, dim, emb_dropout, depth, heads, dim_head, mlp_dim, dropout):
        super(SubNet, self).__init__()
        self.to_patch_embedding = nn.Sequential(
            DEPTHWISECONV(in_ch=dim, out_ch=dim, kernel_size = patch_size, stride = patch_size, padding=0, is_fe=True),
            Rearrange('b c w h -> b (h w) c '),
        )
        self.cls_token = nn.Parameter(torch.zeros(1, 1, dim))
        self.pos_embedding = nn.Parameter(torch.zeros(1, num_patches+1, dim))
        self.dropout = nn.Dropout(emb_dropout)
        self.transformer = Transformer(dim, depth, heads, dim_head, dropout=dropout, num_patches=num_patches)


def get_num_patches(ps, ks):
    return int((ps - ks)/ks)+1


class ViT(nn.Module):
    def __init__(self, *, image_size, patch_size, num_classes, dim, depth, heads, mlp_dim, channels=3, dim_head=64, dropout=0., emb_dropout=0.):
        super(ViT, self).__init__()
        self.ournet = OurFE(channels, dim)
        self.pool = nn.AvgPool2d(kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(in_channels=channels, out_channels=dim, kernel_size=1)
        self.net = nn.Sequential()
        self.mlp_head = nn.ModuleList()
        for ps in patch_size:
            num_patches = get_num_patches(image_size, ps) ** 2
            patch_dim = dim * num_patches
            sub_net = SubNet(ps, num_patches, dim, emb_dropout, depth, heads, dim_head, mlp_dim, dropout)
            self.net.append(sub_net)
            self.mlp_head.append(nn.Sequential(
                nn.LayerNorm(patch_dim),
                nn.Linear(patch_dim, num_classes)
            ))

        self.weight = torch.ones(len(patch_size))

    def forward(self, img):
        if len(img.shape) == 5: img = img.squeeze()
        img = self.ournet(img)
        img = self.pool(img)
        img = self.conv4(img)

        all_branch = []
        for sub_branch in self.net:
            spatial = sub_branch.to_patch_embedding(img)
            b, n, c = spatial.shape
            spatial = spatial + sub_branch.pos_embedding[:, :n]
            spatial = sub_branch.dropout(spatial)
            _, outputs = sub_branch.transformer(spatial)
            res = outputs[-1]
            all_branch.append(res)

        self.weight = F.softmax(self.weight, 0)
        res = 0
        for i, mlp_head in enumerate(self.mlp_head):
            out1 = all_branch[i].flatten(start_dim=1)
            cls1 = mlp_head(out1)
            res = res + cls1 * self.weight[i]
        return res

In [8]:
ps=[3, 5]
d_h=[4,2]#深度为四，将 transformer encoder循环四次
#model = ViT(image_size=15, patch_size=ps, num_classes=16, dim=100, depth=d_h[0], heads=d_h[1],mlp_dim=2048, channels=3, dropout=0.2, emb_dropout=0.2)
# 随机输入，测试网络结构是否通
x = torch.randn(2, 1, 30, 15, 15)
net = ViT(image_size=15, patch_size=ps, num_classes=16, dim=100, depth=d_h[0], heads=d_h[1],mlp_dim=2048, channels=30, dropout=0.2, emb_dropout=0.2)
y = net(x)
print(y.shape)

torch.Size([2, 16])


In [9]:
def train(net):


  current_loss_his = []
  current_Acc_his = []

  best_net_wts = copy.deepcopy(net.state_dict())
  best_acc = 0.0

  criterion = nn.CrossEntropyLoss()
  optimizer = optim.Adam(net.parameters(), lr=0.001)

  # 开始训练
  total_loss = 0
  for epoch in range(100):
      net.train()  # 将模型设置为训练模式
      for i, (inputs, labels) in enumerate(train_loader):
          inputs = inputs.to(device)
          labels = labels.to(device)
          # 优化器梯度归零
          optimizer.zero_grad()
          # 正向传播 +　反向传播 + 优化 
          outputs = net(inputs)
          loss = criterion(outputs, labels)
          loss.backward()
          optimizer.step()
          total_loss += loss.item()

      if epoch % 10 == 0:
        net.eval()   # 将模型设置为验证模式
        current_acc = test_acc(net)
        current_Acc_his.append(current_acc)
        print('[current acc: %.4f]' %(current_acc))

      if current_acc > best_acc:
        best_acc = current_acc
        best_net_wts = copy.deepcopy(net.state_dict())

      print('[Epoch: %d]   [loss avg: %.4f]   [current loss: %.4f]' %(epoch + 1, total_loss/(epoch+1), loss.item()))
      current_loss_his.append(loss.item())

  print('Finished Training')
  print("Best Acc:%.4f" %(best_acc))

  # load best model weights
  net.load_state_dict(best_net_wts)

  return net,current_loss_his,current_Acc_his

In [10]:
def test_acc(net):
  count = 0
  # 模型测试
  for inputs, _ in test_loader:
      inputs = inputs.to(device)
      outputs = net(inputs)
      outputs = np.argmax(outputs.detach().cpu().numpy(), axis=1)
      if count == 0:
          y_pred_test =  outputs
          count = 1
      else:
          y_pred_test = np.concatenate( (y_pred_test, outputs) )

  # 生成分类报告
  classification = classification_report(ytest, y_pred_test, digits=4)
  index_acc = classification.find('weighted avg')
  accuracy = classification[index_acc+17:index_acc+23]
  return float(accuracy)

In [11]:
import warnings
warnings.filterwarnings("ignore")
# 使用GPU训练，可以在菜单 "代码执行工具" -> "更改运行时类型" 里进行设置
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 网络放到GPU上
net = ViT(image_size=15, patch_size=ps, num_classes=16, dim=100, depth=d_h[0], heads=d_h[1],mlp_dim=2048, channels=30, dropout=0.2, emb_dropout=0.2).to(device)
# 训练
net,current_loss_his,current_Acc_his = train(net)

[current acc: 0.2135]
[Epoch: 1]   [loss avg: 16.4578]   [current loss: 1.5024]
[Epoch: 2]   [loss avg: 12.3076]   [current loss: 0.8449]
[Epoch: 3]   [loss avg: 9.6352]   [current loss: 0.3861]
[Epoch: 4]   [loss avg: 7.8047]   [current loss: 0.2571]
[Epoch: 5]   [loss avg: 6.4688]   [current loss: 0.1087]
[Epoch: 6]   [loss avg: 5.5065]   [current loss: 0.0464]
[Epoch: 7]   [loss avg: 4.7802]   [current loss: 0.0360]
[Epoch: 8]   [loss avg: 4.2180]   [current loss: 0.0267]
[Epoch: 9]   [loss avg: 3.7687]   [current loss: 0.0204]
[Epoch: 10]   [loss avg: 3.4052]   [current loss: 0.0179]
[current acc: 0.9818]
[Epoch: 11]   [loss avg: 3.1048]   [current loss: 0.0074]
[Epoch: 12]   [loss avg: 2.8509]   [current loss: 0.0084]
[Epoch: 13]   [loss avg: 2.6404]   [current loss: 0.0553]
[Epoch: 14]   [loss avg: 2.4636]   [current loss: 0.0047]
[Epoch: 15]   [loss avg: 2.3048]   [current loss: 0.0064]
[Epoch: 16]   [loss avg: 2.1647]   [current loss: 0.0096]
[Epoch: 17]   [loss avg: 2.0411]   

In [12]:
net.eval()   # 将模型设置为验证模式
# 测试最好的模型的结果
count = 0
# 模型测试
for inputs, _ in test_loader:
    inputs = inputs.to(device)
    outputs = net(inputs)
    outputs = np.argmax(outputs.detach().cpu().numpy(), axis=1)
    if count == 0:
        y_pred_test =  outputs
        count = 1
    else:
        y_pred_test = np.concatenate( (y_pred_test, outputs) )

# 生成分类报告
classification = classification_report(ytest, y_pred_test, digits=4)
print(classification)

              precision    recall  f1-score   support

         0.0     1.0000    1.0000    1.0000        41
         1.0     0.9984    0.9533    0.9753      1285
         2.0     0.9868    1.0000    0.9934       747
         3.0     0.9953    1.0000    0.9977       213
         4.0     0.9931    0.9954    0.9943       435
         5.0     0.9939    0.9939    0.9939       657
         6.0     1.0000    1.0000    1.0000        25
         7.0     1.0000    1.0000    1.0000       430
         8.0     0.8889    0.8889    0.8889        18
         9.0     0.9897    0.9897    0.9897       875
        10.0     0.9747    0.9932    0.9839      2210
        11.0     0.9943    0.9719    0.9830       534
        12.0     1.0000    1.0000    1.0000       185
        13.0     0.9939    1.0000    0.9969      1139
        14.0     0.9830    1.0000    0.9914       347
        15.0     0.9310    0.9643    0.9474        84

    accuracy                         0.9881      9225
   macro avg     0.9827   