# Setup

In [4]:
from transformers import RobertaTokenizer, RobertaForSequenceClassification, T5ForConditionalGeneration, TrainingArguments
from sklearn.metrics import precision_recall_fscore_support, accuracy_score
from sklearn.model_selection import train_test_split
from tqdm import tqdm

import torch
torch.cuda.empty_cache()

from sklearn.metrics import f1_score
import numpy as np
import pandas as pd
import evaluate

import ast
import astunparse

import random
import string

In [5]:
!nvidia-smi

Tue Apr 11 00:14:18 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 418.126.02   Driver Version: 418.126.02   CUDA Version: 10.1     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|   0  Tesla V100-SXM2...  On   | 00000000:8A:00.0 Off |                    0 |
| N/A   41C    P0    68W / 300W |  12200MiB / 32480MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
+-------

Let's load both scorer and generator model and see how the work in tandem (they use the same tkz with the same config)

In [6]:
scorer_checkpoint = 'checkpoingnts/R2-checkpoint-18000'

scorer_model = RobertaForSequenceClassification.from_pretrained(scorer_checkpoint)
scorer_tokenizer = RobertaTokenizer.from_pretrained("microsoft/codebert-base")

generator_checkpoint = "t5-checkpoints/R2-checkpoint-20000"

generator_model = T5ForConditionalGeneration.from_pretrained(generator_checkpoint)
generator_tokenizer = RobertaTokenizer.from_pretrained('Salesforce/codet5-base-multi-sum')

In [7]:
def get_function_info(functionNode):
    functionName = functionNode.name
    functionArgs = [arg.arg for arg in functionNode.args.args]
    functionCode = astunparse.unparse(functionNode)
    return [functionName, functionArgs, functionCode]

def code_to_functions_df(code):
    node = ast.parse(code)
    functions = [n for n in node.body if isinstance(n, ast.FunctionDef)]
    classes = [n for n in node.body if isinstance(n, ast.ClassDef)]

    standalone_functions = [get_function_info(function) for function in functions]
    
    class_functions = []
        
    for class_ in classes:
        methods = [n for n in class_.body if isinstance(n, ast.FunctionDef)]
        cur_class_functions = [get_function_info(method) for method in methods]
        class_functions.extend(cur_class_functions)
    
    return pd.DataFrame(standalone_functions + class_functions,
                      columns =['functionName', 'functionArgs', 'functionCode'])

def file_to_processed_df(filename):
    functions = []
    with open(filename) as file:
        functions = code_to_functions_df(file.read())
    #preprocess - remove all before args definition
    functions['functionCode'] = [s[s.find('('):] for s in functions['functionCode']]
    return functions

def run_seq2seq_model(input_string, **generator_args):
    generator_args = {
    "max_length": 64,
    "num_beams": 1,
    "length_penalty": 1.5,
    "no_repeat_ngram_size": 4,
    "early_stopping": True,
    }
    
    input_string = input_string + " </s>"
    input_ids = generator_tokenizer.encode(input_string, return_tensors="pt")
    res = generator_model.generate(input_ids, max_length=64)
    output = generator_tokenizer.batch_decode(res, skip_special_tokens=True)
    #output = [item.split("</s>") for item in output]
    return output

# Test 1 - Looking at good code (at a different angle)
We look at test module code from Philips' library for extracting functions from GitHub repos, all functions are correct, how well will the model be able to regenerate the right names? 

Comments were removed to so model won't just rely on them

source: https://github.com/philips-software/functiondefextractor/blob/master/test/test_core_extractor.py

In [8]:
functions = file_to_processed_df("code_sample/code_sample_nocomments.py")
functions

Unnamed: 0,functionName,functionArgs,functionCode
0,get_log_data,[line],(line):\n ini_path = os.path.abspath(os.pat...
1,test_filter_reg_files,[self],(self):\n files = get_file_names(self.src_f...
2,test_get_function_names,[self],"(self):\n (func, line_num) = get_function_n..."
3,test_get_func_body,[self],(self):\n func_body = get_func_body(os.path...
4,test_process_ad,[self],(self):\n dataframe = extractor(os.path.joi...
5,test_process_extract,[self],(self):\n dataframe = extractor(os.path.joi...
6,test_process_annot,[self],(self):\n dataframe = extractor(os.path.joi...
7,test_process_python_test_extract,[self],(self):\n dataframe = extractor(os.path.joi...
8,test_invalid_path,[self],(self):\n self.assertEqual(extractor(os.pat...
9,test_py_annot_method_names,[self],(self):\n line_data = list([line.rstrip() f...


In [9]:
print(run_seq2seq_model(functions.iloc[1]['functionCode'])[0])

test_filter_reg_files


Who told you that? 🤨

In [10]:
regen_names = [run_seq2seq_model(code)[0] for code in functions['functionCode']]
functions["functionGeneratedName"] = regen_names
functions[["functionName", "functionGeneratedName", "functionCode"]]

Unnamed: 0,functionName,functionGeneratedName,functionCode
0,get_log_data,get_line_number,(line):\n ini_path = os.path.abspath(os.pat...
1,test_filter_reg_files,test_filter_reg_files,(self):\n files = get_file_names(self.src_f...
2,test_get_function_names,test_get_function_names,"(self):\n (func, line_num) = get_function_n..."
3,test_get_func_body,test_restore_streams_with_function_body,(self):\n func_body = get_func_body(os.path...
4,test_process_ad,test_codeextractor_T_T_A_D,(self):\n dataframe = extractor(os.path.joi...
5,test_process_extract,test_codeextractor_T_T_A,(self):\n dataframe = extractor(os.path.joi...
6,test_process_annot,test_codeextractor_annotated_report_folder,(self):\n dataframe = extractor(os.path.joi...
7,test_process_python_test_extract,test_codeextractor_T_T,(self):\n dataframe = extractor(os.path.joi...
8,test_invalid_path,test_extract_path,(self):\n self.assertEqual(extractor(os.pat...
9,test_py_annot_method_names,test_extracted_functions_not_found,(self):\n line_data = list([line.rstrip() f...


It grasps that all of the functions are from a test module and pretty much all of them have at least somewhat logical name
Some are exact, because the code stucture is obvious
However 4, 5, 7, 10 are kind of weird

In [11]:
line = 4
print(f"def {functions.iloc[line]['functionGeneratedName']} {functions.iloc[line]['functionCode']}")

def test_codeextractor_T_T_A_D (self):
    dataframe = extractor(os.path.join(self.file_path, 'test_resource', 'test_repo'), annot='@Test', delta='5')
    df2_list = pd.read_excel(os.path.join(os.path.dirname(__file__), os.pardir, 'test_resource', 'codeextractor_T_T_A_D.xlsx')).sort_values('Uniq ID')
    dataframe['Code'] = dataframe['Code'].str.replace(os.linesep, '')
    df2_list['Code'] = df2_list['Code'].str.replace('\n', '')
    self.assertTrue(dataframe['Code'].equals(df2_list['Code']))



So it's not like he invents that terrible acronym, it's still sourced from the code

Let's take a look at a semi-logical one, like 13

In [12]:
line = 13
print(f"def {functions.iloc[line]['functionGeneratedName']} {functions.iloc[line]['functionCode']}")

def test_functiondefextractor (self):
    cmd = ('python -m functiondefextractor --p "%s"' % os.path.join(self.file_path, 'test_resource', 'test_repo', 'test'))
    subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
    my_dir = os.path.join(os.path.dirname(__file__), os.pardir, 'test_resource', 'test_repo', 'test')
    for fname in os.listdir(my_dir):
        if fname.startswith('ExtractedFunc_'):
            df1_list = pd.read_excel(fname).sort_values('Uniq ID')
            df2_list = pd.read_excel(os.path.join(os.path.dirname(__file__), os.pardir, 'test_resource', 'Extracted_java.xlsx')).sort_values('Uniq ID')
            df1_list['Code'] = df1_list['Code'].str.replace(os.linesep, '')
            df2_list['Code'] = df2_list['Code'].str.replace(os.linesep, '')
            df2_list['Code'] = df2_list['Code'].str.replace('\r', '')
            self.assertEqual(df1_list['Code'].values.tolist().sort(), df2_list['Code'].values.tolist().sort())



Huh, so it was not such a smart made-up name after all, the model sources it directly from the cmd code too

In [13]:
line = 1
print(f"def {functions.iloc[line]['functionGeneratedName']} {functions.iloc[line]['functionCode']}")

def test_filter_reg_files (self):
    files = get_file_names(self.src_files)
    filter_files = filter_reg_files(files, '*.py,*.cpp,*.js,*.ts')
    expected = [os.path.join(self.src_files, 'CerberusTest.java'), os.path.join(self.src_files, 'HelloController.java'), os.path.join(self.src_files, 'test_c.c'), os.path.join(self.src_files, 'test_repo.java')]
    self.assertEqual(set(expected), set(filter_files))



# Test 2 - School exam time
Let's see how well the model will handle random trivial functions that are based on school-level knowledge

Let's start with Heron's triangle area function

In [14]:
mystery_function = """(a, b, c):
    s = (a + b + c) / 2
    area = math.sqrt(s * (s - a) * (s - b) * (s - c))
    return area"""

print(run_seq2seq_model(mystery_function)[0])

calc_area


Well it makes sense but the answer is rather obvious when you have "area" variable as a result

Maybe with a little bit more obfuscation?

In [15]:
mystery_function = """(a, b, c):
    s = (a + b + c) / 2
    return math.sqrt(s * (s - a) * (s - b) * (s - c))"""

print(run_seq2seq_model(mystery_function)[0])

distance


It understood the geometry theme (probably based on sqrt) but did not have enough knowledge to understand the exact formula 

How about a different variant of triangle area

In [16]:
mystery_function = """(base, height):
    return base * height / 2"""

print(run_seq2seq_model(mystery_function)[0])

center


Same story - right theme yet wrong concept

Let's try some math

In [17]:
mystery_function = """(n):
  for i in range(2,n):
    if (n%i) == 0:
      return False
  return True"""

print(run_seq2seq_model(mystery_function)[0])

is_prime_number


In [22]:
mystery_function = """(length):
    first = 0
    second = 1
    print(first, second)
    length -= 2
    while length > 0:
        print(first + second, end=" ")
        temp = second
        second = first + second
        first = temp
        length -= 1"""

print(run_seq2seq_model(mystery_function)[0])

print_fibonacci


Nice.

In [20]:
mystery_function = """(actual, predicted):
    actual = np.array(actual)
    predicted = np.array(predicted)
    differences = np.subtract(actual, predicted)
    squared_differences = np.square(differences)
    return np.sqrt(squared_differences.mean())"""

print(run_seq2seq_model(mystery_function)[0])

compare_arrays


It was RMSE but array comparison kind of makes sense

In [27]:
mystery_function = """():
    x = np.linspace(-5,5,100)

    # the function, which is y = x^2 here 
    y = x**2

    # setting the axes at the centre
    fig = plt.figure()
    ax = fig.add_subplot(1, 1, 1)
    ax.spines['left'].set_position('center')
    ax.spines['bottom'].set_position('zero')
    ax.spines['right'].set_color('none')
    ax.spines['top'].set_color('none')
    
    ax.xaxis.set_ticks_position('bottom')
    ax.yaxis.set_ticks_position('left')

    plt.plot(x,y, 'r')

    plt.show()"""

print(run_seq2seq_model(mystery_function)[0])

plot_functions


We actually plot a parabola but good enough

# Test 3 - Into the wild!

For this test, we will use later part of the python code dataset (post-100k),
which was not used in the train/val. We'll find some short- (up to 15 chars), medium (16-30) and long-named (31+) functions and see how model will regenerate them

In [28]:
giga_df = pd.read_parquet("pyfunc_272k.parquet")

#take last 20k
giga_df = giga_df.tail(20000)

#filter ones that have tags and have function decorators
filt_df = giga_df[giga_df['functionCode'].str.startswith("\n\ndef ")].reset_index(drop=True)

#preprocess body
filt_df['functionCode'] = [s[s.find('('):] for s in filt_df['functionCode']]

filt_df

Unnamed: 0,functionName,functionArgs,functionCode
0,load_label,[label_file],(label_file):\n with open(label_file) as f:...
1,load_content,[file_name],(file_name):\n with open(file_name) as f:\n...
2,__init__,"[self, aspell_executable, language]","(self, aspell_executable, language='en'):\n ..."
3,_spawn_aspell,"[self, aspell_executable, language]","(self, aspell_executable, language):\n args..."
4,__del__,[self],(self):\n if self._subprocess:\n sel...
...,...,...,...
16788,find_largest_sq_ana,"[tass, squares]","(tass, squares):\n S = []\n for s in tas..."
16789,find_ana_sq,"[tass, squares]","(tass, squares):\n all_sq_ana = find_larges..."
16790,__init__,"[self, name, path, drive_type, site_id]","(self, name=None, path=None, drive_type=None, ..."
16791,from_docker_envvars,[config],(config):\n if ('PG_PORT' in os.environ):\n...


In [29]:
short_sample = filt_df[filt_df["functionName"].str.len() < 15].sample(10).reset_index(drop=True)
regen_names = [run_seq2seq_model(code)[0] for code in short_sample['functionCode']]
short_sample["functionGeneratedName"] = regen_names
short_sample[["functionName", "functionGeneratedName", "functionCode"]]

Token indices sequence length is longer than the specified maximum sequence length for this model (615 > 512). Running this sequence through the model will result in indexing errors


Unnamed: 0,functionName,functionGeneratedName,functionCode
0,ParseSpec,GetConformance,"(traces, folder, args):\n duplicate_names =..."
1,test_fail_once,test__try_send_splunk_max_attempts_and_hex_max...,(self):\n self.config['splunk_max_attempts'...
2,encode_field,encode_field,"(self, field, value):\n return ('{encoded}'..."
3,main,main,():\n argument_spec = dict(group=dict(requi...
4,set_shuffle,set_shuffle,"(self, shuffle):\n 'Enable/disable shuffle ..."
5,InferError7,yield_error_7,"(a, out):\n\n @instance\n def logic():\n..."
6,__init__,set_ports,"(self, repo_manager_exe, server_port=0, direct..."
7,on_close,on_close,"(self):\n print('Listen.on_close', os.getpi..."
8,_get_normal,getNormal,"(self, pts):\n '\n Get normal vector..."
9,sys_wrapper,_setup_sriov_capabilities,"(sriovs, vnic_capable=True, vnic_failover_capa..."


In [30]:
medium_sample = filt_df[filt_df["functionName"].str.len().between(15, 30)].sample(10).reset_index(drop=True)
regen_names = [run_seq2seq_model(code)[0] for code in medium_sample['functionCode']]
medium_sample["functionGeneratedName"] = regen_names
medium_sample[["functionName", "functionGeneratedName", "functionCode"]]

Unnamed: 0,functionName,functionGeneratedName,functionCode
0,get_positive_axis,_normalize_axis,"(axis, ndims, axis_name='axis', ndims_name='nd..."
1,test_logarithmic_small_scale,test_logarithmic_small_range,():\n 'Test logarithmic with a small range ...
2,update_data_module_name,update_data_module_name,"(cr, models, old_name, new_name):\n '\n ..."
3,create_dockerfile,create_random_dockerfile,"(repository, tag):\n '\n Creates a Docke..."
4,test_user_id_trumps_user,test_user_id_setter,(self):\n self.request.headers['X_USER_ID']...
5,clone_get_equiv,writeme,"(self, check_integrity=True):\n 'WRITEME'\n..."
6,_verify_controllers,_verify_bridge_col_target,"(self, ovsrec_bridge):\n ovsrec_bridge.veri..."
7,generate_auth_token,get_signature,"(self, expiration=600):\n from app import a..."
8,test_key_from_legacy_urlsafe,test_from_legacy_urlsafe,():\n from google.cloud.datastore.key impor...
9,test_destroy_node,test_destroy_node,(self):\n status = self.driver.destroy_node...


In [31]:
long_sample = filt_df[filt_df["functionName"].str.len() > 30].sample(10).reset_index(drop=True)
regen_names = [run_seq2seq_model(code)[0] for code in long_sample['functionCode']]
long_sample["functionGeneratedName"] = regen_names
long_sample[["functionName", "functionGeneratedName", "functionCode"]]

Unnamed: 0,functionName,functionGeneratedName,functionCode
0,test_wait_for_drive_state_transition_timeout,test_wait_for_drive_state_transition_timeout,(self):\n drive = self.driver.ex_list_user_...
1,test_post_name_pattern_none_returns_400,test_bad_name,(self):\n response = self.client.PxST('/for...
2,submit_rescore_one_student_answer,submit_rescore_problem_for_student,"(self, instructor, problem_url_name, student, ..."
3,test_encode_one_line_eol_after_non_ascii,test_encode_utf8_eol,(self):\n self._test_encode('helloυ\n'.enco...
4,testSegmentsMultipleStartOverlapAllow,testExpandMultipleSegments,(self):\n '\n Using start\n T...
5,test_multiple_splittable_leading_char_followed...,test_header_with_maxlinelen_and_thus_should_be...,(self):\n eq = self.ndiffAssertEqual\n h...
6,testStrandsMissingAsNegativeEnd,testIgnoreMissingStrand,"(self):\n '\n Using strand, at end. ..."
7,test_splitting_multiple_long_lines,test_header_continuation_ws,(self):\n eq = self.ndiffAssertEqual\n h...
8,test_rfc2231_no_language_or_charset_in_boundary,test_message_from_string_boundary,(self):\n m = 'Content-Type: multipart/alte...
9,testGenerateFeatureSplitCandidatesInactive,testGenerateFeatureSplitCandidatesInactive,(self):\n with self.cached_session() as ses...


# Conclusion

This 2-epoch 80k fine-tune of Salesforces' CodeT5 is not perfect but more often than not it nails the generation spot-on, with bigger corpus, train time (still only 80k example functions are used here, the full HF dataset has like 50M) and maybe larger model (+ playing with generation parameters) and maybe switching to top-5 recommendations, the results may be drastically improved even without the inclusion of GPTs (which tend to act more chaotic), as a proof-of-concept it acts satisfactory.