#Feature selection with Imperialist Competitive Algorithm
Cardiovascular Disease dataset

In [1]:
from typing import List

import numpy as np
import pandas as pd

import time
import random as rn

from sklearn.metrics         import f1_score
from sklearn.model_selection import train_test_split

from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC

In [2]:
# Code to read csv file into Colaboratory:

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Preprocessing
in this section we loading data from CSV file into a Pandas DataFrame and split the features into X and resault column to y

In [3]:
X = pd.read_csv(
    "/content/drive/MyDrive/cardio_train.csv", sep = ';', 
    usecols = ['age', 
               'gender', 
               'height', 
               'weight', 
               'ap_hi', 
               'ap_lo', 
               'cholesterol', 
               'gluc', 
               'smoke', 
               'alco', 
               'active'],
)

y = pd.read_csv(
    "/content/drive/MyDrive/cardio_train.csv", sep = ';', squeeze = True, 
    usecols = ['cardio']
)

In [None]:
X.head()


Unnamed: 0,age,gender,height,weight,ap_hi,ap_lo,cholesterol,gluc,smoke,alco,active
0,18393,2,168,62.0,110,80,1,1,0,0,1
1,20228,1,156,85.0,140,90,3,1,0,0,1
2,18857,1,165,64.0,130,70,3,1,0,0,0
3,17623,2,169,82.0,150,100,1,1,0,0,1
4,17474,1,156,56.0,100,60,1,1,0,0,0


#Country
This class implements the country concept in the ICA algorithm.
 
The Country object contains features and classification of a country. cost method used classifier to calculate cost. you should choose your classifier before running the cell.
and with getFeature and setFeature you can change and access the features.


In [4]:
class Country:
    def __init__(self, features):
        self.features = features
        self.classifier = KNeighborsClassifier(n_neighbors = 5) # KNN = KNeighborsClassifier(n_neighbors = 5)
                                                                # RF = RandomForestClassifier(n_estimators = 5)
                                                                # SVM = SVC(gamma = 'scale')
        self._cost = None

    def cost(self):
        if self._cost:
            return self._cost

        feature_indexes = np.array([X.columns[i] for i in np.where(self.features == 1)[0]])
        X_train = X_TRAIN[feature_indexes]
        X_test  = X_TEST[feature_indexes]


        self.classifier.fit(X_train, Y_TRAIN)

        y_pred = self.classifier.predict(X_test)

        self._cost = 1 - f1_score(Y_TEST, y_pred)
        return self._cost

    def getFeatures(self):
        return self.features

    def setFeatures(self, features):
        self._cost = None
        self.features = features

#Empire
This class implements the Empire concept in the ICA algorithm.
 
Each empire contains an imperialist and some colonies.
the total cost of an empire calculated with the "ecost" method.
 
This class has other methods to add and delete a colony or get access to them by getColony, getcolonies and getImperialist methods.


In [5]:
class Empire:
    def __init__(self, imperialist):
        self.imperialist: Country = imperialist
        self.colonies: List[Country] = []

    @property
    def ecost(self):
        colony_count = 1
        if len(self.colonies) > 0: colony_count = len(self.colonies)
        return (self.imperialist.cost() + (np.sum([colony.cost() for colony in self.colonies]) * EMPIRE_COST_RATE)) / colony_count

    def deleteColony(self, index):
        del self.colonies[index]

    def addColony(self, colony):
        if colony.cost() < self.imperialist.cost():
            self.colonies.append(self.imperialist)
            self.imperialist = colony
        else:
            self.colonies.append(colony)

    def getColoniesCount(self):
        return len(self.colonies)


    def getColony(self, index):
        return self.colonies[index]

    def getImperialist(self):
        return self.imperialist

In [6]:
def randomSelection(P):
    r = rn.random()

    C = np.cumsum(P)

    index = [i for i, x in enumerate(C) if r <= x]

    return index[0]

#ICA
Here is the main part of the ICA algorithm.
At first we construct initial countries with the "createCountries" method.
and with "createEmpires' ' we create Empires by choosing imperialists and dividing colonies between them.

In "intraCompetition" for each colony of an empire the "assimilation" and "revolution" are executed and the new cost of colony calculated, after that if the cost was better than its imperialist's cost their places will change.

There is other competition between empires that each time the weakest colony of an weakest empire deleted and given to another one with "interCompetition"method.

These competitions will happen till there's just one empire left.

In [7]:
class Ica:
    def __init__(self, country_count):
        self.countries: List[Country] = []
        self.empires: List[Empire] = []
        self.colonies: List[Country] = []
        self.createCountries(country_count)
        self.createEmpires()

    def createCountries(self, count):
        for _ in range(count):
            features = np.random.choice(2, len(X.columns))
            self.countries.append(Country(features))

    def createEmpires(self):
        costs = []
        for country in self.countries:
            costs.append(country.cost())
        costs = np.array(costs)

        sorted_indicates = np.argsort(costs)

        new_countries = []
        for i in sorted_indicates:
            new_countries.append(self.countries[i])
        new_countries = np.array(new_countries)
        self.countries = new_countries

        imperialists = self.countries[:IMPERIALIST_COUNT]
        self.colonies = self.countries[IMPERIALIST_COUNT:]

        for i in imperialists:
            self.empires.append(Empire(i))

        empires_costs = np.array([np.sum(empire.ecost) for empire in self.empires])

        P = np.absolute(np.divide(empires_costs, np.sum(empires_costs)))

        for colony in self.colonies:
            k = randomSelection(P)
            self.empires[k].addColony(colony)

    def assimilation(self, colony, colony_vector, imperialist_vector):

        S = np.random.choice([0, 1], size=len(colony_vector), p=[.4, .6])  

        k =  np.where(S == 1) #mappaing key.
        v = np.array(imperialist_vector)[k]

        for key,val in zip(k[0],v):
            colony_vector[key] = val

        colony.setFeatures(colony_vector)

    def revolution(self, colony):
        colony_vector = colony.getFeatures()

        for i in range(len(colony_vector)): 

            if rn.random() < MUTATION_RATE:
                # flip the bit
                colony_vector[i] = 1 - colony_vector[i]

        colony.setFeatures(colony_vector)

    def intraCompetition(self): # within 
        global intraCompetition_part1_time, intraCompetition_part2_time

        for empire in self.empires:
            for colony in empire.colonies:

                imperialist = empire.getImperialist()

                self.assimilation(colony, colony.getFeatures(), imperialist.getFeatures())

                if rn.random() < P_REVOLUTION:
                    self.revoulotion(colony)
                
                if colony.cost() < imperialist.cost():
                    empire.imperialist = colony
                    empire.colonies[empire.colonies.index(colony)] = imperialist        
        

    def interCompetition(self): # between empires
        if len(self.empires) == 1:
            return self.empires[0]

        empiresTotalCost = np.array([empire.ecost for empire in self.empires])

        weakest_empire_index = np.argmax(empiresTotalCost)
        weakest_empire = self.empires[weakest_empire_index]

        P = np.divide(empiresTotalCost, empiresTotalCost.sum())
        P = np.flip(P, 0)

        if weakest_empire.getColoniesCount() > 0:

            weakest_empire_colonies_cost = np.array([colony.cost() for colony in weakest_empire.colonies])

            weakest_colony_index = np.argmax(weakest_empire_colonies_cost)
            weakest_colony = weakest_empire.getColony(weakest_colony_index)

            winning_empire_index = randomSelection(P)
            winning_empire = self.empires[winning_empire_index]

            winning_empire.addColony(weakest_colony)

            weakest_empire.deleteColony(weakest_colony_index)

            print("an Empire lost a colony", end='\n')


        elif weakest_empire.getColoniesCount() == 0:

            winning_empire_index = randomSelection(P)
            winning_empire = self.empires[winning_empire_index]

            winning_empire.addColony(weakest_empire.getImperialist())

            del self.empires[self.empires.index(weakest_empire)]

            print("***************************")
            print(f"An empire is downfall... features: {weakest_empire.imperialist.features}, cost: {weakest_empire.imperialist.cost()}")
            print("***************************", end='\n\n')


        time.sleep(.100)
        self.intraCompetition()
        self.interCompetition()

#Parameters


In [8]:
COUNTRY_COUNT = 20 # number of countries => population size
IMPERIALIST_COUNT = 5 # number of imperialists

P_REVOLUTION =0.1 # revolution probability
MUTATION_RATE = 1.0 / len(X.columns)

EMPIRE_COST_RATE = 0.01 # colonies min cost coefficient -> zeta

X_TRAIN, X_TEST, Y_TRAIN, Y_TEST = train_test_split(X, y)


In [None]:
if __name__ == "__main__":
    start = time.time()
    ica = Ica(COUNTRY_COUNT)

    ica.intraCompetition()
    ica.interCompetition()

    empire = ica.empires[0]
    print(f"Empire (features: {empire.imperialist.features}, cost: {empire.imperialist.cost()}) is winner....****")
    print(f"total_time = {time.time() - start}")

cost with all features

In [None]:
features = np.array([1] * len(X.columns))
c = Country(features)
print(c.cost())