In [None]:
from functools import partial
from netrc import netrc
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import Sequential, Linear, ReLU, GRU

from torch.optim import Adam
from torch.nn import CrossEntropyLoss
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import numpy as np
import pandas as pd
import pickle
import time


In [None]:
if torch.cuda.is_available():
    print("CUDA is available.")
    print("PyTorch version:", torch.__version__)
    print("CUDA version:", torch.version.cuda)
    print("Number of available GPUs:", torch.cuda.device_count())
    print("GPU name:", torch.cuda.get_device_name(0))
else:
    print("CUDA is not available.")

In [None]:
with open('FTIR_train_data_class3761_aug50.pkl', 'rb') as f:
    Data = pickle.load(f)

In [None]:
unique_arr = np.unique(y)

# Check unique labels in dataset
print("Unique values:", unique_arr)
print("Number of unique values:", len(unique_arr))

In [None]:
#Reference:https://github.com/chensaian/TransG-Net

In [None]:
class EncoderBlock(nn.Module):
    def __init__(self,
                 dim,
                 num_heads,
                 mlp_ratio=4.,
                 qkv_bias=False,
                 qk_scale=None,
                 drop_ratio=0.,
                 attn_drop_ratio=0.,
                 drop_path_ratio=0.,
                 act_layer=nn.GELU,
                 norm_layer=nn.LayerNorm):
        super(EncoderBlock, self).__init__()
        self.norm1 = norm_layer(dim)
        self.attn = Attention(dim, num_heads=num_heads, qkv_bias=qkv_bias, qk_scale=qk_scale,
                              attn_drop_ratio=attn_drop_ratio, proj_drop_ratio=drop_ratio)
        self.drop_path = DropPath(
            drop_path_ratio) if drop_path_ratio > 0. else nn.Identity()
        self.norm2 = norm_layer(dim)
        mlp_hidden_dim = int(dim * mlp_ratio)
        self.mlp = Mlp(in_features=dim, hidden_features=mlp_hidden_dim,
                       act_layer=act_layer, drop=drop_ratio)

    def forward(self, x):
        x = x + self.drop_path(self.attn(self.norm1(x)))
        x = x + self.drop_path(self.mlp(self.norm2(x)))
        return x

In [None]:
class Attention(nn.Module):

    def __init__(self,
                 dim,  
                 num_heads=2,
                 qkv_bias=False,
                 qk_scale=None,
                 attn_drop_ratio=0.,
                 proj_drop_ratio=0.):
        super(Attention, self).__init__()
        self.num_heads = num_heads
        head_dim = dim // num_heads
        self.scale = qk_scale or head_dim ** -0.5
        self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
        self.attn_drop = nn.Dropout(attn_drop_ratio)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_drop_ratio)

    def forward(self, x):
        # [batch_size, num_patches + 1, total_embed_dim]
        B, N, C = x.shape  
        # qkv(): -> [batch_size, num_patches + 1, 3 * total_embed_dim]
        # reshape: -> [batch_size, num_patches + 1, 3, num_heads, embed_dim_per_head]
        # permute: -> [3, batch_size, num_heads, num_patches + 1, embed_dim_per_head]
        qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
        # [batch_size, num_heads, num_patches + 1, embed_dim_per_head]
        # make torchscript happy (cannot use tensor as tuple)
        q, k, v = qkv[0], qkv[1], qkv[2]
        # transpose: -> [batch_size, num_heads, embed_dim_per_head, num_patches + 1]
        # @: multiply -> [batch_size, num_heads, num_patches + 1, num_patches + 1]
        attn = (q @ k.transpose(-2, -1)) * self.scale
        attn = attn.softmax(dim=-1)
        self.attn_weights = attn.detach() 
        attn = self.attn_drop(attn)
        # @: multiply -> [batch_size, num_heads, num_patches + 1, embed_dim_per_head]
        # transpose: -> [batch_size, num_patches + 1, num_heads, embed_dim_per_head]
        # reshape: -> [batch_size, num_patches + 1, total_embed_dim]
        x = (attn @ v).transpose(1, 2).reshape(B, N, C)
        x = self.proj(x)
        x = self.proj_drop(x)

        
        
        return x


In [None]:
class Mlp(nn.Module):

    def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.):
        super().__init__()
        out_features = out_features or in_features
        hidden_features = hidden_features or in_features
        self.fc1 = nn.Linear(in_features, hidden_features)
        self.act = act_layer()
        self.fc2 = nn.Linear(hidden_features, out_features)
        self.drop = nn.Dropout(drop)

    def forward(self, x):
        x = self.fc1(x)
        x = self.act(x)
        x = self.drop(x)
        x = self.fc2(x)
        x = self.drop(x)
        return x

In [None]:
import torch.nn.functional as F

def Spectra_Embedding(x, spec_length, embed_dim):
    batch_size = x.shape[0]
    new_spec_length = (spec_length // embed_dim) * embed_dim
    x = x[:, :new_spec_length]
    x = torch.reshape(x, (batch_size, spec_length // embed_dim, embed_dim))
    return x
'''
def Spectra_Embedding_old(x, spec_length, embed_dim):

    batch_size = x.shape[0]
    x = torch.reshape(x, (batch_size, spec_length // embed_dim, embed_dim))  
    return x

def Spectra_Embedding_enlong(x, spec_length, embed_dim):
    batch_size = x.shape[0]
    remainder = spec_length % embed_dim
    if remainder != 0:
        pad = embed_dim - remainder
        x = F.pad(x, (0, pad))
    x = torch.reshape(x, (batch_size, -1, embed_dim))
    return x
'''

In [None]:
class VIT(nn.Module):
    def __init__(self, spec_length=2000, num_output=1,
                 embed_dim=40, depth=12, num_heads=2, mlp_ratio=4.0, qkv_bias=True,
                 qk_scale=None, drop_ratio=0.,
                 attn_drop_ratio=0., drop_path_ratio=0., norm_layer=None,
                 act_layer=None):
        
        super(VIT, self).__init__()
        self.num_classes = num_output
        self.spec_length = spec_length
        self.num_features = self.embed_dim = embed_dim
        norm_layer = norm_layer or partial(nn.LayerNorm, eps=1e-6)
        act_layer = act_layer or nn.GELU
        self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
        self.pos_embed = nn.Parameter(torch.zeros(1, (spec_length//embed_dim) + 1, embed_dim))
        self.pos_drop = nn.Dropout(p=drop_ratio)
        dpr = [x.item() for x in torch.linspace(0, drop_path_ratio, depth)]
        self.blocks = nn.Sequential(*[
            EncoderBlock(dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, qk_scale=qk_scale,
                        drop_ratio=drop_ratio, attn_drop_ratio=attn_drop_ratio, drop_path_ratio=dpr[i],
                        norm_layer=norm_layer, act_layer=act_layer)
            for i in range(depth)
        ])
        self.norm = norm_layer(embed_dim)
        
        nn.init.trunc_normal_(self.pos_embed, std=0.02)
        nn.init.trunc_normal_(self.cls_token, std=0.02)
        self.apply(_init_vit_weights)

        self.head = nn.Linear(embed_dim, num_output)
    def get_attention_maps(self):
        attention_maps = []
        for block in self.blocks:
            attention_maps.append(block.attn.attn_weights)
        return attention_maps
        
        
        
    def forward(self, x):
        # [B , xrd_length] --> [B , xrd_length/embed_dim , embed_dim]
        x = Spectra_Embedding(x, self.spec_length, self.embed_dim)
        cls_token = self.cls_token.expand(x.shape[0], -1, -1)
        x = torch.cat((cls_token, x), dim=1)
        x = self.pos_drop(x + self.pos_embed)
        x = self.blocks(x)
        x = self.norm(x)
        x = self.head(x)

        return x[:, 0]
    
def _init_vit_weights(m):
    if isinstance(m, nn.Linear):
        nn.init.trunc_normal_(m.weight, std=.01)
        if m.bias is not None:
            nn.init.zeros_(m.bias)
    elif isinstance(m, nn.LayerNorm):
        nn.init.zeros_(m.bias)
        nn.init.ones_(m.weight)

def VIT_model(spec_length=2251,num_output: int = 1,embed_dim=80, depth=6,num_heads=4):
    model = VIT(spec_length=spec_length,
                              embed_dim=embed_dim,
                              depth=depth,
                              num_heads=num_heads,
                              num_output=num_output)
    return model


In [None]:
X = Data.iloc[:, 1:].values
y = Data.iloc[:, 0].values
unique_labels = np.unique(y)

print(f'There are {len(unique_labels)} unique labels.')
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(y)

X = torch.tensor(X, dtype=torch.float)
y = torch.tensor(y, dtype=torch.long)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

batch_size = 1024
train_data = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_data = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

In [None]:
torch.cuda.empty_cache()


In [None]:
import datetime
import os
import time
import csv
import warnings
warnings.filterwarnings("ignore")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

num_classes=3753
embed_dim=480
depth=10
num_heads=10

pretrained_model_path = "model_embed_dim=480_depth=10_early_step_setting=0.88_num_head=1_test_top1_accuracy=73.33333333333333_test_top3_accuracy=100.0_test_top5_accuracy=100.0_test_current_time=2023-05-28 01:57:38.860881.pt"
pretrained_dict = torch.load(pretrained_model_path)

model = VIT_model(spec_length=2251, num_output=num_classes, embed_dim=embed_dim, depth=depth,num_heads=num_heads)
model_dict = model.state_dict()

pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model_dict and 'head' not in k}

model_dict.update(pretrained_dict)
model.load_state_dict(model_dict)

model.head = nn.Linear(embed_dim, num_classes)


model = model.to(device)

optimizer = Adam(model.parameters())
loss_func = CrossEntropyLoss()

import matplotlib.pyplot as plt

num_epochs = 100
losses = []
early_stop_loss = 0.90 
no_improve_epoch = 0 
patience = 3 
epoch_times = []
total_time=0

for epoch in range(num_epochs):
    start_time = time.time()  
    model.train()
    running_loss = 0.0
    for i, (x_batch, y_batch) in enumerate(train_loader):
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)

        optimizer.zero_grad()
        outputs = model(x_batch)
        loss = loss_func(outputs, y_batch)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    end_time = time.time()  
    epoch_duration = end_time - start_time 
    epoch_times.append(epoch_duration)
    total_time+=epoch_duration
        
        
        

    epoch_loss = running_loss / len(train_loader)
    print(f'Epoch: {epoch+1}, Loss: {epoch_loss:.10f}, Time: {epoch_duration:.2f}s, Total time:{total_time:.2f}s')
    
    losses.append(epoch_loss)
    #print(f'Epoch: {epoch+1}, Loss: {epoch_loss}')

   
    if epoch > 4 and epoch_loss > losses[epoch-1]*early_stop_loss:
        no_improve_epoch += 1
        print(no_improve_epoch)
    else:
        no_improve_epoch = 0

    if no_improve_epoch > patience:
        print("Early stopping!")
        break


plt.plot(losses)
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training Loss over Time")
plt.show()




In [None]:
#test with Validation set
model.eval()
all_predicted_top1 = []
all_predicted_top5 = []
all_labels = []
with torch.no_grad():
    total = 0
    correct_top1 = 0
    correct_top5 = 0
    for x_batch, y_batch in test_loader:
        
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)

        outputs = model(x_batch)
        _, predicted_top1 = torch.max(outputs, 1)
        _, predicted_top5 = outputs.topk(5, 1, True, True)
        
        total += y_batch.size(0)
        correct_top1 += (predicted_top1 == y_batch).sum().item()
        correct_top5 += predicted_top5.eq(y_batch.view(-1, 1).expand_as(predicted_top5)).sum().item()

        
        all_predicted_top1.extend(predicted_top1.cpu().numpy())
        all_predicted_top5.extend(predicted_top5.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())

    print(f'Top-1 Accuracy: {correct_top1 / total * 100}%')
    print(f'Top-5 Accuracy: {correct_top5 / total * 100}%')

for i in range(len(all_labels)):
    true_label = label_encoder.inverse_transform([all_labels[i]])[0]
    predicted_label_top1 = label_encoder.inverse_transform([all_predicted_top1[i]])[0]
    predicted_label_top5 = label_encoder.inverse_transform(all_predicted_top5[i])
    print("True labels: ", true_label)
    print("Predicted labels (Top-1): ", predicted_label_top1)
    print("Predicted labels (Top-5): ", predicted_label_top5)

In [None]:
#test model with test set
with open('FTIR_test_data.pkl', 'rb') as f:
    data_real = pickle.load(f)

In [None]:
X_test = data_real.iloc[:, 1:].values
X_test_resampled = X_test[:, new_indices]

In [None]:
df_resampled = pd.DataFrame(X_test_resampled, index=data_real.index)
data_real_resampled = pd.concat([data_real.iloc[:, :1], df_resampled], axis=1)

In [None]:
X1 = data_real_resampled.iloc[:, 1:].values
y1 = data_real_resampled.iloc[:, 0].values

In [None]:
unique_labels_test = np.unique(y1)

missing_labels = set(unique_labels_test).difference(set(unique_labels))

y1_series = pd.Series(y1)

mask = ~y1_series.isin(missing_labels)
X1 = X1[mask.values]
y1 = y1_series[mask].values

In [None]:
y_encoded = label_encoder.transform(y1)
X1 = torch.tensor(X1, dtype=torch.float)
y1 = torch.tensor(y_encoded, dtype=torch.long)
batch_size = batch_size
test_data1 = TensorDataset(X1, y1)
test_loader_real = DataLoader(test_data1, batch_size=batch_size, shuffle=False)

In [None]:
all_predicted_top1 = []
all_predicted_top3 = []
all_predicted_top5 = []
all_labels = []
with torch.no_grad():
    total = 0
    correct_top1 = 0
    correct_top3 = 0
    correct_top5 = 0
    for x_batch, y_batch in test_loader_real:
        
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)

        outputs = model(x_batch)
        _, predicted_top1 = torch.max(outputs, 1)
        _, predicted_top3 = outputs.topk(3, 1, True, True)
        _, predicted_top5 = outputs.topk(5, 1, True, True)
        
        total += y_batch.size(0)
        correct_top1 += (predicted_top1 == y_batch).sum().item()
        correct_top3 += predicted_top3.eq(y_batch.view(-1, 1).expand_as(predicted_top3)).sum().item()
        correct_top5 += predicted_top5.eq(y_batch.view(-1, 1).expand_as(predicted_top5)).sum().item()

    
        all_predicted_top1.extend(predicted_top1.cpu().numpy())
        all_predicted_top3.extend(predicted_top3.cpu().numpy())
        all_predicted_top5.extend(predicted_top5.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())

    print(f'Top-1 Accuracy: {correct_top1 / total * 100}%')
    print(f'Top-3 Accuracy: {correct_top3 / total * 100}%')
    print(f'Top-5 Accuracy: {correct_top5 / total * 100}%')
