In [None]:
model_name = 'code_rl_model'
test_result_path = 'outputs/test/'+model_name+'/test_results/'
eval_result_path = 'models_eval/'+model_name

In [None]:
!pip install pyext

In [None]:
# collect all pkl files in the test_result_path folder and save them into a json file
import os
import pickle
import json

def process_pkl_file(pkl_file_path, data_map):
  with open(pkl_file_path, 'rb') as file:
    data = pickle.load(file)
    problem_id, data = next(iter(data.items()))
    print(f'Processing problem {problem_id}')
    results = data['results'] # [[F],[-2],[T,T,F]]
    times = data['times']
    if not results or any(len(sub_array) == 0 for sub_array in results):
      print('non valid result: '+ str(problem_id))
      return
    for i in range(len(results)):
      result = results[i] # result could be [-1] [-2] [F] [T,T,T,F] [F,T,F,T,T,T]
      time = None
      if len(result)==1 and (result[0]==-1 or result[0]==-2):
        result = result[0]
      else:
        # True only if the program passed all the unit tests
        passed = True
        for test_result in result:
          if test_result!=True:
            passed = False
            break
        result = passed
        if passed:
          time = 0
          # Calculate the time as the total time of passing all the unit tests
          for time_obj in times[i]:
            time+=time_obj.microseconds/1000
      data_map[(problem_id,i)] = (result, time)

In [None]:
# (problem_id,gen_id) -> (error_type,time)
# -2 -compile error -1 -runtime error False -fail True -passed all
data_map = {}

In [None]:
entries = os.listdir(test_result_path)
file_names = [entry for entry in entries if os.path.isfile(os.path.join(test_result_path, entry))]
file_names.sort(key=lambda x: int(x.split('.')[0])) # 0.pkl 1.pkl....
len(file_names)

In [None]:
for file_name in file_names:
  process_pkl_file(os.path.join(test_result_path, file_name), data_map)

In [None]:
# save the data_map to json
# check if path exists, if not make one
import os
if not os.path.exists(eval_result_path):
  os.makedirs(eval_result_path)
with open(eval_result_path+'/data_map.json', 'w') as file:
  data_map = {str(key): value for key, value in data_map.items()}
  json.dump(data_map, file, default=str)

Load the data and compute pass@k

In [None]:
# Read the data_map
import ast
with open(eval_result_path+'/data_map.json', 'r') as file:
  data_loaded = json.load(file)
loaded_data_map = {ast.literal_eval(key): value for key, value in data_loaded.items()}

In [None]:
def compute_passed_k(data_map, difficulty = 'all'):
  # data_map: (problem_id,gen_id) -> (error_type,time)
  # Test:
  # interview: 0-2999
  # competition: 3000-3999
  # Intro: 4000-4999
  problem_set = set()
  for key in data_map:
    if difficulty=='inter':
      if 0<=key[0]<=2999:
        problem_set.add(key[0])
    elif difficulty=='comp':
      if 3000<=key[0]<=3999:
        problem_set.add(key[0])
    elif difficulty=='intro':
      if 4000<=key[0]<4999:
        problem_set.add(key[0])
    else:
      problem_set.add(key[0])
  total_problems = len(problem_set)
  print(total_problems)
  total_passed_1 = 0
  total_passed_5 = 0
  for problem_id in problem_set:
      if data_map[(problem_id, 0)][0] == True:
          total_passed_1 += 1
          total_passed_5 += 1
      else:
          for gen_id in range(1, 5):
              if (problem_id, gen_id) not in data_map:
                  # print(f'{problem_id} {gen_id} not in data_map')
                  continue
              if data_map[(problem_id, gen_id)][0] == True:
                  total_passed_5 += 1
                  break
  # return pass@1 and pass@5
  return round(total_passed_1 / total_problems, 5), round(total_passed_5 / total_problems, 5)

In [None]:
compute_passed_k(loaded_data_map,'intro')

In [None]:
compute_passed_k(loaded_data_map,'inter')

In [None]:
compute_passed_k(loaded_data_map,'comp')

In [None]:
compute_passed_k(loaded_data_map,'all')