## Decoding Strategies
Author: Victoria Pedlar

This notebook explores open-ended text generation for isiZulu.

### Transformer Language Model

In [256]:
import os

# Get the current working directory
cwd = os.getcwd()
print(cwd)

# Change the current working directory
os.chdir('/Users/victoriapedlar/repos/isizulu-text-generation/')

# Get the current working directory again
cwd = os.getcwd()
print(cwd)

/Users/victoriapedlar/repos/isizulu-text-generation
/Users/victoriapedlar/repos/isizulu-text-generation


In [257]:
# %pip install -r transformer_requirements.txt
# %cd src/transformers
# %pip install .
# %cd /Users/victoriapedlar/repos/isizulu-text-generation/

In [258]:
import torch
import nltk
import transformers
from transformers import GPT2TokenizerFast
import sys
sys.path.insert(0,'/Users/victoriapedlar/repos/isizulu-text-generation/scripts')
import layer_switching_gpt2
from layer_switching_gpt2 import LayerSwitchingGPT2Config, GPT2LayerSwitchingLMHeadModel

In [259]:
tokenizer = GPT2TokenizerFast.from_pretrained('experiments/trained_models/transformer/tokenizers')

In [260]:
# Load the model configuration
config = LayerSwitchingGPT2Config.from_pretrained("experiments/trained_models/transformer/config.json")
# Load the model from the checkpoint
model = GPT2LayerSwitchingLMHeadModel.from_pretrained("experiments/trained_models/transformer", config=config)

In [261]:
# Set the tokenizer attribute of the model
model.tokenizer = tokenizer

In [262]:
# Define a list of decoding strategies to try
decoding_strategies = ['argmax', 'beam_search', 'nucleus_sampling', 'top_k_sampling', 'temperature_sampling', 'entmax']

There are several decoding strategies available for open-ended language generation using transformer models:

1. Argmax decoding: This is a type of greedy search, where the model chooses the most likely next word at each step.

2. Beam search: In beam search, the model considers a fixed number of top options at each step, and expands the search space by adding the next most likely words to these options. This can produce more coherent and diverse output than argmax decoding, but can also be slower.

3. Nucleus sampling: Nucleus sampling is a variant of top-k sampling, where the model samples from the set of most likely words, with probability proportional to their likelihood. This can produce more diverse output than argmax decoding, but may still be biased towards the most likely words.

4. Top-k sampling: In top-k sampling, the model samples from the top-k most likely words at each step. This can produce more diverse output than argmax decoding, but may still be biased towards the most likely words.

5. Temperature sampling: In temperature sampling, the model samples from its output distribution with a specified temperature. A high temperature will produce more diverse output, but may also introduce more errors and randomness. A low temperature will produce less diverse output, but may be more accurate.

6. Entmax sampling

In [263]:
def extract_prompts_and_references(file_path):
    prompts = []
    references = []

    with open(file_path, 'r') as f:
        for line in f:
            line = line.strip()
            prompt = ' '.join(line.split()[:5])
            reference = line[len(prompt)+1:]

            prompts.append(prompt)
            references.append(reference)

    return prompts, references

In [264]:
prompts, references = extract_prompts_and_references("data/test/isolezwe.txt")

In [265]:
# Calculate the average number of tokens in each reference
total_tokens = sum(len(tokenizer.encode(ref)) for ref in references)
average_tokens = total_tokens / len(references)

print(f"The average number of tokens in the references is {average_tokens:.2f} tokens.")

The average number of tokens in the references is 29.10 tokens.


In [266]:
def generate_text(prompt, strategy, hyperparameters, max_length=round(average_tokens)):
    input_ids = tokenizer.encode(prompt, return_tensors='pt')
    prompt_length = input_ids.shape[1]

    if strategy == 'argmax':
        output_ids = model.generate(input_ids, max_length=max_length+prompt_length)
    elif strategy == 'beam_search':
        beam_size = hyperparameters.get('beam_size', 1)
        no_repeat_ngram_size = hyperparameters.get('no_repeat_ngram_size', 2)
        output_ids = model.generate(input_ids, num_beams=beam_size, no_repeat_ngram_size=no_repeat_ngram_size, max_length=max_length+prompt_length)
    elif strategy == 'nucleus_sampling':
        top_p = hyperparameters.get('top_p', 0.9)
        output_ids = model.generate(input_ids, do_sample=True, max_length=max_length+prompt_length, top_p=top_p)
    elif strategy == 'top_k_sampling':
        k = hyperparameters.get('k', 10)
        output_ids = model.generate(input_ids, do_sample=True, max_length=max_length+prompt_length, top_k=k)
    elif strategy == 'temperature_sampling':
        temperature = hyperparameters.get('temperature', 1.0)
        output_ids = model.generate(input_ids, do_sample=True, max_length=max_length+prompt_length, temperature=temperature)
    elif strategy == 'entmax_sampling':
        alpha = hyperparameters.get('alpha', 1.5)
        output_ids = model.generate(input_ids, do_sample=True, max_length=max_length+prompt_length, entmax=True, alpha=alpha)

    # Remove the prompt tokens from the generated text
    generated_text = model.tokenizer.decode(output_ids[0][prompt_length:].tolist(), skip_special_tokens=True)

    return generated_text

In [267]:
#rouge scores for a reference/generated sentence pair
#source google seq2seq source code.

import itertools

#supporting function
# def _split_into_words(sentences):
#   """Splits multiple sentences into words and flattens the result"""
#   return list(itertools.chain(*[_.split(" ") for _ in sentences]))

def _split_into_words(sentences):
    """Splits multiple sentences into tokens and flattens the result"""
    tokenized_sentences = [tokenizer.tokenize(sentence) for sentence in sentences]
    return list(itertools.chain.from_iterable(tokenized_sentences))

#supporting function
def _get_word_ngrams(n, sentences):
  """Calculates word n-grams for multiple sentences.
  """
  assert len(sentences) > 0
  assert n > 0

  words = _split_into_words(sentences)
  return _get_ngrams(n, words)

#supporting function
def _get_ngrams(n, text):
  """Calcualtes n-grams.
  Args:
    n: which n-grams to calculate
    text: An array of tokens
  Returns:
    A set of n-grams
  """
  ngram_set = set()
  text_length = len(text)
  max_index_ngram_start = text_length - n
  for i in range(max_index_ngram_start + 1):
    ngram_set.add(tuple(text[i:i + n]))
  return ngram_set

def rouge_n(reference_sentences, evaluated_sentences, n=2):
  """
  Computes ROUGE-N of two text collections of sentences.
  Source: http://research.microsoft.com/en-us/um/people/cyl/download/
  papers/rouge-working-note-v1.3.1.pdf
  Args:
    evaluated_sentences: The sentences that have been picked by the summarizer
    reference_sentences: The sentences from the referene set
    n: Size of ngram.  Defaults to 2.
  Returns:
    recall rouge score(float)
  Raises:
    ValueError: raises exception if a param has len <= 0
  """
  if len(evaluated_sentences) <= 0 or len(reference_sentences) <= 0:
    raise ValueError("Collections must contain at least 1 sentence.")

  evaluated_ngrams = _get_word_ngrams(n, evaluated_sentences)
  reference_ngrams = _get_word_ngrams(n, reference_sentences)
  reference_count = len(reference_ngrams)
  evaluated_count = len(evaluated_ngrams)

  # Gets the overlapping ngrams between evaluated and reference
  overlapping_ngrams = evaluated_ngrams.intersection(reference_ngrams)
  overlapping_count = len(overlapping_ngrams)

  # Handle edge case. This isn't mathematically correct, but it's good enough
  if evaluated_count == 0:
    precision = 0.0
  else:
    precision = overlapping_count / evaluated_count

  if reference_count == 0:
    recall = 0.0
  else:
    recall = overlapping_count / reference_count

  f1_score = 2.0 * ((precision * recall) / (precision + recall + 1e-8))

  #just returning recall count in rouge, useful for our purpose
  return recall

In [268]:
import torch
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

def bleu_i(weights, all_sentences, smoothing_function, i):
    return sentence_bleu(
        references=all_sentences[:i] + all_sentences[i + 1:],
        hypothesis=all_sentences[i],
        weights=weights,
        smoothing_function=smoothing_function)

def compute_rouge_scores(reference_sentences, generated_sentences, n=2):
    return rouge_n(reference_sentences, generated_sentences, n)

def evaluate(generated_texts, reference_texts):
    smoothing_function = SmoothingFunction().method1
    n_sample = len(generated_texts)
    
    perplexities = []
    bleu_scores = [[] for _ in range(5)]
    
    for idx, (generated_text, reference_text) in enumerate(zip(generated_texts, reference_texts)):
        # Compute perplexity
        input_ids = tokenizer.encode(generated_text, return_tensors="pt")
        with torch.no_grad():
            outputs = model(input_ids, labels=input_ids, return_dict=True)
            loss = outputs.loss
        perplexity = torch.exp(loss).item()
        perplexities.append(perplexity)
        
        # Compute BLEU score
        all_sentences = [list(reference_text)] + [list(generated_text) for generated_text in generated_texts]

        for n_gram in range(1, 6):
            if n_gram == 1:
                weights = (1.0, 0, 0, 0)
            elif n_gram == 2:
                weights = (0.5, 0.5, 0, 0)
            elif n_gram == 3:
                weights = (1.0 / 3, 1.0 / 3, 1.0 / 3, 0)
            elif n_gram == 4:
                weights = (0.25, 0.25, 0.25, 0.25)
            elif n_gram == 5:
                weights = (0.2, 0.2, 0.2, 0.2, 0.2)
            else:
                raise ValueError
            bleu_score = bleu_i(weights, all_sentences, smoothing_function, idx)
            bleu_scores[n_gram - 1].append(bleu_score)
        
        # Compute ROUGE scores
        rouge_scores = [[] for _ in range(2)]
        for i in range(1, 3):
            rouge_score = compute_rouge_scores(reference_texts, generated_texts, i)
            rouge_scores[i - 1].append(rouge_score)
    
    avg_perplexity = sum(perplexities) / n_sample
    avg_bleu_scores = [sum(scores) / n_sample for scores in bleu_scores]
    avg_rouge_scores = [sum(scores) / n_sample for scores in rouge_scores]

    return avg_perplexity, avg_bleu_scores, avg_rouge_scores

In [269]:
import json
from typing import Dict, Any

def save_results(results: Dict[str, Any], file_path: str):
    def convert_keys_to_strings(obj):
        if isinstance(obj, dict):
            return {str(k): convert_keys_to_strings(v) for k, v in obj.items()}
        elif isinstance(obj, (list, tuple)):
            return [convert_keys_to_strings(elem) for elem in obj]
        else:
            return obj

    with open(file_path, 'w') as f:
        json.dump(convert_keys_to_strings(results), f, indent=4)


In [270]:
def run_pipeline(file_path, strategies, hyperparameters):
    prompts, references = extract_prompts_and_references(file_path)

    folder_path = 'experiments/results'
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Create a results dictionary to store the evaluation scores
    results = {}
    for strategy in strategies:
        results[strategy] = {}
        for hyperparameter in hyperparameters[strategy]:
            key = tuple(hyperparameter.items())  # Convert the dictionary to a tuple of tuples
            results[strategy][key] = {'perplexity': [], 'bleu_scores': [], 'rouge_scores': []}
    
    output_file_path = "experiments/results/transformer_output_samples.txt"
    with open(output_file_path, "w") as output_file:

        # Loop through each prompt and generate text for each decoding strategy with the specified hyperparameters
        generated_texts_list = []
        for i in range(len(prompts)):
            prompt = prompts[i]
            reference = references[i]
            generated_texts = []
            for strategy in strategies:
                for hyperparameter in hyperparameters[strategy]:
                    key = tuple(hyperparameter.items())  # Convert the dictionary to a tuple of tuples
                    generated_text = generate_text(prompt, strategy, hyperparameter)
                    generated_texts.append(generated_text)
                    generated_texts_list.append((generated_text, reference))
            avg_perplexity, avg_bleu_scores, avg_rouge_scores = evaluate(*zip(*generated_texts_list))

            for idx, strategy in enumerate(strategies):
                for hyperparameter in hyperparameters[strategy]:
                    key = tuple(hyperparameter.items())
                    results[strategy][key]['perplexity'].append(avg_perplexity)
                    results[strategy][key]['bleu_scores'].append(avg_bleu_scores)
                    results[strategy][key]['rouge_scores'].append(avg_rouge_scores)
                    
                    # Save the results to the output file
                    generated_text = generated_texts[idx]
                    output_file.write(f"Strategy: {strategy}\n")
                    output_file.write(f"Hyperparameters: {hyperparameter}\n")
                    output_file.write(f"Prompt: {prompt}\n")
                    output_file.write(f"Generated text: {generated_text}\n")
                    output_file.write(f"Reference text: {reference}\n")
                    output_file.write(f"Perplexity: {avg_perplexity}\n")
                    output_file.write(f"BLEU scores: {avg_bleu_scores}\n")
                    output_file.write(f"ROUGE scores: {avg_rouge_scores}\n\n")

    results_file_path = 'experiments/results/transformer_results.json'
    save_results(results, results_file_path)

    return results

In [271]:
strategies = ['argmax', 'beam_search', 'nucleus_sampling', 'top_k_sampling', 'temperature_sampling', 'entmax_sampling']
hyperparameters = {
    'argmax': [],
    'beam_search': [
        {'beam_size': 2},
        {'beam_size': 5},
        {'beam_size': 10},
    ],
    'nucleus_sampling': [
        {'top_p': 0.5},
        {'top_p': 0.8},
        {'top_p': 0.9},
    ],
    'top_k_sampling': [
        {'k': 5},
        {'k': 10},
        {'k': 20},
    ],
    'temperature_sampling': [
        {'temperature': 0.5},
        {'temperature': 1.0},
        {'temperature': 1.5},
    ],
    'entmax_sampling': [
        {'alpha': 0.5},
        {'alpha': 1.0},
        {'alpha': 1.5},
    ],
}

file_path = 'data/test/isolezwe.txt'

results = run_pipeline(file_path, strategies, hyperparameters)

### AWD-LSTM Language Model

In [272]:
import sys
sys.path.insert(0,'/Users/victoriapedlar/repos/isizulu-text-generation/awd_lstm')

In [273]:
import torch.nn as nn
from model import LSTMModel
from drop_connect import WeightDrop
from locked_dropout import LockedDropout
from embedding_dropout import embedded_dropout

In [274]:
# Load the model from the .pt file
model_path = "experiments/trained_models/awd_lstm/model.pt"
model, _, _ = torch.load(model_path, map_location=torch.device('cpu'))

In [275]:
# Load the saved Corpus object
corpus = torch.load("experiments/trained_models/awd_lstm/corpus.data")

# Access the tokenizer (dictionary) from the loaded Corpus object
dictionary = corpus.dictionary

dictionary.unk_index = 0

In [276]:
def tokenize_text(text, dictionary):
    tokens = []
    for word in text.split():
        if word in dictionary.word2idx:
            tokens.append(dictionary.word2idx[word])
        else:
            tokens.append(dictionary.unk_index)
    return tokens

In [277]:
def sample_with_temperature(logits, temperature):
    word_weights = logits.squeeze().div(temperature).exp().cpu()
    word_idx = torch.multinomial(word_weights, 1)[0]
    return word_idx.item()

In [278]:
def beam_search(prompt_tokens, initial_hidden, k=3, temperature=1.0, max_length=round(average_tokens)):
    # Initialize beams with (tokens, hidden_state, log_prob)
    beams = [([token], initial_hidden, 0.0) for token in prompt_tokens]  # (tokens, hidden_state, log_prob)

    for _ in range(max_length):
        new_beams = []

        for tokens, hidden, log_prob in beams:
            # Create a new input tensor by taking the last token
            token_input = torch.tensor([[tokens[-1]]], dtype=torch.long)
            
            logits, new_hidden = model(token_input, hidden)
            last_token_logits = logits.squeeze()

            top_k_logits, top_k_indices = torch.topk(last_token_logits, k)
            top_k_probs = torch.softmax(top_k_logits / temperature, dim=-1)

            for prob, idx in zip(top_k_probs.squeeze(), top_k_indices.squeeze()):
                new_tokens = tokens + [idx.item()]
                new_prob = log_prob + prob.item()
                new_beams.append((new_tokens, new_hidden, new_prob))

        beams = sorted(new_beams, key=lambda x: x[2], reverse=True)[:k]

    best_beam = torch.tensor(beams[0][0], dtype=torch.long).unsqueeze(0)
    return best_beam

In [279]:
def entmax_sampling_step(logits, alpha=1.5):
    probabilities = entmax15(logits, dim=-1)
    word_idx = torch.multinomial(probabilities, 1)[0]
    return word_idx.item()

In [280]:
import numpy as np
from entmax import entmax15
from torch.autograd import Variable

def generate_text_awd_lstm(prompt, strategy, hyperparameters, max_length=round(average_tokens)):

    model.eval()
    with torch.no_grad():
        input_prompt_tokens = tokenize_text(prompt, dictionary)
        
        # Initialize hidden state for a single token
        hidden = model.init_hidden(1)

        # Process input_prompt_tokens one by one to update the hidden state
        for token in input_prompt_tokens:
            input_token = torch.tensor([[token]], dtype=torch.long)
            _, hidden = model(input_token, hidden)

        generated_tokens = []
        last_token = input_prompt_tokens[-1] if input_prompt_tokens else None

    for _ in range(max_length):
        if last_token is not None:
            input_token = torch.tensor([[last_token]], dtype=torch.long)
            logits, hidden = model(input_token, hidden)
            last_token_logits = logits[:, -1, :]
        else:
            # Placeholder for cases when input_prompt_tokens is empty
            last_token_logits = torch.zeros(1, len(dictionary.word2idx))

        if strategy == 'beam_search':
            k = hyperparameters.get('k', 3)
            temperature = hyperparameters.get('temperature', 1.0)
            best_beam = beam_search(input_prompt_tokens, hidden, k=k, temperature=temperature)
            generated_tokens = best_beam.squeeze().tolist()[len(input_prompt_tokens):]
            break
        else:
            # The code for other strategies should be indented to be inside the for-loop
            if strategy == 'argmax':
                token_index = torch.argmax(last_token_logits, dim=-1).item()
            elif strategy == 'nucleus_sampling':
                top_p = hyperparameters.get('top_p', 0.9)
                sorted_logits, sorted_indices = torch.sort(last_token_logits, descending=True)
                cumulative_probs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1)
                indices_to_remove = cumulative_probs > top_p
                indices_to_remove[..., 1:] = indices_to_remove[..., :-1].clone()
                indices_to_remove[..., 0] = 0
                sorted_logits[indices_to_remove] = float('-inf')
                probabilities = torch.softmax(sorted_logits, dim=-1)
                token_index = np.random.choice(len(dictionary.word2idx), p=probabilities.squeeze().cpu().detach().numpy())
            elif strategy == 'top_k_sampling':
                k = hyperparameters.get('k', 10)
                top_k_logits, _ = torch.topk(last_token_logits, k)
                min_top_k_logits = torch.min(top_k_logits, dim=-1).values.unsqueeze(-1)
                last_token_logits[last_token_logits < min_top_k_logits] = float('-inf')
                probabilities = torch.softmax(last_token_logits, dim=-1)
                token_index = np.random.choice(len(dictionary.word2idx), p=probabilities.squeeze().cpu().detach().numpy())
            elif strategy == 'temperature_sampling':
                temperature = hyperparameters.get('temperature', 1.0)
                token_index = sample_with_temperature(last_token_logits, temperature)
            elif strategy == 'entmax_sampling':
                alpha = hyperparameters.get('alpha', 1.5)
                token_index = entmax_sampling_step(last_token_logits, alpha)
            else:
                raise ValueError(f"Invalid strategy: {strategy}")

            generated_tokens.append(token_index)
            last_token = token_index
            
    generated_text = [dictionary.idx2word[token_idx] for token_idx in generated_tokens]
    return " ".join(generated_text)

In [281]:
import torch
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

def _split_into_words(sentences):
    """Splits multiple sentences into tokens and flattens the result"""
    tokenized_sentences = [tokenize_text(sentence, dictionary) for sentence in sentences]
    return list(itertools.chain.from_iterable(tokenized_sentences))

def evaluate_awd_lstm(generated_texts, reference_texts):
    smoothing_function = SmoothingFunction().method1
    n_sample = len(generated_texts)
    
    perplexities = []
    bleu_scores = [[] for _ in range(5)]
    
    for idx, (generated_text, reference_text) in enumerate(zip(generated_texts, reference_texts)):
        # Compute perplexity
        input_ids = torch.tensor([tokenize_text(generated_text, dictionary)], dtype=torch.long)
        hidden = model.init_hidden(input_ids.size(1))
        with torch.no_grad():
            outputs, _ = model(input_ids, hidden)
            logits = outputs[:, :-1, :]  # Remove the last token (there is no next token to predict)
            labels = input_ids[:, 1:]  # Remove the first token (there is no previous token)
            loss = torch.nn.functional.cross_entropy(logits.reshape(-1, logits.size(-1)), labels.reshape(-1))
        perplexity = torch.exp(loss).item()
        perplexities.append(perplexity)
        
        # Compute BLEU score
        all_sentences = [list(reference_text)] + [list(generated_text) for generated_text in generated_texts]

        for n_gram in range(1, 6):
            if n_gram == 1:
                weights = (1.0, 0, 0, 0)
            elif n_gram == 2:
                weights = (0.5, 0.5, 0, 0)
            elif n_gram == 3:
                weights = (1.0 / 3, 1.0 / 3, 1.0 / 3, 0)
            elif n_gram == 4:
                weights = (0.25, 0.25, 0.25, 0.25)
            elif n_gram == 5:
                weights = (0.2, 0.2, 0.2, 0.2, 0.2)
            else:
                raise ValueError
            bleu_score = bleu_i(weights, all_sentences, smoothing_function, idx)
            bleu_scores[n_gram - 1].append(bleu_score)
        
        # Compute ROUGE scores
        rouge_scores = [[] for _ in range(2)]
        for i in range(1, 3):
            rouge_score = compute_rouge_scores(reference_texts, generated_texts, i)
            rouge_scores[i - 1].append(rouge_score)
    
    avg_rouge_scores = [sum(scores) / n_sample for scores in rouge_scores]       
    avg_perplexity = sum(perplexities) / n_sample
    avg_bleu_scores = [sum(scores) / n_sample for scores in bleu_scores]

    return avg_perplexity, avg_bleu_scores, avg_rouge_scores


In [282]:
import os

def run_pipeline(file_path, strategies, hyperparameters):
    
    prompts, references = extract_prompts_and_references(file_path)

    folder_path = 'experiments/results'
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Create a results dictionary to store the evaluation scores
    results = {}
    for strategy in strategies:
        results[strategy] = {}
        for hyperparameter in hyperparameters[strategy]:
            key = tuple(hyperparameter.items())  # Convert the dictionary to a tuple of tuples
            results[strategy][key] = {'perplexity': [], 'bleu_scores': [], 'rouge_scores': []}
    
    output_file_path = "experiments/results/awd_lstm_output_samples.txt"
    with open(output_file_path, "w") as output_file:

        # Loop through each prompt and generate text for each decoding strategy with the specified hyperparameters
        generated_texts_list = []
        for i in range(len(prompts)):
            prompt = prompts[i]
            reference = references[i]
            generated_texts = []
            for strategy in strategies:
                for hyperparameter in hyperparameters[strategy]:
                    key = tuple(hyperparameter.items())  # Convert the dictionary to a tuple of tuples
                    generated_text = generate_text_awd_lstm(prompt, strategy, hyperparameter)
                    generated_texts.append(generated_text)
                    generated_texts_list.append((generated_text, reference))
            avg_perplexity, avg_bleu_scores, avg_rouge_scores = evaluate_awd_lstm(*zip(*generated_texts_list))

            for idx, strategy in enumerate(strategies):
                for hyperparameter in hyperparameters[strategy]:
                    key = tuple(hyperparameter.items())
                    results[strategy][key]['perplexity'].append(avg_perplexity)
                    results[strategy][key]['bleu_scores'].append(avg_bleu_scores)
                    results[strategy][key]['bleu_scores'].append(avg_rouge_scores)

                    # Save the results to the output file
                    generated_text = generated_texts[idx]
                    output_file.write(f"Strategy: {strategy}\n")
                    output_file.write(f"Hyperparameters: {hyperparameter}\n")
                    output_file.write(f"Prompt: {prompt}\n")
                    output_file.write(f"Generated text: {generated_text}\n")
                    output_file.write(f"Reference text: {reference}\n")
                    output_file.write(f"Perplexity: {avg_perplexity}\n")
                    output_file.write(f"BLEU scores: {avg_bleu_scores}\n")
                    output_file.write(f"ROUGE scores: {avg_rouge_scores}\n\n")

    results_file_path = 'experiments/results/awd_lstm_results.json'
    save_results(results, results_file_path)

    return results

In [283]:
strategies = ['argmax', 'beam_search', 'top_k_sampling', 'temperature_sampling', 'entmax_sampling']
hyperparameters = {
    'argmax': [],
    'beam_search': [
        {'beam_size': 2},
        {'beam_size': 5},
        {'beam_size': 10},
    ],
    'nucleus_sampling': [
        {'top_p': 0.5},
        {'top_p': 0.8},
        {'top_p': 0.9},
    ],
    'top_k_sampling': [
        {'k': 5},
        {'k': 10},
        {'k': 20},
    ],
    'temperature_sampling': [
        {'temperature': 0.5},
        {'temperature': 1.0},
        {'temperature': 1.5},
    ],
    'entmax_sampling': [
        {'alpha': 0.5},
        {'alpha': 1.0},
        {'alpha': 1.5},
    ],
}

file_path = 'data/test/isolezwe.txt'

results = run_pipeline(file_path, strategies, hyperparameters)

### Sparse Language Model

In [284]:
import sys
sys.path.insert(0,'/Users/victoriapedlar/repos/isizulu-text-generation/sparse_text_generation/language_modeling/pytorch_transformers')
from transformers import (
    GPT2LMHeadModel,
    GPT2TokenizerFast
)

In [285]:
tokenizer = GPT2TokenizerFast.from_pretrained('experiments/trained_models/sparse_model/tokenizers')
# Load the model
model = GPT2LMHeadModel.from_pretrained("experiments/trained_models/sparse_model")
# Set the tokenizer attribute of the model
model.tokenizer = tokenizer

In [286]:
def _split_into_words(sentences):
    """Splits multiple sentences into tokens and flattens the result"""
    tokenized_sentences = [tokenizer.tokenize(sentence) for sentence in sentences]
    return list(itertools.chain.from_iterable(tokenized_sentences))

In [287]:
def run_pipeline(file_path, strategies, hyperparameters):
    prompts, references = extract_prompts_and_references(file_path)

    folder_path = 'experiments/results'
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

    # Create a results dictionary to store the evaluation scores
    results = {}
    for strategy in strategies:
        results[strategy] = {}
        for hyperparameter in hyperparameters[strategy]:
            key = tuple(hyperparameter.items())  # Convert the dictionary to a tuple of tuples
            results[strategy][key] = {'perplexity': [], 'bleu_scores': [], 'rouge_scores': []}
    
    output_file_path = "experiments/results/sparse_output_samples.txt"
    with open(output_file_path, "w") as output_file:

        # Loop through each prompt and generate text for each decoding strategy with the specified hyperparameters
        generated_texts_list = []
        for i in range(len(prompts)):
            prompt = prompts[i]
            reference = references[i]
            generated_texts = []
            for strategy in strategies:
                for hyperparameter in hyperparameters[strategy]:
                    key = tuple(hyperparameter.items())  # Convert the dictionary to a tuple of tuples
                    generated_text = generate_text(prompt, strategy, hyperparameter)
                    generated_texts.append(generated_text)
                    generated_texts_list.append((generated_text, reference))
            avg_perplexity, avg_bleu_scores, avg_rouge_scores = evaluate(*zip(*generated_texts_list))

            for idx, strategy in enumerate(strategies):
                for hyperparameter in hyperparameters[strategy]:
                    key = tuple(hyperparameter.items())
                    results[strategy][key]['perplexity'].append(avg_perplexity)
                    results[strategy][key]['bleu_scores'].append(avg_bleu_scores)
                    results[strategy][key]['rouge_scores'].append(avg_rouge_scores)
                    
                    # Save the results to the output file
                    generated_text = generated_texts[idx]
                    output_file.write(f"Strategy: {strategy}\n")
                    output_file.write(f"Hyperparameters: {hyperparameter}\n")
                    output_file.write(f"Prompt: {prompt}\n")
                    output_file.write(f"Generated text: {generated_text}\n")
                    output_file.write(f"Reference text: {reference}\n")
                    output_file.write(f"Perplexity: {avg_perplexity}\n")
                    output_file.write(f"BLEU scores: {avg_bleu_scores}\n")
                    output_file.write(f"ROUGE scores: {avg_rouge_scores}\n\n")

    results_file_path = 'experiments/results/sparse_results.json'
    save_results(results, results_file_path)

    return results

In [288]:
strategies = ['argmax', 'beam_search', 'nucleus_sampling', 'top_k_sampling', 'temperature_sampling', 'entmax_sampling']
hyperparameters = {
    'argmax': [],
    'beam_search': [
        {'beam_size': 2},
        {'beam_size': 5},
        {'beam_size': 10},
    ],
    'nucleus_sampling': [
        {'top_p': 0.5},
        {'top_p': 0.8},
        {'top_p': 0.9},
    ],
    'top_k_sampling': [
        {'k': 5},
        {'k': 10},
        {'k': 20},
    ],
    'temperature_sampling': [
        {'temperature': 0.5},
        {'temperature': 1.0},
        {'temperature': 1.5},
    ],
    'entmax_sampling': [
        {'alpha': 0.5},
        {'alpha': 1.0},
        {'alpha': 1.5},
    ],
}

file_path = 'data/test/isolezwe.txt'

results = run_pipeline(file_path, strategies, hyperparameters)

Setting `pad_token_id` to 50256 (first `eos_token_id`) to generate sequence
Setting `pad_token_id` to 50256 (first `eos_token_id`) to generate sequence
Setting `pad_token_id` to 50256 (first `eos_token_id`) to generate sequence
Setting `pad_token_id` to 50256 (first `eos_token_id`) to generate sequence
Setting `pad_token_id` to 50256 (first `eos_token_id`) to generate sequence
Setting `pad_token_id` to 50256 (first `eos_token_id`) to generate sequence
Setting `pad_token_id` to 50256 (first `eos_token_id`) to generate sequence
Setting `pad_token_id` to 50256 (first `eos_token_id`) to generate sequence
Setting `pad_token_id` to 50256 (first `eos_token_id`) to generate sequence
Setting `pad_token_id` to 50256 (first `eos_token_id`) to generate sequence
Setting `pad_token_id` to 50256 (first `eos_token_id`) to generate sequence
Setting `pad_token_id` to 50256 (first `eos_token_id`) to generate sequence
Setting `pad_token_id` to 50256 (first `eos_token_id`) to generate sequence
Setting `pad

KeyboardInterrupt: 