In [1]:
# ref: https://www.analyticsvidhya.com/blog/2020/01/first-text-classification-in-pytorch/

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
#%cd /content/drive/My Drive/Case Presentation 1
%cd /content/drive/Shared drives/數位醫學/Case Presentation 1

/content/drive/.shortcut-targets-by-id/1403v7Kv9G42MNoj5JOfYxijc9VTDmStt/Case Presentation 1


In [4]:
import torch
#!pip install torchtext
from torchtext import data

In [5]:
torch.manual_seed(2020);
torch.backends.cudnn.deterministic = True

In [6]:
#!pip install spacy
# !python -m spacy download en
import spacy
TEXT = data.Field(tokenize='spacy',batch_first=True,include_lengths=True)
LABEL = data.LabelField(dtype = torch.float,batch_first=True)

In [7]:
fields = [(None, None), ('text',TEXT),('label', LABEL)]

In [8]:
train_data=data.TabularDataset(path = 'train_form.csv',format = 'csv',fields = fields,skip_header = True)
valid_data=data.TabularDataset(path = 'test_form.csv',format = 'csv',fields = fields,skip_header = True)
print(vars(train_data.examples[17]))
print(vars(valid_data.examples[17]))

{'text': ['The', 'patient', 'denies', 'tobacco', 'and', '/', 'or', 'alcohol', 'use', '.'], 'label': '1'}
{'text': ['She', 'does', 'not', 'smoke', ',', 'does', 'not', 'use', 'birth', 'control', 'pills', ',', 'does', 'not', 'use', 'drugs', '.'], 'label': '1'}


In [9]:
#import random
#train_data, valid_data = training_data.split(split_ratio=0.7, random_state = random.seed(2023))

In [10]:
#initialize glove embeddings
TEXT.build_vocab(train_data,min_freq=3,vectors = "glove.6B.100d")  
LABEL.build_vocab(train_data)

#No. of unique tokens in text
print("Size of TEXT vocabulary:",len(TEXT.vocab))

#No. of unique tokens in label
print("Size of LABEL vocabulary:",len(LABEL.vocab))

#Commonly used words
print(TEXT.vocab.freqs.most_common(150))  

#Word dictionary
print(TEXT.vocab.stoi) 
#print(LABEL.vocab.stoi)
inv_label_map = {v: k for k, v in LABEL.vocab.stoi.items()}
#print(inv_label_map)

Size of TEXT vocabulary: 30
Size of LABEL vocabulary: 4
[(',', 26), ('.', 22), ('unknown', 13), ('use', 11), ('tobacco', 10), ('and', 9), ('alcohol', 8), ('of', 7), ('/', 7), ('The', 6), ('patient', 6), ('history', 6), ('smoking', 6), ('She', 5), ('a', 4), ('-', 4), ('or', 4), ('denies', 4), ('quit', 4), ('in', 4), ('yo', 3), ('smoker', 3), (';', 3), ('does', 3), ('not', 3), ('smoke', 3), ('no', 3), ('drug', 3), ('smokes', 2), ('two', 2), ('per', 2), ('day', 2), ('has', 2), ('an', 2), ('approximately', 2), ('year', 2), ('pack', 2), ('years', 2), ('Conditions', 2), ('Infections', 2), ('Complications', 2), ('affecting', 2), ('Treatment', 2), ('Stay', 2), ('o', 2), ('with', 2), ('HTN', 2), ('chest', 2), ('hx', 2), ('hyperlipidemia', 2), ('Patient', 2), ('any', 2), ('as', 2), ('the', 2), ('past', 2), ('former', 2), ('for', 2), ('packs', 1), ('75-pack', 1), ('one', 1), ('x45', 1), ('crack', 1), ('schizoaffective', 1), ('d', 1), ('53', 1), ('M', 1), ('h', 1), ('hyperchol', 1), ('FH', 1), ('h

In [11]:
#check whether cuda is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  

#set batch size
BATCH_SIZE = 8

#Load an iterator
train_iterator, valid_iterator = data.BucketIterator.splits(
    (train_data, valid_data), 
    batch_size = BATCH_SIZE,
    sort_key = lambda x: len(x.text),
    sort_within_batch=True,
    device = device)
device

device(type='cuda')

In [12]:
import torch.nn as nn

class classifier(nn.Module):
    
    #define all the layers used in model
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, 
                 bidirectional, dropout):
        
        #Constructor
        super().__init__()          
        
        #embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        #lstm layer
        self.lstm = nn.LSTM(embedding_dim, 
                           hidden_dim, 
                           num_layers=n_layers, 
                           bidirectional=bidirectional, 
                           dropout=dropout,
                           batch_first=True)
        
        #dense layer
        self.fc = nn.Linear(hidden_dim * 2, output_dim)

        #activation function
        self.act = nn.Sigmoid()
        
    def forward(self, text, text_lengths):
        
        #text = [batch size,sent_length]
        self.embedding = self.embedding
        embedded = self.embedding(text)
        #embedded = [batch size, sent_len, emb dim]
      
        #packed sequence
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths,batch_first=True)
        
        packed_output, (hidden, cell) = self.lstm(packed_embedded)
        #hidden = [batch size, num layers * num directions,hid dim]
        #cell = [batch size, num layers * num directions,hid dim]
        
        #concat the final forward and backward hidden state
        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)
                
        #hidden = [batch size, hid dim * num directions]
        dense_outputs=self.fc(hidden)

        #Final activation function
        outputs=self.act(dense_outputs)
        
        return outputs

In [13]:
#define hyperparameters
size_of_vocab = len(TEXT.vocab)
embedding_dim = 100
num_hidden_nodes = 32
num_output_nodes = 4
num_layers = 2
bidirection = True
dropout = 0.2

#instantiate the model
model = classifier(size_of_vocab, embedding_dim, num_hidden_nodes,num_output_nodes, num_layers, 
                   bidirectional = True, dropout = dropout)

In [14]:
#architecture
print(model)

#No. of trianable parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)
    
print(f'The model has {count_parameters(model):,} trainable parameters')

#Initialize the pretrained embedding
pretrained_embeddings = TEXT.vocab.vectors
model.embedding.weight.data.copy_(pretrained_embeddings)

print(pretrained_embeddings.shape)

classifier(
  (embedding): Embedding(30, 100)
  (lstm): LSTM(100, 32, num_layers=2, batch_first=True, dropout=0.2, bidirectional=True)
  (fc): Linear(in_features=64, out_features=4, bias=True)
  (act): Sigmoid()
)
The model has 62,652 trainable parameters
torch.Size([30, 100])


In [15]:
import torch.optim as optim

#define optimizer and loss
optimizer = optim.Adam(model.parameters())
#optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9)
#criterion = nn.BCELoss()
criterion = nn.CrossEntropyLoss()

#define metric
def binary_accuracy(preds, y):
    #round predictions to the closest integer
    #rounded_preds = torch.round(preds)
    
    #correct = (rounded_preds == y).float()
    correct = (preds == y).float() 
    acc = correct.sum() / len(correct)
    return acc
    
#push to cuda if available
model = model.to(device)
criterion = criterion.to(device)

In [16]:
def train(model, iterator, optimizer, criterion):
    
    #initialize every epoch 
    epoch_loss = 0
    epoch_acc = 0
    
    #set the model in training phase
    model.train()  
    
    for batch in iterator:
        
        #resets the gradients after every batch
        optimizer.zero_grad()   
        
        #retrieve text and no. of words
        text, text_lengths = batch.text   
        text_lengths = text_lengths.cuda()
            
        #convert to 1D tensor
        #predictions = model(text, text_lengths).squeeze()
        
        predictions = model(text, text_lengths).squeeze() 

        #predictions = model(text, text_lengths)
        #predictions = torch.max(predictions, 1)[1]
        #predictions = predictions.type(torch.FloatTensor)
        #predictions = predictions.cuda()
        #predictions = predictions.requires_grad_()

        #compute the loss
        loss = criterion(predictions, batch.label.type(torch.LongTensor).cuda())
        
        #compute the binary accuracy
        acc = binary_accuracy(torch.max(predictions, 1)[1].type(torch.FloatTensor), batch.label.cpu())   
        
        #backpropage the loss and compute the gradients
        loss.backward()       
        
        #update the weights
        optimizer.step()      
        
        #loss and accuracy
        epoch_loss += loss.item()  
        epoch_acc += acc.item()    
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [17]:
def evaluate(model, iterator, criterion):
    
    #initialize every epoch
    epoch_loss = 0
    epoch_acc = 0

    #deactivating dropout layers
    model.eval()
    
    #deactivates autograd
    with torch.no_grad():
    
        for batch in iterator:
        
            #retrieve text and no. of words
            text, text_lengths = batch.text
            text = text.cuda()
            text_lengths = text_lengths.cuda()

            #convert to 1d tensor
            #predictions = model(text, text_lengths).squeeze()

            predictions = model(text, text_lengths).squeeze()  

            #predictions = model(text, text_lengths)
            #predictions = torch.max(predictions, 1)[1]
            #predictions = predictions.type(torch.FloatTensor) 
            #predictions = predictions.cuda()
            #predictions = predictions.requires_grad_()

            #compute loss and accuracy
            loss = criterion(predictions, batch.label.type(torch.LongTensor).cuda())
            print(torch.max(predictions, 1)[1].type(torch.FloatTensor), batch.label.cpu())
            acc = binary_accuracy(torch.max(predictions, 1)[1].type(torch.FloatTensor), batch.label.cpu())

            #keep track of loss and accuracy
            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [18]:
N_EPOCHS = 10
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
     
    #train the model
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    
    #evaluate the model
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
    
    #save the best model
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'saved_weights.pt')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')

tensor([3., 3., 3., 3., 3., 3., 3., 3.]) tensor([0., 0., 0., 0., 0., 0., 0., 0.])
tensor([3., 3., 3., 3., 3., 3., 3., 3.]) tensor([3., 0., 0., 0., 0., 0., 0., 0.])
tensor([3., 1., 3., 2., 3., 2., 2., 3.]) tensor([1., 1., 3., 2., 1., 2., 2., 1.])
tensor([2., 3., 2., 2., 3., 3., 3., 3.]) tensor([2., 3., 1., 2., 1., 3., 1., 1.])
tensor([3., 3., 3., 3., 3., 3., 3., 3.]) tensor([1., 1., 1., 1., 1., 1., 1., 1.])
0	Train Loss: 1.389 | Train Acc: 17.86%
	 Val. Loss: 1.385 |  Val. Acc: 25.00%
tensor([1., 1., 1., 1., 1., 1., 1., 1.]) tensor([0., 0., 0., 0., 0., 0., 0., 0.])
tensor([3., 1., 1., 1., 1., 1., 1., 1.]) tensor([3., 0., 0., 0., 0., 0., 0., 0.])
tensor([1., 1., 1., 2., 1., 2., 2., 1.]) tensor([1., 1., 3., 2., 1., 2., 2., 1.])
tensor([2., 1., 2., 2., 1., 2., 2., 2.]) tensor([2., 3., 1., 2., 1., 3., 1., 1.])
tensor([1., 1., 1., 1., 3., 1., 3., 2.]) tensor([1., 1., 1., 1., 1., 1., 1., 1.])
1	Train Loss: 1.379 | Train Acc: 35.71%
	 Val. Loss: 1.378 |  Val. Acc: 40.00%
tensor([1., 1., 1., 1.

In [19]:
#load weights
path='./saved_weights.pt'
model.load_state_dict(torch.load(path));
model.eval();

#inference 
import spacy
nlp = spacy.load('en')

def predict(model, inv_label_map, sentence):
    tokenized = [tok.text for tok in nlp.tokenizer(sentence)]  #tokenize the sentence 
    indexed = [TEXT.vocab.stoi[t] for t in tokenized]          #convert to integer sequence
    length = [len(indexed)]                                    #compute no. of words
    tensor = torch.LongTensor(indexed).to(device)              #convert to tensor
    tensor = tensor.unsqueeze(1).T                             #reshape in form of batch,no. of words
    length_tensor = torch.LongTensor(length)                   #convert to tensor
    prediction = model(tensor, length_tensor)                  #prediction 
    pred_label = torch.max(prediction, 1)[1].type(torch.FloatTensor)
    real_label = inv_label_map[pred_label[0].item()]
    return real_label

In [37]:

'''
0:cur
1:non
2:past
3:unknown
'''
real_label_to_word = {0 : "current_smoker", 1 : "non_smoker", 2 : "past_smoker", 3 : "unknown"}
import csv

result_label=open("./result_label.txt",'w+') 
ans=open("./test_data_answer.txt",'w+') 
#count = 0 
#open csv file 
with open('./test_form.csv', newline='') as csvfile:
#reading csv file
  rows = csv.reader(csvfile)
  for row in rows:
    #print(row[1])
    if row[1]!="plain_txt":
      print(predict(model, inv_label_map, row[1])+"  real label:" + str(row[2])+" text:"+row[1], file=result_label)
      print(predict(model, inv_label_map, row[1]), file=ans)
      #count=count+1

print(count)

csvfile.close()
result_label.close()
ans.close()
  
# print(predict(model, inv_label_map, "41 yo man with CRFs of DM Type II , high cholesterol , smoking history , family hx , HTN p / w episodes of atypical CP x 1 week , with rest and exertion ."))

# print(predict(model, inv_label_map, "She denies smoking or alcohol ."))

# print(predict(model, inv_label_map, "She is a former smoker of one pack per day of cigarettes for 63 years ."))

# print(predict(model, inv_label_map, "unknown"))

40
