In [1]:
import time
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.distributed as dist
import torch.multiprocessing as mp
from sklearn.metrics import r2_score
import os

In [2]:
%%time
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cpu
CPU times: user 3.71 ms, sys: 20 ms, total: 23.7 ms
Wall time: 23 ms


In [14]:
input_file = "shakespear.txt"
with open(input_file, "r", encoding="utf-8") as file:
    text = file.read()
print(len(text))
vocab_size = len(set(text))

139773


In [38]:
def get_data(filename, nmbr_of_samples, lenght_of_samples, for_training, len_for_test):
    with open(filename, "r", encoding="utf-8") as file:
        text = file.read()
    text2 = []
    sorted_set = sorted(set(text))
    dictionary_set = dict()
    for i in range(len(sorted_set)):
        dictionary_set[sorted_set[i]] = i
    #print(sorted_set)
    #print(dictionary_set)
    for i in range(len(text)):
        break
    x = np.zeros((nmbr_of_samples, lenght_of_samples), dtype=float)
    y = np.zeros((nmbr_of_samples, 1))
    if (for_training == 1):
        minimum = 0
        maximum = len(text) - len_for_test - 1 - lenght_of_samples
    else:
        minimum = len(text) - len_for_test
        maximum = len(text) - 1 - lenght_of_samples
    #print("set: ", sorted(set(text)))
    for i in range(nmbr_of_samples):
        r = random.randint(minimum, maximum)
        string = text[r:r+lenght_of_samples+1]
        string2 = []
        #string = [ord(char) for char in string]
        for ii in range(len(string)):
            string2.append(dictionary_set[string[ii]])
        x[i, :] = string2[0:lenght_of_samples]
        y[i, :] = string2[lenght_of_samples]
        string2 = []
    return x, y, dictionary_set

In [39]:
class SimpleModel(nn.Module):
    def __init__(self, input_size):
        super(SimpleModel, self).__init__()
        self.linear1 = nn.Linear(input_size, 512)
        self.linear2 = nn.Linear(512, 128)
        self.linear3 = nn.Linear(128, 64)
        self.output = nn.Linear(64, 1)
        self.relu = nn.ReLU()
        self.sigm = nn.Sigmoid()

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.relu(x)
        x = self.linear3(x)
        x = self.sigm(x)
        x = self.output(x)
        return x

class Trainer:
    def __init__(self, input_size, learning_rate, num_epochs):
        self.model = SimpleModel(input_size)
        self.criterion = nn.MSELoss()
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.num_epochs = num_epochs

    def train(self, inputs, targets):
        for epoch in range(self.num_epochs):
            # Forward pass
            outputs = self.model(inputs)
            loss = self.criterion(outputs, targets)

            # Backward pass and optimization
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

            # Print progress
            if (epoch + 1) % 100 == 0:
                print(f"Epoch [{epoch+1}/{self.num_epochs}], Loss: {loss.item():.4f}")
            print(epoch+1, end=" ")

    def predict(self, inputs):
        with torch.no_grad():
            return self.model(inputs)


In [42]:
%%time
input_size = 20
num_epochs = 10
learning_rate = 0.01

def get_dataset():
    global input_file
    x, y, dictionary_set = get_data(input_file, 10000, input_size, True, 10000)
    x = torch.tensor(x)/vocab_size
    y = torch.tensor(y)/vocab_size
    x,y=x.type(torch.FloatTensor),y.type(torch.FloatTensor)
    x = x.to(device)
    y = y.to(device)
    return x, y, dictionary_set

trainer = Trainer(input_size, learning_rate, num_epochs)

num_cores = torch.get_num_threads()  # Get the number of available CPU cores
for i in range(1):
    x, y, dictionary_set = get_dataset()
    trainer.train(x, y)

1 2 3 4 5 6 7 8 9 10 CPU times: user 13.1 s, sys: 78.5 ms, total: 13.2 s
Wall time: 1.23 s


In [43]:
def get_key_by_value(dictionary, value):
    return next(key for key, val in dictionary.items() if val == value)

input_vector = []
input_string = "And a speak anything against me"
output_string = []
dictionary_set_reversed = {value: key for key, value in dictionary_set.items()}
for i in range(len(input_string)):
    input_vector.append(dictionary_set[input_string[i]]/vocab_size)
input_vector = input_vector[-input_size:]

for i in range(50):
    #print(input_vector)
    predicted = trainer.predict(torch.tensor(input_vector).type(torch.FloatTensor))
    predicted = predicted*vocab_size
    predicted = round(float(predicted[0].float()))
    predicted = max(0, predicted)
    predicted = min(vocab_size -1, predicted)
    output_string.append(dictionary_set_reversed[predicted])
    input_vector = input_vector[1:]
    input_vector.append(predicted/vocab_size)
for i in range(len(output_string)):
    print(output_string[i], end="")
print()
print(output_string)
print(dictionary_set)
print(dictionary_set_reversed)

bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
['b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b', 'b']
{'\n': 0, ' ': 1, '!': 2, '"': 3, '&': 4, "'": 5, ',': 6, '-': 7, '.': 8, ':': 9, ';': 10, '?': 11, 'A': 12, 'B': 13, 'C': 14, 'D': 15, 'E': 16, 'F': 17, 'G': 18, 'H': 19, 'I': 20, 'J': 21, 'K': 22, 'L': 23, 'M': 24, 'N': 25, 'O': 26, 'P': 27, 'Q': 28, 'R': 29, 'S': 30, 'T': 31, 'U': 32, 'V': 33, 'W': 34, 'Y': 35, 'Z': 36, '[': 37, ']': 38, '_': 39, 'a': 40, 'b': 41, 'c': 42, 'd': 43, 'e': 44, 'f': 45, 'g': 46, 'h': 47, 'i': 48, 'j': 49, 'k': 50, 'l': 51, 'm': 52, 'n': 53, 'o': 54, 'p': 55, 'q': 56, 'r': 57, 's': 58, 't': 59, 'u': 60, 'v': 61, 'w': 62, 'x': 63, 'y': 64, 'z': 65}
{0: '\n', 1: ' ', 2: '!', 3: '"', 4: '&', 5: "'", 6: ',', 7: '-', 8: '.', 9: ':', 10: ';', 11: '?', 12: 'A', 13: