In [1]:
from niaarm import Dataset, get_rules
from niapy.algorithms.basic import DifferentialEvolution
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from collections import Counter
import numpy as np
import re

%run helpers.py

In [2]:
# pina database
df = pd.read_csv('../dataset/diabetes.csv')

df["Outcome"] = pd.Categorical(df['Outcome'], df['Outcome'].unique()) 

run = 1
X_train, X_test, y_train, y_test = train_test_split(df.drop(columns=['Outcome']), df[['Outcome']],
                                                stratify=df[['Outcome']], 
                                                test_size=0.2, 
                                                random_state= run)

X_train = X_train.reset_index(drop=True)
y_train = y_train.reset_index(drop=True)

X_test = X_test.reset_index(drop=True)
y_test = y_test.reset_index(drop=True)
print("\tSplitting data completed.")

oversample = SMOTE(random_state=run)

# for Logistic regression
X_train, y_train = oversample.fit_resample(X_train, y_train['Outcome'])
print("\tOversampling completed.")

# for GPT
train = pd.concat([X_train.reset_index(drop=True), y_train.reset_index(drop=True)], axis=1)
   
## NIAARM
train_tr = Dataset(train)

algo = DifferentialEvolution(population_size=10, differential_weight=0.5, crossover_probability=0.9, random_state=run)
metrics = ('support', 'confidence')

	Splitting data completed.
	Oversampling completed.


In [None]:
print("\tSearching for rules...")
association_rules_perf = pd.DataFrame(
    {
        "run": [],
        "nrules": [],
        "fitnes": [],
        "support": [],
        "confidence": []
    }
)

while True:
    rules, run_time = get_rules(train_tr, algo, metrics, max_iters=50, logging=False, random_state = run)
    status = str(rules).split('\n')
    
    if float(status[1].split(":")[1]) > 400: # ~ due to the GPT token limitation
        continue
    
    # store rules to a list
    rules_list =  []
    for i in range(len(rules)):
        rules_list.append(rules[i])

    # only rules where Outcome is included as consequent or antecedent
    list_of_relevant_rules = []
    for j in range(len(rules_list)):
        for i in range(len(getattr(rules_list[j], 'consequent'))):
            if getattr(getattr(rules_list[j], 'consequent')[i], 'name') == 'Outcome':
                list_of_relevant_rules.append(rules_list[j])
                
        for i in range(len(getattr(rules_list[j], 'antecedent'))):
            if getattr(getattr(rules_list[j], 'antecedent')[i], 'name') == 'Outcome':
                list_of_relevant_rules.append(rules_list[j])
    
    # store values of Outcome that appear within the "list_of_relevant_rules"            
    values = []
    for ls in range(len(list_of_relevant_rules)):
        lst = getattr(list_of_relevant_rules[ls], 'consequent')
        for i in lst:
            if getattr(i,'name') == "Outcome":
                values.append(getattr(i,'categories'))

    values = [val for sublist in values for val in sublist]

    # ensure there is at least one such rule with outcome as a consequent
    if len(values) > 0:
        # There must be at least 30% and at most 70& of class 0 and 1. We don't want to have a set of rules that represents only one single outcome
        if ((Counter(values)[0]/len(values)) <= 0.7) and ((Counter(values)[0]/len(values)) >= 0.3) :            
            # At least 1/10 of all rules must be rules that include an Outcome as a variable
            if (len(rules)/10 <= len(list_of_relevant_rules)):                 
                if (float(status[2].split(":")[1]) > 0.70) & (float(status[3].split(":")[1]) > 0.70) & (float(status[4].split(":")[1]) > 0.70): 
                    print("Found a set of rules")
                    print("\t",status[1])
                    print("\t",status[2])
                    print("\t",status[3])
                    print("\t",status[4])
                    row2 = {
                        "run": run,
                        "nrules": status[1],
                        "fitnes": status[2],
                        "support": status[3],
                        "confidence": status[4]
                        }
                    association_rules_perf.loc[len(association_rules_perf)] = row2
                    break
print("\tAssociation rule mining completed.")

In [None]:
import sys
obj = rules

# TESTING RULES WITH GPT
y_pred_gpt = []

print("\tTesting with GPT")

times = []
for test_idx in range(X_test.shape[0]):
    
    print("\t\tRun:"+str(run)+" Test sample: " + str(test_idx) + "/" + str(X_test.shape[0]))

    test_sample = {}
    
    for i in range(len(X_test.loc[test_idx,].index.to_list())):
        test_sample[X_test.loc[test_idx,].index.to_list()[i]] = X_test.loc[test_idx,].values.tolist()[i]

    messages = []
    messages.append({"role": "user", "content": "You will be provided with a set of rules presented in a RuleList format, each containing an antecedent and a consequent. For example, a rule might be given as [A([19, 67])] => [B([59, 98]), C([2.56, 8.3])], signifying that if A falls within the range of 19 and 67, it implies that B must fall within the range of 59 and 98, and C must fall within the range of 2.56 and 8.3. The RuleList is as follows: "})#"You will be provided with a set of rules. Rules will be given as a RuleList with an antecedent and consequent. [A([19, 67])] => [B([59, 98]), C([2.56, 8.3])], where if A is between 19 and 67 then B must be between 59 and 98 and C between 2.56 and 8.3. Remember the following rules: "})
    messages.append({"role": "user", "content": str(rules_list)}) 
        
    placeholder = "For the test sample: "+str(test_sample)+ " determine the 'Outcome' value. Respond with an answer in the following format 'Outcome=x' where x can be either '0' or '1'. Additionally, provide a set of rules that significantly contributed to the obtained result. Provide them in the following format 'Rules=[Rule1 ; Rule2 ; Rule3 ; Rule4]'. Rules should be written as an [antecedent] => [consequent] and should be separated by a semicolon."
    messages.append({"role": "user", "content": placeholder})
    
    attempts = 5
    attempts_predictions = []
    attempts_replies = []
    for i in range(attempts):   
        reply = gpt_prompt(messages)
        attempts_replies.append(reply)
        attempts_predictions.append(getPrediction(reply))
   
    # look for majority element, if a tie then get another prediction until the answer is 0 or 1.
    me = find_majority_element(attempts_predictions)
    
    # # Useless since 5 attempts
    # if me == "tie":
    #     while True:
    #         reply = gpt_prompt(messages)
    #         me = getPrediction(reply)
    #         attempts_replies.append(reply)
    #         attempts_predictions.append(me)
    #         if me != "tie":
    #             break
    #     y_pred_gpt.append(me)
    # else:
    #     y_pred_gpt.append(me)
    y_pred_gpt.append(me)

    indexes = [index for index in range(len(attempts_predictions)) if attempts_predictions[index] == me]
    explanations_raw = [attempts_replies[i] for i in indexes]
    

    for expl in explanations_raw:
        row2 = {
            "run":run,
            "output": me,
            "rules": expl
        }
        gpt_explanations.loc[len(gpt_explanations)] = row2
    
perf_gpt = calculate_metrics(y_test, y_pred_gpt)
print("\t\t", perf_gpt)

row2 = {
    "run": run,
    "Accuracy": perf_gpt['Accuracy'],
    "Precision": perf_gpt['Precision'],
    "F1 Score": perf_gpt['F1 Score'],
    "Specificity": perf_gpt['Specificity'],
    "Sensitivity": perf_gpt['Sensitivity'],
    "AUC": perf_gpt['AUC']
}                   

gpt_perf.loc[len(gpt_perf)] = row2
    

In [11]:
rules_list =  []
for i in range(len(rules)):
    rules_list.append(rules[i])

counter = 0
nth = 0
cnt_1 = 0
cnt_0 = 0
for j in range(len(rules_list)):
    for i in range(len(getattr(rules_list[j], 'consequent'))):
        if getattr(getattr(rules_list[j], 'consequent')[i], 'name') == 'Outcome':
            counter = counter + 1
            
    for i in range(len(getattr(rules_list[j], 'antecedent'))):
        if getattr(getattr(rules_list[j], 'antecedent')[i], 'name') == 'Outcome':
            if getattr(getattr(rules_list[j], 'antecedent')[i], 'categories')[0] == 0:
                cnt_0 = cnt_0 + 1
            elif getattr(getattr(rules_list[j], 'antecedent')[i], 'categories')[0] == 1:
                cnt_1 = cnt_1 + 1
            else:
                nth = nth + 1
            counter = counter + 1

In [22]:
rules_0_outcome = gpt_explanations.loc[gpt_explanations['output']==0,'output']
rules_1_outcome = gpt_explanations.loc[gpt_explanations['output']==1,'output']
rules_0_outcome = rules_0_outcome.reset_index(drop=True)
rules_1_outcome = rules_1_outcome.reset_index(drop=True)

rules_0 = gpt_explanations.loc[gpt_explanations['output']==0,'rules']
rules_1 = gpt_explanations.loc[gpt_explanations['output']==1,'rules']
rules_0 = rules_0.reset_index(drop=True)
rules_1 = rules_1.reset_index(drop=True)

allrules_1=[]
counter_1 = 0
for i in range(len(rules_1)):   
    lofrules = [item for item in re.split(r';|\],\[', extract_text_after_rules(rules_1[i]).replace(" ", "").replace("[[","[").replace("]]","]")) if item]
    for j in lofrules:
        allrules_1.append(j)

allrules_0=[]
counter_0 = 0
for i in range(len(rules_0)):   
    lofrules = [item for item in re.split(r';|\],\[', extract_text_after_rules(rules_0[i]).replace(" ", "").replace("[[","[").replace("]]","]")) if item]
    for j in lofrules:
        allrules_0.append(j)

allrules_1_d = []
for i in allrules_1:
    allrules_1_d.append("["+i.replace("[","").replace("]","").replace("=>","]=>[")+"]")
    
allrules_0_d = []
for i in allrules_0:
    allrules_0_d.append("["+i.replace("[","").replace("]","").replace("=>","]=>[")+"]")  
    
allrules_1 = allrules_1_d
allrules_0 = allrules_0_d