In [None]:
RESULTS_PATH = "./results/continuity/perturbations.csv"
N_PERTURBATIONS = 5

In [None]:
import os

from detector_radford import DetectorRadford
from detector_guo import DetectorGuo
from explainer_wrappers import LIME_Explainer, SHAP_Explainer, Random_Explainer

In [None]:
import pandas as pd
import numpy as np
import transformers

In [None]:
test = pd.read_pickle("./dataset_test.pkl")
test = test # always load the full dataset! (np.random.shuffle(tokenized_sentences)). slice the actual hybrid_documents if debugging!


documents = test["answer"]
gold_labels = test["author"] == "human_answers" # convention: 0: machine, 1: human, see detector.py


detector_classes = [DetectorRadford, DetectorGuo]

explainer_classes = [LIME_Explainer,SHAP_Explainer, Random_Explainer]

In [None]:
import re

In [None]:
import torch

In [None]:
DEVICE = "cuda"
pattern = re.compile(r"<extra_id_\d+>")


# model used for generating perturbations
model = "t5-small"
cache_dir="./.cache"
mask_model = transformers.AutoModelForSeq2SeqLM.from_pretrained(model, cache_dir=cache_dir).to(DEVICE)
mask_tokenizer = transformers.AutoTokenizer.from_pretrained(model, model_max_length=mask_model.config.n_positions, cache_dir=cache_dir)#.to(DEVICE)

In [None]:
columns=["Detector", "Original", "Prompt", "Edited"]

In [None]:
def get_pertubed_text(detector, text, n=1):
    """Generates perturbations, similar to how it is done in detectgpt/detector_detectgpt.py. Always edits one token. 

    Args:
        detector: Detector to use when verifying that label doesn't flip.
        text: Original document
        n: How many perturbations to generate. Defaults to 1.

    Returns:
        n edited documents
    """
    tokens = text.split(' ')
    # select 1 token in the original document to mask
    mask = np.zeros_like(tokens, dtype=bool)
    mask[np.random.randint(0, len(mask))] = 1 # TODO number of tokens to mask

    prediction_original = detector.predict_label([text])[0]

    past_generations = []
    perturbed_text = text
    # generate n unique perturbations (replace the same masked word(s) with one or more words)
    for _ in range(0,n):
        replacement_attempts = 0
        while True: # do while 
            i = 0
            for ii, (m, token) in enumerate(zip(mask, tokens)):
                if m:
                    tokens[ii] = "<extra_id_{}>".format(i)
                    i+=1
            i-=1
            masked_text = ' '.join(tokens)
            stop_id = mask_tokenizer.encode(f"<extra_id_{i+1}>")[0]


            tok = mask_tokenizer(masked_text, return_tensors="pt", padding=True).to(DEVICE)
            outputs = mask_model.generate(**tok, max_length=150, do_sample=True, top_p=1, num_return_sequences=1, eos_token_id=stop_id,)
            mt = mask_tokenizer.batch_decode(outputs, skip_special_tokens=False)

            fills = [x for x in re.split(r"<extra_id_\d*>", mt[0]) if x != "<pad>"]

            for i, (token, m) in enumerate(zip(tokens, mask)):
                if m:
                    if replacement_attempts < 100:
                        tokens[i] = fills.pop(0).strip()
                    else: # sometimes t5 can't come up with 5 unique new perturbations that match the constraints below. use a random word from the vocabulary instead
                        # have to change seed here as detector.predict_label() below sets it (wich results in endless loop)
                        np.random.seed(replacement_attempts)
                        random_token = [np.random.randint(0, mask_tokenizer.vocab_size)]
                        np.random.seed(42) # reset seed just to be sure, is reset with the next detector.predict_label() anyways 
                        tokens[i] = mask_tokenizer.batch_decode(random_token, skip_special_tokens=False)[0]
            perturbed_text = " ".join(tokens)

            # check if this is a valid and new perturbation
            if (perturbed_text == text) or (perturbed_text in past_generations):
                replacement_attempts+=1
                continue
            # verify that label didn't flip
            if detector.predict_label([perturbed_text])[0] != prediction_original:
                replacement_attempts+=1
                continue
            else:
                break
        past_generations.append(perturbed_text)
    return past_generations

In [None]:
from tqdm import tqdm

In [None]:
columns=["Detector", "Original", "Perturbation"]

In [None]:
if os.path.isfile(RESULTS_PATH):
    df = pd.read_csv(RESULTS_PATH)
else: 
    df = pd.DataFrame([], columns=columns)
    # write headers (mode != "a")
    df.to_csv(RESULTS_PATH, encoding="UTF-8", index=False)


In [None]:
for detector_class in detector_classes:
    detector = detector_class()
    for document in tqdm(documents, total=len(documents), desc="Generating perturbations"):
        if df[(df["Original"] == document) & (df["Detector"] == detector.__class__.__name__)]["Original"].count() > 0:
            continue
        # set seeds here so perturbed documents are the same regardless of slice for documents when debugging (and explanations don't have to be regenerated)
        np.random.seed(42)
        torch.manual_seed(42)
        for perturbation in get_pertubed_text(detector, document, N_PERTURBATIONS):
            row = ((detector.__class__.__name__, document, perturbation))
            pd.DataFrame([row], columns=columns).to_csv(RESULTS_PATH, mode="a", encoding="UTF-8", index=False, header=False)
        #break



In [None]:
df = pd.read_csv(RESULTS_PATH)
df

In [None]:
# generate all explanations
for detector_class in detector_classes:
    detector = detector_class()
    print(detector.__class__.__name__)
    for explainer_class in explainer_classes:
        explainer = explainer_class(detector)
        print(explainer.__class__.__name__)
        for original, perturbation in tqdm([(o, p) for o,p in zip(df.loc[df["Detector"] == detector.__class__.__name__,"Original"], df.loc[df["Detector"] == detector.__class__.__name__,"Perturbation"]) if not explainer.is_cached(o) or not explainer.is_cached(p) or not all([explainer.is_cached(o, alt="alt_{}_".format(i)) for i in range(1,5)])]):
            explainer.get_explanation_cached(original)
            explainer.get_explanation_cached(perturbation)
            for i in range(1,5):
                explainer.get_explanation_cached(original, alt="alt_{}_".format(i)) 

In [None]:
import krippendorff


In [None]:
from collections import defaultdict

In [None]:
# in this experiment, words are treated as variables and fi-scores as responses in a coding task
# this function therefore transforms a string "An example is an example"
# into a list of pairs:
# [("An", 0), ("example", 0), ("is", 0), ("an", 0), ("example",1)]
# "An example is not an example"
# [("An", 0), ("example", 0), ("is", 0), ("not", 0), ("an", 0), ("example",1)]

# when calculating the reliability measure, replaced and/or missing words are treated as unobserved in the other explanations

def get_tokens_with_pos(explainer, document):
    """Returns a list of tokens in the document, in an encoding that allows for treating explanations as observations in an experiment
    """
    p_counter = defaultdict(lambda : 0)
    tokens_with_pos = []
    for token in explainer.tokenize(document):
        tokens_with_pos.append((token, p_counter[token]))
        p_counter[token] += 1
    return tokens_with_pos

In [None]:
# as above, but also includes the position in the original document given by enumerate(explainer.tokenize(document))
# this is useful for indexing the original explanation (a dict)
def get_tokens_with_pos_and_id(explainer, document):
    p_counter = defaultdict(lambda : 0)
    tokens_with_pos = []
    for id, token in enumerate(explainer.tokenize(document)):
        tokens_with_pos.append((token, p_counter[token], id))
        p_counter[token] += 1
    return tokens_with_pos

In [None]:
def experiment_to_cannonical_form(experiment, explainer):
    fi_scores = [tuple(zip(*explainer.get_fi_scores(d,fill=True)[0]))[1] for d in experiment] # fi scores towards label machine
    # each word is treated as an item, each explanation as an observation
    tokenized = [explainer.tokenize(d) for d in experiment]
    
    # determine bounds of left common part
    i = 0
    while all(x[0:i] == tokenized[0][0:i] for x in tokenized):
        i+=1
    i-=1
    # determine bounds of right common part
    j = 1
    while all(x[-j:] == tokenized[0][-j:] for x in tokenized):
        j+=1
    j-=1

    # this matrix will be passed to krippendorff.alpha as reliability_data
    left_part = np.vstack([e[0:i] for e in fi_scores])
    if j > 0: 
        right_part = np.vstack([e[-j:] for e in fi_scores])
        cannonical_form = np.hstack([left_part, right_part])
    else: # if no tokens on the right part match
        cannonical_form = left_part

    return cannonical_form


In [None]:
results = []
for detector_class in detector_classes:
    detector = detector_class()
    df_detector = df[df["Detector"] == detector.__class__.__name__]
    for explainer_class in explainer_classes:
        explainer = explainer_class(detector)
        print(explainer.__class__.__name__, detector.__class__.__name__)
        for original, perturbations in tqdm(df_detector.groupby("Original"), desc="Calculating agreement"):
            # perturbations
            experiment = [original]+perturbations["Perturbation"].tolist()
            cannonical_form = experiment_to_cannonical_form(experiment, explainer)
            # re-runs on the original document
            #           original document                                                  # 4 re-runs
            # print(explainer.get_fi_scores(original,fill=True, alt="alt_{}_".format(1)))
            fi_scores = [tuple(zip(*explainer.get_fi_scores(original,fill=True)[0]))[1]] + [tuple(zip(*explainer.get_fi_scores(original,fill=True, alt="alt_{}_".format(i))[0]))[1] for i in range(1,5)] # fi scores towards label machine
            cannonical_form_rerun = np.vstack(fi_scores)
   
            results.append((
                explainer.__class__.__name__, 
                explainer.detector.__class__.__name__,
                krippendorff.alpha(reliability_data=cannonical_form, level_of_measurement="interval"),
                krippendorff.alpha(reliability_data=cannonical_form_rerun, level_of_measurement="interval")
                ))

In [None]:
df_results = pd.DataFrame(results, columns=["Explainer", "Detector", "$\\alpha$","$\\alpha$ re-run"])

In [None]:
df_results[df_results["Explainer"] == "SHAP_Explainer"].describe()

In [None]:
df_results["Explainer"] = df_results["Explainer"].str.replace("_Explainer", "")

In [None]:
results_explainer_detector = df_results.groupby(["Explainer", "Detector"]).mean()\
        .style.highlight_max(props=["font-weight: bold;"])
results_explainer_detector

In [None]:
# latex tables per explainer
tex_explainer_continuity = pd.DataFrame(df_results.set_index(["Explainer", "Detector"])[ "$\\alpha$"]).groupby(["Explainer"]).mean().sort_values(by=["$\\alpha$"],  ascending=False)\
        .style.highlight_max(props=["font-weight: bold;"]).format(precision=3)\
        .to_latex(environment="table", position="h!", position_float="centering",convert_css=True, clines="all;data", hrules=True, caption="Results aggregated by detector", label="continuity-results-explainer")

tex_explainer_consistency = pd.DataFrame(df_results.set_index(["Explainer", "Detector"])["$\\alpha$ re-run"]).groupby(["Explainer"]).mean().sort_values(by=["$\\alpha$ re-run"], ascending=False)\
        .style.highlight_max(props=["font-weight: bold;"]).format(precision=3)\
        .to_latex(environment="table", position="h!", position_float="centering",convert_css=True, clines="all;data", hrules=True, caption="Results aggregated by detector", label="consistency-results-explainer")

In [None]:
# latex tables per detector-explainer combination
tex_explainer_continuity_detector = pd.DataFrame(df_results.set_index(["Explainer", "Detector"])[ "$\\alpha$"]).groupby(["Explainer", "Detector"]).mean().sort_values(by=["$\\alpha$"],  ascending=False)\
        .style.highlight_max(props=["font-weight: bold;"]).format(precision=3)\
        .to_latex(environment="table", position="h!", position_float="centering",convert_css=True, clines="all;data", hrules=True, caption="Results aggregated by detector", label="continuity-results-explainer")

tex_explainer_consistency_detector = pd.DataFrame(df_results.set_index(["Explainer", "Detector"])["$\\alpha$ re-run"]).groupby(["Explainer", "Detector"]).mean().sort_values(by=["$\\alpha$ re-run"], ascending=False)\
        .style.highlight_max(props=["font-weight: bold;"]).format(precision=3)\
        .to_latex(environment="table", position="h!", position_float="centering",convert_css=True, clines="all;data", hrules=True, caption="Results aggregated by detector", label="consistency-results-explainer")

In [None]:
def shorten_latex(string):
    return string\
    .replace("_Explainer", "")\
    .replace("DetectorRadford", "Radford")\
    .replace("DetectorDetectGPT", "DetectGPT")\
    .replace("DetectorGuo", "Guo")\
    .replace("Pointing Game Scores", "Score")\
    .replace("$\\alpha$ re-run", "$\\alpha$")\
    .replace(r"""\begin{subfigure}""", r"""\begin{subfigure}{\columnwidth}""")


In [None]:
out = tex_explainer_continuity
out += tex_explainer_continuity_detector

out += tex_explainer_consistency
out += tex_explainer_consistency_detector
with open("figures/tables_continuity.tex", "w", encoding="UTF-8") as text_file:
    text_file.write(shorten_latex(out))