# LSTM model testing
Change the paths, `project` and `train_len` and run the notebook.


In [None]:
!pip install wordsegment np nltk

Collecting wordsegment
  Downloading wordsegment-1.3.1-py2.py3-none-any.whl (4.8 MB)
[K     |████████████████████████████████| 4.8 MB 4.7 MB/s 
[?25hCollecting np
  Downloading np-1.0.2.tar.gz (7.4 kB)
Building wheels for collected packages: np
  Building wheel for np (setup.py) ... [?25l[?25hdone
  Created wheel for np: filename=np-1.0.2-py3-none-any.whl size=13676 sha256=bde195a0ac65d560a0721200eb2f23cbba445028f54d4bb1465814e90aaf0643
  Stored in directory: /root/.cache/pip/wheels/8d/31/5b/f3f27c678f2b3ad7e29903ed09bb7446717fd4c8b35f53973a
Successfully built np
Installing collected packages: wordsegment, np
Successfully installed np-1.0.2 wordsegment-1.3.1


In [None]:
import json
from time import perf_counter
from pickle import load
from keras.models import load_model

In [None]:
project = 'netbeans'
fold = '9'
top_k = 20

In [None]:
from keras.backend import reshape
import re
import np
import wordsegment
from nltk import wordpunct_tokenize

wordsegment.load()

REG = r"(.+?)([A-Z])"


def splitCase(match):
    return match.group(1).lower() + "/" + match.group(2).lower()


def tokenize(word):
    if (len(word) == 0):
        return []

    camelCases = []

    # Do not predict snake_cases
    # if (word.find("_") != -1 or len(word) == 0):
    # return []

    ppWord = '%s' % word

    TAREG = re.compile("[<,>?\[\](){}&.|_=]")
    for match in re.finditer("[A-Z][A-Z\d]+", ppWord):
        result = ""
        s = match.start()
        e = match.end()
        if (e == len(ppWord) or bool(TAREG.match(ppWord[e]))):
            # CLASS => Class
            result = ppWord[s:e][1:].lower()
            result = ppWord[s:e][0] + result
        else:
            # CLASS => ClasS
            result = ppWord[s:e][1:-1].lower()
            result = ppWord[s:e][0] + result + ppWord[s:e][-1]

        ppWord = ppWord[:s] + result + ppWord[e:]

    # Split ...
    words = ppWord.split("...")

    # Split type argument character
    for w in words:
        tmpWord = ""

        for char in w:
            if (bool(TAREG.match(char))):
                camelCases.append(tmpWord)
                camelCases.append(char)
                tmpWord = ""
            else:
                tmpWord += char
        if (len(tmpWord) > 0):
            camelCases.append(tmpWord)

        camelCases.append("...")

    camelCases.pop()

    split_cases = [re.sub(REG, splitCase, w, 0).lower() for w in camelCases]
    words = [re.split("/", w) for w in split_cases]

    words = np.concatenate(words).tolist()

    result = []
    for ele in words:
        if (bool(TAREG.match(ele))):
            result.append([ele])
        else:
            result.append(wordsegment.segment(ele))

    return np.concatenate(result).tolist()


def remove_comments(string):
    pattern = r"(\".*?\"|\'.*?\')|(/\*.*?\*/|//[^\r\n]*$)"
    # first group captures quoted strings (double or single)
    # second group captures comments (//single-line or /* multi-line */)
    regex = re.compile(pattern, re.MULTILINE|re.DOTALL)
    def _replacer(match):
        # if the 2nd group (capturing comments) is not None,
        # it means we have captured a non-quoted (real) comment string.
        if match.group(2) is not None:
            return "" # so we will return empty to remove the comment
        else: # otherwise, we will return the 1st group
            return match.group(1) # captured quoted-string
    return regex.sub(_replacer, string)


# def tokenize_subtoken(txt):
#     src = remove_comments(txt)
#     src = wordpunct_tokenize(txt)
#     res = []
#     for token in src:
#         if not token[0].isalnum():
#             for p in token:
#                 res.append(p)
#         else:
#             res += tokenize(token)
#     return reshape


def tokenize_fulltoken(txt):
    src = remove_comments(txt)
    src = wordpunct_tokenize(txt)
    res = []
    for token in src:
        if not token[0].isalnum():
            for p in token:
                res.append(p)
        else:
            res.append(token)
    return res

In [None]:
import numpy as np
from keras.preprocessing.sequence import pad_sequences


def prepare_sentence(seq, train_len, start_pos):
    # Pads seq and slides windows
    x = []
    y = []
    for i in range(start_pos, len(seq)):
        x_padded = pad_sequences([seq[:i]],
                                 maxlen=train_len - 1,
                                 padding='pre')[0]  # Pads before each sequence
        x.append(x_padded)
        y.append(seq[i])
    return x, y


def prepare_sentences(context, sentences, train_len, start_pos):
    x_test_all = []
    y_test_all = []
    sentence_len = []
    for sentence in sentences:
        x_test, y_test = prepare_sentence(context + sentence, train_len, start_pos)
        x_test_all += x_test
        y_test_all += y_test
        sentence_len += [len(x_test)]
    return np.array(x_test_all), np.array(y_test_all), sentence_len


def predict(model, x, **kwargs):
    model_input = [x]
    return model.predict(model_input, workers=4, use_multiprocessing=True, batch_size=200)


def evaluate(p_pred, y_test, sentence_len, **kwargs):
    log_p_sentence = [0] * len(sentence_len)
    x_test_id = 0
    accumulate_len = 0

    for i, prob in enumerate(p_pred):
        # word = vocab_inv[y_test[i] + 1]  # Index 0 from vocab is reserved to <PAD>
        # history = ''.join([vocab_inv[w] for w in x_test[i, :] if w != 0])
        if i - accumulate_len == sentence_len[x_test_id]:
            accumulate_len += sentence_len[x_test_id]
            x_test_id += 1
        prob_word = prob[y_test[i]]
        log_p_sentence[x_test_id] += np.log(prob_word)
        # print('P(w={}|h={})={}'.format(word, history, prob_word))
        # print('Prob. sentence: {}'.format(log_p_sentence))
    return log_p_sentence

In [None]:
def java_tokenize_sentences(lexes, tokenizer, to_sequence=True):
    text_sequences = []
    for lex in lexes:
        text_sequences.append(tokenize_fulltoken(lex))
    sequences = tokenizer.texts_to_sequences(text_sequences)
    return sequences

def java_tokenize_take_last(lines, tokenizer, train_len):
    all_tokens = []
    for line in lines:
        all_tokens += tokenize_fulltoken(line)
    # print(all_tokens)
    seq = all_tokens[max(len(all_tokens) - train_len, 0):len(all_tokens)]
    sequences = tokenizer.texts_to_sequences([seq])
    return sequences[0]
  
def select_top_candidates(java_context, data, start_time):
    sorted_scores = sorted(java_context, key=lambda x: -x[0])[:top_k]
    prediction_detail = {}
    predictions = []
    for score in sorted_scores:
        predictions.append(score[1])
    prediction_detail['lexModelScores'] = list(map(lambda x: x[0], sorted_scores))
    prediction_detail['predictions'] = list(map(lambda x: x[1], sorted_scores))
    prediction_detail['runtime'] = perf_counter() - start_time
    prediction_detail['answer'] = data['expected_lex']
    prediction_detail['test_id'] = data['test_id']
    return json.dumps(prediction_detail)

In [None]:
def predict_param(data, candidates, train_len, java_tokenizer, java_model):
    start_time = perf_counter()

    java_origin_context = java_tokenize_take_last(data['lex_context'],
                                                  tokenizer=java_tokenizer,
                                                  train_len=train_len)
    java_suggestions_all = java_tokenize_sentences(candidates, tokenizer=java_tokenizer)
    x_test_all = []
    y_test_all = []
    sentence_len_all = []
    x_test, y_test, sentence_len = prepare_sentences(java_origin_context,
                                                      java_suggestions_all, train_len,
                                                      len(java_origin_context))
    x_test_all += x_test.tolist()
    y_test_all += y_test.tolist()
    sentence_len_all += sentence_len
    x_test_all = np.array(x_test_all)
    y_test_all = np.array(y_test_all)
    p_pred = predict(java_model, x_test_all)
    log_p_sentence = evaluate(p_pred, y_test_all, sentence_len_all)
    assert len(log_p_sentence) == len(candidates)
    counter = 0
    java_suggestion_scores = []
    for i, java_suggestion in enumerate(java_suggestions_all):
        java_suggestion_scores.append((log_p_sentence[i], candidates[i]))
    sorted_scores = sorted(java_suggestion_scores, key=lambda x: -x[0])[:top_k]
    return select_top_candidates(sorted_scores, data, start_time)

In [None]:
from pathlib import Path

model_path = f'/content/drive/MyDrive/shared/LSTM-Kien/model/{project}/{project}.h5'
java_model = load_model(model_path)

tokenizer_path = f'/content/drive/MyDrive/shared/LSTM-Kien/tokenizer/{project}/{project}.tk'
java_tokenizer = load(open(tokenizer_path, 'rb'))

In [None]:
test_path = f'/content/drive/MyDrive/shared/LSTM-Kien/testcase/{project}/{project}_ArgRecTests_fold{fold}.txt'
tests = open(test_path, "r")
local_preds_path = f'/content/drive/MyDrive/shared/LSTM-Kien/local-pred/{project}/fold{fold}/{project}_prediction_detail_flute_sequence.txt'
local_preds_tests = open(local_preds_path, "r")
global_preds_path = f'/content/drive/MyDrive/shared/LSTM-Kien/global-pred/{project}/fold{fold}/{project}_prediction_detail_flute_sequence.txt'
Path(global_preds_path).parent.mkdir(parents=True, exist_ok=True)
global_preds_tests = open(global_preds_path, "w")


train_len = 6
cnt = 0
while True:
    cnt += 1
    # print(cnt)
    test = tests.readline()
    if test == '' or test == None:
        break
    test = json.loads(test)
    local_preds = json.loads(local_preds_tests.readline())
    if test['expected_lex'] == ')':
        prediction_detail = {}
        prediction_detail['lexModelScores'] = [0]
        prediction_detail['predictions'] = [')']
        prediction_detail['runtime'] = 0
        prediction_detail['answer'] = ')'
        prediction_detail['test_id'] = test['test_id']
        global_preds_tests.write(json.dumps(prediction_detail) + '\n')
        continue
    pred_detail = predict_param(test, local_preds['predictions'][:top_k], train_len, java_tokenizer, java_model)
    global_preds_tests.write(pred_detail + '\n')
    print(pred_detail)
    #print(test['test_id'], pred_detail)

tests.close()
local_preds_tests.close()
global_preds_tests.close()

Output hidden; open in https://colab.research.google.com to view.