In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from catboost import CatBoostClassifier
from sklearn.metrics import log_loss, roc_auc_score, precision_recall_curve, matthews_corrcoef, classification_report
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from catboost import Pool, cv
import optuna
from optuna.samplers import TPESampler
import logging
import sys
import umap.plot
import os
from tqdm import tqdm

In [None]:
SENTENCES_FILE = './data/<YOUR_SENTENCES_FILE>.csv'
EMBEDDINGS_FILE = './data/<YOUR_EMBEDDINGS_FILE>.npy'

In [None]:
# array(['tfidf', 'bertlegal4', 'multilingua4', 'beto4'], dtype=object)
def load_dataset(which, verbose=True):
    if which == 'beto4':
        filename = "..."
    elif which == 'bertlegal4':
        filename = "..."
    elif which == 'tfidf':
        filename = "..."
    elif which == 'multilingua4':
        sentences_df = pd.read_csv(SENTENCES_FILE)
        embeddings = np.load(EMBEDDINGS_FILE)
        if verbose:
            print(which, SENTENCES_FILE, sentences_df.shape)
            print(which, EMBEDDINGS_FILE, embeddings.shape)
        data_df = pd.DataFrame(embeddings)
        data_df['bias'] = sentences_df['bias']
        return data_df
    else:
        return None
    
    data_df = pd.read_csv(filename)
    data_df = data_df.drop(['doc','text','page'], axis=1)
    if verbose:
        print(which, filename, data_df.shape)
    return data_df

In [None]:
data_df = load_dataset('beto4')
data_df.head()

In [None]:
# Ratio between positive and negative examples
scale_pos_weight = len(data_df[data_df.bias==0]) / len(data_df[data_df.bias==1]) 
print(scale_pos_weight)
data_df['bias'].value_counts()

In [None]:
def train_catboost(df, test_size=0.2, random_seed=4, plot=False, weight=1, metric='MCC', learning_rate=0.034741, verbose=True, extra_results=False):
     if verbose:
          print(metric)
     X = df.drop('bias', axis=1)
     y = df['bias']
     
     X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, stratify=y, random_state=random_seed)
     if verbose:
          model = CatBoostClassifier(loss_function='Logloss', eval_metric=metric, learning_rate=learning_rate,
                              scale_pos_weight=weight, random_seed=random_seed, verbose=verbose)
     else:
          model = CatBoostClassifier(loss_function='Logloss', eval_metric=metric, learning_rate=learning_rate,
                              scale_pos_weight=weight, random_seed=random_seed, logging_level='Silent')
     # https://forecastegy.com/posts/catboost-binary-classification-python/
     model.fit(X_train, y_train, eval_set=(X_test, y_test), 
               verbose_eval=300,
               early_stopping_rounds=500,
               use_best_model=True,
               plot=plot)
     
     class_predictions = model.predict(X_test)
     probability_predictions = model.predict_proba(X_test)
     
     if verbose:
          log_loss_value = log_loss(y_test, probability_predictions[:,1])
          print(f'Log Loss: {log_loss_value}')
     
          roc_auc = roc_auc_score(y_test, probability_predictions[:,1])
          print(f'ROC AUC: {roc_auc}')
     
          mcc = matthews_corrcoef(y_test, class_predictions)
          print(f'MCC: {mcc}')
     
     class_report = classification_report(y_test, class_predictions)
     if verbose:
          print(f'Classification Report:\n {class_report}')
     
     if plot:
          # ConfusionMatrixDisplay.from_predictions(y_test, class_predictions)
          ConfusionMatrixDisplay.from_estimator(model, X_test, y_test)
          plt.show()

     if extra_results:
          return model, X_test, y_test, classification_report(y_test, class_predictions, output_dict=True)
     else:
          return model

In [None]:
def crosseval_catboost(df, kfold=5, random_seed=4, plot=False, weight=1, metric='MCC', learning_rate=0.034741):
    print(metric)
    X = df.drop('bias', axis=1)
    y = df['bias']

    params = {"iterations": 500,
          "learning_rate": learning_rate,
          "random_seed": random_seed,
          #"depth": 2,
          "loss_function": "Logloss",
          "eval_metric": metric,
          "scale_pos_weight": weight, 
          "logging_level": 'Silent'
          #"verbose": False
    }

    scores = cv(
        params = params,
        pool = Pool(data=X,label=y),
        fold_count=kfold,
        shuffle=True,
        #partition_random_seed=0,
        plot=plot,
        stratified=True,
        verbose=False
    )

    best_value_logloss = scores['test-Logloss-mean'].min()
    best_iter_logloss = scores['test-Logloss-mean'].values.argmin()
    std_logloss = scores['test-Logloss-std'].values[best_iter_logloss]
    best_value_metric = scores['test-'+metric+'-mean'].max()
    best_iter_metric = scores['test-'+metric+'-mean'].values.argmax()
    std_metric = scores['test-'+metric+'-std'].values[best_iter_metric]

    print(f'Best Log Loss: {best_value_logloss} at iteration {best_iter_logloss}, std: {std_logloss}')
    print(f'Best {metric}: {best_value_metric} at iteration {best_iter_metric}, std: {std_metric}')

    return scores

In [None]:
def objective(trial, df, metric, kfolds=5, iterations=500, depth=2): 
    params = {
        "iterations": iterations,
        "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.1, log=True), 
        "random_seed": trial.suggest_int("random_seed", 1, 10),
        "depth": depth, 
        "loss_function": "Logloss",
        "eval_metric": metric, #"Recall", # "MCC",
        "scale_pos_weight": trial.suggest_float("scale_pos_weight", 1, 25, step=0.5),
        "logging_level": 'Silent'
    }

    X = df.drop('bias', axis=1)
    y = df['bias']
    scores_df = cv(
        params=params,
        pool=Pool(data=X,label=y),
        fold_count=kfolds,
        shuffle=True,
        plot=False,
        stratified=True,
        verbose=False
    )

    best_value_logloss = scores_df['test-Logloss-mean'].min()
    best_value_metric = scores_df['test-'+metric+'-mean'].max()

    rs = params['random_seed']
    lr = params['learning_rate']
    w = params['scale_pos_weight']
    best_model, X_test, y_test, class_report = train_catboost(df, weight=w, plot=False, random_seed=rs, metric=metric, learning_rate=lr, 
                                                    extra_results=True, verbose=False)

    trial.set_user_attr('1-precision', class_report['1']['precision'])
    trial.set_user_attr('1-recall', class_report['1']['recall'])
    trial.set_user_attr('metric', metric)

    return best_value_metric


def run_optuna(df, study_id, database_name="example-study.db", n_trials=10, metric='MCC'):
    print('Optimizing for', metric)
    study_name = database_name.replace(".db", str(study_id)) # Unique identifier for each study.
    storage_name = "sqlite:///{}".format(database_name)
    print("DB:", storage_name, study_name)

    # Run the study
    sampler = TPESampler(seed=1)
    study = optuna.create_study(direction='maximize', study_name=study_name, storage=storage_name, load_if_exists=True, sampler=sampler)
    obj = lambda trial: objective(trial, df, metric)
    study.optimize(obj, n_trials=n_trials, show_progress_bar=True)

    print('Best hyperparameters:', study.best_params)
    print('Best '+metric+':', study.best_value)
    return study 


In [None]:
# Run hyper-parameter tuning, if necessary
# study = run_optuna(data_df, study_id=1, n_trials=8, metric='MCC')

In [None]:
# print(trial.best_params, trial.best_value)

rs = 6 #trial.best_params['random_seed']
w = 25 #trial.best_params['scale_pos_weight']
lr = 0.002574 #trial.best_params['learning_rate']

best_model, X_test, y_test, class_report = train_catboost(data_df, weight=w, plot=True, random_seed=rs, metric='MCC', learning_rate=lr, extra_results=True)


In [None]:
pd.DataFrame(class_report)

In [None]:
os.environ["TOKENIZERS_PARALLELISM"] = "false"

umap_args2 = {'n_neighbors': 15,
             'n_components': 2,
             'metric': 'cosine'}

embeddings = data_df.drop('bias', axis=1).values
print(embeddings.shape)

umap_model2D = umap.UMAP(**umap_args2, random_state=42).fit(embeddings)
u = umap_model2D.transform(embeddings)
# crosseval_catboost(data_df, weight=w, plot=True, random_seed=rs, metric='PRAUC')

In [None]:
def plot_bias(u, y, title=""):
    c = ['gray' if x == 0 else 'r' for x in y]
    plt.scatter(u[:,0], u[:,1], c=c, alpha=0.4, s=12)
    plt.axis("off")
    plt.title(title)
    plt.show()

In [None]:
y = best_model.predict(embeddings) 
plot_bias(u, y, title='Predicted bias (full dataset)')

y = data_df['bias'] 
plot_bias(u, y, title='Actual bias (full dataset)')

In [None]:
import umap.plot

umap.plot.points(umap_model2D)