In [None]:
import pandas as pd
import numpy as np
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

INPUT_FILE_PATH = "../output/4_calculate_probability.json"
OUTPUT_FILE_PATH = "../output/5_calculate_causality.csv"

# Various constants with text defined by Abhinav
# These constants are here so that, if Abhinav changes his mind on what the strings will look like, 
# the information is centralized
ALL_ANSWERS = "has all the answers"
ALL_ANSWERS_ROUNDED = "has all the answers when rounded"
SOME_SOLUTION = "has one or more of the answers, but not all of them"
SOME_SOLUTION_ROUNDED = "has one or more of the answers when rounded, but not all of them"
NO_SOLUTION = "says no solution"
INVALID = "invalid"

In [None]:
data = pd.read_json(INPUT_FILE_PATH)
data

In [None]:
# Adds up all the 1s in column
def get_len(column):
    return column.sum()

# Gets the number of valid rows
def get_valid_len(data):
    return get_len(data["valid"])

# Flips all 0s to 1s and vice versa in column
def negation(column):
    return 1 - column

# Disjunction between column_1 and column_2. For each row index i in the resulting column, the result is 1 if either column_1 or column_2 are equal to 1
def disjunction(column_1, column_2):
    return column_1 | column_2

# Conjunction between column_1 and column_2. For each row index i in the resulting column, the result is 1 if both column_1 and column_2 are equal to 1
def conjunction(column_1, column_2):
    return column_1 & column_2

# Gets the probability of a column depending on size
def get_probability(column_1, size):
    return column_1.sum() / size

# Checks to see if a column is a column of 1s and 0s. Returns a boolean if it is a binary column
def is_binary_column(data, column_name):
    return data.apply(lambda row : 0 if (isinstance(row[column_name], int) and (row[column_name] <= 1)) else 1, axis=1).sum() <= 0

# Removes all non binary columns in the dataset
def remove_non_binary(data):
    non_binary = []
    for i in data.columns:
        if not is_binary_column(data, i):
            non_binary.append(i)

    return data.drop(columns=non_binary)

# Checks to see how similar word_1 and word_2 are. Returns the number of equal letters
def word_similarity(word_1, word_2):
    count = 0
    shortest = min(len(word_1), len(word_2))
    for i in range(0, shortest):
        if word_1[i] == word_2[i]:
            count = count + 1
    return count

In [None]:
# Calculates the conditional probability. Occurence and conditions are columns of 1s and 0s.
def conditional_probability(occurence, condition):
    if condition.sum() == 0:
        return 0
    return conjunction(occurence, condition).sum() / condition.sum()

# Calculates the prior which is num of failures / size of data set
def prior(data):
    return conditional_probability(negation(data["is_correct"]), data["valid"])

# Returns a list of all columns which are non prima facie causes. In other words, where their conditional probability < prior
def discard_non_prima_facie(data):
    name_list = []

    prior_prob = prior(data)
    for cause in data.columns:
        if cause == "valid" or cause == "is_correct":
            continue

        if conditional_probability(negation(data["is_correct"]), data[cause]) <= prior_prob:
            name_list.append(cause)
        
    return name_list

# Returns a list of names of all columns which are prima facie and cooccur with column. Cooccur means that there is at least one row index such that data[column] == data[related column] == 1
def rel(rel_data, column):

    # If the column itself is non prima facie, return an empty array
    if prior(rel_data) >= conditional_probability(negation(data["is_correct"]), data[column]):
        return []
    
    # The valid and is_correct columns should not be included as part of the calculations
    if column == "valid" or column == "is_correct":
        return []
    
    name_list = []

    prior_prob = prior(rel_data)

    for cause in rel_data.columns:
        if cause == "valid" or cause == "is_correct" or cause == column:
            continue
        
        if max(len(column), len(cause)) - word_similarity(cause, column) < 5:
            continue

        difference = conditional_probability(negation(rel_data["is_correct"]), rel_data[cause]) - prior_prob

        # Checks to see if the column is prima facie cause
        if conjunction(rel_data[column], rel_data[cause]).sum() > 0 and difference > 0:
            name_list.append(cause)
    return name_list

# Calculates causality based on the paper which describes causality
def calculate_causality(data, cause):

    if prior(data) >= conditional_probability(negation(data["is_correct"]), data[cause]):
        return "n/a"
        
    relateds = rel(data, cause)

    total_probability = 0
    for related in relateds:
        # Finds cause ^ related where ^ means conjunction and cause and related are binary columns
        conj = conjunction(data[cause], data[related])

        # Finds ~cause ^ related where ~ means negation and ^ means conjunction and cause and related are binary columns
        negj = conjunction(negation(data[cause]), data[related])

        # Calculates the conditional probability of ChatGPT failing conditionally according to cause ^ related 
        # where ^ means conjunction and cause and related are binary columns
        conj = conditional_probability(negation(data["is_correct"]), conj)

        # Calculates the conditional probability of ChatGPT failing conditionally according to ~cause ^ related 
        # where ^ means conjunction and cause and related are binary columns
        negj = conditional_probability(negation(data["is_correct"]), negj)

        total_probability += (conj - negj)

    if (len(relateds) > 0):
        total_probability /= len(relateds)

    return total_probability

# Completes the row in the resulting data frame. Fills it in with all the necessary information
def complete_row(data, cause):
    toReturn = pd.DataFrame({
        "name": [cause], 
        "support": conjunction(data[cause], data["valid"]).sum(),
        "causality": calculate_causality(data, cause),
        "rel": ','.join(rel(data, cause)),
        "conditional_probability":[conditional_probability(negation(data["is_correct"]), data[cause])], 
        "prior": prior(data),
        "conditional - prior": conditional_probability(negation(data["is_correct"]), data[cause]) - prior(data)
    })
    return toReturn

# Gets the causality value of each potential cause or returns n/a if it's non prima facie
def get_causalities(data):
    toReturn = complete_row(data, "valid")

    for d in data.columns:
        toReturn = toReturn.append(complete_row(data, d))

    return toReturn

In [None]:
data_causes = remove_non_binary(data)
data_causes = data_causes.query('valid == 1')
# data_causes.drop(discard_non_prima_facie(data_causes), axis=1,inplace=True)
data_causes

In [None]:
causalities = get_causalities(data_causes)
causalities = causalities.iloc[3:]
causalities.head(40)

In [None]:
causalities.to_csv(OUTPUT_FILE_PATH, index=False)