In [1]:
from snorkel.labeling import LFAnalysis
from snorkel.labeling.model import LabelModel
import pandas as pd
from sklearn.metrics import f1_score
from sklearn.model_selection import StratifiedKFold
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from sklearn.metrics import confusion_matrix

In [2]:
SEED = 42

from sklearn.model_selection import KFold

def get_train_test_fold(fold, dataset, num_splits=10):
    assert fold < num_splits
    
    dataset_path = f"../data/signals/{dataset}.csv"
    df = pd.read_csv(dataset_path)
    skf = StratifiedKFold(n_splits=num_splits, shuffle=True, random_state=SEED)
    for j, (train_idxs, test_idxs) in enumerate(skf.split(range(len(df)), df.objective_true)):
        train_df, test_df = df.iloc[train_idxs], df.iloc[test_idxs]

        if fold == j:
            return train_df, test_df

In [None]:
for dataset in ["politifact", "gossipcop", "celebrity", "fakenewsamt"]:
    mean_fnr = 0
    mean_fpr = 0
    mean_triggered_signals = np.zeros(19)
    confusion_matrices = []
    mean_f1 = 0
    for i in range(10):
        train_df, test_df = get_train_test_fold(i, dataset)
        L_train = train_df.iloc[:, :19].to_numpy()
        y_train = train_df["objective_true"].to_numpy()
        L_test = test_df.iloc[:, :19].to_numpy()
        y_test = test_df["objective_true"].to_numpy()

        label_model = LabelModel(cardinality=2, verbose=False)
        label_model.fit(L_train, n_epochs=2000, log_freq=100, seed=SEED)
        prediction = label_model.predict(L_test, tie_break_policy="random")
        mean_fnr += ((prediction == 0) & (y_test == 1)).sum()/(y_test == 1).sum()
        non_zero_L = L_test.copy()
        non_zero_L[non_zero_L == -1] = 0
        non_zero_L

        mean_fpr += ((prediction == 1) & (y_test == 0)).sum()/(y_test == 0).sum()

        fn_triggered_signals = non_zero_L[((prediction == 0) & (y_test == 1))].sum(axis=0)  # triggered signals   
        label_model = LabelModel(cardinality=2, verbose=False)
        label_model.fit(L_train, n_epochs=500, log_freq=100, seed=SEED)
        prediction = label_model.predict(L_test, tie_break_policy="random")

        mean_f1 += f1_score(y_test, prediction, average="macro")
        cm = confusion_matrix(y_test, prediction)
        sum_of_entries = np.sum(cm)
        confusion_matrices.append(cm / sum_of_entries)

    average_confusion_matrix = np.mean(confusion_matrices, axis=0) * 100
    std_confusion_matrix = np.std(confusion_matrices, axis=0) * 100

    mean_triggered_signals += fn_triggered_signals

    mean_f1 /= 10
    mean_fnr /= 10
    mean_fpr /= 10
    mean_triggered_signals /= 10

    print(dataset)
    print("F1 Macro", mean_f1)
    print("FNR", mean_fnr)
    print("FPR", mean_fpr)
    labels = [f"{average_confusion_matrix[i][j]:.1f}%\n±{std_confusion_matrix[i][j]:.1f}%" for i in range(2) for j in range(2)]
    labels = [[labels[0], labels[1]], [labels[2], labels[3]]]

    plt.figure(figsize=(12,10))
    sns.set(font_scale=5.0)
    sns.heatmap(average_confusion_matrix, annot=labels, fmt="", cmap='Blues', vmin=0, vmax=100, annot_kws={"size": 60})
    if dataset == "politifact":
        plt.title("PolitiFact",fontweight="bold")
    elif dataset == "gossipcop":
        plt.title("GossipCop",fontweight="bold")
    elif dataset == "celebritydataset":
        plt.title("Celebrity",fontweight="bold")
    elif dataset == "fakenewsdataset":
        plt.title("FakeNewsAMT",fontweight="bold")

    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    # plt.savefig(f"../confusion_matrix_{dataset}.pdf", bbox_inches='tight', dpi=300)
    plt.show()


In [None]:
from sklearn.metrics import confusion_matrix


for dataset in ["politifact", "gossipcop", "celebrity", "fakenewsamt"]:
    mean_fnr = 0
    mean_fpr = 0
    mean_triggered_signals = np.zeros(19)
    confusion_matrices = []
    mean_f1 = 0
    for i in range(10):
        train_df, test_df = get_train_test_fold(i, dataset)
        L_train = train_df.iloc[:, :19].to_numpy()
        y_train = train_df["objective_true"].to_numpy()
        L_test = test_df.iloc[:, :19].to_numpy()
        y_test = test_df["objective_true"].to_numpy()

        label_model = LabelModel(cardinality=2, verbose=False)
        label_model.fit(L_train, n_epochs=2000, log_freq=100, seed=SEED)
        prediction = label_model.predict(L_test, tie_break_policy="random")
        mean_fnr += ((prediction == 0) & (y_test == 1)).sum()/(y_test == 1).sum()
        non_zero_L = L_test.copy()
        non_zero_L[non_zero_L == -1] = 0
        non_zero_L

        mean_fpr += ((prediction == 1) & (y_test == 0)).sum()/(y_test == 0).sum()

        fn_triggered_signals = non_zero_L[((prediction == 0) & (y_test == 1))].sum(axis=0)  # triggered signals   
        label_model = LabelModel(cardinality=2, verbose=False)
        label_model.fit(L_train, n_epochs=500, log_freq=100, seed=SEED)
        prediction = label_model.predict(L_test, tie_break_policy="random")

        mean_f1 += f1_score(y_test, prediction, average="macro")
        cm = confusion_matrix(y_test, prediction)
        sum_of_entries = np.sum(cm)
        confusion_matrices.append(cm)

    average_confusion_matrix = np.mean(confusion_matrices, axis=0)
    std_confusion_matrix = np.std(confusion_matrices, axis=0)

    mean_triggered_signals += fn_triggered_signals

    mean_f1 /= 10
    mean_fnr /= 10
    mean_fpr /= 10
    mean_triggered_signals /= 10

    print(dataset)
    print("F1 Macro", mean_f1)
    print("FNR", mean_fnr)
    print("FPR", mean_fpr)
    labels = [f"{average_confusion_matrix[i][j]:.1f}\n±{std_confusion_matrix[i][j]:.1f}" for i in range(2) for j in range(2)]
    labels = [[labels[0], labels[1]], [labels[2], labels[3]]]

    plt.figure(figsize=(12,10))
    sns.set(font_scale=5.0)
    sns.heatmap(average_confusion_matrix, annot=labels, fmt="", cmap='Blues', vmin=average_confusion_matrix.min(), vmax=average_confusion_matrix.max(), annot_kws={"size": 60})
    if dataset == "politifact":
        plt.title("PolitiFact",fontweight="bold")
    elif dataset == "gossipcop":
        plt.title("GossipCop",fontweight="bold")
    elif dataset == "celebritydataset":
        plt.title("Celebrity",fontweight="bold")
    elif dataset == "fakenewsdataset":
        plt.title("FakeNewsAMT",fontweight="bold")

    plt.xlabel('Predicted Label')
    plt.ylabel('True Label')
    # plt.savefig(f"../confusion_matrix_{dataset}.pdf", bbox_inches='tight', dpi=300)
    plt.show()


In [None]:
all_preds = {}
for dataset in ["gossipcop", "politifact", "celebrity", "fakenewsamt"]:
        all_preds[dataset] = []
        for i in range(10):
                train_df, test_df = get_train_test_fold(i, dataset)
                L_train = train_df.iloc[:, :19].to_numpy()
                y_train = train_df["objective_true"].to_numpy()
                L_test = test_df.iloc[:, :19].to_numpy()
                y_test = test_df["objective_true"].to_numpy()

                label_model = LabelModel(cardinality=2, verbose=False)
                label_model.fit(L_train, n_epochs=2000, log_freq=100, seed=SEED)
                prediction = label_model.predict(L_test, tie_break_policy="random")
                test_df["prediction"] = prediction
                all_preds[dataset].append(test_df)

In [None]:
all_dfs = []
for dataset in ["gossipcop", "politifact", "celebrity", "fakenewsamt"]:
    df = pd.concat(all_preds[dataset])
    df_signals = df.iloc[:, :19]
    df_signals[df_signals == -1] = 0
    df_signals["objective_true"] = df["objective_true"]
    df_signals["prediction"] = df["prediction"]
    df = df_signals

    df_fp = df[(df["objective_true"] == 1) & (df["prediction"] == 0)]
    num_fp = len(df_fp)
    df_fp = df_fp.drop(["objective_true", "prediction"], axis=1)
    # df_fp = df_fp.sum(axis=0) / df_fp.sum(axis=0).sum() * 100
    df_fp = df_fp.sum(axis=0) / num_fp
    df_fp = df_fp.sort_values(ascending=False)

    df_tn = df[(df["objective_true"] == 1) & (df["prediction"] == 1)]
    num_tn = len(df_tn)
    df_tn = df_tn.drop(["objective_true", "prediction"], axis=1)
    # df_tn = df_tn.sum(axis=0) / df_tn.sum(axis=0).sum() * 100
    df_tn = df_tn.sum(axis=0) / num_tn
    df_tn = df_tn.sort_values(ascending=False)

    # Combine true negatives and false positives
    df = pd.concat([df_tn, df_fp], axis=1)
    df.columns = ["True Positives", "False Negatives"]

    
    # Calculate the percentage decrease from true positives to false negatives
    df["Change"] = (np.round(df["True Positives"], 1) - np.round(df["False Negatives"], 1)) / np.round(df["True Positives"], 1) * 100
    
    df = df[["True Positives", "False Negatives", "Change"]]
    all_dfs.append(df)

# Concatenate all datasets into one DataFrame

df = pd.concat(all_dfs, axis=1)
df.columns = pd.MultiIndex.from_product([["GossipCop", "PolitiFact", "Celebrity", "FakeNewsAMT"], ["True Positives", "False Negatives", "Change"]])
df_mean = df.groupby(level=1, axis=1).mean()

# Add the mean columns
df["Mean", "True Positives"] = df_mean["True Positives"]
df["Mean", "False Negatives"] = df_mean["False Negatives"]
df["Mean", "Change"] = ((df["Mean", "True Positives"] - df["Mean", "False Negatives"])) / df["Mean", "True Positives"] * 100

# Rearrange the columns
df = df[["PolitiFact", "GossipCop", "FakeNewsAMT", "Celebrity", "Mean"]]
df = df.sort_values(("Mean", "Change"), ascending=False)

# add a row that sums the value of all columns
df.loc["Total"] = df.sum()
df.loc["Total", ("Mean", "Change")] = (df.loc["Total", ("Mean", "True Positives")] - df.loc["Total", ("Mean", "False Negatives")]) / df.loc["Total", ("Mean", "True Positives")] * 100
df.loc["Total", ("PolitiFact", "Change")] = (df.loc["Total", ("PolitiFact", "True Positives")] - df.loc["Total", ("PolitiFact", "False Negatives")]) / df.loc["Total", ("PolitiFact", "True Positives")] * 100
df.loc["Total", ("GossipCop", "Change")] = (df.loc["Total", ("GossipCop", "True Positives")] - df.loc["Total", ("GossipCop", "False Negatives")]) / df.loc["Total", ("GossipCop", "True Positives")] * 100
df.loc["Total", ("Celebrity", "Change")] = (df.loc["Total", ("Celebrity", "True Positives")] - df.loc["Total", ("Celebrity", "False Negatives")]) / df.loc["Total", ("Celebrity", "True Positives")] * 100
df.loc["Total", ("FakeNewsAMT", "Change")] = (df.loc["Total", ("FakeNewsAMT", "True Positives")] - df.loc["Total", ("FakeNewsAMT", "False Negatives")]) / df.loc["Total", ("FakeNewsAMT", "True Positives")] * 100
df = df.round(1)
df.fillna(0, inplace=True)
df