In [1]:
from collections import Counter
import heapq
import numpy
import pandas as pd

In [2]:
import torch
from transformers import AutoModelForCausalLM , AutoTokenizer
import scipy
import numpy as np
class LMHeadModel:

    def __init__(self, model_name):
        # Initialize the model and the tokenizer.
        self.model = AutoModelForCausalLM.from_pretrained(model_name)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    def get_predictions(self, sentence):
        # Encode the sentence using the tokenizer and return the model predictions.
        inputs = self.tokenizer.encode(sentence, return_tensors="pt")
        with torch.no_grad():
            outputs = self.model(inputs)
            predictions = outputs[0]
        return predictions
    
    def get_next_word_probabilities(self, sentence, top_k=500):

        # Get the model predictions for the sentence.
        predictions = self.get_predictions(sentence)
        #print(predictions)
        
    
        # Get the next token candidates.
        next_token_candidates_tensor = predictions[0, -1, :]

        # Get the top k next token candidates.
        length = len(next_token_candidates_tensor)
        topk_candidates_indexes = torch.topk(
            next_token_candidates_tensor, length).indices.tolist()
        
        
        #printing 1st token tensor
        # print(next_token_candidates_tensor[0])

        
        #printing 1st token tensor in sorted arr
        # next_token_candidates_sort = torch.sort(next_token_candidates_tensor)
        # print(next_token_candidates_sort[1])


        # Get the token probabilities for all candidates.
        all_candidates_probabilities = torch.nn.functional.softmax(
            next_token_candidates_tensor, dim=-1)
        
        # all_candidates_prob_sorted = torch.nn.functional.softmax(next_token_candidates_sort,dim = -1)
        
        # Filter the token probabilities for the top k candidates.
        topk_candidates_probabilities = \
            all_candidates_probabilities[topk_candidates_indexes].tolist()

        # Decode the top k candidates back to words.
        topk_candidates_tokens = \
            [self.tokenizer.decode([idx]).strip() for idx in topk_candidates_indexes]#topk_candidates_indexes]

        #Return the top k candidates and their probabilities.
        return list(zip(topk_candidates_tokens, topk_candidates_probabilities))
      
        # output=list(zip(next_token_candidates_tensor,all_candidates_probabilities))
        # return output




  from .autonotebook import tqdm as notebook_tqdm


In [3]:

def VITERBI_Lists(state_transition_probmat, initial_state_prob):



    viterbi_mat = []
    backpointer = []
    viterbi_1stLayer = []
    for i in range(len(initial_state_prob)):
        viterbi_1stLayer.append(float(initial_state_prob[i]))
    viterbi_mat.append(viterbi_1stLayer)

    for time_step in range(len(state_transition_probmat)):
        viterbi_layer = []
        backpointer_layer = []
        for state in range(len(state_transition_probmat[time_step])):
            iteration_vec = [viterbi_mat[time_step][i]*state_transition_probmat[time_step][state][i] for i in range(len(viterbi_mat[time_step]))]

            maxval = max(iteration_vec)
            maxind = iteration_vec.index(maxval)
            viterbi_layer.append(maxval)
            # max_index = max(range(len(state_vec)), key=lambda i: state_vec[i])
            backpointer_layer.append(maxind)
          
        viterbi_mat.append(viterbi_layer)
        backpointer.append(backpointer_layer)
    
    best_path_prob = max(viterbi_mat[-1])
    # max_index = max(range(len(viterbi_mat[-1])), key = lambda i: viterbi_mat[-1][i])
    max_index = viterbi_mat[-1].index(best_path_prob)
    best_backpointer = max_index
    best_path = [best_backpointer]
    j = 0
    for i in reversed(range(len(state_transition_probmat))):
        best_path.append(backpointer[i][best_path[j]])
        j += 1
    best_path = best_path[::-1]
    return best_path, viterbi_mat


In [4]:
class SearchTree:
    def __init__(self,context,probability,parent = None,child = None):
        self.context = context
        self.probability = probability
        self.parent = parent
        self.child = []
        if child is not None:
           self.child.append(child)
    def build_Context(self):
      
        context_list = []
        node = self
        while node.parent is not None:
           
            context_list.append(node.context)   
            node = node.parent
        context_list.append(node.context)
        context_list.reverse()
        formatted_contextList = []
        for i in range(len(context_list)):
            if context_list[i] in ['.',':',',','?','!',';']:
                if (i-1>= 0):
                    formatted_contextList[i-1] += context_list[i]
            else:
                formatted_contextList.append(context_list[i])
        return ' '.join(formatted_contextList)
    
    def create_child(self):
        if self.parent is not None:
           self.parent.child.append(self)

In [5]:
def most_frequent(List):
    return max(set(List), key=List.count)

def sort_with_indices(arr):
  """Sorts a list and returns the original indices of the sorted elements."""
  indices = list(range(len(arr)))
  indices.sort(key=lambda i: arr[i], reverse = True)
  return indices

def find_overlap_children(arr):
  "Find all the children of top two tokens and return their overlap"
  children1 = {child.context for child in arr[0].child}
  children2 = {child.context for child in arr[1].child}
  common = len(children1.intersection(children2))
  return common / len(arr[0].child) if arr[0].child else 0



In [6]:
# Now, have the probability matrix ready in which one list contains the probability to reach that state from previous list of tokens
#To make the probs ready find the unique tokens and then number them/store in a list and then find top 3 tokens given those tokens, find unique
#tokens and then extract probs of getting those from the previous list. [state transition probmat] and then run viterbi!!

def findProbability(InitialToken,FinalToken,model):
    context = InitialToken.build_Context()
    tokens_50K = model.get_next_word_probabilities(context)
    for token,prob in tokens_50K:
        if token == FinalToken.context:
            return prob


In [7]:
def decodePath(best_path,unique_tokens_list,root_string):
    resultant_string = ''
    for i in range(len(best_path)):
        resultant_string = resultant_string + ' '+ unique_tokens_list[i][best_path[i]]
    return root_string+resultant_string

 

In [8]:
def generateIntermediates(root,numTokens = 3, loop_runner = 4):
  sentence = SearchTree(root,1)
  context = []
  # root = Node("I enjoy walking in the", prob = 1)\
  prob_list = []
  num_tokens = numTokens
  content = []
  probability = []
  model = LMHeadModel("gpt2")
  tokens_50K = model.get_next_word_probabilities(sentence.context,num_tokens)
  children = []
  overlap = []
  most_common = []
  #unique_elements = []   # to store unique elements at each iteration
  unique_tokens = set()
  probabilityMatrix = []
  uniqueTokensList = []
  new_content = []
  for i in range(num_tokens):
    context = tokens_50K[i][0]
    unique_tokens.add(context)
    new_content.append(context)
    prob = tokens_50K[i][1]
    probability.append(prob)
    context = SearchTree(context,prob,sentence)
    context.create_child()
    uniqueTokensList.append(context)
    children.append(context)

  content.append(new_content)
  previousUniqueLength = num_tokens
  #unique_elements.append(unique_tokens)
  initialStateProbability = probability

  for i in range(2,loop_runner):
    unique_tokens = set()
    probability = []
    new_content = []
    previousSetLength = 0
    for j in range(len(children)):
      token_list = model.get_next_word_probabilities(children[j].build_Context(),num_tokens)
      for s in range(num_tokens):
        context = token_list[s][0]
        prob = token_list[s][1]
        unique_tokens.add(context)
      
        #probability.append(prob)
        context = SearchTree(context,prob,children[j])
        context.create_child()
        if (len(unique_tokens)>previousSetLength):
          previousSetLength = len(unique_tokens)
          uniqueTokensList.append(context)
          new_content.append(context.context)

        children.append(context)    # may be don't need to store everything rather I can just store unique elements in here
    
    #unique_elements.append(unique_tokens) # append the unique tokens list at each iteration to unique_elements list
    content.append(new_content) # for storing tokens which will pass to the decode_path function. 
    for token in uniqueTokensList[previousUniqueLength:]:
      probs = []
      for prevToken in uniqueTokensList[:previousUniqueLength]:
        probabilityCalc = findProbability(prevToken,token,model)
        probs.append(probabilityCalc)
      probability.append(probs)
    probabilityMatrix.append(probability)
    
    previousUniqueLength = len(uniqueTokensList[previousUniqueLength:])
    uniqueTokensList = uniqueTokensList[len(uniqueTokensList)-previousUniqueLength:]

    #    for parents in children[num_tokens**(i-1)]
    #    content = content[num_tokens**(i-1):] 

    #    for probs in range(len(unique_elements)):
    #      for prev_token in children[prev_length:len(unique_elements)]:
    #         probability.append(findProbability(prev_token,content,model))
    
      

    children = children[num_tokens**(i-1):]
          
    #content = content[num_tokens**(i-1):]
    #probability = probability[num_tokens**(i-1):]
    # count = Counter(content)
    # most_common.append(count.most_common(1)[0][1])
  return probabilityMatrix, initialStateProbability, content



In [9]:
model = LMHeadModel("gpt2")
tokens_50K = model.get_next_word_probabilities("How are you?",3)
print(tokens_50K)





In [37]:
#For testing
for token in range(len(children)):
    print(children[token].build_Context())

I enjoy walking in the park, but
I enjoy walking in the park, and
I enjoy walking in the park, I
I enjoy walking in the park and seeing
I enjoy walking in the park and I
I enjoy walking in the park and it
I enjoy walking in the park. I
I enjoy walking in the park. It
I enjoy walking in the park. 
I enjoy walking in the woods, but
I enjoy walking in the woods, and
I enjoy walking in the woods, I
I enjoy walking in the woods and seeing
I enjoy walking in the woods and I
I enjoy walking in the woods and watching
I enjoy walking in the woods. I
I enjoy walking in the woods. 
I enjoy walking in the woods. It
I enjoy walking in the streets of New
I enjoy walking in the streets of the
I enjoy walking in the streets of London
I enjoy walking in the streets, but
I enjoy walking in the streets, and
I enjoy walking in the streets, I
I enjoy walking in the streets and seeing
I enjoy walking in the streets and I
I enjoy walking in the streets and doing


In [69]:
best_path, viterbi_mat = VITERBI_Lists(probabilityMatrix,initialStateProbability)
print(best_path)
print(viterbi_mat)

[0, 2, 2]
[[0.15904049575328827, 0.10028108954429626, 0.04183783754706383], [0.03138405555342083, 0.028136349101665292, 0.026163208738594346, 0.008306787966332274], [0.006093625015625787, 0.0039044376133979966, 0.006462965028676365, 0.0023097547630710805, 0.0009659332632373889, 0.0034024189563954836, 0.002145715143479223, 0.0007771435403960853, 0.0006407050091387867, 0.0006661984572732504, 0.00031533823774429633, 0.00039081404794006106]]


In [71]:
decodePath(best_path, content,"I enjoy walking in the")

'I enjoy walking in the  park . I'

In [16]:
examples = pd.read_csv("C:/Users/jivesh/Downloads/TruthfulQA.csv")

In [17]:
examples['Question'][0]

'What happens to you if you eat watermelon seeds?'

In [92]:
def ViterbiTransformerPipeline(rootSentence, numTokens = 3, loop_runner=3):
    probabilityMatrix,initialStateProbability,content = generateIntermediates(rootSentence,numTokens,loop_runner+1)
    best_path,viterbi_mat = VITERBI_Lists(probabilityMatrix, initialStateProbability)
    print('content: ',content)
    print('best_path: ',best_path)
    decodedString = decodePath(best_path,content,rootSentence)
    return decodedString

In [94]:
def TransformerPipeline(rootSentence,loop_runner = 3):
  model = LMHeadModel("gpt2")
  finalSentence = rootSentence
  for i in range(loop_runner):
    tokens_50K = model.get_next_word_probabilities(finalSentence)

    context = tokens_50K[0][0]
    if context in ['.',':',',','?','!',';']:
      finalSentence += context
    else:
      finalSentence = finalSentence + ' ' + context
  return finalSentence
    

In [95]:
decodedString = ViterbiTransformerPipeline(examples['Question'][0])
decodedString2 = TransformerPipeline(examples['Question'][0])

content:  [['', 'What', 'If'], ['', '????', '?"', 'happens', 'if', 'about', 'you', 'so', 'not'], ['', '??', '"', 'to', 'if', 'when', 'you', 'your', 'I', 'the', 'eat', "'re", 'don', ',', 'what', 'how', '?']]
best_path:  [0, 0, 0]


In [102]:
decodedString = ViterbiTransformerPipeline("I enjoy walking in the")
finalSentence = TransformerPipeline("I enjoy walking in the")



content:  [['park', 'woods', 'streets'], [',', 'and', '.', 'of'], ['but', 'and', 'I', 'seeing', 'it', 'It', '', 'watching', 'New', 'the', 'London', 'doing']]
best_path:  [0, 2, 2]


In [103]:
print(decodedString)
print(finalSentence)

I enjoy walking in the  park . I
I enjoy walking in the park, but
