In [None]:
# !pip install transformers

In [None]:
import os
import glob
import shutil
import zipfile
import subprocess

In [None]:
LANG = "python"
MODEL_TYPE = "roberta"
PRE_TRAINED_MODEL_NAME = "microsoft/codebert-base"

N_EPOCHS = 10
LEARNING_RATE = 5e-5

BEAM_SIZE = 10
MAX_SOURCE_LEN = 256
MAX_TARGET_LEN = 128

BATCH_SIZE_EVAL = 50
BATCH_SIZE_TRAIN = 10

MINI_DATASET_SIZE = 50

In [None]:
root_path = os.path.abspath(os.curdir)
repo_path = os.path.join(root_path, "repos", "CodeXGLUE")
task_path = os.path.join(repo_path, "Code-Text", "code-to-text")

code_path = os.path.join(task_path, "code")
model_path = os.path.join(task_path, "model")
dataset_path = os.path.join(task_path, "dataset")
evaluator_path = os.path.join(task_path, "evaluator")

In [None]:
if not os.path.exists(repo_path):
    os.makedirs(repo_path, exist_ok=True)
    
    subprocess.check_call(
        [
            "git"
            , "clone"
            , "https://github.com/microsoft/CodeXGLUE"
            , repo_path
        ]
    )

In [None]:
if not os.path.exists(dataset_path):
    with zipfile.ZipFile(os.path.join(task_path, "dataset.zip"), 'r') as dataset_zip_file:
        dataset_zip_file.extractall(task_path)

    for lang in ["python", "java", "ruby", "javascript", "go", "php"]:
        subprocess.check_call(
            [
                "wget"
                , "https://s3.amazonaws.com/code-search-net/CodeSearchNet/v2/{lang}.zip".format(lang=lang)
                , "-P"
                , dataset_path
            ]
        )

        with zipfile.ZipFile(os.path.join(dataset_path, "{lang}.zip".format(lang=lang)), 'r') as lang_zip_file:
            lang_zip_file.extractall(dataset_path)
    
    os.chdir(dataset_path)
    subprocess.check_call(
        [
            "python"
            , os.path.join(dataset_path, "preprocess.py")
        ]
    )
    os.chdir(root_path)

    [os.remove(f) for f in glob.glob(os.path.join(dataset_path, "*.zip"))]
    [os.remove(f) for f in glob.glob(os.path.join(dataset_path, "*.pkl"))]
    [shutil.rmtree(dir, ignore_errors=True) for dir in glob.glob(os.path.join(dataset_path, "*/final"))]

    for lang in ["python", "java"]:
        with open(os.path.join(dataset_path, lang, "train.jsonl"), mode="r") as f:
            with open(os.path.join(dataset_path, lang, "train-mini.jsonl"), mode="a") as mini_f:
                for i in range(MINI_DATASET_SIZE):
                    mini_f.write(next(f))

        with open(os.path.join(dataset_path, lang, "valid.jsonl"), mode="r") as f:
            with open(os.path.join(dataset_path, lang, "valid-mini.jsonl"), mode="a") as mini_f:
                for i in range(MINI_DATASET_SIZE):
                    mini_f.write(next(f))

        with open(os.path.join(dataset_path, lang, "test.jsonl"), mode="r") as f:
            with open(os.path.join(dataset_path, lang, "test-mini.jsonl"), mode="a") as mini_f:
                for i in range(MINI_DATASET_SIZE):
                    mini_f.write(next(f))

In [None]:
os.chdir(code_path)

subprocess.check_call(
    [
        "python"
        
        , "run.py"
        
        , "--model_type", MODEL_TYPE
        , "--model_name_or_path", PRE_TRAINED_MODEL_NAME
        , "--output_dir", os.path.join(model_path, LANG)
        
        , "--num_train_epochs", str(N_EPOCHS)
        , "--learning_rate", str(LEARNING_RATE)

        , "--beam_size", str(BEAM_SIZE)
        , "--max_source_length", str(MAX_SOURCE_LEN)
        , "--max_target_length", str(MAX_TARGET_LEN)

        , "--do_train"
        , "--train_batch_size", str(BATCH_SIZE_TRAIN)
        , "--train_filename", os.path.join(dataset_path, LANG, "train.jsonl")

        , "--do_eval"
        , "--eval_batch_size", str(BATCH_SIZE_EVAL)
        , "--dev_filename", os.path.join(dataset_path, LANG, "valid.jsonl")
    ]
)

os.chdir(root_path)

In [None]:
os.chdir(code_path)

subprocess.check_call(
    [
        "python"
        
        , "run.py"
        
        , "--model_type", MODEL_TYPE
        , "--model_name_or_path", PRE_TRAINED_MODEL_NAME
        , "--output_dir", os.path.join(model_path, LANG)
        , "--load_model_path", os.path.join(model_path, LANG, "checkpoint-best-bleu", "pytorch_model.bin")

        , "--beam_size", str(BEAM_SIZE)
        , "--max_source_length", str(MAX_SOURCE_LEN)
        , "--max_target_length", str(MAX_TARGET_LEN)

        , "--do_test"
        , "--test_filename", os.path.join(dataset_path, LANG, "test.jsonl")
    ]
)

os.chdir(root_path)

In [None]:
os.chdir(evaluator_path)

subprocess.check_call(
    [
        "python"
        , "evaluator.py"
        , os.path.join(model_path, LANG, "test_0.gold")
        , "<"
        , os.path.join(model_path, LANG, "test_0.output")
    ]
)

os.chdir(root_path)