This code serves as an illustrative example of how to implement a recurrent neural network, without regard for efficiency considerations.

## Settings and preprocessing

In [1]:
import numpy as np

# Example corpus
corpus = "hello world"
chars = list(set(corpus))
data_size, vocab_size = len(corpus), len(chars)

# Character to index and index to character mappings
char_to_ix = {ch: i for i, ch in enumerate(chars)}
ix_to_char = {i: ch for i, ch in enumerate(chars)}

# Hyperparameters
hidden_size = 100  # Size of the hidden layer of neurons
seq_length = 10  # Number of steps to unroll the RNN for
learning_rate = 1e-1

## Model implementation

In [2]:
# Model parameters
Wxh = np.random.randn(hidden_size, vocab_size) * 0.01  # input to hidden
Whh = np.random.randn(hidden_size, hidden_size) * 0.01  # hidden to hidden
Why = np.random.randn(vocab_size, hidden_size) * 0.01  # hidden to output
bh = np.zeros((hidden_size, 1))  # hidden bias
by = np.zeros((vocab_size, 1))  # output bias

def lossFun(inputs, targets, hprev):
    """
    inputs,targets are both list of integers.
    hprev is Hx1 array of initial hidden state
    returns the loss, gradients on model parameters, and last hidden state
    """
    xs, hs, ys, ps = {}, {}, {}, {}
    hs[-1] = np.copy(hprev)
    loss = 0
    # Forward pass
    for t in range(len(inputs)):
        xs[t] = np.zeros((vocab_size, 1))  # encode in 1-of-k representation
        xs[t][inputs[t]] = 1
        hs[t] = np.tanh(np.dot(Wxh, xs[t]) + np.dot(Whh, hs[t-1]) + bh)  # hidden state
        ys[t] = np.dot(Why, hs[t]) + by  # unnormalized log probabilities for next chars
        ps[t] = np.exp(ys[t]) / np.sum(np.exp(ys[t]))  # probabilities for next chars
        loss += -np.log(ps[t][targets[t], 0])  # softmax (cross-entropy loss)
    
    # Backward pass: compute gradients going backwards
    dWxh, dWhh, dWhy = np.zeros_like(Wxh), np.zeros_like(Whh), np.zeros_like(Why)
    dbh, dby = np.zeros_like(bh), np.zeros_like(by)
    dhnext = np.zeros_like(hs[0])
    for t in reversed(range(len(inputs))):
        dy = np.copy(ps[t])
        dy[targets[t]] -= 1  # backprop into y
        dWhy += np.dot(dy, hs[t].T)
        dby += dy
        dh = np.dot(Why.T, dy) + dhnext  # backprop into h
        dhraw = (1 - hs[t] * hs[t]) * dh  # backprop through tanh nonlinearity
        dbh += dhraw
        dWxh += np.dot(dhraw, xs[t].T)
        dWhh += np.dot(dhraw, hs[t-1].T)
        dhnext = np.dot(Whh.T, dhraw)
    
    for dparam in [dWxh, dWhh, dWhy, dbh, dby]:
        np.clip(dparam, -5, 5, out=dparam)  # clip to mitigate exploding gradients
    
    return loss, dWxh, dWhh, dWhy, dbh, dby, hs[len(inputs)-1]

def sample(h, seed_ix, n):
    """
    sample a sequence of integers from the model
    h is memory state, seed_ix is seed letter for first time step
    """
    x = np.zeros((vocab_size, 1))
    x[seed_ix] = 1
    ixes = []
    for t in range(n):
        h = np.tanh(np.dot(Wxh, x) + np.dot(Whh, h) + bh)
        y = np.dot(Why, h) + by
        p = np.exp(y) / np.sum(np.exp(y))
        ix = np.random.choice(range(vocab_size), p=p.ravel())
        x = np.zeros((vocab_size, 1))
        x[ix] = 1
        ixes.append(ix)
    return ixes

## Model training

In [4]:
# Training loop
n, p = 0, 0
hprev = np.zeros((hidden_size, 1))  # reset RNN memory
while n < 100000:
    # Prepare inputs (we're sweeping from left to right in steps seq_length long)
    if p+seq_length+1 >= len(corpus) or n == 0: 
        hprev = np.zeros((hidden_size, 1))  # reset RNN memory
        p = 0  # go from start of data
    inputs = [char_to_ix[ch] for ch in corpus[p:p+seq_length]]
    targets = [char_to_ix[ch] for ch in corpus[p+1:p+seq_length+1]]

    # Sample from the model now and then
    if n % 1000 == 0:
        sample_ix = sample(hprev, inputs[0], 200)
        txt = ''.join(ix_to_char[ix] for ix in sample_ix)
        print('----\n %s \n----' % (txt, ))

    # Forward seq_length characters through the net and fetch gradient
    loss, dWxh, dWhh, dWhy, dbh, dby, hprev = lossFun(inputs, targets, hprev)
    if n % 1000 == 0: print('iter %d, loss: %f' % (n, loss))  # print progress
    
    # Perform parameter update with gradient descent
    for param, dparam in zip([Wxh, Whh, Why, bh, by], 
                             [dWxh, dWhh, dWhy, dbh, dby]):
        param -= learning_rate * dparam

    p += seq_length  # move data pointer
    n += 1  # iteration counter

----
 ello worldro worldro worldro worldrodworldro worldro worldro worldro worldro worldro worldrodworldro worldrodworldro worldro worldrodworldro worldro worldro worldrodworldrodworldrodworldro worldrodwor 
----
iter 0, loss: 0.000014
----
 ello worldro worldro worldro worldro worldro worldro worldro worldrodworldro worldro worldro worldro worldrodworldro worldro worldro worldro worldrodworldro worldro worldro worldro worldrodworldro wor 
----
iter 1000, loss: 0.000014
----
 ello worldro worldro worldro worldro worldro worldro worldrodworldro worldrodworldro worldrodworldro worldrodworldro worldro worldrodworldrodworldrodworldrodworldro worldro worldro worldro worldro wor 
----
iter 2000, loss: 0.000014
----
 ello worldro worldro worldro worldro worldro worldro worldrodworldrodworldro worldro worldrodworldrodworldro worldrodworldro worldro worldrodworldro worldro worldro worldrodworldrodworldro worldro wor 
----
iter 3000, loss: 0.000013
----
 ello worldro worldro worldro worldro worl

----
 ello worldro worldro worldro worldrodworldrodworldro worldrodworldro worldrodworldro worldrodworldro worldro worldrodworldrodworldro worldrodworldrodworldro worldrodworldrodworldro worldro worldrodwor 
----
iter 35000, loss: 0.000010
----
 ello worldrodworldro worldro worldro worldro worldro worldro worldro worldro worldro worldro worldro worldrodworldro worldrodworldro worldro worldro worldro worldro worldro worldro worldrodworldrodwor 
----
iter 36000, loss: 0.000010
----
 ello worldro worldro worldro worldro worldro worldro worldro worldro worldro worldro worldro worldro worldro worldrodworldro worldro worldro worldrodworldro worldro worldrodworldro worldro worldrodwor 
----
iter 37000, loss: 0.000010
----
 ello worldro worldrodworldro worldro worldro worldrodworldro worldrodworldrodworldro worldro worldro worldro worldro worldro worldro worldrodworldrodworldro worldrodworldrodworldro worldrodworldrodwor 
----
iter 38000, loss: 0.000010
----
 ello worldrodworldro worldrodworld

----
 ello worldro worldro worldro worldro worldrodworldro worldro worldro worldro worldro worldro worldro worldro worldro worldro worldro worldro worldro worldro worldro worldrodworldro worldro worldrodwor 
----
iter 70000, loss: 0.000008
----
 ello worldrodworldro worldrodworldrodworldro worldro worldro worldrodworldro worldrodworldrodworldro worldro worldrodworldrodworldro worldro worldrodworldrodworldro worldrodworldrodworldro worldro wor 
----
iter 71000, loss: 0.000008
----
 ello worldro worldro worldro worldrodworldro worldro worldrodworldrodworldro worldro worldrodworldrodworldro worldro worldro worldro worldro worldro worldro worldro worldrodworldrodworldro worldrodwor 
----
iter 72000, loss: 0.000008
----
 ello worldro worldro worldro worldro worldro worldro worldro worldro worldro worldrodworldro worldro worldro worldro worldro worldrodworldrodworldro worldro worldrodworldro worldrodworldro worldrodwor 
----
iter 73000, loss: 0.000008
----
 ello worldrodworldrodworldro world

## Making Predictions

Once the model is trained, you can use it to generate new text sequences based on a given seed input. The model predicts the next character at each step and uses this prediction as the input for the next step.

In [8]:
h_prev = np.zeros((hidden_size, 1)) # Reset the hidden state

seed_ix = char_to_ix['h']
n = 10  # Number of characters to generate

# Correct function call
generated_sequence = sample(h_prev, seed_ix, n)

# Convert the generated sequence of indices back to characters
generated_text = ''.join(ix_to_char[ix] for ix in generated_sequence)
print(generated_text)

ello world


This function generates text by predicting one character at a time and using the predicted character as the input for the next prediction. The seed_ix parameter is the starting character index, and n is the number of characters to generate. The output is a string of generated characters based on learned patterns in the training data.

## Using language models to generate probability and embeddings

In [None]:
import torch
from transformers import *
import torch.nn.functional as F
import numpy as np
from scipy import spatial
from collections import defaultdict

#Roberta and Gpt2 use bype-level BPE
def init_model(model_name):
    if model_name == "xlnet":
        pretrained_name = 'xlnet-base-cased'
        tokenizer = XLNetTokenizer.from_pretrained(pretrained_name)
        model = XLNetModel.from_pretrained(pretrained_name, output_hidden_states=True, output_attentions=True)
        model_lm = XLNetLMHeadModel.from_pretrained(pretrained_name).eval()
    elif model_name == "distillbert":
        pretrained_name = 'distilbert-base-cased'
        tokenizer = DistilBertTokenizer.from_pretrained(pretrained_name)
        model = DistilBertModel.from_pretrained(pretrained_name, output_hidden_states=True, output_attentions=True)
        model_lm = DistilBertForMaskedLM.from_pretrained(pretrained_name).eval()
    elif model_name == "bert":
        pretrained_name = 'bert-base-uncased'
        tokenizer = BertTokenizer.from_pretrained(pretrained_name)
        model = BertModel.from_pretrained(pretrained_name, output_hidden_states=True, output_attentions=True)
        model_lm = BertForMaskedLM.from_pretrained(pretrained_name).eval()
    elif model_name == "bertlarge":
        pretrained_name = 'bert-large-uncased'
        tokenizer = BertTokenizer.from_pretrained(pretrained_name)
        model = BertModel.from_pretrained(pretrained_name, output_hidden_states=True, output_attentions=True)
        model_lm = BertForMaskedLM.from_pretrained(pretrained_name).eval()
    elif model_name == "roberta":
        pretrained_name = 'roberta-base'
        tokenizer = RobertaTokenizer.from_pretrained(pretrained_name)
        model = BertModel.from_pretrained(pretrained_name, output_hidden_states=True, output_attentions=True)
        model_lm = RobertaForMaskedLM.from_pretrained(pretrained_name).eval()
    elif model_name == "gpt":
        pretrained_name = 'openai-gpt'
        tokenizer = AutoTokenizer.from_pretrained(pretrained_name)
        model = OpenAIGPTModel.from_pretrained(pretrained_name, output_hidden_states=True, output_attentions=True)
        model_lm = OpenAIGPTLMHeadModel.from_pretrained(pretrained_name).eval()
    elif model_name == "gpt2":
        pretrained_name = 'gpt2'
        tokenizer = GPT2Tokenizer.from_pretrained(pretrained_name)
        model = GPT2Model.from_pretrained(pretrained_name, output_hidden_states=True, output_attentions=True)
        model_lm = GPT2LMHeadModel.from_pretrained(pretrained_name).eval()
    elif model_name == "gpt2large":
        pretrained_name = 'gpt2-large'
        tokenizer = GPT2Tokenizer.from_pretrained(pretrained_name)
        model = GPT2Model.from_pretrained(pretrained_name, output_hidden_states=True, output_attentions=True)
        model_lm = GPT2LMHeadModel.from_pretrained(pretrained_name).eval()
    else:
        logger.error("unsupported model: {}".format(model_name))

    return tokenizer, model, model_lm

def match_piece_to_word(piece, word):
    mapping = defaultdict(list)
    word_index = 0
    piece_index = 0
    while (word_index < len(word.split()) and piece_index < len(piece)):
        if piece[piece_index] != '[UNK]':
            mid = piece[piece_index].strip('Ġ').strip('▁').strip('##')
            mid = mid.replace('</w>', '')
            t = len(mid)
        else:
            t = 1
        while (piece_index + 1 < len(piece) and t<len(word.split()[word_index])):
            mapping[word_index].append(piece_index)
            piece_index += 1
            if piece[piece_index] != '[UNK]':
                mid = piece[piece_index].strip('Ġ').strip('▁').strip('##')
                mid = mid.replace('</w>', '')
                t += len(mid)
            else:
                t += 1
        try:
            assert(t == len(word.split()[word_index]))
        except:
            print(word)
            print(piece)
            import pdb
            pdb.set_trace()
        mapping[word_index].append(piece_index)
        word_index += 1

        piece_index += 1
    return mapping

def convert_logits_to_probs(logits, input_ids):
    """"
    input:
        logits: (1, n_word, n_vocab), GPT2 outputed logits of each word
        input_inds: (1, n_word), the word id in vocab
    output: probs: (1, n_word), the softmax probability of each word
    """

    probs = F.softmax(logits[0], dim=1)
    n_word = input_ids.shape[1]
    res = []
    for i in range(n_word):
        res.append(probs[i, input_ids[0][i]].item())
    return np.array(res).reshape(1, n_word)


if __name__ == '__main__':
    '''
    parameters
    inputfile: sentences with target word
    
    '''
    ind1 = -2  # index for the target word

    inputfile = 'abc.txt'

    model_names = ["xlnet", "distillbert", "bert", "bertlarge", "roberta", "gpt", "gpt2", "gpt2large"]
    for model_name in model_names:
        print(model_name)

        out = [] 
        for input in open(inputfile):
            input_sent = input.strip()

            tokenizer, model, model_lm = init_model(model_name)
            input_ids = tokenizer.encode(input_sent, return_tensors = "pt")
            tok_input = tokenizer.convert_ids_to_tokens(input_ids[0])
            print(input_ids)
            print(tok_input)
            if model_name in ["xlnet"]:
                tok_input = tok_input[0:-2]
                input_ids = input_ids[:,0:-2]
            elif model_name in ["distillbert", "bert", "bertlarge", "roberta"]:
                tok_input = tok_input[1:-1]
                input_ids = input_ids[:,1:-1]
            
            tok_sent = ' '.join(tok_input).replace('Ġ', '').replace('▁', '').replace('##', '').replace('</w>', '')
            word_piece_mapping = match_piece_to_word(tok_input, input_sent)
            # print(word_piece_mapping)
            
            with torch.no_grad():
                outputs = model(input_ids)
                logits = model_lm(input_ids)[0]
            hidden_states = outputs['hidden_states']
            # print(len(hidden_states))
            prob = convert_logits_to_probs(logits, input_ids)[0]
            # print(len(prob), prob)
            prob1 = 1
  
            for i in word_piece_mapping[len(input_sent.split())+ind1]:
                prob1 *= prob[i]

            for i in range(len(outputs['hidden_states'])): #layers
                vec1 = outputs['hidden_states'][i][0, word_piece_mapping[len(input_sent.split())+ind1], :].detach().numpy().mean(axis=0)