In [1]:
import numpy as np
import torch
import torch.nn as nn

In [2]:
class Preprocessing:
    def read_dataset(self,file):
        letters = ['a','b','c','d','e','f','g','h','i','j','k','l','m',
                   'n','o','p','q','r','s','t','u','v','w','x','y','z',' ']
        # Open raw file
        with open(file, 'r') as f:
            raw_text = f.readlines()

        # Convert into lowercase
        raw_text = [line.lower() for line in raw_text]

        # Create a string which contains the entire text
        text_string = ''
        for line in raw_text:
            text_string += line.strip()

        # Create an array by char
        text = list()
        for char in text_string:
            text.append(char)

        # Remove all symbosl and just keep letters
        text = [char for char in text if char in letters]

        return text

    def create_dictionary(self,text):
        char_to_idx = dict()
        idx_to_char = dict()

        idx = 0
        for char in text:
            if char not in char_to_idx.keys():

                # Build dictionaries
                char_to_idx[char] = idx
                idx_to_char[idx] = char
                idx += 1
        return char_to_idx, idx_to_char
    
    def build_sequences(self,text, char_to_idx, window):
        x = list()
        y = list()

        for i in range(len(text)):
            try:
                # Get window of chars from text
                # Then, transform it into its idx representation
                sequence = text[i:i+window]
                sequence = [char_to_idx[char] for char in sequence]

                # Get word target
                # Then, transfrom it into its idx representation
                target = text[i+window]
                target = char_to_idx[target]

                # Save sequences and targets
                x.append(sequence)
                y.append(target)
            except:
                pass

        x = np.array(x)
        y = np.array(y)

        return x,y

In [3]:
class TextGeneratorModel(nn.ModuleList):
    def __init__(self,args,vocab_size):
        super(TextGeneratorModel, self).__init__()
        
        self.batch_size = args.batch_size
        self.hidden_dim = args.hidden_dim
        self.input_size = vocab_size
        self.num_layers = vocab_size
        self.sequence_len = args.window
        
        # Dropout
        self.dropout = nn.Dropout(0.25)
        
        # Embedding layer
        self.embedding = nn.Embedding(self.input_size, self.hidden_dim, padding_idx=0)
        
        # Bi-LSTM
        # Forward and backward
        self.lstm_foward = nn.LSTMCell(self.hidden_dim, self.hidden_dim)        
        self.lstm_backward = nn.LSTMCell(self.hidden_dim, self.hidden_dim)
        
        # LSTM layer
        self.lstm = nn.LSTMCell(self.hidden_dim*2, self.hidden_dim*2)
        
        # Linear layer
        self.linear = nn.Linear(self.hidden_dim*2, self.num_layers)
        
    def forward(self,x):
        # Bi-LSTM
        # hs = [batch_size x hidden_size]
        # cs = [batch_size x hidden_size]
        hs_forward = torch.zeros(x.size(0), self.hidden_dim)
        cs_forward = torch.zeros(x.size(0), self.hidden_dim)
        hs_backward = torch.zeros(x.size(0), self.hidden_dim)
        cs_backward = torch.zeros(x.size(0), self.hidden_dim)
        
        # LSTM
        # hs = [batch_size x (hidden_size * 2)]
        # cs = [batch_size x (hidden_size * 2)]
        hs_lstm = torch.zeros(x.size(0), self.hidden_dim * 2)
        cs_lstm = torch.zeros(x.size(0), self.hidden_dim * 2)
        
        # Weights initialization
        nn.init.kaiming_normal_(hs_forward)
        nn.init.kaiming_normal_(cs_forward)
        nn.init.kaiming_normal_(hs_backward)
        nn.init.kaiming_normal_(cs_backward)
        nn.init.kaiming_normal_(hs_lstm)
        nn.init.kaiming_normal_(cs_lstm)
        
        # From idx to embedding
        out = self.embedding(x)
        
        # Prepare the shape for LSTM Cells
        out = out.view(self.sequence_len, x.size(0), -1)
        
        forward = []
        backward = []
        
        # Unfolding Bi-LSTM
        # Forward
        for i in range(self.sequence_len):
            hs_forward, cs_forward = self.lstm_foward(out[i], (hs_backward,cs_forward))
            hs_forward = self.dropout(hs_forward)
            cs_forward = self.dropout(cs_forward)
            forward.append(hs_forward)
            
        # Backward
        for i in reversed(range(self.sequence_len)):
            hs_backward, cs_backward = self.lstm_backward(out[i], (hs_backward, cs_backward))
            hs_backward = self.dropout(hs_backward)
            cs_backward = self.dropout(cs_backward)
            backward.append(hs_backward)
            
        # LSTM
        for fwd, bwd in zip(forward, backward):
            input_tensor = torch.cat((fwd,bwd), 1)
            hs_lstm , cs_lstm = self.lstm(input_tensor, (hs_lstm, cs_lstm))
            
        # Last hidden state is passed through a linear layer
        out = self.linear(hs_lstm)
        
        return out

In [46]:
class Execution:
    
    def __init__(self, args):
        self.file = 'data/test.txt'
        self.window = args.window
        self.batch_size = args.batch_size
        self.learning_rate = args.learning_rate
        self.num_epochs = args.num_epochs

        self.targets = None
        self.sequences = None
        self.vocab_size = None
        self.char_to_idx = None
        self.idx_to_char = None
        
    def prepare_data(self):

        # Initialize preprocessor object
        preprocessing = Preprocessing()

        # The 'file' is loaded and split by char
        text = preprocessing.read_dataset(self.file)

        # Given 'text', it is created two dictionaries
        # a dictiornary about: from char to index
                # a dictorionary about: from index to char
        self.char_to_idx, self.idx_to_char = preprocessing.create_dictionary(text)

        # Given the 'window', it is created the set of training sentences as well as
        # the set of target chars
        self.sequences, self.targets = preprocessing.build_sequences(text, self.char_to_idx, window=self.window)

        # Gets the vocabuly size
        self.vocab_size = len(self.char_to_idx)
        
    
    def train(self, args):
        
        # Model initialization
        model = TextGeneratorModel(args, self.vocab_size)
        
        # Optimizer initialization
        optimizer = torch.optim.RMSprop(model.parameters(), lr= self.learning_rate)
        
        # Defining number of batches
        num_batches = int(len(self.sequences)/self.batch_size)
        
        # Set model in training mode
        model.train()
        
        # Training phase
        for epoch in range(self.num_epochs):
            
            # Mini batches
            for i in range(num_batches):
                
                # Batch definition
                try:
                    x_batch = self.sequences[i * self.batch_size : (i + 1) * self.batch_size]
                    y_batch = self.targets[i * self.batch_size : (i + 1) * self.batch_size]
                except:
                    x_batch = self.sequences[i * self.batch_size :]
                    y_batch = self.targets[i * self.batch_size :]
                    
                # Convert numpy array into torch tensors
                x = torch.from_numpy(x_batch).type(torch.LongTensor)
                y = torch.from_numpy(y_batch).type(torch.LongTensor)
                
                # Forward pass
                y_pred = model(x)
                
                # Loss calc
                loss = nn.functional.cross_entropy(y_pred, y.squeeze())
                
                # Clean gradients
                optimizer.zero_grad()
                
                # Backpropagation
                loss.backward()
                
                # Update parameters through gradient descent
                optimizer.step()
                
            print("Epoch: %d ,  loss: %.5f " % (epoch, loss.item()))
                
        # Save weights
#         torch.save(model.state_dict(), 'weights/textGenerator_model.pt')
                
    def generator(self,model, sequences, idx_to_char, n_char):
        
        # Set model in evaluation model
        model.eval()
        
        # Softmax activation function
        softmax = nn.Softmax(dim=1)
        
        # Randomly is selected the index from the set of sequences
        start = np.random.randint(0, len(sequences)-1)
        
        # The pattern is defined given the random idx
        pattern = sequences[start]
        
        # By making use of the dictionaries, it is printed the pattern
        print("\nPattern: \n")
        print(''.join([idx_to_char[value] for value in pattern]), "\"")
        
        # In full_prediction we will save the complete prediction
        full_prediction = pattern.copy()
        
        # The prediction starts, it is going to be predicted a given
        # number of characters
        for i in range(n_char):
            
            # The numpy patterns is transformed into a tesor-type and reshaped
            pattern = torch.from_numpy(pattern).type(torch.LongTensor)
            pattern = pattern.view(1,-1)

            # Make a prediction given the pattern
            prediction = model(pattern)
            # It is applied the softmax function to the predicted tensor
            prediction = softmax(prediction)

            # The prediction tensor is transformed into a numpy array
            prediction = prediction.squeeze().detach().numpy()
            # It is taken the idx with the highest probability
            arg_max = np.argmax(prediction)

            # The current pattern tensor is transformed into numpy array
            pattern = pattern.squeeze().detach().numpy()
            # The window is sliced 1 character to the right
            pattern = pattern[1:]
            # The new pattern is composed by the "old" pattern + the predicted character
            pattern = np.append(pattern, arg_max)

            # The full prediction is saved
            full_prediction = np.append(full_prediction, arg_max)
            
        print("Prediction: \n")
        print(' '.join([idx_to_char[value] for value in full_prediction]), "\"")

In [47]:
class args:
    window = 5
    num_epochs = 50
    hidden_dim = 128
    batch_size = 6
    learning_rate = 0.001

In [48]:
## Execution

# Load and preprare the sequences
execution = Execution(args)
execution.prepare_data()

# Training the model
execution.train(args)

Epoch: 0 ,  loss: 2.96876 
Epoch: 1 ,  loss: 2.54420 
Epoch: 2 ,  loss: 1.94019 
Epoch: 3 ,  loss: 1.43155 
Epoch: 4 ,  loss: 1.00472 
Epoch: 5 ,  loss: 0.38241 
Epoch: 6 ,  loss: 0.29214 
Epoch: 7 ,  loss: 0.81126 
Epoch: 8 ,  loss: 0.96055 
Epoch: 9 ,  loss: 0.28479 
Epoch: 10 ,  loss: 0.12264 
Epoch: 11 ,  loss: 0.07859 
Epoch: 12 ,  loss: 0.05342 
Epoch: 13 ,  loss: 0.05423 
Epoch: 14 ,  loss: 0.04125 
Epoch: 15 ,  loss: 0.03350 
Epoch: 16 ,  loss: 0.06954 
Epoch: 17 ,  loss: 0.03093 
Epoch: 18 ,  loss: 0.02811 
Epoch: 19 ,  loss: 0.02445 
Epoch: 20 ,  loss: 0.02176 
Epoch: 21 ,  loss: 0.01909 
Epoch: 22 ,  loss: 0.01707 
Epoch: 23 ,  loss: 0.01412 
Epoch: 24 ,  loss: 0.01264 
Epoch: 25 ,  loss: 0.01401 
Epoch: 26 ,  loss: 0.01140 
Epoch: 27 ,  loss: 0.00915 
Epoch: 28 ,  loss: 0.01103 
Epoch: 29 ,  loss: 0.00881 
Epoch: 30 ,  loss: 0.00922 
Epoch: 31 ,  loss: 0.00760 
Epoch: 32 ,  loss: 0.00602 
Epoch: 33 ,  loss: 0.00572 
Epoch: 34 ,  loss: 0.00687 
Epoch: 35 ,  loss: 0.00462 
Ep

In [None]:
# pp = Preprocessing()
# pp.read_dataset('data/pg.txt')

In [6]:
### Testing
file = './data/test.txt'
pp = Preprocessing()

In [8]:
text = pp.read_dataset(file)

In [40]:
text

['t',
 'h',
 'i',
 's',
 ' ',
 'i',
 's',
 ' ',
 'a',
 ' ',
 't',
 'e',
 'x',
 't',
 ' ',
 'd',
 'o',
 'c',
 'u',
 'm',
 'e',
 'n',
 't',
 ' ',
 'h',
 'o',
 'p',
 'e',
 ' ',
 'y',
 'o',
 'u',
 ' ',
 'u',
 's',
 'e',
 ' ',
 't',
 'h',
 'i',
 's',
 ' ',
 'f',
 'o',
 'r',
 ' ',
 'e',
 'd',
 'u',
 'c',
 'a',
 't',
 'i',
 'o',
 'n',
 'a',
 'l',
 ' ',
 'p',
 'u',
 'r',
 'p',
 'o',
 's',
 'e',
 ' ',
 'o',
 'n',
 'l',
 'y']

In [10]:
type(text)

list

In [15]:
text[len(text)-1]

'y'

In [14]:
text[-1:]

['y']

In [16]:
dic = pp.create_dictionary(text)

In [27]:
seq = pp.build_sequences(text, dic[0], 5)

In [31]:
seq[0].shape

(65, 5)

In [33]:
seq[1].shape

(65,)

In [49]:
sequen = "This is a test."
ox = pp.create_dictionary(sequen)

In [50]:
model = TextGeneratorModel(args,len(sequen))

In [52]:
# execution.generator(model, sequen, ox[1], 3)