In [1]:
import code
import code_ablation
from nltk import pos_tag
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

In [2]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0, 3"
from transformers import AutoTokenizer
import transformers
import torch
import json
import pandas as pd
from tqdm import tqdm
import logging
import time
from code import list_2_str

model = "meta-llama/Llama-2-7b-chat-hf"
# model = "meta-llama/Llama-2-13b-hf"
tokenizer = AutoTokenizer.from_pretrained(model)

pipeline = transformers.pipeline(
    "text-generation",
    model=model,
    torch_dtype=torch.float16,
    device_map="auto",
)
def ask(question:str, max_length=500) -> str:
    sequences = pipeline(
        question,
        do_sample=False,
        # do_sample=True,
        # top_k=10,
        num_return_sequences=1,
        eos_token_id=tokenizer.eos_token_id,
        max_length=max_length,
        # max_length=200,
    )
    for seq in sequences:
        return seq['generated_text']


  from .autonotebook import tqdm as notebook_tqdm
Loading checkpoint shards: 100%|██████████| 2/2 [00:06<00:00,  3.14s/it]


In [3]:
def replace_token(sentence, from_token, to_token):
    sentence_token = sentence.split(' ')
    sentence_token = [token.strip() for token in sentence_token]
    result_token = []
    for token in sentence_token:
        if from_token == token:
            token = token.replace(from_token, to_token)
        result_token.append(token)
    result_token = [r for r in result_token if len(r) > 1]
    result = ' '.join(result_token)
    return result

# replace_token('the cat needs the dog', 'the', '')


In [4]:
from nltk import pos_tag
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

def find_verb(sentence:str) -> str:
    tokens = word_tokenize(sentence)
    tagged_words = pos_tag(tokens)
    verbs = [word[0] for word in tagged_words if word[1].startswith('VB')]
    if len(verbs) > 0:
        return verbs[0]
    return ''


In [5]:
# find_verb('Bob eats apple')
# find_verb('eats apple')

In [5]:
def lemma_verb(verb:str):
    lemmatizer = WordNetLemmatizer()
    verb_lemma = lemmatizer.lemmatize(verb, pos='v')
    return verb_lemma

# lemma_verb(find_verb('Bob eats apple'))

In [6]:
def ablation_inference_parser_rule_nl(rule:str):
    rule = rule.replace('.', '')
    if 'someone' in rule:
        rule = rule.replace('someone', 'Bob')
        rule = rule.replace('they', 'Bob')
        if 'it' in rule:
            rule = replace_token(rule, 'it', 'the cat')
        if 'are' in rule:
            rule = replace_token(rule, 'are', 'is')
    elif 'something' in rule:
        rule = rule.replace('something', 'the cat')
        if 'it' in rule:
            rule = replace_token(rule, 'it', 'the cat')
        if 'are' in rule:
            rule = replace_token(rule, 'are', 'is')
    rule_if_then_list = rule.split('then')
    condition_str = rule_if_then_list[0]
    conclusion_str = rule_if_then_list[1]
    # remove if
    condition_str = condition_str.split('If ')[1]
    # process condition
    if 'and' in condition_str:
        conditions_list = condition_str.split('and')
    else:
        conditions_list = [condition_str]
    conditions_list = [condition.strip() for condition in conditions_list]

    # with one word, like something is red and young -> ['the cat is red', 'young']
    # ['the cat is red', 'young'] -> ['the cat is young']

    # first condition:
    condition_first = conditions_list[0]
    if 'is' in condition_first:
        noun = condition_first.split('is')[0]
    else:
        verb = find_verb(condition_first)
        noun = condition_first.split(verb)
        

    # if other conditions have no noun?
    # then add them
    for index, condition in enumerate(conditions_list[1:]):
        condition_token = condition.split(' ')
        if len(condition_token) < 3:
            # only adj
            conditions_list[index+1] = f"{noun.strip()} is {condition}"


    conclusion_str = conclusion_str.strip()
    result = {
        'condition_list'     :   conditions_list,
        'conclusion_str'    :   conclusion_str
    }
    return result


In [8]:
# import jsonlines
# import pandas as pd
# from tqdm import tqdm
# result_df = pd.DataFrame()
# with jsonlines.open('./CWA_rules_sampled_2000.jsonl', 'r') as f:
#     for data_rule in tqdm(f):
#         rule = data_rule['rule']
#         nl = ablation_inference_parser_rule_nl(rule)
#         print(nl['condition_list'])

In [7]:

from nltk import pos_tag
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer

def ablation_inference_parser_rule_logic(condition_list:list):
    lemmatizer = WordNetLemmatizer()
    conditions_logic_list = []
    for condition in condition_list:
        # condition = replace_token(condition, 'the', '')
        # condition = condition.strip()
        if ' is ' in condition: # '__is__' not 'is', because like 'visit'
            # condition = condition.replace('the', '')
            # condition = condition.strip()
            condition_list = condition.split(' is ')
            noun = condition_list[0]
            adj = condition_list[1]
            noun = replace_token(noun, 'the', '')
            adj = replace_token(adj, 'the', '')
            conditions_logic_list.append(f'{adj.capitalize().strip()}({noun.capitalize().strip()})')
        elif 'likes' in condition:
            # condition = condition.replace('the', '')
            # condition = condition.strip()
            condition_list = condition.split('likes')
            noun = condition_list[0]
            adj = condition_list[1]
            noun = replace_token(noun, 'the', '')
            adj = replace_token(adj, 'the', '')
            conditions_logic_list.append(f'Like({noun.capitalize().strip()}, {adj.capitalize().strip()})')
        elif 'like' in condition:
            # condition = condition.replace('the', '')
            # condition = condition.strip()
            condition_list = condition.split('like')
            noun = condition_list[0]
            adj = condition_list[1]
            noun = replace_token(noun, 'the', '')
            adj = replace_token(adj, 'the', '')
            conditions_logic_list.append(f'Like({noun.capitalize().strip()}, {adj.capitalize().strip()})')
        elif 'chases' in condition:
            # condition = condition.replace('the', '')
            # condition = condition.strip()
            condition_list = condition.split('chases')
            noun = condition_list[0]
            adj = condition_list[1]
            noun = replace_token(noun, 'the', '')
            adj = replace_token(adj, 'the', '')
            conditions_logic_list.append(f'Chase({noun.capitalize().strip()}, {adj.capitalize().strip()})')
        elif 'chase' in condition:
            # condition = condition.replace('the', '')
            # condition = condition.strip()
            condition_list = condition.split('chase')
            noun = condition_list[0]
            adj = condition_list[1]
            noun = replace_token(noun, 'the', '')
            adj = replace_token(adj, 'the', '')
            conditions_logic_list.append(f'Chase({noun.capitalize().strip()}, {adj.capitalize().strip()})')
        else:   # verb
            verb = find_verb(condition)
            # tokens = word_tokenize(condition)
            # tagged_words = pos_tag(tokens)
            # print(condition)
            # print(tagged_words)
            # verb = [word[0] for word in tagged_words if word[1].startswith('VB')][0]
            condition_list = condition.split(verb)
            # print(condition_list)
            noun_1 = condition_list[0]
            noun_2 = condition_list[1]
            noun_1 = replace_token(noun_1, 'the', '')
            noun_2 = replace_token(noun_2, 'the', '')
            verb_lemma = lemmatizer.lemmatize(verb, pos='v').strip()
            conditions_logic_list.append(f'{verb_lemma.capitalize()}({noun_1.capitalize().strip()}, {noun_2.capitalize().strip()})')
    return conditions_logic_list

In [8]:
# NO
import jsonlines
import pandas as pd
result_df = pd.DataFrame()
with jsonlines.open('./CWA_rules_sampled_2000.jsonl', 'r') as f:
    for data_rule in tqdm(f):
        rule = data_rule['rule']
        print(rule)
        nl = ablation_inference_parser_rule_nl(rule)
        # print(nl)
        print(ablation_inference_parser_rule_logic(nl['condition_list']))

0it [00:00, ?it/s]

3it [00:00, 16.15it/s]

If someone is round and they like the bear then they are red.
['Round(Bob)', 'Like(Bob, Bear)']
If something is blue then it needs the dog.
['Blue(Cat)']
If something chases the cat then it is green.
['Chase(Cat, Cat)']
If someone sees the bear then they chase the bear.


270it [00:01, 195.34it/s]

['See(Bob, Bear)']
If something likes the squirrel then the squirrel needs the dog.
['Like(Cat, Squirrel)']
If something needs the dog then it is kind.
['Need(Cat, Dog)']
If something is quiet then it is red.
['Quiet(Cat)']
If something visits the rabbit and it needs the mouse then it visits the mouse.
['Visit(Cat, Rabbit)', 'Need(Cat, Mouse)']
If someone needs the rabbit then the rabbit visits the bear.
['Need(Bob, Rabbit)']
If someone is kind then they are round.
['Kind(Bob)']
If someone is blue then they need the cat.
['Blue(Bob)']
If someone sees the cat and they are not red then they eat the bear.
['See(Bob, Cat)', 'Not red(Bob)']
If someone is big then they chase the mouse.
['Big(Bob)']
If Harry is quiet and Harry is white then Harry is cold.
['Quiet(Harry)', 'White(Harry)']
If something is red and young then it is white.
['Red(Cat)', 'Young(Cat)']
If someone is young and they do not visit the rabbit then the rabbit sees the squirrel.
['Young(Bob)', 'Do(Bob, Not visit rabbit)']
I

1219it [00:02, 1128.04it/s]

If something is red then it is nice.
['Red(Cat)']
If the dog sees the squirrel then the dog sees the lion.
['See(Dog, Squirrel)']
If something is quiet and not round then it is red.
['Quiet(Cat)', 'Not round(Cat)']
If something is green and it visits the mouse then it chases the lion.
['Green(Cat)', 'Visit(Cat, Mouse)']
If Fiona is smart then Fiona is quiet.
['Smart(Fiona)']
If the lion is cold and the lion is nice then the lion is round.
['Cold(Lion)', 'Nice(Lion)']
If something likes the rabbit and the rabbit is cold then the rabbit eats the dog.
['Like(Cat, Rabbit)', 'Cold(Rabbit)']
If something is cold then it is red.
['Cold(Cat)']
If someone is round and not quiet then they are big.
['Round(Bob)', 'Not quiet(Bob)']
If someone chases the lion then the lion is kind.
['Chase(Bob, Lion)']
If the cow is kind and the cow is red then the cow is nice.
['Kind(Cow)', 'Red(Cow)']
If Anne is blue and Anne is furry then Anne is cold.
['Blue(Anne)', 'Furry(Anne)']
If the cow is nice then the co

1864it [00:02, 1854.31it/s]

['See(Cat, Dog)']
If someone likes the lion then the lion is red.
['Like(Bob, Lion)']
If the mouse is cold and the mouse does not visit the squirrel then the mouse needs the rabbit.
['Cold(Mouse)', 'Do(Mouse, Not visit squirrel)']
If something is young then it visits the bald eagle.
['Young(Cat)']
If something is green and quiet then it is white.
['Green(Cat)', 'Quiet(Cat)']
If something likes the squirrel then the squirrel likes the dog.
['Like(Cat, Squirrel)']
If someone is round and young then they are smart.
['Round(Bob)', 'Young(Bob)']
If someone is green and they do not need the cow then they chase the lion.
['Green(Bob)', 'Do(Bob, Not need cow)']
If the squirrel likes the dog and the squirrel sees the dog then the dog likes the squirrel.
['Like(Squirrel, Dog)', 'See(Squirrel, Dog)']
If something likes the rabbit then it visits the rabbit.
['Like(Cat, Rabbit)']
If someone sees the cow then the cow is round.
['See(Bob, Cow)']
If Bob is rough and Bob is not white then Bob is red.
[

2000it [00:02, 792.19it/s] 

['See(Cat, Bear)']
If someone is green and they visit the cow then they visit the tiger.
['Green(Bob)', 'Visit(Bob, Cow)']
If something is cold then it eats the cow.
['Cold(Cat)']
If something chases the squirrel then it visits the mouse.
['Chase(Cat, Squirrel)']
If something is cold then it visits the bear.
['Cold(Cat)']
If someone is smart then they are green.
['Smart(Bob)']
If someone is furry then they are round.
['Furry(Bob)']
If someone is smart and big then they are kind.
['Smart(Bob)', 'Big(Bob)']
If something is young then it is white.
['Young(Cat)']
If Charlie is quiet then Charlie is blue.
['Quiet(Charlie)']
If someone is red then they see the mouse.
['Red(Bob)']
If the tiger likes the mouse and the tiger likes the rabbit then the rabbit likes the bald eagle.
['Like(Tiger, Mouse)', 'Like(Tiger, Rabbit)']
If someone eats the dog and the dog needs the rabbit then the dog is cold.
['Eat(Bob, Dog)', 'Need(Dog, Rabbit)']
If someone sees the bald eagle then the bald eagle is young




In [11]:
# find_verb('the cat needs the dog')

In [8]:
def ablation_inference_prompt_formulate_nl(rule_nl:str, condition_list:list):
    if len(condition_list) > 0:
        facts = list_2_str(condition_list) 
    else:
        facts = "[no facts avaiable].\n"
    prompt = f"""Task: please answer whether the following rule is satisfied under the provided given Facts?
If yes, say 'yes', if no, say 'no',
If it is satisfied, please answer, what does it produce.
Rule: {rule_nl},
Facts: {facts}Please answer with the following format:
Answer: [yes or no]
Procude: [your answer here]
Output:"""
    return prompt

# rule_nl = 'if Bob is nice, then Bob is cute'
# condition_list = ['Bob is nice']
# prompt_nl         = ablation_inference_prompt_formulate_nl(rule_nl=rule_nl, condition_list=condition_list)   
# answer_nl         = ask(prompt_nl)
# print(answer_nl)

# rule_nl = 'if Bob is nice, then Bob is cute'
# condition_list = ['Bob is bad']
# prompt_nl         = ablation_inference_prompt_formulate_nl(rule_nl=rule_nl, condition_list=condition_list)   
# answer_nl         = ask(prompt_nl)
# print(answer_nl)

# rule_nl = 'if Bob is nice, then Bob is cute'
# condition_list = []
# prompt_nl         = ablation_inference_prompt_formulate_nl(rule_nl=rule_nl, condition_list=condition_list)   
# answer_nl         = ask(prompt_nl)
# print(answer_nl)

In [9]:

def ablation_inference_prompt_formulate_logic(rule_logic:str, condition_logic_list:list):
    if len(condition_logic_list) > 0:
        facts = list_2_str(condition_logic_list) 
    else:
        facts = "[no facts avaiable].\n"
    prompt = f"""Task: please answer whether the following rule is satisfied under the provided given Facts?
If yes, say 'yes', if no, say 'no',
If it is satisfied, please answer, what does it produce.
Rule: {rule_logic}
Facts: {facts}Please answer with the following format:
Answer: [yes or no]
Procude: [your answer here]
Output:"""
    return prompt
# rule_nl = 'IF Nice(Bob) THEN Cute(Bob)'
# condition_list = ['Nice(Bob)']
# prompt_nl         = ablation_inference_prompt_formulate_logic(rule_logic=rule_nl, condition_logic_list=condition_list)   
# answer_nl         = ask(prompt_nl)
# print(answer_nl)
# rule_nl = 'IF Nice(Bob) THEN Cute(Bob)'
# condition_list = ['Bad(Bob)']
# prompt_nl         = ablation_inference_prompt_formulate_logic(rule_logic=rule_nl, condition_logic_list=condition_list)   
# answer_nl         = ask(prompt_nl)
# print(answer_nl)
# rule_nl = 'IF Nice(Bob) THEN Cute(Bob)'
# condition_list = []
# prompt_nl         = ablation_inference_prompt_formulate_logic(rule_logic=rule_nl, condition_logic_list=condition_list)   
# answer_nl         = ask(prompt_nl)
# print(answer_nl)

In [10]:
def ablation_inference_extract_result(prompt:str, response:str) -> str:
    result = {
        'answer'  : "",
        'produce' : "",
    }
    if prompt in response:
        response = response.replace(prompt, '')
    response = response.strip()
    if '\n\n' in response:
        response_list = response.split('\n\n')
        response_list = [response for response in response_list if 'Answer:' in response and 'Produce:' in response]
        if len(response_list) < 1:
            return result
        else: 
            response = response_list[0]
    responses = response.split('\n')
    for respond in responses:
        if 'Answer:' in respond:
            # print('Answer:', respond)
            result['answer'] = respond.replace('Answer:', '').strip()
        if 'Produce:' in respond:
            # print('Produce:', respond)
            result['produce'] = respond.replace('Produce:', '').strip()
    
    return result

# prompt = """Please let me know if you have any questions or need further clarification.
# Task: please answer whether the following rule is satisfied under the provided given Facts?
# If yes, say 'yes', if no, say 'no',
# If it is satisfied, please answer, what does it produce.
# Rule: IF Nice(Bob) THEN Cute(Bob)
# Facts: [no facts avaiable].
# Please answer with the following format:
# Answer: [yes or no]
# Procude: [your answer here]
# Output: [your answer here]"""
# response = """Please let me know if you have any questions or need further clarification.
# Task: please answer whether the following rule is satisfied under the provided given Facts?
# If yes, say 'yes', if no, say 'no',
# If it is satisfied, please answer, what does it produce.
# Rule: IF Nice(Bob) THEN Cute(Bob)
# Facts: [no facts avaiable].
# Please answer with the following format:
# Answer: [yes or no]
# Procude: [your answer here]
# Output: [your answer here]

# Answer: no
# Produce: nothing

# Please let me know if you have any questions or need further clarification."""
# ablation_inference_extract_result(prompt, response)

In [15]:
# rule_nl = 'if Bob is nice, then Bob is cute'
# condition_list = ['Bob is nice']
# prompt_nl         = ablation_inference_prompt_formulate_nl(rule_nl=rule_nl, condition_list=condition_list)   
# answer_nl         = ask(prompt_nl)
# print(answer_nl)
# print(prompt_nl in answer_nl)
# print(ablation_inference_extract_result(prompt_nl, answer_nl))

In [11]:

def ablation_inference_prompt_formulate_logic(rule_logic:str, condition_logic_list:list):
    if len(condition_logic_list) > 0:
        facts = list_2_str(condition_logic_list) 
    else:
        facts = "[no facts avaiable].\n"
    prompt = f"""Task: please answer whether the following rule is satisfied under the provided given Facts?
The rule is written in the IF <condition> THEN <concluision> form,
If all of the conditions of a rule can be found in the list of facts, the rule can be satisfied.
If the rule is satisfied and tell me the produce, say 'yes', if no, say 'no',
If it is be satisfied, please answer, what does it produce.
Rule: {rule_logic}
Facts: {facts}Please answer with the following format:
Answer: [yes or no]
Procude: [your answer here]
Output:"""
    return prompt
# rule_nl = 'IF Nice(Bob) THEN Cute(Bob)'
# condition_list = ['Nice(Bob)']
# prompt_nl         = ablation_inference_prompt_formulate_nl(rule_nl=rule_nl, condition_list=condition_list)   
# answer_nl         = ask(prompt_nl)
# print(answer_nl)
# print(ablation_inference_extract_result(prompt_nl, answer_nl))
# rule_nl = 'IF Nice(Bob) THEN Cute(Bob)'
# condition_list = ['Bad(Bob)']
# prompt_nl         = ablation_inference_prompt_formulate_logic(rule_logic=rule_nl, condition_logic_list=condition_list)   
# answer_nl         = ask(prompt_nl)
# print(answer_nl)
# print(ablation_inference_extract_result(prompt_nl, answer_nl))
# rule_nl = 'IF Nice(Bob) THEN Cute(Bob)'
# condition_list = []
# prompt_nl         = ablation_inference_prompt_formulate_nl(rule_nl=rule_nl, condition_list=condition_list)   
# answer_nl         = ask(prompt_nl)
# print(answer_nl)
# print(ablation_inference_extract_result(prompt_nl, answer_nl))


In [12]:

def ablation_inference_positive(rule_nl:str):
    result = {
        'pos_num_condition'         : 0, 
        'pos_nl_answer_correct'            : 0, 
        'pos_nl_produce_correct'            : 0, 
        'pos_logic_answer_correct'         : 0,
        'pos_logic_produce_correct'         : 0,
        'pos_nl_condition'          : '',
        'pos_nl_conclusion'         : '',
        'pos_nl_answer_answer'      : '',
        'pos_nl_answer_produce'     : '',
        'pos_logic_condition'       : '',
        'pos_logic_conclusion'      : '',
        'pos_logic_answer_answer'   : '',
        'pos_logic_answer_produce'  : '',
    } 
    rule_condition_conclusion_dict = ablation_inference_parser_rule_nl(rule_nl)
    condition_list = rule_condition_conclusion_dict['condition_list']
    conclusion_str = rule_condition_conclusion_dict['conclusion_str']
    
    
    result['pos_num_condition']     = len(condition_list)
    result['pos_nl_condition']      = code.number_list_to_str(condition_list)
    result['pos_nl_conclusion']     = conclusion_str

    # nl
    prompt_nl         = ablation_inference_prompt_formulate_nl(rule_nl=rule_nl, condition_list=condition_list)   
    answer_nl         = ask(prompt_nl)
    answer_nl_parsed  = ablation_inference_extract_result(prompt_nl, answer_nl)                   # split with output
    result['pos_nl_answer_answer']      = answer_nl_parsed['answer']
    result['pos_nl_answer_produce']     = answer_nl_parsed['produce']

    if answer_nl_parsed['answer'].lower() == 'yes':
        result['pos_nl_answer_correct'] += 1
    if answer_nl_parsed['produce'].lower() == conclusion_str.lower():
        result['pos_nl_produce_correct'] += 1

    # logic
    condition_logic_list = ablation_inference_parser_rule_logic(condition_list)
    conclusion_logic_str = ablation_inference_parser_rule_logic([conclusion_str])[0]
    result['pos_logic_condition']     = code.number_list_to_str(condition_logic_list)
    result['pos_logic_conclusion']    = code.number_list_to_str(conclusion_logic_str)
    prompt_logic          = ablation_inference_prompt_formulate_logic(rule_logic=conclusion_logic_str, condition_logic_list=condition_logic_list)
    answer_logic          = ask(prompt_logic)
    answer_logic_parsed   = ablation_inference_extract_result(prompt_logic, answer_logic)
    result['pos_logic_answer_answer']      = answer_logic_parsed['answer']
    result['pos_logic_answer_produce']     = answer_logic_parsed['produce']

    if answer_logic_parsed['answer'].lower() == 'yes':
        result['pos_logic_answer_correct'] += 1
    if answer_logic_parsed['produce'].lower() == conclusion_logic_str.lower():
        result['pos_logic_produce_correct'] += 1

    return result


def ablation_inference_negative(rule_nl:str):
    # always one condition fehlt, rule should never run
    result = {
        'neg_num_condition'         : 0, 
        'neg_nl_answer_correct'     : 0, 
        'neg_nl_produce_correct'    : 0, 
        'neg_logic_answer_correct'  : 0,
        'neg_logic_produce_correct' : 0,
        'neg_nl_condition'          : '',
        'neg_nl_conclusion'         : '',
        'neg_nl_answer_answer'      : '', 
        'neg_nl_answer_produce'     : '', 
        'neg_logic_condition'       : '',
        'neg_logic_conclusion'      : '',
        'neg_logic_answer_answer'   : '',
        'neg_logic_answer_produce'  : '',
    } 
    rule_condition_conclusion_dict = ablation_inference_parser_rule_nl(rule_nl)
    condition_list = rule_condition_conclusion_dict['condition_list']
    conclusion_str = rule_condition_conclusion_dict['conclusion_str']
    result['neg_num_condition']     = len(condition_list)
    result['neg_nl_condition']      = code.number_list_to_str(condition_list)
    result['neg_nl_conclusion']     = conclusion_str

    # nl
    prompt_nl         = ablation_inference_prompt_formulate_nl(rule_nl=rule_nl, condition_list=condition_list[1:])   
    answer_nl         = ask(prompt_nl)
    answer_nl_parsed  = ablation_inference_extract_result(prompt_nl, answer_nl)
    result['neg_nl_answer_answer']      = answer_nl_parsed['answer']
    result['neg_nl_answer_produce']     = answer_nl_parsed['produce']
    if answer_nl_parsed['answer'].lower() == 'no':
        result['neg_nl_answer_correct'] += 1
    if answer_nl_parsed['produce'].lower() == conclusion_str.lower():
        result['neg_nl_produce_correct'] += 1

    # logic
    condition_logic_list = ablation_inference_parser_rule_logic(condition_list)
    conclusion_logic_str = ablation_inference_parser_rule_logic([conclusion_str])[0]
    result['neg_logic_condition']     = code.number_list_to_str(condition_logic_list)
    result['neg_logic_conclusion']    = code.number_list_to_str(conclusion_logic_str)
    prompt_logic          = ablation_inference_prompt_formulate_logic(rule_logic=conclusion_logic_str, condition_logic_list=condition_logic_list[1:])
    answer_logic          = ask(prompt_logic)
    answer_logic_parsed   = ablation_inference_extract_result(prompt_logic, answer_logic)
    result['neg_logic_answer_answer']      = answer_logic_parsed['answer']
    result['neg_logic_answer_produce']     = answer_logic_parsed['produce']
    if answer_logic_parsed['answer'].lower() == 'no':
        result['neg_logic_answer_correct'] += 1
    if answer_logic_parsed['produce'].lower() == conclusion_logic_str.lower():
        result['neg_logic_produce_correct'] += 1

    return result

def ablation_inference_empty(rule_nl:str):
    # no conditions
    result = {
        'empty_nl_answer_correct'       : 0, 
        'empty_nl_produce_correct'      : 0, 
        'empty_logic_answer_correct'    : 0,
        'empty_logic_produce_correct'   : 0,
        'empty_nl_conclusion'           : '',
        'empty_nl_answer_answer'        : '', 
        'empty_nl_answer_produce'       : '', 
        'empty_logic_conclusion'        : '',
        'empty_logic_answer_answer'     : '',
        'empty_logic_answer_produce'    : '',
    } 
    rule_condition_conclusion_dict = ablation_inference_parser_rule_nl(rule_nl)
    conclusion_str = rule_condition_conclusion_dict['conclusion_str']
    result['empty_nl_conclusion']     = conclusion_str
    # nl
    prompt_nl         = ablation_inference_prompt_formulate_nl(rule_nl=rule_nl, condition_list=[])   
    answer_nl         = ask(prompt_nl)
    answer_nl_parsed  = ablation_inference_extract_result(prompt_nl, answer_nl)
    result['empty_nl_answer_answer']      = answer_nl_parsed['answer']
    result['empty_nl_answer_produce']     = answer_nl_parsed['produce']
    if answer_nl_parsed['answer'].lower() == 'no':
        result['empty_nl_answer_correct'] += 1
    if answer_nl_parsed['produce'].lower() == conclusion_str.lower():
        result['empty_nl_produce_correct'] += 1

    # logic
    conclusion_logic_str = ablation_inference_parser_rule_logic([conclusion_str])[0]
    result['empty_logic_conclusion']     = conclusion_str

    prompt_logic          = ablation_inference_prompt_formulate_logic(rule_logic=conclusion_logic_str, condition_logic_list=[])
    answer_logic          = ask(prompt_logic)
    answer_logic_parsed   = ablation_inference_extract_result(prompt_logic, answer_logic)
    result['empty_logic_answer_answer']      = answer_logic_parsed['answer']
    result['empty_logic_answer_produce']     = answer_logic_parsed['produce']
    if answer_logic_parsed['answer'].lower() == 'no':
        result['empty_logic_answer_correct'] += 1
    if answer_logic_parsed['produce'].lower() == conclusion_logic_str.lower():
        result['empty_logic_produce_correct'] += 1

    return result


In [14]:
# No
import jsonlines
import pandas as pd
result_df = pd.DataFrame()
with jsonlines.open('./CWA_rules_sampled_2000.jsonl', 'r') as f:
    for data_rule in tqdm(f):
        rule = data_rule['rule']
        result = {'nr' : data_rule['nr']}
        result_dict_pos = ablation_inference_positive(rule)
        result_dict_neg = ablation_inference_negative(rule)
        result_dict_emp = ablation_inference_empty(rule)
        result.update(result_dict_pos)
        result.update(result_dict_neg)
        result.update(result_dict_emp)
        result_df = pd.concat([result_df, pd.DataFrame([result])], ignore_index=True)
# result_df.to_csv('./test.csv')
        

0it [00:02, ?it/s]


KeyboardInterrupt: 

# Process Data

In [None]:
# No
import jsonlines
import pandas as pd
from tqdm import tqdm
result_df = pd.DataFrame()
with jsonlines.open('./CWA_rules_sampled_2000.jsonl', 'r') as f:
    for data_rule in tqdm(f):
        rule = data_rule['rule']
        result = {}
        result['nr']    = data_rule['nr']
        result['rule']  = data_rule['rule']
        rule_condition_conclusion_dict = ablation_inference_parser_rule_nl(rule)
        condition_list  = rule_condition_conclusion_dict['condition_list']
        conclusion_str  = rule_condition_conclusion_dict['conclusion_str']
        # print(condition_list)
        # print(';'.join(condition_list))
        # break


        result['pos_num_condition']     = len(condition_list)
        result['pos_nl_condition']      = ';'.join(condition_list)
        result['pos_nl_conclusion']     = conclusion_str
        condition_logic_list = ablation_inference_parser_rule_logic(condition_list)
        conclusion_logic_str = ablation_inference_parser_rule_logic([conclusion_str])[0]
        # result['pos_logic_condition']     = condition_logic_list
        result['pos_logic_condition']     = ';'.join(condition_logic_list)
        result['pos_logic_conclusion']    = conclusion_logic_str
        result_df = pd.concat([result_df, pd.DataFrame([result])], ignore_index=True)
# result_df.to_csv('./CWA_rules_sampled_2000_processed.csv')

In [13]:

def ablation_inference_positive_nl(rule_nl:str, condition_list:list, conclusion_str:str, condition_logic_list:list, conclusion_logic_str:str):
    # nl
    prompt_nl         = ablation_inference_prompt_formulate_nl(rule_nl=rule_nl, condition_list=condition_list)   
    answer_nl         = ask(prompt_nl)
    answer_nl_parsed  = ablation_inference_extract_result(prompt_nl, answer_nl)                   # split with output
    pos_nl_answer_answer      = answer_nl_parsed['answer']
    pos_nl_answer_produce     = answer_nl_parsed['produce']

    pos_nl_answer_correct = 0
    pos_nl_produce_correct = 0
    pos_logic_answer_correct = 0
    pos_logic_produce_correct = 0

    if answer_nl_parsed['answer'].lower() == 'yes':
        pos_nl_answer_correct = 1
    if answer_nl_parsed['produce'].lower() == conclusion_str.lower():
        pos_nl_produce_correct = 1

    # logic
    prompt_logic          = ablation_inference_prompt_formulate_logic(rule_logic=conclusion_logic_str, condition_logic_list=condition_logic_list)
    answer_logic          = ask(prompt_logic)
    answer_logic_parsed   = ablation_inference_extract_result(prompt_logic, answer_logic)
    pos_logic_answer_answer      = answer_logic_parsed['answer']
    pos_logic_answer_produce     = answer_logic_parsed['produce']

    if answer_logic_parsed['answer'].lower() == 'yes':
        pos_logic_answer_correct = 1
    if answer_logic_parsed['produce'].lower() == conclusion_logic_str.lower():
        pos_logic_produce_correct = 1

    return pos_nl_answer_answer, pos_nl_answer_produce, pos_nl_answer_correct, pos_nl_produce_correct,pos_logic_answer_answer,pos_logic_answer_produce,pos_logic_answer_correct,pos_logic_produce_correct



In [14]:
def pandas_split_list(string:str):
    return string.split(';')

In [15]:
def ablation_inference_positive_nl_apply(df):
    return ablation_inference_positive_nl(
        df['rule'],
        df['pos_nl_condition'].split(';'),
        df['pos_nl_conclusion'],
        df['pos_logic_condition'].split(';'),
        df['pos_logic_conclusion'],
    )

In [18]:
import pandas as pd
result_df = pd.DataFrame()
data_df = pd.read_csv('./CWA_rules_sampled_2000_processed.csv')

result_df['nr']    = data_df['nr']
result_df['rule']  = data_df['rule']
result_df['pos_num_condition']  = data_df['pos_num_condition']
result_df['pos_nl_condition']  = data_df['pos_nl_condition']
result_df['pos_nl_conclusion']  = data_df['pos_nl_conclusion']
result_df['pos_logic_condition']  = data_df['pos_logic_condition']
result_df['pos_logic_conclusion']  = data_df['pos_logic_conclusion']

result_df['pos_nl_answer_answer'] ,     \
result_df['pos_nl_answer_produce'],     \
result_df['pos_nl_answer_correct'],     \
result_df['pos_nl_produce_correct'],    \
result_df['pos_logic_answer_answer'] ,  \
result_df['pos_logic_answer_produce'],  \
result_df['pos_logic_answer_correct'],  \
result_df['pos_logic_produce_correct'] = \
zip(*data_df.apply(ablation_inference_positive_nl_apply, axis=1))

# result_df.to_csv('./CWA_rules_sampled_2000_processed_pos.csv')




KeyboardInterrupt



In [16]:

def ablation_inference_negative_direct_nl(rule_nl:str, condition_list:list, conclusion_str:str, condition_logic_list:list, conclusion_logic_str:str):
    # always one condition fehlt, rule should never run
    neg_nl_answer_correct   = 0
    neg_nl_produce_correct  = 0
    neg_logic_answer_correct    = 0
    neg_logic_produce_correct   = 0

    # nl
    prompt_nl         = ablation_inference_prompt_formulate_nl(rule_nl=rule_nl, condition_list=condition_list[1:])   
    answer_nl         = ask(prompt_nl)
    answer_nl_filtered= answer_nl.replace(prompt_nl, '')
    answer_nl_parsed  = ablation_inference_extract_result(prompt_nl, answer_nl)
    neg_nl_answer_answer      = answer_nl_parsed['answer']
    neg_nl_answer_produce     = answer_nl_parsed['produce']
    if answer_nl_parsed['answer'].lower() == 'no':
        neg_nl_answer_correct += 1
    if answer_nl_parsed['produce'].lower() == conclusion_str.lower():
        neg_nl_produce_correct += 1

    # logic
    prompt_logic          = ablation_inference_prompt_formulate_logic(rule_logic=conclusion_logic_str, condition_logic_list=condition_logic_list[1:])
    answer_logic          = ask(prompt_logic)
    answer_logic_filtered= answer_logic.replace(prompt_logic, '')
    answer_logic_parsed   = ablation_inference_extract_result(prompt_logic, answer_logic)
    neg_logic_answer_answer      = answer_logic_parsed['answer']
    neg_logic_answer_produce     = answer_logic_parsed['produce']
    if answer_logic_parsed['answer'].lower() == 'no':
        neg_logic_answer_correct = 1
    if answer_logic_parsed['produce'].lower() == conclusion_logic_str.lower():
        neg_logic_produce_correct = 1

    return neg_nl_answer_answer, neg_nl_answer_produce, neg_nl_answer_correct, neg_nl_produce_correct, neg_logic_answer_answer, neg_logic_answer_produce, neg_logic_answer_correct, neg_logic_produce_correct, answer_nl_filtered, answer_logic_filtered

def ablation_inference_empty_direct(rule_nl:str, condition_list:list, conclusion_str:str, condition_logic_list:list, conclusion_logic_str:str):
    # no conditions
    empty_nl_answer_correct         = 0
    empty_nl_produce_correct        = 0
    empty_logic_answer_correct      = 0
    empty_logic_produce_correct     = 0
    # nl
    prompt_nl         = ablation_inference_prompt_formulate_nl(rule_nl=rule_nl, condition_list=[])   
    answer_nl         = ask(prompt_nl)
    answer_nl_filtered= answer_nl.replace(prompt_nl, '')
    answer_nl_parsed  = ablation_inference_extract_result(prompt_nl, answer_nl)
    empty_nl_answer_answer      = answer_nl_parsed['answer']
    empty_nl_answer_produce     = answer_nl_parsed['produce']
    if answer_nl_parsed['answer'].lower() == 'no':
        empty_nl_answer_correct = 1


    # logic

    prompt_logic          = ablation_inference_prompt_formulate_logic(rule_logic=conclusion_logic_str, condition_logic_list=[])
    answer_logic          = ask(prompt_logic)
    answer_logic_filtered= answer_logic.replace(prompt_logic, '')
    answer_logic_parsed   = ablation_inference_extract_result(prompt_logic, answer_logic)
    empty_logic_answer_answer      = answer_logic_parsed['answer']
    empty_logic_answer_produce     = answer_logic_parsed['produce']
    if answer_logic_parsed['answer'].lower() == 'no':
        empty_logic_answer_correct += 1

    return empty_nl_answer_answer, empty_nl_answer_produce, empty_nl_answer_correct, empty_nl_produce_correct, empty_logic_answer_answer, empty_logic_answer_produce, empty_logic_answer_correct, empty_logic_produce_correct, answer_nl_filtered, answer_logic_filtered


In [17]:
def ablation_inference_negative_nl_apply(df):
    return ablation_inference_negative_direct_nl(
        df['rule'],
        df['pos_nl_condition'].split(';')[1:],
        df['pos_nl_conclusion'],
        df['pos_logic_condition'].split(';')[1:],
        df['pos_logic_conclusion'],
    )

def ablation_inference_empty_nl_apply(df):
    return ablation_inference_empty_direct(
        df['rule'],
        df['pos_nl_condition'].split(';')[1:],
        df['pos_nl_conclusion'],
        df['pos_logic_condition'].split(';')[1:],
        df['pos_logic_conclusion'],
    )

In [26]:
import pandas as pd
result_df = pd.DataFrame()
data_df = pd.read_csv('./CWA_rules_sampled_2000_processed.csv')

result_df['nr']    = data_df['nr']
result_df['rule']  = data_df['rule']
result_df['pos_num_condition']  = data_df['pos_num_condition']
result_df['pos_nl_condition']  = data_df['pos_nl_condition']
result_df['pos_nl_conclusion']  = data_df['pos_nl_conclusion']
result_df['pos_logic_condition']  = data_df['pos_logic_condition']
result_df['pos_logic_conclusion']  = data_df['pos_logic_conclusion']

result_df['neg_nl_answer_answer'] ,     \
result_df['neg_nl_answer_produce'],     \
result_df['neg_nl_answer_correct'],     \
result_df['neg_nl_produce_correct'],    \
result_df['neg_logic_answer_answer'] ,  \
result_df['neg_logic_answer_produce'],  \
result_df['neg_logic_answer_correct'],  \
result_df['neg_logic_produce_correct'], \
result_df['neg_answer_nl_filtered'], \
result_df['neg_answer_logic_filtered'] = \
zip(*data_df.apply(ablation_inference_negative_nl_apply, axis=1))

result_df.to_csv('./CWA_rules_sampled_2000_processed_neg.csv')



In [19]:
def check_yes(text):
    if 'yes' in text.lower():
        return 'yes'
    return ''

def check_no(text):
    if ' no' in text.lower():
        return 'no'
    return ''

In [26]:
import pandas as pd
result_df = pd.DataFrame()
data_df = pd.read_csv('./CWA_rules_sampled_2000_processed_pos.csv')

result_df['nr']    = data_df['nr']
result_df['rule']  = data_df['rule']
result_df['pos_num_condition']  = data_df['pos_num_condition']
result_df['pos_nl_condition']  = data_df['pos_nl_condition']
result_df['pos_nl_conclusion']  = data_df['pos_nl_conclusion']
result_df['pos_logic_condition']  = data_df['pos_logic_condition']
result_df['pos_logic_conclusion']  = data_df['pos_logic_conclusion']

result_df['emp_nl_answer_answer'] ,     \
result_df['emp_nl_answer_produce'],     \
result_df['emp_nl_answer_correct'],     \
result_df['emp_nl_produce_correct'],    \
result_df['emp_logic_answer_answer'] ,  \
result_df['emp_logic_answer_produce'],  \
result_df['emp_logic_answer_correct'],  \
result_df['emp_logic_produce_correct'], \
result_df['emp_answer_nl_filtered'], \
result_df['emp_answer_logic_filtered'] = \
zip(*data_df.apply(ablation_inference_empty_nl_apply, axis=1))

result_df['nl_yes']     =result_df['emp_answer_nl_filtered'].map(check_yes)
result_df['nl_no']      =result_df['emp_answer_nl_filtered'].map(check_no)
result_df['logic_yes']  =result_df['emp_answer_logic_filtered'].map(check_yes)
result_df['logic_no']   =result_df['emp_answer_logic_filtered'].map(check_no)

result_df.to_csv('./CWA_rules_sampled_2000_processed_empty.csv')



# Dataset

import random
import json
import jsonlines  #导入
import code

random.seed(42)
datasets = []
dataset_rules_all_str_list = []
dataset_rules_all_dict_list = []
with jsonlines.open('../../proofwriter_selected_top_1000case/CWA-All-5000.jsonl', "r") as reader:
    for obj in reader:
        rules_list = code.FR_decomposer_proofwriter_new(obj['theory'])['rules']
        dataset_rules_all_str_list.extend(rules_list)

print(len(dataset_rules_all_str_list))    #     print(type(obj))
print(type(dataset_rules_all_str_list))


for index, rule in enumerate(dataset_rules_all_str_list):
    if 'if' not in rule.lower():
        continue
    rule_dict = {}
    rule_dict['nr'] = index
    rule_dict['rule'] = rule
    dataset_rules_all_dict_list.append(rule_dict)

print(len(dataset_rules_all_dict_list))    #     print(type(obj))
print(type(dataset_rules_all_dict_list))

with jsonlines.open("./CWA_rules_19297.jsonl", 'w') as w:
    return ''
    w.write_all(dataset_rules_all_dict_list)

dataset_size = 2000
sampled_dataset = random.sample(dataset_rules_all_dict_list, dataset_size)
print(len(sampled_dataset))
print(type(sampled_dataset))
print(type(sampled_dataset[0]))
with jsonlines.open("./CWA_rules_sampled_2000.jsonl", 'w') as w:
    return ''
    w.write_all(sampled_dataset)
