In [None]:
# Libraries
import matplotlib.pyplot as plt
import pandas as pd
import torch
# Preliminaries
from torchtext.legacy import data
# Models
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
# Training
import torch.optim as optim
# Evaluation
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import seaborn as sns

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!nvidia-smi

Tue Oct 19 12:44:19 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 470.74       Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla K80           Off  | 00000000:00:04.0 Off |                    0 |
| N/A   52C    P8    31W / 149W |      0MiB / 11441MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

# Set Random Seed

In [None]:
#Reproducing same results
SEED = 2021
#Torch
torch.manual_seed(SEED)
#Cuda algorithms
torch.backends.cudnn.deterministic = True  

# Change Your Path Here

In [None]:
root_dir = 'drive/MyDrive/your/root/path/'
data_dir = root_dir + 'Case Presentation 1 Data/'
#check whether cuda is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#set batch size
BATCH_SIZE = 32

In [None]:
# Fields
text_field = data.Field(tokenize='spacy', lower=True, include_lengths=True, batch_first=True)
label_field = data.Field(sequential=False, use_vocab=False, batch_first=True, dtype=torch.float)
fields = [('text', text_field), ('label', label_field)]
# TabularDataset
train, valid, test = data.TabularDataset.splits(path=data_dir, train='train.csv', validation='test.csv', test='valid.csv', format='CSV', fields=fields, skip_header=True)
# Iterators
train_iterator = data.BucketIterator(train, batch_size=BATCH_SIZE, sort_key=lambda x: len(x.text),
                            device=device, sort=True, sort_within_batch=True)
valid_iterator = data.BucketIterator(valid, batch_size=BATCH_SIZE, sort_key=lambda x: len(x.text),
                            device=device, sort=True, sort_within_batch=True)
test_iterator = data.BucketIterator(test, batch_size=BATCH_SIZE, sort_key=lambda x: len(x.text),
                            device=device, sort=True, sort_within_batch=True)
# Vocabulary
text_field.build_vocab(train, min_freq=3)

# Model

In [None]:
class classifier(nn.Module):
    
  #define all the layers used in model
  def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers,
               bidirectional, dropout):
    #Constructor
    super().__init__()          
    
    #embedding layer
    self.embedding = nn.Embedding(vocab_size, embedding_dim)
    
    #lstm layer
    self.lstm = nn.LSTM(embedding_dim,
                        hidden_dim, 
                        num_layers=n_layers, 
                        bidirectional=bidirectional, 
                        dropout=dropout,
                        batch_first=True)
    
    #dense layer
    self.fc = nn.Linear(hidden_dim * 2, output_dim)
    
    #activation function
    self.act = nn.Sigmoid()

  def forward(self, text, text_lengths):

    #text = [batch size,sent_length]
    embedded = self.embedding(text)
    #embedded = [batch size, sent_len, emb dim]
  
    #packed sequence
    packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths.cpu(), batch_first=True)
    
    packed_output, (hidden, cell) = self.lstm(packed_embedded)
    #hidden = [batch size, num layers * num directions,hid dim]
    #cell = [batch size, num layers * num directions,hid dim]
    
    #concat the final forward and backward hidden state
    hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)
            
    #hidden = [batch size, hid dim * num directions]
    dense_outputs=self.fc(hidden)

    #Final activation function
    outputs=self.act(dense_outputs)
    
    return outputs

In [None]:
#define hyperparameters
size_of_vocab = len(text_field.vocab)
embedding_dim = 100
num_hidden_nodes = 32
num_output_nodes = 1
num_layers = 2
bidirection = True
dropout = 0.2

#instantiate the model
model = classifier(size_of_vocab, embedding_dim, num_hidden_nodes,num_output_nodes, num_layers,
                   bidirectional=True, dropout=dropout)

In [None]:
#architecture
print(model)

#No. of trianable parameters
def count_parameters(model):
  return sum(p.numel() for p in model.parameters() if p.requires_grad)
    
print('The model has {} trainable parameters'.format(count_parameters(model)))

# Loss & Optimizer

In [None]:
import torch.optim as optim

#define optimizer and loss
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.BCELoss()

#define metric
def binary_accuracy(preds, y):
    #round predictions to the closest integer
    rounded_preds = torch.round(preds)
    
    correct = (rounded_preds == y).float() 
    acc = correct.sum() / len(correct)
    return acc
    
#push to cuda if available
model = model.to(device)
criterion = criterion.to(device)

# TRAIN

In [None]:
def train(model, iterator, optimizer, criterion):
  #initialize every epoch
  epoch_loss = 0
  epoch_acc = 0

  #set the model in training phase
  model.train()

  for batch in iterator:
    #resets the gradients after every batch
    optimizer.zero_grad()

    #retrieve text and no. of words
    text, text_lengths = batch.text

    #convert to 1D tensor
    predictions = model(text, text_lengths).squeeze()

    #compute the loss
    loss = criterion(predictions, batch.label)

    #compute the binary accuracy
    acc = binary_accuracy(predictions, batch.label)

    #backpropage the loss and compute the gradients
    loss.backward()

    #update the weights
    optimizer.step()

    #loss and accuracy
    epoch_loss += loss.item()
    epoch_acc += acc.item()

  return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
def evaluate(model, iterator, criterion):
  #initialize every epoch
  epoch_loss = 0
  epoch_acc = 0

  #deactivating dropout layers
  model.eval()

  #deactivates autograd
  with torch.no_grad():
    for batch in iterator:
      #retrieve text and no. of words
      text, text_lengths = batch.text

      #convert to 1d tensor
      predictions = model(text, text_lengths).squeeze()

      #compute loss and accuracy
      loss = criterion(predictions, batch.label)
      acc = binary_accuracy(predictions, batch.label)

      #keep track of loss and accuracy
      epoch_loss += loss.item()
      epoch_acc += acc.item()

  return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
# Save and Load Functions
def save_checkpoint(save_path, model, optimizer, valid_loss):
  if save_path == None:
      return

  state_dict = {'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'valid_loss': valid_loss}

  torch.save(state_dict, save_path)
  print(f'Model saved to ==> {save_path}')

def load_checkpoint(load_path, model, optimizer):

  if load_path==None:
      return

  state_dict = torch.load(load_path, map_location=device)
  print(f'Model loaded from <== {load_path}')

  model.load_state_dict(state_dict['model_state_dict'])
  optimizer.load_state_dict(state_dict['optimizer_state_dict'])

  return state_dict['valid_loss']

In [None]:
N_EPOCHS = 50
ckpt_dir = root_dir + 'ckpt/'
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
  print('Epoch: {}'.format(epoch+1))
  #train the model
  train_loss, train_acc = train(model, train_iterator, optimizer, criterion)

  #evaluate the model
  valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)

  #save the best model
  if valid_loss < best_valid_loss:
    best_valid_loss = valid_loss
    #save_checkpoint(ckpt_dir + 'weights.pt', model, optimizer, best_valid_loss)
    save_checkpoint(ckpt_dir + 'weights.pt', model, optimizer, best_valid_loss)

  print('\tTrain Loss: {:.3f} | Train Acc: {:.2f}%'.format(train_loss*100, train_acc*100))
  print('\t Val. Loss: {:.3f} |  Val. Acc: {:.2f}%'.format(valid_loss*100, valid_acc*100))

# TEST

In [None]:
#instantiate the model
test_model = classifier(size_of_vocab, embedding_dim, num_hidden_nodes,num_output_nodes, num_layers,
                   bidirectional=True, dropout=dropout)
#load weights
optimizer = optim.Adam(model.parameters(), lr=0.001)
#load_checkpoint(ckpt_dir + 'weights.pt', test_model, optimizer)
load_checkpoint(ckpt_dir + 'weights.pt', test_model, optimizer)
test_model = test_model.to(device)

In [None]:
def test(model, iterator):
  model.eval();

  with torch.no_grad():
    ID, pred = [], []
    for batch in iterator:
      text, text_lengths = batch.text

      predictions = model(text, text_lengths).squeeze()
      predictions = torch.round(predictions)
      predictions = predictions.cpu().numpy()

      for i in batch.label:
        id = i.cpu().numpy()
        id = int(id)
        id = 'ID_' + str(id) +'.txt'
        ID.append(id)
      for i in predictions:
        i = int(i)
        if i == 1:
          pred.append(0)
        else:
          pred.append(1)
  return ID, pred

In [None]:
test_ID, test_pred = test(test_model, test_iterator)

temp = list(zip(test_ID, test_pred))
temp.sort()
test_ID, test_pred = zip(*temp)

In [None]:
def LISTtoCSV(id, pred, dir_path, filename):
  data_dict = {"Filename": id, "Obesity": pred}

  # 轉為dataframe再透過pandas轉成csv檔
  result_df = pd.DataFrame(data_dict)
  result_df.to_csv(dir_path+'{}.csv'.format(filename), index=False)
  print(result_df)

In [None]:
LISTtoCSV(test_ID, test_pred, root_dir, 'result')

# Draw Confusion Matrix

In [None]:
def draw_confusion_matrix(model, iterator):
  model.eval()

  with torch.no_grad():
    pred, label = [], []
    for batch in iterator:
      text, text_lengths = batch.text

      predictions = model(text, text_lengths).squeeze()
      predictions = torch.round(predictions)

      pred.extend(predictions.tolist())
      label.extend(batch.label.tolist())

    print('Classification Report:')
    print(classification_report(label, pred, labels=[1,0], digits=4))
    
    cm = confusion_matrix(label, pred, labels=[1,0])
    ax = plt.subplot()
    sns.heatmap(cm, annot=True, ax=ax, cmap='Blues', fmt="d")

    ax.set_title('Confusion Matrix')

    ax.set_xlabel('Predictions')
    ax.set_ylabel('Ground-Truth')

    ax.xaxis.set_ticklabels(['NO', 'YES'])
    ax.yaxis.set_ticklabels(['NO', 'YES'])

In [None]:
draw_confusion_matrix(test_model, valid_iterator)