## The CrossTree

Voting Schema with multiple binary classification trees. The network implements a voting scheme based on three different trees:

* Cultural **Agnostic-Rappresentative** tree
* Cultural **Agnostic-Exclusive** tree
* Cultural **Exclusive-Rappresentative** tree

the most voted class will be the predicted class.

### Training Phase

The training process is quite standard and straight-forward: given the n G_features we want to directly predict the associated class.

### Employment Phase

The training model will be inserted in a wider model called X and utilized as a function for the computation of the G_Factor

## Import Necessary Libraries

In [None]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import OneHotEncoder
from CU_Dataset_Factory import Hf_Loader, CU_Dataset_Factory

from sklearn.feature_selection import SelectFdr, chi2, VarianceThreshold, RFE
from sklearn.ensemble import RandomForestClassifier

from sklearn.neural_network import MLPClassifier
from sklearn.metrics import classification_report

from scipy.stats import mode
from sklearn.base import ClassifierMixin, BaseEstimator, clone
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import LocalOutlierFactor

import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

## Dataset

### Load the dataset

In [None]:
def onehot_encode(
    df_train: pd.DataFrame,
    df_test: pd.DataFrame,
    cat_cols: list[str]|None = None,
    num_cols: list[str]|None = None,
    sparse: bool = False
) -> tuple[pd.DataFrame, pd.DataFrame, OneHotEncoder]:
    
    """
    Applies One-Hot Encoding to df_train and df_test guaranteeing the same
    set of columns, even if train is missing categories who are in the test set.

    Parameters
    ----------
    df_train : pd.DataFrame
        Training DataFrame.
    df_test : pd.DataFrame
        Testing DataFrame.
    cat_cols : list[str], optional
        List of categorical columns to encode.
        If None, all columns of type 'object' are taken.
    num_cols : list[str], optional
        List of numerical (or non-categorical) columns to preserve.
        If None, all columns not in cat_cols are taken. 
    handle_unknown : str, default="ignore"
        Beahavior on unknown values in test (typically "ignore").
    sparse : bool, default=False
        If True, returns sparse matrix, otherwise dense.

    Returns
    -------
    df_train_enc : pd.DataFrame
        Training DataFrame with One-Hot Encoding + original num_cols.
    df_test_enc : pd.DataFrame
        Testing DataFrame with One-Hot Encoding + original num_cols.
    encoder : OneHotEncoder
        The fitted OneHotEncoder object, useful for future transform.
    """
    
    # 1) Identify category and numerical columns (if not given)
    if cat_cols is None:
        cat_cols = df_train.select_dtypes(include="object").columns.tolist()
    if num_cols is None:
        num_cols = [c for c in df_train.columns if c not in cat_cols]

    # 2) Fit encoder on all category data (train + test)
    all_cats = pd.concat([df_train[cat_cols], df_test[cat_cols]], 
                         axis=0, ignore_index=True)
    encoder = OneHotEncoder(
        sparse_output=sparse
    ).fit(all_cats)

    # 3) Transform separatly train and test
    X_train_ohe = encoder.transform(df_train[cat_cols])
    X_test_ohe  = encoder.transform(df_test[cat_cols])

    # 4) Name the new columns
    ohe_cols = encoder.get_feature_names_out(cat_cols).tolist()

    # 5) Compose the final DataFrames
    df_train_enc = pd.DataFrame(
        np.hstack([X_train_ohe.toarray() if sparse else X_train_ohe,
                   df_train[num_cols].values]), # type: ignore
        columns=ohe_cols + num_cols,
        index=df_train.index
    )
    df_test_enc = pd.DataFrame(
        np.hstack([X_test_ohe.toarray() if sparse else X_test_ohe,
                   df_test[num_cols].values]),
        columns=ohe_cols + num_cols,
        index=df_test.index
    )

    return df_train_enc[ohe_cols], df_test_enc[ohe_cols], encoder

### Produce the Dataset

In [None]:
train = pd.read_csv('train.csv', sep='\t')
validation = pd.read_csv('validation.csv', sep='\t')

In [None]:
train.head(5)

In [None]:
validation.head(5)

## Features Selection

The features exstracted are grouped in many different categories. We have individuated three:

- **Static Features**: regard page structure, numbers of links and other aspects connected to the page that in general changes very slow
- **Semi-Dynamic Features**: regard information about wikipedia network like page references or page links, in general they tend to change in large amount of time
- **Dynamic Features**: Regard information about users and number of iteractions with corpus pages, in general they tend to change very quickly change very quicly and be very interesting because they allow us to classify instances based on natural cultural change

In the next cell we had selected the best features for our purposes

In [None]:
from sklearn.feature_selection import SelectFdr, chi2, VarianceThreshold, RFE
from sklearn.ensemble import RandomForestClassifier

In [None]:
# Labels
y_train = train[['label']]
y_validation = validation[['label']]
# Identificators
id_train = train[['wiki_name']]
id_validation = validation[['wiki_name']]
# Numeric features
fe_train = train[['languages', 'num_langs', 'reference','n_mod','back_links', 'G_nodes', 'G_num_cliques', 'G_density', 'G_mean_pr','G_num_components', 'n_visits']]
fe_validation = validation[['languages', 'num_langs', 'reference','n_mod','back_links', 'G_nodes', 'G_num_cliques', 'G_density', 'G_mean_pr','G_num_components','n_visits']]
# String features
fe_str_train = train[['category', 'subcategory', 'type']]
fe_str_validation = validation[['category', 'subcategory', 'type']]

In [None]:
best_feactures_tr = set (SelectFdr(chi2, alpha=0.05).fit(fe_train, y_train).get_feature_names_out())
best_features_te  = set (SelectFdr(chi2, alpha=0.05).fit(fe_validation, y_validation).get_feature_names_out())

best_fe = best_feactures_tr.intersection(best_features_te)
print(f'best features for train set {best_feactures_tr}')
print(f'best features for test set  {best_features_te}')

print(f'Absolute best features {best_fe}')

fe_train = train[list(best_fe)]
fe_validation = validation[list(best_fe)]

In [None]:
#######################################
# One-hot-encoding on string features #
#######################################

train_cat, validation_cat, _ =  onehot_encode(fe_str_train, fe_str_validation, ['category'] )
train_scat, validation_scat, _ = onehot_encode(fe_str_train, fe_str_validation, ['subcategory'] )
train_t, validation_t, _ = onehot_encode(fe_str_train, fe_str_validation, ['type'] )

In [None]:
print(validation_cat.shape)
print(validation_scat.shape)
print(validation_t.shape)

In [None]:
train_set = pd.concat([fe_train, train_cat, train_scat, train_t, y_train], axis=1)
validation_set = pd.concat([fe_validation, validation_cat, validation_scat, validation_t, y_validation], axis=1) 

In [None]:
train_set.head(3)

## Network

In [None]:
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import classification_report
from sklearn.ensemble import ExtraTreesClassifier

### Agnostic-Representative Classifier

In [None]:
# prepare the dataset in order to take only two classes and eliminates the labels of the elements
d = train_set.query("label == 0 or label == 1")
y = d['label'].astype(int).to_numpy()
x = d.drop(['label'], axis=1).astype(float).to_numpy()

In [None]:
ar_tree = ExtraTreesClassifier().fit(x, y)

In [None]:
d = validation_set.query("label == 0 or label == 1")
y = d['label'].astype(int).to_numpy()
x = d.drop(['label'], axis=1).astype(float).to_numpy()

In [None]:
y_pred = ar_tree.predict(x)
print(classification_report(y, y_pred))

### Agnostic-Exclusive Classifier

In [None]:
# prepare the dataset in order to take only two classes and eliminates the labels of the elements
d = train_set.query("label == 0 or label == 2")
y = d['label'].astype(int).to_numpy()
x = d.drop(['label'], axis=1).astype(float).to_numpy()

In [None]:
ae_tree = ExtraTreesClassifier().fit(x, y)

In [None]:
d = validation_set.query("label == 0 or label == 2")
y = d['label'].astype(int).to_numpy()
x = d.drop(['label'], axis=1).astype(float).to_numpy()

In [None]:
y_pred = ae_tree.predict(x)
print(classification_report(y, y_pred))

### Representative-Exclusive Classifier

In [None]:
# prepare the dataset in order to take only two classes and eliminates the labels of the elements
d = train_set.query("label == 1 or label == 2")
y = d['label'].astype(int).to_numpy()
x = d.drop(['label'], axis=1).astype(float).to_numpy()

In [None]:
re_tree = ExtraTreesClassifier().fit(x, y)

In [None]:
d = validation_set.query("label == 1 or label == 2")
y = d['label'].astype(int).to_numpy()
x = d.drop(['label'], axis=1).astype(float).to_numpy()

In [None]:
y_pred = re_tree.predict(x)
print(classification_report(y, y_pred))

## Voting Schema


In [None]:
import numpy as np
from scipy.stats import mode
from sklearn.base import ClassifierMixin, BaseEstimator
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import LocalOutlierFactor

class CABNet(BaseEstimator, ClassifierMixin):
    """
    Cultural-Classification Network:
      - ar: classifier for classes {0,1}
      - ae: classifier for classes {0,2}
      - re: classifier for classes {1,2}
      - detector: optional unsupervised estimator to filter outliers

    Combines three binary classifiers with majority voting.
    """
    def __init__(self,
                 ar_estimator,
                 ae_estimator,
                 re_estimator,
                 th=0.45,
                 th_ar=0,
                 th_ae=-0.6,
                 th_re=0,
                 detector=None) -> None:
        # Store base estimators for hyperparameter tuning
        self.ar_estimator = ar_estimator
        self.ae_estimator = ae_estimator
        self.re_estimator = re_estimator
        self.global_estimator = ExtraTreesClassifier()
        self.th = th
        self.th_ar = th_ar
        self.th_ae = th_ae
        self.th_re = th_re
        self.detector = detector
        
        # Label maps
        self._label_map = {
            'ar': {0, 1},
            'ae': {0, 2},
            're': {1, 2}
        }

    def _fit_binary(self, estimator, X, y):
        return estimator.fit(X, y)
        

    def fit(self, X, y):
        """
        Fit CABNet on feature matrix X (array-like, shape (n_samples, n_features))
        and labels y (array-like, shape (n_samples,)).
        """
        X_arr = np.asarray(X, dtype=float)
        y_arr = np.asarray(y, dtype=int)

        # Optional outlier removal via detector
        if self.detector is not None:
            mask_inliers = self.detector.fit_predict(X_arr) == 1
            X_in = X_arr[mask_inliers]
            y_in = y_arr[mask_inliers]
        else:
            X_in = X_arr
            y_in = y_arr


        # global 3 class estimator
        
        X_ar, y_ar = X_in, y_in
        self.global_estimator = self._fit_binary(self.global_estimator, X_ar, y_ar)

        # Train ar: classes 0 vs 1
        mask_ar = np.isin(y_in, list(self._label_map['ar']))
        X_ar, y_ar = X_in[mask_ar], y_in[mask_ar]
        self.ar_estimator = self._fit_binary(self.ar_estimator, X_ar, y_ar)

        # Train ae: classes 0 vs 2
        mask_ae = np.isin(y_in, list(self._label_map['ae']))
        X_ae, y_ae = X_in[mask_ae], y_in[mask_ae]
        self.ae_estimator = self._fit_binary(self.ae_estimator, X_ae, y_ae)

        # Train re: classes 1 vs 2
        mask_re = np.isin(y_in, list(self._label_map['re']))
        X_re, y_re = X_in[mask_re], y_in[mask_re]
        self.re_estimator = self._fit_binary(self.re_estimator, X_re, y_re)

        return self

    def predict(self, X):
        """
        Predict class labels for samples in X.
        """

        threshold = self.th
        X_arr = np.asarray(X, dtype=float)
        # Individual predictions
        v1 = self.ar_estimator.predict(X_arr)
        v2 = self.ae_estimator.predict(X_arr)
        v3 = self.re_estimator.predict(X_arr)
        v4 = self.global_estimator.predict(X_arr)

        p1 = self.ar_estimator.predict_proba(X_arr)
        p2 = self.ae_estimator.predict_proba(X_arr)
        p3 = self.re_estimator.predict_proba(X_arr)
        p4 = self.global_estimator.predict_proba(X_arr)

        
        n = X_arr.shape[0]
        final_votes = np.empty(n, dtype=v1.dtype)

        for i in range(n):
            # raccogli i voti “sicuri”
            votes_safe = []
            if p1[i].max() >= threshold + self.th_ar:
                votes_safe.append(v1[i])
            if p2[i].max() >= threshold + self.th_ae:
                votes_safe.append(v2[i]) 
            if p3[i].max() >= threshold + self.th_re:
                votes_safe.append(v3[i])
            
            if votes_safe:
                # maggioranza fra i voti sicuri
                maj, _ = mode(votes_safe, keepdims=False)
                final_votes[i] = maj
            else:
                probs = [p1[i].max(), p2[i].max(), p3[i].max(), p4[i].max()]
                votes = [v1[i],    v2[i],    v3[i],    v4[i]]
                best_idx = int(np.argmax(probs))
                final_votes[i] = votes[best_idx]
    
        return final_votes


In [None]:
param_grid = {
    'ar_estimator__n_estimators': [50, 100, 170],
    'ar_estimator__max_depth': [None, 10],
    'ae_estimator__n_estimators': [50, 100, 150],
    'ae_estimator__max_depth': [None, 10],
    're_estimator__n_estimators': [50, 100, 150],
    're_estimator__max_depth': [None, 10],
    'detector' : LocalOutlierFactor(),
    #'th' : [0.45,0,0.50,.65,.80],
    #'th_ar' : [0, +0.05, -0.05, +0.010, -0.010],
    #'th_ae' : [0, +0.05, -0.05, +0.010, -0.010],
    #'th_re' : [0, +0.05, -0.05, +0.010, -0.010]
}
grid = GridSearchCV(
    estimator=CABNet(ExtraTreesClassifier(), ExtraTreesClassifier(), ExtraTreesClassifier(), detector=LocalOutlierFactor()),
    param_grid=param_grid,
    scoring='accuracy',
    verbose=2,
    cv=2
   
)

In [None]:
d = train_set
y = d['label'].astype(int)
x = d.drop(['label'], axis=1).astype(float)

##############################
# For exaustive Gride Search #
##############################
grid.fit(x,y)
print(grid.best_params_, grid.best_score_)

In [None]:
model =  CABNet(ExtraTreesClassifier(150), ExtraTreesClassifier(100), ExtraTreesClassifier(90), th=0.45,th_ae=-0.05, th_ar=0.00, th_re=0.00, detector=LocalOutlierFactor())

In [None]:
d = train_set
y = d['label'].astype(int)
x = d.drop(['label'], axis=1).astype(float)

model = model.fit(x, y)

In [None]:
d = validation_set
y = d['label'].astype(int).to_numpy()
x = d.drop(['label'], axis=1).astype(float).to_numpy()

y_pred = model.predict(x)
print(classification_report(y, y_pred))

In [None]:
cm = confusion_matrix(y, y_pred)
sns.heatmap(cm, annot=True, fmt='d')
plt.xlabel('Predicted Label')
plt.ylabel('Real Label')
plt.title('Confusion Matrix')
plt.show()