In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import  Dataset, DataLoader

In [2]:
import os
import string

## Keep your training documents in a folder named 'data'
input_data_dir = "data"

# String of punctuation without the full stop
punctuation = string.punctuation.replace('.', '')  # Retain the full stop

def is_hidden(filepath):
    return os.path.basename(filepath).startswith('.')

text_data=""

howManyDocuments = 50
used=0

for filename in os.listdir(input_data_dir):
    filepath = os.path.join(input_data_dir, filename)
    if not is_hidden(filepath):
        with open(filepath, "r", encoding="utf-8") as infile:
            for line in infile:
                if line.strip():  # Check if line is not just whitespace
                    # Remove all punctuation except full stops
                    for char in punctuation:
                        line = line.replace(char, '')
                    text_data += line
    used=used+1
    if (used==howManyDocuments):
        print(howManyDocuments, "documents used.")
        break

50 documents used.


In [3]:
from nltk.tokenize import RegexpTokenizer

tokenizer = RegexpTokenizer(r'\w+')
words = tokenizer.tokenize(text_data.lower())
# import nltk
# nltk.download("punkt")

# # Tokenize the text into words
# # Lowercasing for consistency
# words = nltk.word_tokenize(text_data.lower()) 

In [4]:
words

['asian',
 'exporters',
 'fear',
 'damage',
 'from',
 'u',
 's',
 'japan',
 'rift',
 'mounting',
 'trade',
 'friction',
 'between',
 'the',
 'u',
 's',
 'and',
 'japan',
 'has',
 'raised',
 'fears',
 'among',
 'many',
 'of',
 'asia',
 's',
 'exporting',
 'nations',
 'that',
 'the',
 'row',
 'could',
 'inflict',
 'far',
 'reaching',
 'economic',
 'damage',
 'businessmen',
 'and',
 'officials',
 'said',
 'they',
 'told',
 'reuter',
 'correspondents',
 'in',
 'asian',
 'capitals',
 'a',
 'u',
 's',
 'move',
 'against',
 'japan',
 'might',
 'boost',
 'protectionist',
 'sentiment',
 'in',
 'the',
 'u',
 's',
 'and',
 'lead',
 'to',
 'curbs',
 'on',
 'american',
 'imports',
 'of',
 'their',
 'products',
 'but',
 'some',
 'exporters',
 'said',
 'that',
 'while',
 'the',
 'conflict',
 'would',
 'hurt',
 'them',
 'in',
 'the',
 'long',
 'run',
 'in',
 'the',
 'short',
 'term',
 'tokyo',
 's',
 'loss',
 'might',
 'be',
 'their',
 'gain',
 'the',
 'u',
 's',
 'has',
 'said',
 'it',
 'will',
 'imp

In [5]:
# Create a set of all unique words and create a dictionary to convert words to integers
word_to_int = {w: i for i, w in enumerate(set(words))}
int_to_word = {i: w for w, i in word_to_int.items()}


In [6]:
len(word_to_int)

2456

In [7]:
# Convert the entire text to integers
encoded_text = np.array([word_to_int[word] for word in words])


In [8]:
# Prepare dataset
def create_sequences(input_data, seq_length):
    sequences = []
    for i in range(0, len(input_data) - seq_length):
        sequence_in = input_data[i:i + seq_length]
        sequence_out = input_data[i + seq_length]
        sequences.append((sequence_in, sequence_out))
    return sequences


In [9]:
seq_length = 20  # Length of input sequences
sequences = create_sequences(encoded_text, seq_length)

In [10]:
sequences

[(array([1157, 2092, 1034, 1061, 1834, 2366, 1677, 1885, 1335,  194,  837,
         2342,  898,  592, 2366, 1677,  104, 1885,  308, 1216]),
  np.int64(2003)),
 (array([2092, 1034, 1061, 1834, 2366, 1677, 1885, 1335,  194,  837, 2342,
          898,  592, 2366, 1677,  104, 1885,  308, 1216, 2003]),
  np.int64(1473)),
 (array([1034, 1061, 1834, 2366, 1677, 1885, 1335,  194,  837, 2342,  898,
          592, 2366, 1677,  104, 1885,  308, 1216, 2003, 1473]),
  np.int64(1291)),
 (array([1061, 1834, 2366, 1677, 1885, 1335,  194,  837, 2342,  898,  592,
         2366, 1677,  104, 1885,  308, 1216, 2003, 1473, 1291]),
  np.int64(1445)),
 (array([1834, 2366, 1677, 1885, 1335,  194,  837, 2342,  898,  592, 2366,
         1677,  104, 1885,  308, 1216, 2003, 1473, 1291, 1445]),
  np.int64(2415)),
 (array([2366, 1677, 1885, 1335,  194,  837, 2342,  898,  592, 2366, 1677,
          104, 1885,  308, 1216, 2003, 1473, 1291, 1445, 2415]),
  np.int64(1677)),
 (array([1677, 1885, 1335,  194,  837, 2342,  

In [11]:
class RNNModel(nn.Module):
    def __init__(self, 
                 input_size, 
                 output_size, 
                 hidden_dim, 
                 n_layers):
        super(RNNModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(
            input_size, hidden_dim)
        self.rnn = nn.RNN(
            hidden_dim, hidden_dim, 
            n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_size)

    def forward(self, x, hidden):
        x = self.embedding(x)
        out, hidden = self.rnn(x, hidden)
        out = out.contiguous().view(-1, self.hidden_dim)
        out = self.fc(out)
        return out, hidden

    def init_hidden(self, batch_size):
        return torch.zeros(self.n_layers, 
                           batch_size, 
                           self.hidden_dim).to(device)


In [12]:
# Check for GPU
device = torch.device(
    "cuda" if torch.cuda.is_available() else "cpu")

# Hyperparameters
input_size = len(word_to_int)
output_size = len(word_to_int)
hidden_dim = 256
n_layers = 3
batch_size = 256
epochs = 5

# Initialize model, loss function, and optimizer
model = RNNModel(
    input_size, 
    output_size, 
    hidden_dim, 
    n_layers).to(device)

loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(
    model.parameters(), lr=0.001)

# Convert sequences to PyTorch tensors
def batchify(data, batch_size):
    # Prepare inputs and targets
    inputs = [item[0] for item in data]
    targets = [item[1] for item in data]

    # Split data into batches
    n_batches = len(inputs) // batch_size
    inputs = inputs[:n_batches * batch_size]
    targets = targets[:n_batches * batch_size]

    # Batchify
    batched_inputs = [torch.tensor(
        inputs[i:i + batch_size], 
        dtype=torch.long) 
                      for i in range(0, 
                                     len(inputs), 
                                     batch_size)]
    batched_targets = [torch.tensor(
        targets[i:i + batch_size], 
        dtype=torch.long) 
                       for i in range(0, 
                                      len(targets), 
                                      batch_size)]

    return batched_inputs, batched_targets

batched_inputs, batched_targets = batchify(sequences, batch_size)


# Training loop
for epoch in range(epochs):
    for batch_idx in range(len(batched_inputs)):
        inputs, targets = batched_inputs[batch_idx], \
        batched_targets[batch_idx]
        inputs, targets = inputs.to(device), \
        targets.to(device)

        hidden = model.init_hidden(batch_size)
        optimizer.zero_grad()
        output, hidden = model(inputs, hidden)


        # Reshape output to [batch_size, seq_length, output_size]
        output = output.view(batch_size, seq_length, -1)

        # Use only the last output of each sequence
        output_last = output[:, -1, :]

        # Flatten the last outputs
        output_flat = output_last.view(-1, output_size)

        # Flatten the targets
        targets_flat = targets.view(-1)

        # Compute the loss
        loss = loss_function(output_flat, targets_flat)

        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch+1}, Loss: {loss.item()}')




  batched_inputs = [torch.tensor(


Epoch 1, Loss: 6.955472946166992
Epoch 2, Loss: 6.512889862060547
Epoch 3, Loss: 6.287113666534424
Epoch 4, Loss: 5.840775966644287
Epoch 5, Loss: 5.3891777992248535


In [13]:
import torch.nn.functional as F

start_seq = "Some exporters said"
model.eval()
input_seq = [word_to_int[word] 
             for word in start_seq.lower().split()]
input_tensor = torch.tensor([input_seq], 
                            dtype=torch.long).to(device)
hidden = model.init_hidden(1)
output, _ = model(input_tensor, hidden)
probabilities = F.softmax(output[-1], dim=0).detach().cpu()

# Choose from top k probabilities
top_prob, top_idx = torch.topk(probabilities, k=1)
next_word = int_to_word[top_idx.numpy()[0]]

In [14]:
next_word

'it'

In [16]:
import torch.nn.functional as F
import random

def generate_text(model, 
                  start_seq, 
                  word_to_int, 
                  int_to_word, 
                  gen_length=50, 
                  top_k=1):
    """
    Generate text using a trained RNN model.

    Parameters:
    model: Trained RNN model.
    start_seq: Starting sequence for text generation.
    word_to_int: Dictionary mapping words to integers.
    int_to_word: Dictionary mapping integers to words.
    gen_length: Number of words to generate.
    top_k: from top_k next words, randomly select one.
    This ensures that results are different for the
    same prompt.

    Returns:
    Generated text.
    """
    model.eval()  # Set the model to evaluation mode

    input_seq = [word_to_int[word] for word in start_seq.lower().split()]
    generated_text = start_seq

    for _ in range(gen_length):
        input_tensor = torch.tensor([input_seq], dtype=torch.long).to(device)
        hidden = model.init_hidden(1)

        output, _ = model(input_tensor, hidden)
        probabilities = F.softmax(output[-1], dim=0).detach().cpu()

        # Choose from top k probabilities
        top_prob, top_idx = torch.topk(probabilities, k=top_k)
        chosen_idx = random.choice(top_idx.numpy())

        next_word = int_to_word[chosen_idx]
        generated_text += ' ' + next_word

        input_seq.append(chosen_idx)
        input_seq = input_seq[1:]

    return generated_text

def generate_text_temperature(model, start_seq, 
                              word_to_int, 
                              int_to_word, 
                              gen_length=50, 
                              temperature=1.0):
    # In this function, the temperature parameter 
    # is used to scale the logits before applying 
    # softmax. A higher temperature (>1) produces 
    # more randomness, while a lower temperature (<1) 
    # makes the model more confident (but potentially 
    # more repetitive).
    
    model.eval()
    input_seq = [word_to_int[word] for word in start_seq.lower().split()]
    generated_text = start_seq

    for _ in range(gen_length):
        input_tensor = torch.tensor([input_seq], dtype=torch.long).to(device)
        hidden = model.init_hidden(1)

        output, _ = model(input_tensor, hidden)
        output = output / temperature  # Adjusting output with temperature
        probabilities = F.softmax(output[-1], dim=0).detach().cpu()

        next_word_idx = torch.multinomial(probabilities, 1).item()
        next_word = int_to_word[next_word_idx]
        generated_text += ' ' + next_word

        input_seq.append(next_word_idx)
        input_seq = input_seq[1:]

    return generated_text

def beam_search(model, 
                start_seq, 
                word_to_int, 
                int_to_word, 
                beam_width=5, 
                gen_length=50):
    # Initialize beams as a list of tuples (sequence, probability)
    initial_seq = [word_to_int[word] for word in start_seq.lower().split()]
    beams = [(initial_seq, 1.0)]

    for _ in range(gen_length):
        new_beams = []
        for seq, prob in beams:
            input_tensor = torch.tensor([seq], dtype=torch.long).to(device)
            hidden = model.init_hidden(1)

            output, _ = model(input_tensor, hidden)
            probabilities = F.softmax(output[-1], dim=0).detach().cpu().numpy()

            # Consider top beam_width choices for this beam
            top_indices = np.argsort(probabilities)[-beam_width:]
            for idx in top_indices:
                new_seq = seq + [idx]
                new_prob = prob * probabilities[idx]
                new_beams.append((new_seq, new_prob))

        # Select top beam_width beams
        beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width]

    # Choose the beam with the highest probability
    best_seq, _ = max(beams, key=lambda x: x[1])
    return ' '.join([int_to_word[idx] for idx in best_seq])



In [1]:
start_sequence = "INDONESIA SEES CPO PRICE RISING SHARPLY "
generated_sequence_length = 50  # Number of words to generate

'''
generated_text = generate_text(model, 
                               start_sequence, 
                               word_to_int, 
                               int_to_word, 
                               generated_sequence_length)

generated_text = generate_text_temperature(
    model, start_sequence, 
    word_to_int, int_to_word, generated_sequence_length)
'''
generated_text = beam_search(model, 
                             start_sequence, 
                             word_to_int, int_to_word, 
                             gen_length=generated_sequence_length)

print(generated_text)

NameError: name 'beam_search' is not defined