In [1]:
import numpy as np
import os
import pandas as pd
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader

from sklearn.model_selection import train_test_split

pd.set_option('display.max_colwidth', None)

In [2]:
project_folder = "diygenomics-projects"
sub_category = "math"
input_file = 'combined_extracted_annotated_math.csv'
combined_math_file = 'combined_extracted_annotated_math.csv'
index_col = 'uuid'

model_path = 'lstm_character_level_v2'

In [3]:
data_path = os.getenv('DATA_PATH')
file_path = lambda *args: os.path.join(data_path, project_folder, sub_category, *args)

if not os.path.exists(file_path(model_path)):
    os.makedirs(file_path(model_path))

In [4]:
df = pd.read_csv(file_path(input_file), index_col=index_col)

In [5]:
latex_corpus = df['math'].astype(str).str.cat(sep=' ')

In [6]:
class LSTMTextGenerator:
    def __init__(self, corpus, seq_length=100, hidden_size=256, lr=0.001, epochs=50, batch_size=64):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.corpus = corpus
        self.seq_length = seq_length
        self.hidden_size = hidden_size
        self.lr = lr
        self.epochs = epochs
        self.batch_size = batch_size
        self.dataX, self.dataY, self.char_to_int, self.int_to_char = self.prepare_data()
        self.model = self.build_model().to(self.device)
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=self.lr)
        
    class CharPredictor(nn.Module):
        def __init__(self, input_size, hidden_size, output_size):
            super().__init__()
            self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
            self.fc = nn.Linear(hidden_size, output_size)

        def forward(self, x):
            out, _ = self.lstm(x)
            out = self.fc(out[:, -1, :])
            return out

        def get_embeddings(self, x):
            out, _ = self.lstm(x)
            return out  # return LSTM outputs as embeddings
        
    def get_embeddings(self, text):
        text_int = [self.char_to_int[char] for char in text]
        text_int = np.reshape(text_int, (1, len(text_int), 1))
        text_tensor = torch.from_numpy(text_int).float()

        # Get the current device (CPU or GPU)
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Move the input tensor to the current device
        text_tensor = text_tensor.to(device)

        with torch.no_grad():
            embeddings = self.model.get_embeddings(text_tensor)

        return embeddings.cpu().numpy()

    def prepare_data(self):
        chars = sorted(list(set(self.corpus)))
        char_to_int = {ch: i for i, ch in enumerate(chars)}
        int_to_char = {i: ch for i, ch in enumerate(chars)}

        dataX, dataY = [], []
        for i in range(0, len(self.corpus) - self.seq_length, 1):
            seq_in = self.corpus[i:i + self.seq_length]
            seq_out = self.corpus[i + self.seq_length]
            dataX.append([char_to_int[char] for char in seq_in])
            dataY.append(char_to_int[seq_out])

        return np.array(dataX), np.array(dataY), char_to_int, int_to_char

    def build_model(self):
        return self.CharPredictor(1, self.hidden_size, len(self.char_to_int))

    def create_data_loader(self, X, y):
        dataset = TensorDataset(X, y)
        dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)
        return dataloader
    
    def predict(self, input_sequence, predict_length=100):
        self.model.eval()  # Set the model to evaluation mode

        # Convert char to int for input sequence
        input_sequence = [self.char_to_int[ch] for ch in input_sequence]
        input_sequence = np.reshape(input_sequence, (1, len(input_sequence), 1))
        input_sequence = torch.from_numpy(input_sequence).float().to(self.device)

        predicted_output = ''
        with torch.no_grad():
            for _ in range(predict_length):
                output = self.model(input_sequence)
                _, predicted = torch.max(output.data, 1)
                predicted_output += self.int_to_char[predicted.item()]

                # Use the predicted character to generate the next character
                input_sequence = torch.cat((input_sequence[:, 1:, :], predicted.float().unsqueeze(0).unsqueeze(2)), dim=1)

        return predicted_output

    def train(self):
        X_train, X_test, y_train, y_test = train_test_split(self.dataX, self.dataY, test_size=0.2, random_state=42)
        X_train = np.reshape(X_train, (len(X_train), self.seq_length, 1))
        X_test = np.reshape(X_test, (len(X_test), self.seq_length, 1))
        X_train = torch.from_numpy(X_train).float().to(self.device)
        y_train = torch.from_numpy(y_train).long().to(self.device)
        X_test = torch.from_numpy(X_test).float().to(self.device)
        y_test = torch.from_numpy(y_test).long().to(self.device)

        train_loader = self.create_data_loader(X_train, y_train)
        test_loader = self.create_data_loader(X_test, y_test)

        for epoch in range(self.epochs):
            for batch in train_loader:
                x, y = batch
                self.optimizer.zero_grad()
                output = self.model(x)
                loss = self.criterion(output, y)
                loss.backward()
                self.optimizer.step()

            print(f'Epoch: {epoch+1}, Loss: {loss.item()}')
            torch.save(self.model.state_dict(), file_path(model_path, f'model_epoch_{epoch+1}.pth'))

        self.model.eval()
        with torch.no_grad():
            correct = 0
            total = 0
            for x, y in test_loader:
                output = self.model(x)
                _, predicted = torch.max(output.data, 1)
                total += y.size(0)
                correct += (predicted == y).sum().item()
            print('Test Accuracy: %d %%' % (100 * correct / total)) 

In [7]:
corpus = latex_corpus
text_generator = LSTMTextGenerator(corpus) # , seq_length=200, epochs=100, hidden_size=512
text_generator.train()

Epoch: 1, Loss: 2.5626513957977295
Epoch: 2, Loss: 1.9624334573745728
Epoch: 3, Loss: 2.0024614334106445
Epoch: 4, Loss: 1.5447893142700195
Epoch: 5, Loss: 1.5110098123550415
Epoch: 6, Loss: 1.7050663232803345
Epoch: 7, Loss: 1.2821760177612305
Epoch: 8, Loss: 0.9019022583961487
Epoch: 9, Loss: 1.1504005193710327
Epoch: 10, Loss: 0.6830762624740601
Epoch: 11, Loss: 1.0580450296401978
Epoch: 12, Loss: 1.083831787109375
Epoch: 13, Loss: 1.0346381664276123
Epoch: 14, Loss: 1.6023166179656982
Epoch: 15, Loss: 0.5750702619552612
Epoch: 16, Loss: 0.968996524810791
Epoch: 17, Loss: 0.7857094407081604
Epoch: 18, Loss: 1.0009915828704834
Epoch: 19, Loss: 0.8866775631904602
Epoch: 20, Loss: 0.8769830465316772
Epoch: 21, Loss: 0.489242285490036
Epoch: 22, Loss: 0.9786921739578247
Epoch: 23, Loss: 0.6967050433158875
Epoch: 24, Loss: 0.9609808325767517
Epoch: 25, Loss: 0.7508860230445862
Epoch: 26, Loss: 0.741387128829956
Epoch: 27, Loss: 1.4323986768722534
Epoch: 28, Loss: 0.8514877557754517
Epoch

In [8]:
print(text_generator.predict("mathrm{d}", 20))

=__n)\}- \ee iovd_{t


In [9]:
embedding = text_generator.get_embeddings(df.iloc[0]['math'])

In [10]:
# model_path = "model_epoch_50.pth"

# text_generator.model.load_state_dict(torch.load(model_path))

# text_generator.model.eval()