In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import re
import torch
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torch.nn as nn
from torch.nn.utils import clip_grad_norm_
import torch.optim as optim

In [2]:
# remove special characters (& \n) from category/sub category/type
def splitcategory(x):
    if isinstance(x, list):
        return x
    else:
        if isinstance(x, str):
            res = re.split('& |, |\*|\n', x)
            return res
        else:
            return ''

# function to change characters to lower case
def cleaner(x):
    if isinstance(x, list):
        return [str.lower(i.replace(" ", "")) for i in x]
    else:
        if isinstance(x, str):
            return str.lower(x.replace(" ", ""))
        else:
            return ''

# change text to lower case
def changeCase(x):
    return str.lower(x)

# merge all text columns
def couple(x):
    return x['zzibrnd'] + ' ' + ' '.join(x['TherapeuticClass_new']) + ' '.join( x['PrincipalName_new']) + ' ' + x['Description'] 

def preprocess_text(question):
    question = question.replace("S08_", "")
    question = question.replace("NOTTTT  FOUND", "")
    question = re.sub("([\(\[]).*?([\)\]])", "", question) # remove text between brackets
    question = re.sub("[^\w\s]", "", question) # remove punctuations
    re.sub("\s+", " ", question) # remove multiple white spaces
    re.sub("[\t\n]", "", question) # remove tabs and newline characters
    re.sub("[_]", "", question) # remove tabs and newline characters
    question = question.lower().strip()
    return question

In [3]:
df = pd.read_csv('https://raw.githubusercontent.com/rkumar-bengaluru/data-science/main/16-Projects/zuel/data/2001_all_materials.csv')
df['zzibrnd'] = df['zzibrnd'].astype(str)
df['TherapeuticClass'] = df['TherapeuticClass'].astype(str)
df['PrincipalName'] = df['PrincipalName'].astype(str)
df['Description'] = df['Description'].astype(str)
# remove special characters
df['zzibrnd_new'] = df['zzibrnd'].apply(splitcategory)
df['TherapeuticClass_new'] = df['TherapeuticClass'].apply(splitcategory)
df['PrincipalName_new'] = df['PrincipalName'].apply(splitcategory)


# mix all text columns
df['soup'] = df.apply(couple, axis=1)
df['soup'] = df['soup'].apply(changeCase)
df['soup'] = df['soup'].apply(preprocess_text)
text = df['soup']

vocab = sorted(set("".join(text)))
sos_token = '['
eos_token = ']'
BATCH_FIRST=True
BATCH_SIZE=2
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
class QuestionsDataset(Dataset):
    def __init__(self, questions, vocab, sos_token, eos_token, batch_first=False):
        
        # initialize parameters
        self.sos_idx = 0
        self.eos_idx = 1
        self.int2char = {self.sos_idx: sos_token, self.eos_idx: eos_token} # insert start of sentence and end of sentence tokens
        self.int2char.update({idx: char for idx, char in enumerate(vocab, start=self.eos_idx+1)})
        self.char2int = {char: idx for idx, char in self.int2char.items()}
        self.n_chars = len(self.int2char)
        
        # encode and pad questions
        self.questions_encoded = pad_sequence([self.encode_question(q) for q in questions], \
                                              batch_first=batch_first)
                
    def __len__(self):
        return len(self.questions_encoded)
    
    def __getitem__(self, idx):
        return self.questions_encoded[idx]
        
    def encode_question(self, question):
        '''
        encode question as char indices and perform one-hot encoding
        '''
        question_encoded = [self.sos_idx] # append sos
        for char in question:
            question_encoded.append(self.char2int[char])
        question_encoded.append(self.eos_idx) # append eos
        return F.one_hot(torch.tensor(question_encoded, dtype=torch.long), self.n_chars).float()

In [5]:
class charRNN(nn.Module):
    
    def __init__(self, VOCAB_SIZE, HIDDEN_SIZE, N_LAYERS=2, P_DROPOUT=0.5, batch_first=False):
        super().__init__()
        self.HIDDEN_SIZE = HIDDEN_SIZE
        self.N_LAYERS = N_LAYERS
        self.lstm = nn.LSTM(VOCAB_SIZE, HIDDEN_SIZE, batch_first=batch_first, 
                            dropout=P_DROPOUT, num_layers=N_LAYERS)
        self.dropout = nn.Dropout(P_DROPOUT)
        self.fc = nn.Linear(HIDDEN_SIZE, VOCAB_SIZE)
        
    def forward(self, inputs, hidden):
        lstm_out, hidden = self.lstm(inputs, hidden)
        
        # flatten the lstm output
        lstm_out = torch.flatten(lstm_out, start_dim=0, end_dim=1)
        
        out = self.dropout(lstm_out)
        out = self.fc(out)
        
        return out, hidden
    
    def init_hidden(self, BATCH_SIZE, device):
        hidden = (torch.zeros((self.N_LAYERS, BATCH_SIZE, self.HIDDEN_SIZE), dtype=torch.float32).to(device),
                  torch.zeros((self.N_LAYERS, BATCH_SIZE, self.HIDDEN_SIZE), dtype=torch.float32).to(device))
        return hidden

In [6]:
vocab = sorted(set("".join(text)))
sos_token = '['
eos_token = ']'
BATCH_FIRST=True
BATCH_SIZE=2
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [7]:
questions_train =text[:]
train_dataset = QuestionsDataset(questions_train, vocab, sos_token, eos_token, batch_first=BATCH_FIRST)

In [8]:
VOCAB_SIZE=train_dataset.n_chars
HIDDEN_SIZE=512
N_LAYERS=3
P_DROPOUT = 0.4
model = charRNN(VOCAB_SIZE, HIDDEN_SIZE, N_LAYERS, P_DROPOUT, BATCH_FIRST)
model.to(device=device)

charRNN(
  (lstm): LSTM(44, 512, num_layers=3, batch_first=True, dropout=0.4)
  (dropout): Dropout(p=0.4, inplace=False)
  (fc): Linear(in_features=512, out_features=44, bias=True)
)

In [9]:
model.load_state_dict(torch.load('charRNN_questions_epoch_100.pt',map_location=torch.device('cpu')))
model.eval()

charRNN(
  (lstm): LSTM(44, 512, num_layers=3, batch_first=True, dropout=0.4)
  (dropout): Dropout(p=0.4, inplace=False)
  (fc): Linear(in_features=512, out_features=44, bias=True)
)

In [10]:
class GenerateText:
    def __init__(self, model, k, int2char, char2int, device):
        self.int2char = int2char
        self.char2int = char2int
        self.n_chars = len(int2char)
        self.model = model
        self.device = device
        self.k = k
        self.sos_token = self.int2char[0]
        self.eos_token = self.int2char[1]
        
    def predict_next_char(self, hidden, input_char):
        
        # encode char
        char_one_hot = self.encode_char(input_char)

        # get the predictions
        with torch.no_grad():
            out, hidden = self.model(char_one_hot, hidden)
            
            # convert the output to a character probability distribution
            p = F.softmax(out, dim=1)

            # move to cpu as numpy doesn't support gpu
            p = p.cpu()

            # get top k characters from the distribution
            values, indices = p.topk(self.k)

        indices = indices.squeeze().numpy()
        values = values.squeeze().numpy()

        # sample any char from the top k chars using the output softmax distribution
        char_pred = np.random.choice(indices, size=1, p=values/values.sum())

        return self.int2char[char_pred[0]], hidden
    
    def generate_text(self, prime, max_chars=20):
        
        prime = self.sos_token + prime

        all_chars = [char for char in prime]
        print(all_chars)
        hidden = model.init_hidden(1, self.device)

        # build up the hidden state using the initial prime
        for char in prime:
            char_pred, hidden = self.predict_next_char(hidden, char)

        all_chars.append(char_pred)
        print(all_chars)

        # generate n chars
        c = len(all_chars)
        print(c)
        while char_pred != self.eos_token:
            if c == max_chars:
                break
            char_pred, hidden = self.predict_next_char(hidden, all_chars[-1])
            all_chars.append(char_pred)
            c += 1

        return "".join(all_chars)
        
    def encode_char(self, char):
        char_int = self.char2int[char]
        char_one_hot = F.one_hot(torch.tensor(char_int), self.n_chars).float()
        return char_one_hot.unsqueeze(0).unsqueeze(0).to(self.device)

In [12]:
k = 4
text_generator = GenerateText(model, k, train_dataset.int2char, train_dataset.char2int, device)
response = text_generator.generate_text('so', max_chars=100)
response

['[', 's', 'o']
['[', 's', 'o', 'l']
4


'[solaray chromiacin t other mineral supplementsgroway sdn bhd  solaray par iliancitacsplus  20s2tabs'