In [None]:
from google.colab import drive
drive.mount("/content/drive")

In [None]:
!pip install javalang

In [None]:
import math
import io
import os
import torchtext
import torch
import torch.nn as nn
from torchtext.vocab import vocab
from torch import Tensor
from torch.nn import (TransformerEncoder, TransformerDecoder,TransformerEncoderLayer, TransformerDecoderLayer)
from collections import OrderedDict
import javalang
import json
import re

In [None]:
BPE_FILEPATH = "drive/MyDrive/dissertation_workplace/code_translation/preprocessed_files/BPE"
OUTPUT_FILEPATH = "drive/MyDrive/dissertation_workplace/code_translation/output_files"

SRC_LANGUAGE = "pn"
TGT_LANGUAGE = "ja"
MAX_COUNT = 10000
NUM_EPOCHS = 100
LEARNING_RATE = 2e-5
ACTIVATION = "gelu"
BATCH_SIZE = 16
NUM_ENCODER_LAYERS = 12
NUM_ENCODER_LAYERS = 12
TEST_MODEL = f"{OUTPUT_FILEPATH}/sourcecode_nmt_{SRC_LANGUAGE}2{TGT_LANGUAGE}_{MAX_COUNT}C_{NUM_EPOCHS}E_{LEARNING_RATE}LR_{ACTIVATION}_{BATCH_SIZE}B_{NUM_ENCODER_LAYERS}E_{NUM_ENCODER_LAYERS}D.pth"
TEST_MODEL_OUTPUT_PATH = TEST_MODEL[0:-4]

# Place-holders
vocab_transform = {}

In [None]:
if not os.path.exists(TEST_MODEL_OUTPUT_PATH):
  os.makedirs(TEST_MODEL_OUTPUT_PATH)

In [None]:
def build_vocab(filename):
  ordered_dict = OrderedDict()
  with io.open(filename) as file:
    for string_ in file:
      word_feq = string_.rstrip("\n").split(" ")
      word = word_feq[0]
      feq = int(word_feq[1])
      ordered_dict[word] = feq
  vocabulary = vocab(ordered_dict)
  unk_token = "<unk>"
  if unk_token not in vocabulary: vocabulary.insert_token(unk_token, 0)
  vocabulary.insert_token("<pad>", 1)
  vocabulary.insert_token("<bos>", 2)
  vocabulary.insert_token("<eos>", 3)
  vocabulary.set_default_index(vocabulary[unk_token])
  return vocabulary

In [None]:
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:
  vocab_transform[ln] = build_vocab(f"{BPE_FILEPATH}/vocab.{ln}.{MAX_COUNT}")

In [None]:
PAD_IDX = vocab_transform[SRC_LANGUAGE]["<pad>"]
BOS_IDX = vocab_transform[SRC_LANGUAGE]["<bos>"]
EOS_IDX = vocab_transform[SRC_LANGUAGE]["<eos>"]

In [None]:
# transformer
class Seq2SeqTransformer(nn.Module):
    def __init__(self, 
                 num_encoder_layers: int, 
                 num_decoder_layers: int,
                 emb_size: int, 
                 nhead:int,
                 src_vocab_size: int, 
                 tgt_vocab_size: int,
                 dim_feedforward:int, 
                 activation:str,
                 layer_norm_eps:float,
                 dropout:float = 0.1):
        super(Seq2SeqTransformer, self).__init__()
        encoder_layer = TransformerEncoderLayer(d_model = emb_size, 
                                                nhead = nhead,
                                                dim_feedforward = dim_feedforward, 
                                                activation = activation, 
                                                layer_norm_eps = layer_norm_eps)
        self.transformer_encoder = TransformerEncoder(encoder_layer, num_layers = num_encoder_layers)

        decoder_layer = TransformerDecoderLayer(d_model = emb_size, 
                                                nhead = nhead, 
                                                dim_feedforward = dim_feedforward,
                                                activation = activation, 
                                                layer_norm_eps = layer_norm_eps)
        self.transformer_decoder = TransformerDecoder(decoder_layer, num_layers=num_decoder_layers)

        self.generator = nn.Linear(emb_size, tgt_vocab_size)
        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)
        self.positional_encoding = PositionalEncoding(emb_size, dropout = dropout)

    def forward(self, src: Tensor, trg: Tensor, src_mask: Tensor,
                tgt_mask: Tensor, src_padding_mask: Tensor,
                tgt_padding_mask: Tensor, memory_key_padding_mask: Tensor):
      
        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
        memory = self.transformer_encoder(src_emb, src_mask, src_padding_mask)
        outs = self.transformer_decoder(tgt_emb, memory, tgt_mask, None, tgt_padding_mask, memory_key_padding_mask)

        return self.generator(outs)

    def encode(self, src: Tensor, src_mask: Tensor):
        return self.transformer_encoder(self.positional_encoding(self.src_tok_emb(src)), src_mask)

    def decode(self, tgt: Tensor, memory: Tensor, tgt_mask: Tensor):
        return self.transformer_decoder(self.positional_encoding(self.tgt_tok_emb(tgt)), memory, tgt_mask)

In [None]:
class PositionalEncoding(nn.Module):
    def __init__(self, emb_size: int, dropout, maxlen: int = 450):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2) * math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer("pos_embedding", pos_embedding)

    def forward(self, token_embedding: Tensor):
        return self.dropout(token_embedding +
                            self.pos_embedding[:token_embedding.size(0),:])

class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size
        
    def forward(self, tokens: Tensor):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

In [None]:
def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float("-inf")).masked_fill(mask == 1, float(0.0))
    return mask

In [None]:
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
transformer = torch.load(TEST_MODEL)

In [None]:
def greedy_decode(model, src, src_mask, max_len, start_symbol):
    src = src.to(DEVICE)
    src_mask = src_mask.to(DEVICE)
    memory = model.encode(src, src_mask)
    ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)
    for i in range(max_len-1):
        memory = memory.to(DEVICE)
        memory_mask = torch.zeros(ys.shape[0], memory.shape[0]).to(DEVICE).type(torch.bool)
        tgt_mask = (generate_square_subsequent_mask(ys.size(0)).type(torch.bool)).to(DEVICE)
        out = model.decode(ys, memory, tgt_mask)
        out = out.transpose(0, 1)
        prob = model.generator(out[:, -1])
        _, next_word = torch.max(prob, dim = 1)
        next_word = next_word.item()

        ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)
        if next_word == EOS_IDX:
          break
    return ys

In [None]:
def translate(model, src, src_vocab, tgt_vocab):
    model.eval()
    src_tokens = src.split(" ")
    tokens = [BOS_IDX] + [src_vocab.get_stoi()[tok] if tok in src_vocab.get_stoi() else 0 for tok in src_tokens] + [EOS_IDX]
    num_tokens = len(tokens)
    src = (torch.LongTensor(tokens).reshape(num_tokens, 1))
    src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
    tgt_tokens = greedy_decode(model, src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()
    out = " ".join([tgt_vocab.get_itos()[tok] for tok in tgt_tokens]).replace("<bos>", "").replace("<eos>", "")
    return out

In [None]:
def decode_bpe(x):
  return x.replace("@@ ", "")

In [None]:
def detokenize_java(s):
  try:
    tokens = javalang.tokenizer.tokenize(s)
    return javalang.tokenizer.reformat_tokens(tokens)
  except:
    return s


In [None]:
def detokenize_python(s):
    cleaned_lines = []
    lines = s.split("NEWLINE")
    for line in lines:
        line = line.strip()
        if line.startswith("INDENT"):
            idn_count = line.count("INDENT")
            for i in range(idn_count):
                if i == idn_count:
                    line = line.replace("INDENT ", "    ")
                else:
                    line = line.replace("INDENT", "    ")
        line = line.replace("INDENT", "")
        line = line.replace("DEDENT ", "")
        line = line.replace("DEDENT", "")
        line = line.replace("NL", "")
        line = line.replace("ENDMARKER", "")
        cleaned_lines.append(line)
    code = "\n".join(cleaned_lines)
    code = code.replace(". ", ".").replace(" .", ".")
    return code

In [None]:
def detokenize(s, lang):
  if lang == "ja":
    return detokenize_java(s)
  elif lang == "pn":
    return detokenize_python(s)

In [None]:
def prepare_eval(s):
    cleaned_tokens = []
    tokens = s.split(" ")
    # tokens.replace("NEWLINE", "\\n")
    for token in tokens:
      if token == "NL" or token == "DEDENT" or token == "ENDMARKER" or len(token) == 0 or token == "<unk>":
        continue
      elif token.startswith("NEWLINE") and len(token) > len("NEWLINE"):
        token = token.replace("NEWLINE", "")
      elif token == "NEWLINE":
        cleaned_tokens.append("\\n")
      elif token.startswith("INDENT"):
        idn_count = token.count("INDENT")
        for i in range(idn_count):
          cleaned_tokens.append("\\t")      
      else:
        cleaned_tokens.append(token)
    return " ".join(cleaned_tokens)


In [None]:
def cleanup(s):
  l = re.compile("newline", re.IGNORECASE).sub("NEWLINE", s)
  l = re.compile("new line", re.IGNORECASE).sub("NEWLINE", l)
  l = re.compile("indent", re.IGNORECASE).sub("INDENT", l)
  l = re.compile("dedent", re.IGNORECASE).sub("DEDENT", l)
  return l

In [None]:
# try on training data
f = open(f"{BPE_FILEPATH}/train.{SRC_LANGUAGE}.{MAX_COUNT}", "r")
line = f.readlines()[0]
print(translate(transformer, line, vocab_transform[SRC_LANGUAGE], vocab_transform[TGT_LANGUAGE]))

In [None]:
# try on plain source code
source_code = 'System . out . println ( " Testing " ) ' if SRC_LANGUAGE == "ja" else "print('Hello World!!!')"
print(translate(transformer, source_code, vocab_transform[SRC_LANGUAGE], vocab_transform[TGT_LANGUAGE]))

In [None]:
# try on test data
bpefile = open(f"{BPE_FILEPATH}/test.{SRC_LANGUAGE}.{MAX_COUNT}", "r")
ref_bpefile = open(f"{BPE_FILEPATH}/test.{TGT_LANGUAGE}.{MAX_COUNT}", "r")
bpelines = [line for line in bpefile.read().split("\n") if len(line.strip()) != 0]
ref_bpelines = [line for line in ref_bpefile.read().split("\n") if len(line.strip()) != 0]

bpe_decoded_lines = []
cleaned_lines = []
eval = []

length = len(ref_bpelines)
idx = 0

for idx in range(length):
  translated_line = translate(transformer, bpelines[idx], vocab_transform[SRC_LANGUAGE], vocab_transform[TGT_LANGUAGE])
  ref_bpe_decoded = decode_bpe(ref_bpelines[idx])
  tgt_bpe_decoded = decode_bpe(translated_line)
  # replace <unk>
  tgt_code = tgt_bpe_decoded.replace("<unk>", "")
  tgt_code = cleanup(tgt_code) if TGT_LANGUAGE == "pn" else tgt_code
  cleaned_code = detokenize(tgt_code, TGT_LANGUAGE)
  bpe_decoded_lines.append(tgt_code + "\n")
  cleaned_lines.append(cleaned_code + "\n")

  # prepare text for evaluation
  if TGT_LANGUAGE == "pn":
    ref = prepare_eval(ref_bpe_decoded)
    tgt = prepare_eval(tgt_code)
  else:
    ref = ref_bpe_decoded
    tgt = tgt_code
  eval.append({"id": idx + 1 , "ref": ref, "hyp": tgt})
  idx += 1

In [None]:
# check on test data
print("BPE code in source language:")
print(bpelines[9])
print("\n")

translated_line = translate(transformer, bpelines[17], vocab_transform[SRC_LANGUAGE], vocab_transform[TGT_LANGUAGE])
print("Translated code:")
print(translated_line)
print("\n")

decoded_line = decode_bpe(translated_line)
print("BPE decoded code:")
print(decoded_line) 
print("\n")

cleaned_line = detokenize(decoded_line, TGT_LANGUAGE)
print("Cleaned code:")
print(cleaned_line)

In [None]:
# store bpe decoded translated source codes in the text file
translate_file = open(f"{TEST_MODEL_OUTPUT_PATH}/translates.txt", "w")
translate_file.writelines(bpe_decoded_lines)
translate_file.close()


# store translated source codes in the text file
count = 1
de_translate_file = open(f"{TEST_MODEL_OUTPUT_PATH}/detokenized_translates.txt", "w")
for i in cleaned_lines:
  de_translate_file.writelines([f"Solution {count}\n", "---" * 30, "\n"])
  de_translate_file.writelines(i)
  de_translate_file.writelines(["\n\n\n"])
  count += 1
de_translate_file.close()

# store translates in one file
output_file = open(f"{TEST_MODEL_OUTPUT_PATH}/output_translates.json", "w")
json.dump({"output": eval}, output_file)
output_file.close()  

In [None]:
if TGT_LANGUAGE == "pn":
  # construct minified file and store
  !pip install pyminifier
  of = open(f"{TEST_MODEL_OUTPUT_PATH}/mini_translates.txt", "w")
  count = 1

  for s in cleaned_lines:
      testfile = open("test.py", "w")
      testfile.writelines(detokenize(s, TGT_LANGUAGE))
      testfile.close()
      os.popen("autopep8 --in-place --aggressive --aggressive test.py")
      os.popen("pyminifier test.py > mini_testfile.py")
      
      mini_testfile = open("mini_testfile.py", "r")
      of.writelines([f"Solution {count}\n", "---" * 30, "\n"])
      of.writelines(mini_testfile.readlines())
      of.writelines(["\n\n\n"])
      count += 1
  of.close()