In [19]:
# run nvidia-smi to check for available gpus, and change cuda_device_num value below
import torch
cuda_device_num = 1
device = torch.device("cuda:" + str(cuda_device_num) if torch.cuda.is_available() else "cpu")
print(device)
torch.cuda.set_device(cuda_device_num)
torch.cuda.current_device()

cuda:1


1

In [20]:
# modify these paths to your system's
MODEL_METHOD_MUTATION_PATH = "/home/chenghin/Desktop/repos/java-mutation-framework/models/code-generation/saved_models/checkpoint-best-score"
MODEL_COMMENT_MUTATION_PATH = "/home/chenghin/Desktop/repos/java-mutation-framework/models/codet5_base_all_lr5_bs32_src64_trg64_pat5_e10/checkpoint-best-bleu"

In [21]:
from model import Seq2Seq # Copy pasted from https://github.com/microsoft/CodeBERT/blob/master/UniXcoder/downstream-tasks/code-generation/model.py
from transformers import RobertaTokenizer, RobertaConfig, RobertaModel, T5ForConditionalGeneration, AutoTokenizer

PRETRAINED_MODEL_NAME_UNIXCODER = "microsoft/unixcoder-base"
PRETRAINED_MODEL_NAME_CODET5 = "Salesforce/codet5-base-multi-sum"

class TokenizerModelPair:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer

def build_model(pretrained_model_name):
    if pretrained_model_name == PRETRAINED_MODEL_NAME_UNIXCODER:
        # build model
        tokenizer = RobertaTokenizer.from_pretrained(pretrained_model_name)
        config = RobertaConfig.from_pretrained(pretrained_model_name)
        # important！You must set is_decoder to True for generation
        config.is_decoder = True
        encoder = RobertaModel.from_pretrained(pretrained_model_name,config=config)
        model = Seq2Seq(encoder=encoder,decoder=encoder,config=config,
                        beam_size=10,max_length=256,
                        sos_id=tokenizer.convert_tokens_to_ids(["<mask0>"])[0],eos_id=tokenizer.sep_token_id)
    else:
        # build model
        tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name)
        model = T5ForConditionalGeneration.from_pretrained(pretrained_model_name)
    return TokenizerModelPair(model, tokenizer)

In [22]:
unixcoder_model_and_tokenizer = build_model(PRETRAINED_MODEL_NAME_UNIXCODER)
codet5_model_and_tokenizer = build_model(PRETRAINED_MODEL_NAME_CODET5)

Updated!!!


In [23]:
import os
def load_model(model, load_model_path='fine_tuned_models'):
    model_to_load = model.module if hasattr(model, 'module') else model
    load_model_path = os.path.join(load_model_path, 'pytorch_model.bin')
    model.load_state_dict(torch.load(load_model_path, map_location='cpu'))
    model.to(device)

In [24]:
load_model(unixcoder_model_and_tokenizer.model, MODEL_METHOD_MUTATION_PATH)
load_model(codet5_model_and_tokenizer.model, MODEL_COMMENT_MUTATION_PATH)

In [25]:
# def get_tokens

def predict(model, tokenizer, code, gold):
    input_ids = tokenizer(code, return_tensors="pt").input_ids
    generated_ids = model.generate(input_ids.to(device))
    if len(generated_ids.size()) == 3: # decoder takes in a 2d tensor. If 3d, remove the outer dimension
        generated_ids = torch.squeeze(generated_ids, 0)
    comment = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
    return comment, getSmoothBLEU4(gold, comment)

In [26]:
import nltk
from nltk.translate.bleu_score import SmoothingFunction
chencherry = SmoothingFunction()
def getSmoothBLEU4(gold, result):
    BLEUscore = nltk.translate.bleu_score.sentence_bleu([gold.split()], result.split(), weights = [0.25,0.25,0.25,0.25], smoothing_function=chencherry.method2)
    return round(BLEUscore,4)

In [27]:
code = '''
public RequestMethodsRequestCondition getMatchingCondition(ServerWebExchange exchange) {
                if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
                        return matchPreFlight(exchange.getRequest());
                }
                if (getMethods().isEmpty()) {
                        if (RequestMethod.OPTIONS.name().equals(exchange.getRequest().getMethodValue())) {
                                return null; // We handle OPTIONS transparently, so don't match if no explicit declarations
                        }
                        return this;
                }
                return matchRequestMethod(exchange.getRequest().getMethod());
        }

'''

In [28]:
gold = 'check if any of the http request methods match the given request and return an instance that contains the matching http request method only'

In [29]:
predict(unixcoder_model_and_tokenizer.model, unixcoder_model_and_tokenizer.tokenizer, code, gold) # test method mutation

("public RequestMethodsRequestCondition getMatchingCondition(ServerWebExchange exchange) {\n                if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {\n                        return matchPreFlight(exchange.getRequest());\n                }\n                if (getMethods().isEmpty()) {\n                        if (RequestMethod.OPTIONS.name().equals(exchange.getRequest().getMethodValue())) {\n                                return null; // We handle OPTIONS transparently, so don't match if no explicit declarations\n                        }\n                        return this;\n                }",
 0.0384)

In [30]:
predict(codet5_model_and_tokenizer.model, codet5_model_and_tokenizer.tokenizer, gold, code) # test comment mutation



('if any of the http request methods match the given request and return an instance that contains',
 0.0236)

In [58]:
class ExplainableMutator:
    def __init__(self, comment_mutation_model, comment_mutation_tokenizer,
                 method_mutation_model, method_mutation_tokenizer):
        self.comment_mutation_model = comment_mutation_model
        self.comment_mutation_tokenizer = comment_mutation_tokenizer
        self.method_mutation_model = method_mutation_model
        self.method_mutation_tokenizer = method_mutation_tokenizer

    def generate(self, comment, method_body):
        mutated_comment = predict(self.comment_mutation_model, self.comment_mutation_tokenizer, 
                                  comment, comment)[0]
        method_with_comment = comment + "</s>" + method_body
        mutated_method = predict(self.method_mutation_model, self.method_mutation_tokenizer, 
                                 method_with_comment, method_with_comment)[0]
        return [mutated_comment, mutated_method]

In [59]:
# Tests
import unittest
from numpy.testing import assert_array_equal

TEST_FILES_PATH = "./test-files"

class TestExplainableMutator(unittest.TestCase):
    
    def setUp(self):
        self.mutator = ExplainableMutator(codet5_model_and_tokenizer.model, codet5_model_and_tokenizer.tokenizer,
                                         unixcoder_model_and_tokenizer.model, unixcoder_model_and_tokenizer.tokenizer) 

# ======================== Integration tests =====================
    def test_generate(self):
        comment = "This method does nothing."
        method_body = """public void method() {
            int a = 0;
            return;
        }"""
        [mutated_comment, mutated_method] = self.mutator.generate(comment, method_body)
        print(mutated_method)
        self.assertNotEqual(comment, mutated_comment)
        self.assertNotEqual(method_body, mutated_method)

        
unittest.main(argv=[''], verbosity=2, exit=False)

ok

----------------------------------------------------------------------
Ran 1 test in 0.342s

OK


public void method() throws JspException {
            int a = 0;
            return;
        }


<unittest.main.TestProgram at 0x7fc4f920de10>

In [33]:
%run ./SocketServer.ipynb

In [80]:
class ExplainableMutationSocketServer(SocketServer):
    def __init__(self, host, port, explainable_mutator):
        super().__init__(host, port)
        self.explainable_mutator = explainable_mutator

    def func(self):
        while True:
            print("-"*20)
            comment = self.recvMsg()
            method_body = self.recvMsg()
            try:
                [mutated_comment, mutated_method] = self.explainable_mutator.generate(comment, method_body)
            except RuntimeError as e:
                self.sendMsg("FAIL")
                self.sendMsg(str(e))
                continue
            self.sendMsg(mutated_comment)
            self.sendMsg(mutated_method)

In [75]:
HOST = "127.0.0.1"
PORT = 8080
server = ExplainableMutationSocketServer(HOST, PORT, ExplainableMutator(codet5_model_and_tokenizer.model, codet5_model_and_tokenizer.tokenizer,
                                         unixcoder_model_and_tokenizer.model, unixcoder_model_and_tokenizer.tokenizer) )

In [79]:
server.start()

Connected by ('127.0.0.1', 33762)
--------------------
response: b'FAIL'
response: b'failure'
response: b'Creates a new renderer with no tool tip generator and no URL generator. The defaults (no'
response: b'protected AbstractCategoryItemRenderer(){\n  this.itemLabelGeneratorList=new ObjectList();\n  this.toolTipGeneratorList=new ObjectList();\n  this.urlGeneratorList=new ObjectList();\n  this.legendItemLabelGenerator=new StandardCategorySeriesLabelGenerator();\n  this.backgroundAnnotations=new ArrayList();\n  this.foregroundAnnotations=new ArrayList();\n}'
--------------------
response: b'FAIL'
response: b'failure'
response: b'This method returns the number of passes through the dataset required by the renderer. This method'
response: b'public int getPassCount(){\n  return 1;\n}'
--------------------
response: b'FAIL'
response: b'failure'
response: b'This method returns the plot that the renderer has been assigned to (where <code>null'
response: b'public CategoryPlot getPlot(){\n  ret

ConnectionResetError: [Errno 104] Connection reset by peer