In [None]:
import fasttext
import pandas as pd
import numpy as np
import random, string, os

from ast import literal_eval
from collections import Counter

from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import classification_report

In [None]:
def flatten(t):
    return [item for sublist in t for item in sublist]


def custom_stratified_train_test_split(df: pd.DataFrame, ratios: dict[str, float]):
    """
    custom function for stratified train test splitting
    1) take unique sub-tags (example: ['Health'])
    2) For each unique subtag:
        i) take all indexes that have that specific subtag
        ii) split them randomly to train, val and test sets
    """

    train_ids = []
    val_ids = []
    test_ids = []

    ratio_val_to_test = ratios['val'] / (1 - ratios['train'])
    positive_df = df.copy()
    positive_df["target"] = positive_df.target.apply(str)
    ids = positive_df.groupby("target")["entry_id"].agg(list).values
    unique_ids = [list(np.unique(list_tmp)) for list_tmp in ids]

    for ids_entry in unique_ids:

        train_ids_entry = random.sample(
            ids_entry, int(len(ids_entry) * ratios["train"]) + 1
        )

        val_test_ids_entry = list(set(ids_entry) - set(train_ids_entry))

        val_ids_entry = random.sample(
            val_test_ids_entry, int(len(val_test_ids_entry) * ratio_val_to_test) 
        )
        test_ids_entry = list(set(val_test_ids_entry) - set(val_ids_entry))

        train_ids.append(train_ids_entry)
        val_ids.append(val_ids_entry)
        test_ids.append(test_ids_entry)

    train_df = df[df.entry_id.isin(flatten(train_ids))]
    val_df = df[df.entry_id.isin(flatten(val_ids))]
    test_df = df[df.entry_id.isin(flatten(test_ids))]

    return train_df, val_df, test_df

In [None]:
important_columns = ['entry_id', 'project_id', 'lead_id', 'analysis_framework_id', 'excerpt', 'sectors', 'subpillars_1d', 'subpillars_2d', 'lang']
dataset = pd.read_csv("./test_dataset.csv")[important_columns]
nb_samples = dataset.shape[0]


classification_columns = ['sectors', 'subpillars_1d', 'subpillars_2d']
for col in classification_columns:
    dataset[col] = dataset[col].apply(literal_eval)
    


dataset["target"] = dataset.apply(
        lambda x: x.sectors + x.subpillars_1d + x.subpillars_2d, axis=1
    )

most_frequent_tags = list(dict(Counter(flatten(dataset['target'])).most_common(50)).keys())

dataset["target"] = dataset["target"].apply(
    lambda x: [tag for tag in x if tag in most_frequent_tags]
)

In [None]:
## SELECT LANGUAGE

d = dataset[dataset["lang"]=="en"]

In [None]:
ratios = {'train': 0.7, 'val': 0.2, 'test': 0.1}
train_df, val_df, test_df = custom_stratified_train_test_split(d, ratios)

In [None]:
dataset["subpillars"] = dataset.apply(
        lambda x: x.subpillars_1d + x.subpillars_2d, axis=1
    )

In [None]:
def clean_sentence(x):
    x = x.replace("\n", " ")
    x = x.translate(str.maketrans(' ', ' ', string.punctuation))
    return x

def prepare_fasttext_data(df, column, filename=None):
    if not os.path.exists("./fast_data"):
        os.makedir("./fast_data")
    total = []
    text = [c.strip().lower() for c in df.excerpt]
    target = [[a.strip().lower().replace(" ", "*") for a in c] if c else ["NEGATIVE"] for c in df[column].tolist()]
    for x, y in zip(text, target):
        x = clean_sentence(x)
        labels = " ".join([f"__label__{c}" for c in y])
        total.append(" ".join([labels, x]))
        
    a =  "\n".join(total)
    with open(f"./fast_data/{filename}", "w+") as f:
        f.write(a)
        
def prepare_total_data(columns=["sectors"]):
    pass

In [None]:
prepare_fasttext_data(train_df, "sectors", "sectors.train")
prepare_fasttext_data(val_df, "sectors", "sectors.val")
prepare_fasttext_data(test_df, "sectors", "sectors.test")

In [None]:
model = fasttext.train_supervised(input="./fast_data/sectors.train",
                                  autotuneValidationFile="./fast_data/sectors.val",
                                  thread=1,
                                  loss="ova")

In [None]:
def get_pred(filename, model, thres = 0.5):
    tot = []
    test = open(filename, "r").read().split("\n")
    for s in test:
        labels = [c for c in s.split() if "__label__" in c]
        ss = " ".join([c for c in s.split() if "__label__" not in c]).strip()
        pred = model.predict(ss, k=-1, threshold=thres)
        lab = [c.replace("__label__","").replace("*", " ") for c in pred[0] if not "NEGATIVE" in c]
        tot.append(lab)
    return tot

In [None]:
pred = get_pred("./fast_data/sectors.val", model)

In [None]:
target = [[c.lower() for c in a] for a in val_df.subpillars_1d]
multi = MultiLabelBinarizer()
multi.fit(target)
target = multi.transform(target)

In [None]:
print(classification_report(target, multi.transform(pred), target_names=multi.classes_))