# Rule-based annotation

In [1]:
import os
import json

In [2]:
def extract_model_calls(model_name: str, test_set: str):
    data_dir = f'answers/{model_name}/{test_set}'
    data_files = os.listdir(data_dir)
    data_files = [f for f in data_files if f.endswith('.json')]
    ret = {}
    for data_file in data_files:
        qid = int(data_file.split('_')[0])
        with open(os.path.join(data_dir, data_file), 'r') as f:
            data = json.load(f)
        if not data['answer_generation']['valid_data']:
            continue
        if 'train_messages' not in data["answer_generation"]:
            print(data_dir, data_file)
            ret[qid] = []
            continue
        if len(data["answer_generation"]["train_messages"]) == 0:
            ret[qid] = []
        else:
            apis = []
            history = data["answer_generation"]["train_messages"][-1]
            for utt, utt_next in zip(history[:-1], history[1:]):
                if utt['role'] != 'assistant':
                    continue
                if 'tool_calls' not in utt:
                    continue
                for tool in utt['tool_calls']:
                    assert 'function' in tool
                    assert 'name' in tool['function']
                    if tool['function']['name'].lower() == 'finish':
                        continue
                    apis.append(tool['function']['name'])
            # apis = list(set(apis))
            apis = list(apis)
            ret[qid] = apis
    return ret

def extract_model_messages(model_name: str, test_set: str) -> dict[int, list[dict]]:
    data_dir = f'answers/{model_name}/{test_set}'
    data_files = os.listdir(data_dir)
    data_files = [f for f in data_files if f.endswith('.json')]
    ret = {}
    for data_file in data_files:
        qid = int(data_file.split('_')[0])
        with open(os.path.join(data_dir, data_file), 'r') as f:
            data = json.load(f)
        if not data['answer_generation']['valid_data']:
            continue
        if len(data["answer_generation"]["train_messages"]) == 0:
            ret[qid] = []
        else:
            ret[qid] = data["answer_generation"]["train_messages"][-1]
    return ret

## SuccCalling

In [None]:
def check_success(message: dict):
    assert message['role'] == 'tool'
    try:
        tool_response = json.loads(message['content'])
    except:
        if not message['content'].startswith('{"error": "", "response": ') and message['content'].startswith('{"error": "'):
            tool_response = json.loads(message['content'] + '"}')

    if message['content'].startswith('{"error": "", "response": ') and  message['content']:
        return True
    if tool_response['error'] != '':
        return False
    response = tool_response['response']
    if isinstance(response, dict):
        if 'status' in response:
            if response['status'] == 400:
                return False
            else:
                return True
        else:
            return True
    elif isinstance(response, list) or isinstance(response, bool) or isinstance(response, int):
        return True
    if 'wrong' in response or 'error' in response or 'is empty' in response or response.startswith('Please'):
        return False
    return True


def success_count(messages: list[dict]):
    cnt = 0
    success_cnt = 0
    for message, next_message in zip(messages[:-1], messages[1:]):
        if message['role'] != 'assistant':
            continue
        if 'tool_calls' not in message or len(message['tool_calls']) == 0:
            continue
        tool_call = message['tool_calls'][0]
        if tool_call['function']['name'].lower() == 'finish':
            continue
        cnt += 1
        if check_success(next_message):
            success_cnt += 1
    return cnt, success_cnt

# Contribution

In [None]:
import json

def get_content(message: dict) -> str:
    try:
        tool_response = json.loads(message['content'])
    except:
        return message['content'][len('{"error": "", "response": '):]
    response = tool_response['response']
    return str(response)

def lcs(str1: str, str2: str):
    m, n = len(str1), len(str2)
    # 创建一个(m+1) x (n+1)的矩阵，dp[i][j]存储str1前i个字符与str2前j个字符的LCS长度
    dp = [[0] * (n + 1) for _ in range(m + 1)]

    # 填充dp表
    for i in range(1, m + 1):
        for j in range(1, n + 1):
            if str1[i - 1] == str2[j - 1]:
                dp[i][j] = dp[i - 1][j - 1] + 1
            else:
                dp[i][j] = max(dp[i - 1][j], dp[i][j - 1])

    # 构建LCS字符串
    lcs_str = []
    i, j = m, n
    while i > 0 and j > 0:
        if str1[i - 1] == str2[j - 1]:
            lcs_str.append(str1[i - 1])
            i -= 1
            j -= 1
        elif dp[i - 1][j] > dp[i][j - 1]:
            i -= 1
        else:
            j -= 1

    # 反转列表并返回LCS字符串
    lcs_str.reverse()
    return ''.join(lcs_str)

def calc_contribution(message: dict, finish_content: str, threshold: float = 0.3):
    response = get_content(message)
    if len(response) == 0:
        return 0
    # calculate the longest common subsequence
    lcs_str = lcs(response, finish_content)
    ratio = len(lcs_str) / len(response)
    # return ratio
    if ratio < threshold:
        return 0
    else:
        return 1