In [None]:
# Torch
import torch
from torch import nn, optim

# Data
from torch.utils.data import DataLoader,Dataset
from torchvision import transforms,models
from PIL import Image
import pandas as pd

from torch import optim
import torch.optim.lr_scheduler as lr_scheduler

# Plots
from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
import numpy as np
import time, os
%matplotlib inline

In [None]:
torch.cuda.is_available()

In [None]:
RESULT_NAME = 'resnet_pre_trained_fine_tuning.csv'
NET_NAME = "resnet_pre_trained_fine_tuning_"

# hyperparameters
args = {
    'epoch_num': 100,     # Epochs
    'lr': 0.01,           # Lr
    'weight_decay': 1e-3, # L2
    'batch_size': 32,     # batch size
    'num_classes': 9
}

# hardware
if torch.cuda.is_available():
    args['device'] = torch.device('cuda')
else:
    args['device'] = torch.device('cpu')

print(args['device'])

In [None]:
root_dir = "./"
base_dir = "./"

In [None]:
class LampDataset(Dataset):
    def __init__(self, main_dir, transform, labels):
        self.main_dir = main_dir
        self.transform = transform
        all_imgs = os.listdir(main_dir)
        all_imgs = [i for i in all_imgs if i.endswith('.jpg')]
        self.total_imgs = sorted(all_imgs)
        self.labels = labels

    def __len__(self):
        return len(self.total_imgs)

    def __getitem__(self, idx):

        img_loc = os.path.join(self.main_dir, self.total_imgs[idx])
        image = Image.open(img_loc,'r')
        tensor_image = self.transform(image)

        return tensor_image, self.labels[idx]

In [None]:
df = pd.read_csv(base_dir+'dataset.csv',sep=";")

df.set_index('id',inplace=True)

df['y'] = df['tipo_lampada'].str.replace(" ", "") + df['potencia'].astype(str)

labels_mapping = dict(enumerate(df['y'].astype('category').cat.categories))

labels = df['y'].astype('category').cat.codes

labels = torch.tensor(labels.values).type(torch.LongTensor)

labels

In [None]:
FOURIER = ['df01', 'df02', 'df03', 'df04','df05', 'df06', 'df07', 'df08', 'df09', 'df10']
HU = ['i1', 'i2', 'i3', 'i4','i5', 'i6', 'i7']
HARALICK = ['probmax', 'energia', 'entropia', 'contraste','homogeneidade', 'correlacao']

all_features = FOURIER + HU + HARALICK

X = df[all_features]

X = X.apply(lambda x: x.str.replace(',', '.').astype(float), axis=1)

In [None]:
transform = transforms.Compose([
                                transforms.Resize((144, 216),antialias=True),
                                transforms.ToTensor()
                              ])

In [None]:
dataset = LampDataset(base_dir+'img/',transform,labels)

In [None]:
image_data_loader = DataLoader(
  dataset,
  batch_size=len(dataset),
  shuffle=False,
  num_workers=0
)

In [None]:
import matplotlib.pyplot as plt

images, labels = next(iter(image_data_loader))

def display_image(images):
  images_np = images.numpy()
  img_plt = images_np.transpose(0,2,3,1)
  # display 5th image from dataset
  plt.imshow(img_plt[4])

display_image(images)

In [None]:
def new_adapted_vit():
    model = models.vit_b_16(weights=models.ViT_B_16_Weights.DEFAULT).to(args['device'])
    num_classes =  args['num_classes']  # Change this to your desired number of classes
    model.heads.head = nn.Linear(in_features=model.heads.head.in_features, out_features=num_classes).to(args['device'])
    return model

In [None]:
def train(train_loader, net, epoch):

  # Training mode
  net.train()

  start = time.time()

  epoch_loss  = []
  pred_list, rotulo_list = [], []
  for batch in train_loader:

    dado, rotulo = batch

    # Cast data to GPU
    dado = dado.to(args['device'])
    rotulo = rotulo.to(args['device'])
   
    # Forward
    ypred = net(dado)
    loss = criterion(ypred, rotulo)
    epoch_loss.append(loss.cpu().data)

    _, pred = torch.max(ypred, axis=1)
    pred_list.append(pred.cpu().numpy())
    rotulo_list.append(rotulo.cpu().numpy())

    # Backpropagation
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

  #before_lr = optimizer.param_groups[0]["lr"]
  scheduler.step()
  #after_lr = optimizer.param_groups[0]["lr"]
  #print("Epoch %d: SGD lr %.4f -> %.4f" % (epoch, before_lr, after_lr))
  epoch_loss = np.asarray(epoch_loss)
  pred_list  = np.concatenate(pred_list).ravel()
  rotulo_list  = np.concatenate(rotulo_list).ravel()


  acc = accuracy_score(pred_list, rotulo_list)

  end = time.time()
  #print('#################### Train ####################')
  #print('Epoch %d, Loss: %.4f +/- %.4f, Acc: %.2f, Time: %.2f' % (epoch, epoch_loss.mean(), epoch_loss.std(), acc*100, end-start))

  return epoch_loss.mean(),acc, end-start

In [None]:
def validate(test_loader, net, epoch):

  # Evaluation mode
  net.eval()

  start = time.time()

  epoch_loss  = []
  pred_list, rotulo_list = [], []
  with torch.no_grad():
    for batch in test_loader:

      dado, rotulo = batch

      # Cast do dado na GPU
      dado = dado.to(args['device'])
      rotulo = rotulo.to(args['device'])

      # Forward
      ypred = net(dado)
      loss = criterion(ypred, rotulo)
      epoch_loss.append(loss.cpu().data)

      _, pred = torch.max(ypred, axis=1)
      pred_list.append(pred.cpu().numpy())
      rotulo_list.append(rotulo.cpu().numpy())

  epoch_loss = np.asarray(epoch_loss)

  pred_list  = np.concatenate(pred_list).ravel()
  rotulo_list  = np.concatenate(rotulo_list).ravel()

  acc = accuracy_score(pred_list, rotulo_list)

  end = time.time()
  #print('********** Validate **********')
  #print('Epoch %d, Loss: %.4f +/- %.4f, Acc: %.2f, Time: %.2f\n' % (epoch, epoch_loss.mean(), epoch_loss.std(), acc*100, end-start))
  #print('Epoch %d, Loss: %.4f +/- %.4f, Acc: %.2f, Time: %.2f\n' % (epoch, epoch_loss.mean(), epoch_loss.std(), acc*100, end-start))

  return epoch_loss.mean(),acc, end-start

In [None]:
from sklearn.model_selection import RepeatedStratifiedKFold

REPEATED_CV = RepeatedStratifiedKFold(n_splits=10, n_repeats=3,random_state=18062001)

In [None]:
from sklearn.model_selection import train_test_split

def validation_index(labels,train_index):
  train_index, valid_index, _, _ = train_test_split(train_index, labels,stratify=labels, test_size=0.1, random_state=18062001)


  return train_index, valid_index

In [None]:
for fold, (train_index, test_index) in enumerate(REPEATED_CV.split(X,labels)):

    # Print
    print(f'FOLD {fold}')
    print('--------------------------------')
    print('| Epoch | Train Loss | Train Acc | Validation Loss | Validation Acc | Time |')

    train_index2, valid_index = validation_index(labels[train_index],train_index)

    # Sample elements randomly from a given list of ids, no replacement.
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_index2)
    valid_subsampler = torch.utils.data.SubsetRandomSampler(valid_index)

    # Define data loaders for training and testing data in this fold
    train_loader = torch.utils.data.DataLoader(
                      dataset,
                      batch_size=args['batch_size'],
                      sampler=train_subsampler)

    valid_loader = torch.utils.data.DataLoader(
                      dataset,
                      batch_size=args['batch_size'],
                        sampler=valid_subsampler)

    net = new_adapted_vit()
    for param in net.parameters():
        param.requires_grad = False

    for param in net.heads.parameters():
        param.requires_grad = True
    #net.apply(reset_weights)
    criterion = nn.CrossEntropyLoss().to(args['device'])
    optimizer = optim.Adam(net.parameters(), lr=args['lr'], weight_decay=args['weight_decay'])
    scheduler = lr_scheduler.ExponentialLR(optimizer, gamma=0.95)
    
    train_losses, test_losses = [], []

    best_acc = 0
    for epoch in range(args['epoch_num']):

      # Train
      train_loss, train_acc, train_time = train(train_loader, net, epoch)
      train_losses.append(train_loss)

      # Validate
      test_loss, test_acc, test_time = validate(valid_loader, net, epoch)
      test_losses.append(test_loss)

      print(f'|  {epoch:03.0f}  |   {train_loss:.5f}  |    {train_acc*100:02.0f}%    |     {test_loss:.5f}     |       {test_acc*100:02.0f}%      | {train_time + test_time:.2f} |')

      if test_acc >= best_acc:
        torch.save(net.state_dict(), root_dir + 'tcc/models/' + f'best-model-parameters-fold{fold}.pt')

    #plot fold losses
    plt.close()
    plt.plot(train_losses,label= 'train loss')
    plt.plot(test_losses, label= 'test loss')
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend(loc='best')
    plt.savefig(root_dir + 'tcc/loss/' + f"fold{fold}_loss_plot.png")

In [None]:
# Testing
if TEST_TRAIN:
    scores = []
    for fold, (train_index, test_index) in enumerate(REPEATED_CV.split(X,labels)):
    
      PATH = root_dir + 'tcc/models/' + f'best-model-parameters-fold{fold}.pt'
    
      #Load Trained Model
      test_net = net
      test_net.load_state_dict(torch.load(PATH))
    
      print(f'FOLD {fold}')
      print('--------------------------------')
      test_subsampler = torch.utils.data.SubsetRandomSampler(test_index)
    
      test_loader = torch.utils.data.DataLoader(
                        dataset,
                        batch_size=args['batch_size'],
                          sampler=test_subsampler)
    
      # Test
      test_loss, test_acc, _ = validate(test_loader, test_net, 0)
      print(f"{test_acc}")
      scores.append(test_acc)


In [None]:
from scipy import stats 
methods_results = {}

scores_np = np.array(scores)

methods_results[NET_NAME] = scores_np.copy()

inf, sup = stats.norm.interval(0.95, loc=scores_np.mean(),
                                scale=scores_np.std()/np.sqrt(len(scores_np)))

print("Mean: {} +/- {} \n Inf: {} Sup : {}".format(scores_np.mean(),scores_np.std(),inf,sup))

result_comp = {}
result_comp["ViT"] = scores_np
df_resultsA = pd.DataFrame.from_dict(result_comp)
df_resultsA.to_csv("vit_results_final.csv",index=False)

# Extrair caracteristicas com ViT

In [None]:
def extrai_caracteristicas_vit(net, loader):

  # Evaluation mode
  net.eval()
  vit = net
  feat_list, rotulo_list = [], []
  
  with torch.no_grad():
    for k, batch in enumerate(loader):
      print('\r--{0}/{1}--'.format(k, len(loader)), end='', flush=True)

      dado, rotulo = batch

      # Cast do dado na GPU
      dado = dado.to(args['device'])
      rotulo = rotulo.to(args['device'])

      # Extração
      feats = vit._process_input(dado)

    # Expand the class token to the full batch
      batch_class_token = vit.class_token.expand(dado.shape[0], -1, -1)
      feats = torch.cat([batch_class_token, feats], dim=1)

      feats = vit.encoder(feats)

    # We're only interested in the representation of the classifier token that we appended at position 0
      feats = feats[:, 0]
      feat_list.append(feats.detach().cpu().numpy())
      rotulo_list.append(rotulo.detach().cpu().numpy())

  feat_list = np.asarray(feat_list)
  feat_list = np.reshape(feat_list, (feat_list.shape[0]*feat_list.shape[1], feat_list.shape[2]))

  rotulo_list = np.asarray(rotulo_list).ravel()

  return feat_list, rotulo_list

In [None]:
extract_net = new_adapted_vit_to_extract()

with torch.no_grad():
    vit_X, vit_Y = extrai_caracteristicas_vit(extract_net, image_data_loader)

df_feats = pd.DataFrame(vit_X)
df_feats['class'] = vit_Y
df_feats

In [None]:
df_feats.to_csv("vit_features.csv",index=False)

In [None]:
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score

In [None]:
def nested_cross_validation(model,model_name,model_parameters,X, y,standardize = True):

    labels = np.unique(y)
    labels = np.sort(labels)
    
    REPEATED_CV = RepeatedStratifiedKFold(n_splits=10, n_repeats=3,random_state=18062001)
    
    scores = {
        'accuracy':[],
        'confusion_matrix': []
    }

    #labels = np.unique(y)
    #labels = np.sort(labels)

    #extern loop
    for i, (train_index, test_index) in enumerate(REPEATED_CV.split(X, y)):
        
        # Split Data in train and test
        x_train, x_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]

        #check with standardization
        if standardize == True:
            pipe_clf = Pipeline([('scaler', StandardScaler()), (model_name, model)])
        else:
            pipe_clf = Pipeline([(model_name, model)])

        # intern loop 
        clf = GridSearchCV(pipe_clf, model_parameters,cv=4)
  
        clf.fit(x_train, y_train)
  
        #test
        y_pred = clf.predict(x_test)
        acc = accuracy_score(y_test, y_pred)

        scores['accuracy'].append(acc)
        #scores['confusion_matrix'].append(confusion_matrix(y_test, y_pred,labels = labels))


    return scores

In [None]:
def test_features(features_name):

    parameters_svm = {'svm__C': [0.1,1, 10, 100], 
                'svm__gamma': [1,0.1,0.01,0.001],
                'svm__kernel': ['rbf', 'poly', 'sigmoid']}

    clf = SVC()

    df_data = pd.read_csv(features_name)

    X = df_data.loc[:, df_data.columns != 'class']
    y = df_data['class']
    
    X = X.to_numpy()
    y = y.to_numpy()

    labels = list(labels_mapping.values())

    s = nested_cross_validation(clf,'svm',parameters_svm,X,y)
    
    scores_np = np.array(s['accuracy'])
    inf, sup = stats.norm.interval(0.95, loc=scores_np.mean(),
                                  scale=scores_np.std()/np.sqrt(len(scores_np)))
    
    print("Mean: {} +/- {} \n Inf: {} Sup : {}".format(scores_np.mean(),scores_np.std(),inf,sup))

    return scores_np, s

In [None]:
test_features('vit_features.csv')