<a href="https://colab.research.google.com/github/pogamar/NeuralNetworkProject_588/blob/prod/project_lstm_588.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from collections import Counter

In [3]:
#data collection
# Load the source CSV file
source_csv = '/content/gdrive/My Drive/project_lstm_588/1500_haskell.csv'
df = pd.read_csv(source_csv)

# Randomly select 50 rows
df_50 = df.sample(n=100, random_state=42)

# Save the new CSV file with 50 randomly selected rows
output_csv = '/content/gdrive/My Drive/project_lstm_588/100_random_rows.csv'
df_50.to_csv(output_csv, index=False)

model_path =  '/content/gdrive/My Drive/project_lstm_588/model.pt'

In [4]:
import re

def tokenize_haskell(code):
    # Remove single-line comments
    code = re.sub(r'--.*', '', code)

    # Remove multi-line comments
    code = re.sub(r'{-[\s\S]*?-}', '', code)

    # Replace string literals with a special token
    code = re.sub(r'\"(?:[^\"\\]|\\.)*\"', '<STR>', code)

    # Replace character literals with a special token
    code = re.sub(r'\'(?:[^\'\\]|\\.)*\'', '<CHAR>', code)

    # Define a regular expression pattern for Haskell tokens
    token_pattern = r'([(){}\[\]=.,;:!?|&+*\-/<>\^%\$@~#]|<STR>|<CHAR>|\b\w+\b)'

    # Find all tokens using the regex pattern
    tokens = re.findall(token_pattern, code)

    return tokens


In [15]:
class Model(nn.Module):
    def __init__(self, dataset):
        super(Model, self).__init__()
        self.lstm_size = 128
        self.hidden_size = 256
        self.embedding_dim = 128
        self.num_layers = 3

        n_vocab = len(dataset.uniq_words)
        self.embedding = nn.Embedding(
            num_embeddings=n_vocab,
            embedding_dim=self.embedding_dim,
        )
        self.lstm = nn.LSTM(
            input_size=self.lstm_size,
            hidden_size=self.hidden_size,
            num_layers=self.num_layers,
            dropout=0.2,
        )
        self.fc = nn.Linear(self.lstm_size, n_vocab)

    def forward(self, x, prev_state):
        embed = self.embedding(x)
        output, state = self.lstm(embed, prev_state)
        logits = self.fc(output)
        return logits, state

    def init_state(self, sequence_length):
        return (torch.zeros(self.num_layers, sequence_length, self.lstm_size),
                torch.zeros(self.num_layers, sequence_length, self.lstm_size))

In [6]:
class HaskellDatasetPre(torch.utils.data.Dataset):
    def __init__(self, args, vocab=None):
        self.args = args
        self.words = self.load_words()
        if vocab:
            self.uniq_words = vocab
        else:
            self.uniq_words = self.get_uniq_words()

        self.index_to_word = {index: word for index, word in enumerate(self.uniq_words)}
        self.word_to_index = {word: index for index, word in enumerate(self.uniq_words)}

        self.words_indexes = [self.word_to_index[w] for w in self.words]

    def load_words(self):
        #data collection
        df = pd.read_csv(source_csv)
        tokenized_sequences = [tokenize_haskell(snippet) for snippet in list(df.content.values)]

        words = []
        for tokens in tokenized_sequences:
            words.extend(tokens)
        return words

    def get_uniq_words(self):
        word_counts = Counter(self.words)
        return sorted(word_counts, key=word_counts.get, reverse=True)

    def __len__(self):
        return len(self.words_indexes) - self.args.sequence_length

    def __getitem__(self, index):
        return (
            torch.tensor(self.words_indexes[index:index+self.args.sequence_length]),
            torch.tensor(self.words_indexes[index+1:index+self.args.sequence_length+1]),
        )

In [7]:
class HaskellDataset(torch.utils.data.Dataset):
    def __init__(self, args, vocab=None):
        self.args = args
        self.words = self.load_words()
        if vocab:
            self.uniq_words = vocab
        else:
            self.uniq_words = self.get_uniq_words()

        self.index_to_word = {index: word for index, word in enumerate(self.uniq_words)}
        self.word_to_index = {word: index for index, word in enumerate(self.uniq_words)}

        self.words_indexes = [self.word_to_index[w] for w in self.words]

    def load_words(self):
        #data collection
        df = pd.read_csv(output_csv)
        tokenized_sequences = [tokenize_haskell(snippet) for snippet in list(df.content.values)]

        words = []
        for tokens in tokenized_sequences:
            words.extend(tokens)
        return words

    def get_uniq_words(self):
        word_counts = Counter(self.words)
        return sorted(word_counts, key=word_counts.get, reverse=True)

    def __len__(self):
        return len(self.words_indexes) - self.args.sequence_length

    def __getitem__(self, index):
        return (
            torch.tensor(self.words_indexes[index:index+self.args.sequence_length]),
            torch.tensor(self.words_indexes[index+1:index+self.args.sequence_length+1]),
        )


In [8]:
import argparse
import torch
import numpy as np
from torch import nn, optim
from torch.utils.data import DataLoader

def train(dataset, model, args):
    model.train()

    dataloader = DataLoader(dataset, batch_size=args.batch_size, num_workers=2)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(args.max_epochs):
      state_h, state_c = model.init_state(args.sequence_length)

      for batch, (x, y) in enumerate(dataloader):
        optimizer.zero_grad()
        y_pred, (state_h, state_c) = model(x, (state_h, state_c))
        loss = criterion(y_pred.transpose(1, 2), y)
        state_h = state_h.detach()
        state_c = state_c.detach()
        loss.backward()
        optimizer.step()
        if batch % 100 == 0:
          print({ 'epoch': epoch, 'batch': batch, 'loss': loss.item() })

      torch.save(model.state_dict(), model_path)

In [9]:
def generate_code_from_source(dataset, model, input_seq):
    generated_code = predict(dataset, model, text=input_seq, next_words=50)
    return generated_code

In [10]:
def predict(dataset, model, text, next_words=100):
    model.eval()

    words = text.split(' ')
    state_h, state_c = model.init_state(len(words))

    for i in range(0, next_words):
        x = torch.tensor([[dataset.word_to_index[w] for w in words[i:]]])
        y_pred, (state_h, state_c) = model(x, (state_h, state_c))

        last_word_logits = y_pred[0][-1]
        p = torch.nn.functional.softmax(last_word_logits, dim=0).detach().numpy()
        word_index = np.random.choice(len(last_word_logits), p=p)
        words.append(dataset.index_to_word[word_index])

    return words

In [11]:
class Args:
    def __init__(self, max_epochs, batch_size, sequence_length):
        self.max_epochs = 10
        self.batch_size = 256
        self.sequence_length = 4

In [13]:
from zmq.constants import NULL

args =Args(max_epochs=5, batch_size=500, sequence_length=50)

training_dataset = HaskellDatasetPre(args)

# Then, get the vocabulary list from the training_dataset
training_vocabulary_list = training_dataset.uniq_words

dataset = HaskellDataset(args, vocab=training_vocabulary_list)

try:
  model_state_dict = torch.load('/content/gdrive/My Drive/project_lstm_588/model.pt')
except:
  model_state_dict = NULL
model = Model(dataset)
if model_state_dict :
  print("model loaded")
  model.load_state_dict(model_state_dict)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
train(dataset, model, args)


model loaded
{'epoch': 0, 'batch': 0, 'loss': 0.47985339164733887}
{'epoch': 1, 'batch': 0, 'loss': 0.4477563202381134}
{'epoch': 2, 'batch': 0, 'loss': 0.38539692759513855}
{'epoch': 3, 'batch': 0, 'loss': 0.3667314052581787}
{'epoch': 4, 'batch': 0, 'loss': 0.3358776867389679}
{'epoch': 5, 'batch': 0, 'loss': 0.3535510003566742}
{'epoch': 6, 'batch': 0, 'loss': 0.34082895517349243}
{'epoch': 7, 'batch': 0, 'loss': 0.31231504678726196}
{'epoch': 8, 'batch': 0, 'loss': 0.3088725507259369}
{'epoch': 9, 'batch': 0, 'loss': 0.300496369600296}


In [None]:
print(predict(dataset, model, text='let'))