In [None]:
# Import all models to use them within voting technique
# Known that voting work with mulitple of estimators with the same data set
# So no bootstrapping 
# We have the same idea of aprallelism and voting with extra feature of soft voting so we work with probablities with pre aggregate and hard voting is after aggregate which is the same idea of bagging
from Libraries.LinearRegression import *
from Libraries.LogisticRegression import *
from Libraries.KNN import *
from Libraries.SVM import *
from Libraries.NaiveBayes import *
from Libraries.DecisionTree import *

In [None]:
import numpy as np
# We need this copy for cloning base estimator
import copy
# For parallelism and working with multiple of cores 
from joblib import Parallel, delayed

class VotingBase():

    # Initialization
    def __init__(self, estimators = None, n_jobs = None):
        # Here we gonna define the estimators used later in classification or regression
        # Others is the same default paramerts of API
        # n_jobs=None or 1 is the same
        self.n_estimators = estimators
        self.n_jobs = n_jobs
        self.models = []

    # Clone function
    def _clone(self):
        # Here we use the library of copy to return clone of base_estimator
        # The idea here to siolate object per model
        # It is only for learning phase
        return copy.deepcopy(self.n_estimator)

    # Modeling function
    # We will have one fit function per every model then combine all fits in general fit function for all memebers
    # Fit single model
    def _fit_single_model(self,current_estimator X, y):      
        # Get cloned copy of base_estimator object
        # Every time we gonna pass the object of base estimator
        # here we have to pass current estimator as it is not the same every time
        cloned_model = self._clone(current_estimator)

        # We use the fit function for specific single model which was already built from scratch before
        # Here no bottstrapped data so we work with X and y directly
        cloned_model.fit(X, y)

        # Every time we return the fitted model
        return cloned_model
    
    # Fit all models function
    # Here we gather all fitted models 
    def fit(self, X, y):
        # No random state exist
        # Loop over estimator number collecting all fitted models in one place to be ready for test phase
        # We call _fit_single_model every time
        self.models = Parallel(n_jobs=self.n_jobs)(
            delayed(self._fit_single_model)(curtrent_estimator, X, y) for curtrent_estimator in self.n_estimators
        )    
        
        # We return nothing
        return self
    
    # The same structure of fit will be built for predict
    # We will have one predict function per every model then combine all predicts in general predict function for all memebers
    # Predict single model
    def _predict_single_model(self, current_estimator, X):
        # We use the predict function for specific single model which was already built from scratch before
        # We use test data
        return current_estimator.predict(X)
    
    # Precit all models function
    # Here we gather all precited models 
    def predict(self, X):
        # Loop over model collecting all predictions in one place to be ready for aggregation later using cpu cores (parallelism)
        # We call _predict_single_model every time
        # to apply predict we need to concatenate with fit first this is gathered in self.models
        # Here every fitted model work with all samples 
        # So each sample have multiple predictions for all models
        predictions = Parallel(n_jobs=self.n_jobs)(
            delayed(self._predict_single_model)(model, X) for model in self.models
        )    
        
        # Preditction as numpy array
        predictions = np.array(predictions)
        
        # After collecting predictions we are ready for aggregation
        return self._aggregate(predictions)
    
    # Aggregation function    
    # Aggregation to decide the final output
    # Here we create just abstract so after inheritence each of classification and regression has its own methodoly of aggregation for final prediction
    def _aggregate(self, predictions):
        raise NotImplementedError  
    
# Voting ensemble Classifier class
class VotingClassifier(VotingBase):

    # Intialization
    # We pass known numbers of paramters to parent class and also at the same time get known numbers of paramters
    def __init__(self, estimators = None, n_jobs=1, voting = 'hard'):
        
        super().__init__(estimators, n_jobs)

        # Here handling error of voting know that default is hard voting
        if voting not in ("hard", "soft"):
            raise ValueError("Voting must be 'hard' or 'soft'.")
        
        self.voting = voting

    # Aggregation function  
    # This represents hard voting which is the same as bagging  
    def _aggregate(self, predictions):

        # Get number of samples as every sample wich will be columns as every column represents sample and indexes represent models  
        n_samples = predictions.shape[1]

        # Gather all predictions in one array and make sure its data type the same as the source predictions array
        # Intialize
        final_predictions = np.zeros(n_samples, dtype=predictions.dtype)

        # Apply majority voting 
        # Loop over all samples to get the most voted label
        for sample in range(n_samples):
            # Featch models of samples values
            # Every sample represents column
            votes = predictions[:, sample]

            # Here we get unique classes then counting them
            # We return two related arrays of classes and labels 
            # One for unique values and other for unique counts
            # So return the index of laregest count will be equivelant to the label of largest count
            classes, counts = np.unique(votes, return_counts=True)
            majority_voting = classes[np.argmax(counts)]
            final_predictions[sample] = majority_voting

        # Return the final predictions of the test data
        return final_predictions
    
    # Predict function
    def predict(self, X):
        # Here we need to add new logic of soft voting and for hard vote we just use the predict function of the base class
        if self.voting == "soft":
            probas = Parallel(n_jobs=self.n_jobs)(
                delayed(model.predict_proba)(X)
                for model in self.models
            )
            probas = np.array(probas)
            return self._aggregate(probas)
        
        # Hard voting logic is the same depends on base class
        return super().predict(X)()


# Voting ensemble Tree Regressor class
class VotingRegressor(VotingBase):

    # Intialization
    # We pass known numbers of paramters to parent class and also at the same time get known numbers of paramters
    def __init__(self, estimators = None, n_jobs=1):
        
        # Here we handle if not estimator we raise and error
        if estimators is None:
            raise ValueError('No estimators have been passed.')

        super().__init__(estimators, n_jobs)

    # Aggregation function 
    def _aggregate(self, predictions):
        # We just return the mean of every sample wich will be columns as every column represents sample and indexes represent models
        return np.mean(predictions, axis=0)  
  
    # Score
    def score(self, X, y):
        return np.mean((self.predict(X) - y) ** 2)