# **TRI-TRAINING - ☘** 

##### **Autora: Patricia Hernando Fernández**

In [21]:
from sklearn.datasets import load_iris
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split, KFold, StratifiedKFold
import numpy as np

In [22]:
dataset = load_iris()

X = np.array(dataset.data)
y = np.array(dataset.target)

rd = np.random.RandomState(5)
skf = StratifiedKFold(n_splits=2, shuffle=True, random_state=rd)

for train_index, test_index in skf.split(X, y):

        X_train, X_test = X[train_index], X[test_index]
        y_train, y_test = y[train_index], y[test_index]
        L_train, U_train, Ly_train, Uy_train = train_test_split(X_train, y_train, test_size=0.8, random_state=rd, stratify=y_train)

In [23]:
class Tri_Training:  

    def __init__(self, L, y, U, h_0, h_1, h_2, random_state=None):
        
        self.n = 3

        self.n_L = L.shape[0]
        self.L = L
        self.y = y
        self.U = U

        self.mask_L = np.zeros(shape=((self.L.shape[0]), self.n), dtype=int, order='C')

        self.classes = np.unique(y)
        self.rd = self.check_random_state(random_state)

        self.classifiers = self.initialize_classifiers([h_0, h_1, h_2])


    def initialize_classifiers(self, cls):

        classifiers = {}

        # La máscara en este va a sobrar, no hace falta memoria se reentrena con todo L
        for i in range(self.n):

            # Además, igual es mejor bootstrapear al 100% del tamaño de L (replace)
            rand_rows = self.rd.choice(a = np.arange(0, self.n_L), replace = True, size=(int(0.8*self.n_L)) )
            self.mask_L[rand_rows, i] = 1
            classifiers[i] = cls[i].fit(self.L[rand_rows, :], self.y[rand_rows])

        return classifiers


    def measure_error(self, i):
        """
            In detail, the classification error of the
            hypothesis is approximated through dividing the number
            of labeled examples on which both hj and hk make
            incorrect classification by the number of labeled examples
            on which the classification made by hj is the same as that
            made by hk.

        """
        
        prediction_j = self.classifiers[(i+1) % 3].predict(self.L)
        prediction_k = self.classifiers[(i+2) % 3].predict(self.L)

        incorrect_classification = np.logical_and(prediction_j != self.y, prediction_k == prediction_j)
        concordance = (prediction_j == prediction_k)

        return sum(incorrect_classification) / sum(concordance)


    def create_pseudolabeled_set(self, i):
        """
        when two models agree on the label, save it
        """

        U_y_j = self.classifiers[(i+1) % 3].predict(self.U)
        U_y_k = self.classifiers[(i+2) % 3].predict(self.U)

        concordances = (U_y_j == U_y_k)

        return (self.U[concordances], U_y_j[concordances])


    def fit(self):

        previous_e = [0.5 for i in range(self.n)]
        previous_l = [0 for i in range(self.n)]

        e = [0 for i in range(self.n)]
        l = [0 for i in range(self.n)]

        new_data = True

        while new_data:

            cls_changes = np.array([False for i in range(self.n)])
            cls_pseudo_updates = [(np.array([]), np.array([])) for i in range(self.n)]

            for i in range(self.n):

                e[i] = self.measure_error(i)

                if e[i] < previous_e[i]:
                    cls_pseudo_updates[i] = self.create_pseudolabeled_set(i)

                if previous_l[i] == 0:
                    previous_l[i] = ((e[i] / (previous_e[i]-e[i])) + 1)

                L_i_size = cls_pseudo_updates[i][0].shape[0]

                if previous_l[i] < L_i_size:

                    if e[i] * L_i_size < previous_e[i] * previous_l[i]:
                        cls_changes[i] = True
                    
                    elif previous_l[i] > (e[i] / (previous_e[i] - e[i])):

                        L_index = self.rd.choice(L_i_size, int( (previous_e[i] * previous_l[i] / e[i]) - 1))
                        cls_pseudo_updates[i] = (cls_pseudo_updates[i][0][L_index], cls_pseudo_updates[i][1][L_index])
                        cls_changes[i] = True

            if cls_changes.sum() == 0:
                new_data = False

            else:

                for i in np.fromiter(self.classifiers.keys(), dtype=int)[cls_changes]:

                    X_train = np.concatenate((self.L, cls_pseudo_updates[i][0]))
                    y_train = np.concatenate((self.y, cls_pseudo_updates[i][1]))
                    self.classifiers[i] = self.classifiers[i].fit(X_train, y_train)

                    previous_e[i] = e[i]
                    previous_l[i] = cls_pseudo_updates[i][0].shape[0] #Tamaño de Li anterior



    def check_random_state(self, seed):
        """
        Turn seed into a np.random.RandomState instance.
        Source: SkLearn

        Parameters
        ----------
        seed : None, int or instance of RandomState
            If seed is None, return the RandomState singleton used by np.random.
            If seed is an int, return a new RandomState instance seeded with seed.
            If seed is already a RandomState instance, return it.
            Otherwise raise ValueError.

        Returns
        -------
        numpy.random.RandomState
            The random state object based on seed parameter.
        """

        if seed is None or seed is np.random:
            return np.random.mtrand._rand

        if isinstance(seed, numbers.Integral):
            return np.random.RandomState(seed)

        if isinstance(seed, np.random.RandomState):
            return seed
            
        raise ValueError("%r cannot be used to seed a numpy.random.RandomState instance" % seed)


    # def single_predict(self, sample: np.array): 
    #     """
    #     Returns the class predicted by tri-training.

    #     Parameters
    #     ----------
    #     sample: np_array
    #         sample to predict

    #     Returns
    #     -------
    #     np.array:
    #         label predicted by tri-training.
    #     """

    #     count = {i: 0  for i in self.classes}

    #     for i in (cls.predict([sample])[0] for cls in self.classifiers.values()):
    #         count[i]+= 1

    #     max_agreement = max(count.values())
    #     return list(count.keys())[list(count.values()).index(max_agreement)]


    # def predict(self, samples: np.array) -> np.array:
    #     """
    #     Returns the labels predicted by the coforest
    #     for a given data.

    #     Parameters
    #     ----------
    #     samples: np_array
    #         samples to predict

    #     Returns
    #     -------
    #     np.array:
    #         labels predicted by the coforest.
    #     """
        
    #     samples = (lambda x: np.expand_dims(x, axis=0) if x.ndim == 1 else x)(samples)
    #     return np.array([self.single_predict(sample) for sample in samples])

In [19]:

h_1 = DecisionTreeClassifier()
h_2 = DecisionTreeClassifier()
h_3 = DecisionTreeClassifier()

t_t = Tri_Training(L_train, Ly_train, U_train, h_1, h_2, h_3)
t_t.fit()