In [1]:
import numpy as np
import pandas as pd
from operator import attrgetter
from sklearn.linear_model import LogisticRegression

In [2]:
Data = pd.read_excel('./Midterm_Data/Data.xlsx') # training data
Test1 = pd.read_excel('./Midterm_Data/Test1.xlsx') # validation data
Test2 = pd.read_excel('./Midterm_Data/Test2.xlsx') # testing data

Data.fillna(Data.mean(), inplace=True)
Data # training data

Unnamed: 0,F01,F02,F03,F04,F05,F06,F07,F08,F09,F10,...,R01,R02,R03,R04,R05,C01,C02,C03,C04,C05
0,18393.0,9357.0,13397.0,16697.0,26431.0,25851.0,9405.0,18202.0,17282.0,13523.0,...,12638.0,15667.0,30694.0,12833.0,11818.0,0,0,0,1,0
1,18745.0,9691.0,14468.0,16775.0,26356.0,26274.0,9416.0,19742.0,16728.0,13210.0,...,12964.0,15611.0,31138.0,14006.0,13198.0,0,0,0,1,0
2,18331.0,9457.0,14252.0,17275.0,25436.0,25361.0,9345.0,19305.0,16663.0,13789.0,...,13140.0,15346.0,31209.0,13300.0,13156.0,0,0,0,1,0
3,18628.0,10163.0,15165.0,16941.0,25885.0,26406.0,10100.0,20111.0,17331.0,14476.0,...,13369.0,15816.0,32021.0,13851.0,14921.0,0,0,0,1,0
4,19934.0,10123.0,15345.0,16771.0,25747.0,26557.0,9946.0,20888.0,18319.0,15910.0,...,13607.0,15973.0,32121.0,14579.0,15040.0,0,0,0,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4235,19666.5,8073.0,14092.5,15596.0,21840.0,21547.0,7234.5,14053.5,15365.5,14248.0,...,10207.5,15066.0,30719.0,10697.0,10252.0,0,0,0,0,0
4236,21793.0,9445.0,16975.5,17308.0,24248.0,24396.5,8801.0,16224.0,18726.0,15883.5,...,12337.5,17731.0,33451.5,12386.0,11616.0,0,0,0,0,0
4237,21674.0,9371.0,17518.0,16757.0,25124.0,23335.5,8349.0,16361.0,17887.0,15855.5,...,12484.5,17338.0,32867.0,11824.0,11849.0,0,0,0,0,0
4238,20922.5,9080.0,17082.0,16467.0,23651.0,24128.0,7916.0,16153.0,17400.5,15250.0,...,12195.0,16850.5,33134.0,12155.0,11179.0,0,0,0,0,0


# training data preprocessing

- 1 way ratio: 0 ~ 94 (xi)
- 2 way ratio: 95 ~ 4559 (xi * xj)

In [3]:
# Data_bio_ratio
x = pd.DataFrame(index=range(Data.shape[0]),columns=range(95))
for i in range(19):
    for j in range(5):
        if i < 9:
            x[i * 5 + j] = Data[f'F0{i+1}'] / Data[f'R0{j+1}']
        else:
            x[i * 5 + j] = Data[f'F{i+1}'] / Data[f'R0{j+1}']
            
one_way = x.to_numpy()
two_way = np.zeros((4240, 95 * 94 // 2))
count = 0
for i in range(one_way.shape[1]):
    for j in range(i+1, one_way.shape[1]):
        two_way[:, count] = one_way[:, i] * one_way[:, j]
        count += 1
x_train = np.hstack((one_way, two_way))
print("x_train.shape: ", x_train.shape)

# Data disease
y_train = Data[['C01','C02','C03','C04','C05']].to_numpy().T
print("y_train.shape: ", y_train.shape)

x_train.shape:  (4240, 4560)
y_train.shape:  (5, 4240)


# validation data preprocessing

In [4]:
# Test1_bio_ratio
x = pd.DataFrame(index=range(Test1.shape[0]),columns=range(95))
for i in range(19):
    for j in range(5):
        if i < 9:
            x[i * 5 + j] = Test1[f'F0{i+1}'] / Test1[f'R0{j+1}']
        else:
            x[i * 5 + j] = Test1[f'F{i+1}'] / Test1[f'R0{j+1}']
            
one_way = x.to_numpy()
two_way = np.zeros((Test1.shape[0], 95 * 94 // 2))
count = 0
for i in range(one_way.shape[1]):
    for j in range(i+1, one_way.shape[1]):
        two_way[:, count] = one_way[:, i] * one_way[:, j]
        count += 1
x_val = np.hstack((one_way, two_way))
print("x_val.shape: ", x_val.shape)

# Data disease
y_val = Test1[['C01','C02','C03','C04','C05']].to_numpy().T
print("y_val.shape: ", y_val.shape)

x_val.shape:  (475, 4560)
y_val.shape:  (5, 475)


# GA

計算整個 population 裡每個 chromosome 的 fitness 到一個 list 裡

In [5]:
def calculate_population_fitness(population, d_num):
    fitness_list = []
    for j in range(len(population)):
        acc_sum = 0
        clf = LogisticRegression(max_iter=400, tol=1, solver='saga').fit(x_train[:, population[j]], y_train[d_num])
        acc_sum += clf.score(x_val[:, population[j]], y_val[d_num])
        fitness_list.append(acc_sum)
    return fitness_list

In [6]:
def run_GA(which_diagnosis):
    
    # initialize population
    population = []
    fitness = []

    GA_avg_acc_list = []
    GA_best_acc_list = []

    for i in range(population_size):
        population.append(np.random.choice(x_train.shape[0], chromosome_length, replace=False))
    
    # calculate fitness
    fitness = calculate_population_fitness(population, which_diagnosis)

    # start
    for gen in range(generation):

        # produce offspring
        new_population = []

        for i in range(2):

            arr = np.arange(population_size)
            np.random.shuffle(arr)

            # Tournament Selection Without Replacement
            for j in range(int(len(arr) / 4)):

                # parent 1
                if fitness[arr[j * 4]] > fitness[arr[j * 4 + 1]]:
                    parent1 = arr[j * 4]
                else:
                    parent1 = arr[j * 4 + 1]

                # parent 2
                if fitness[arr[j * 4 + 2]] > fitness[arr[j * 4 + 3]]:
                    parent2 = arr[j * 4 + 2]
                else:
                    parent2 = arr[j * 4 + 3]


                # uniform crossover
                which_parent = np.random.uniform(size = chromosome_length)

                offspring1 = np.zeros(chromosome_length)
                offspring2 = np.zeros(chromosome_length)

                for k in range(chromosome_length):
                    if which_parent[k] > 0.5:
                        offspring1[k] = population[parent1][k]
                        offspring2[k] = population[parent2][k]
                    else:
                        offspring1[k] = population[parent2][k]
                        offspring2[k] = population[parent1][k]

                # mutation
                prob = np.random.uniform()
                if prob < 0.05:
                    mutation_gene_index = np.random.randint(chromosome_length)
                    mutation_gene_value = np.random.choice(x_train.shape[0])
                    offspring1[mutation_gene_index] = mutation_gene_value
                elif prob > 0.95:
                    mutation_gene_index = np.random.randint(chromosome_length)
                    mutation_gene_value = np.random.choice(x_train.shape[0])
                    offspring2[mutation_gene_index] = mutation_gene_value

                # add offspring to new population
                new_population.append(offspring1.astype(int))
                new_population.append(offspring2.astype(int))

        population = new_population

        # calculate fitness
        fitness = calculate_population_fitness(population, which_diagnosis)

        if gen % 50 == 0:
            print(f'GA {gen}th')
        GA_best_acc_list.append(max(fitness))
        GA_avg_acc_list.append(sum(fitness) / len(fitness))

    return population[fitness.index(max(fitness))], GA_best_acc_list, GA_avg_acc_list

# SIB

In [7]:
def calculate_fitness(chromosome, d_num):
    clf = LogisticRegression(max_iter=400, tol=1, solver='saga').fit(x_train[:, chromosome], y_train[d_num])
    return clf.score(x_val[:, chromosome], y_val[d_num])

def mix(best, particle, num):
    tmp = np.copy(particle)
    idx = np.random.choice(chromosome_length, num, replace=False)

    # delete num units
    tmp = np.delete(tmp, idx)
    
    # add num units
    tmp = np.append(tmp, best[idx])
    
    return tmp

In [8]:
class particle:
    def __init__(self, diagnosis_num):
        self.chromosome = np.random.choice(x_train.shape[0], chromosome_length, replace=False)
        self.fitness = calculate_fitness(self.chromosome, diagnosis_num)
        self.LB_chromosome = self.chromosome
        self.LB_fitness = self.fitness

In [9]:
def run_SIB(diagnosis_num):
    qLB = 3
    qGB = 3

    SIB_avg_acc_list = []
    SIB_best_acc_list = []
    
    population = [particle(diagnosis_num) for i in range(population_size)]
    GB = max(population, key=attrgetter('LB_fitness')) # assign max fitness particle to GB

    
    for i in range(generation):

        
        for j in range(population_size):

            # MIX
            
            # produce mixwLB
            mixwLB = mix(population[j].LB_chromosome, population[j].chromosome, qLB)
            mixwLB_fitness = calculate_fitness(mixwLB, diagnosis_num)

            # produce mixwGB
            mixwGB = mix(GB.chromosome, population[j].chromosome, qGB)
            mixwGB_fitness = calculate_fitness(mixwGB, diagnosis_num) 


            # MOVE

            particle_change_flag = False
            # if either mixwGB or mixwLB has the best value evaluated from the objective function
                # replace X by the best candidate
            if mixwGB_fitness >= population[j].fitness:
                population[j].chromosome = mixwGB
                population[j].fitness = mixwGB_fitness
                particle_change_flag = True

            if mixwLB_fitness >= population[j].fitness:
                population[j].chromosome = mixwLB
                population[j].fitness = mixwLB_fitness
                particle_change_flag = True

                
            # if both mixwGB and mixwLB are worse than X, 
            
                # some units of X are randomly chosen and replaced by some other random units
                # mixRC
            if (mixwLB_fitness < population[j].fitness) and (mixwGB_fitness < population[j].fitness):
                pass # do nothing

            
            # 用 mixwGB 或 mixwGB 或 mixwGB 來更新 LB 和 GB
            # update LB
            if particle_change_flag and (population[j].fitness >= population[j].fitness):
                population[j].LB_chromosome = np.copy(population[j].LB_chromosome)
                population[j].LB_fitness = population[j].fitness

                
        # update GB
        GB = max(population, key=attrgetter('LB_fitness')) # assign max fitness particle to GB
        if i % 50 == 0:
            print(f'SIB {i}th')
        
        tmp = sum(i.fitness for i in population) / population_size
        SIB_avg_acc_list.append(tmp)
        SIB_best_acc_list.append(GB.fitness)

#     print('\nSIB_best_acc_list: ', SIB_best_acc_list)
#     print('SIB_avg_acc_list: ', SIB_avg_acc_list)

    return GB.chromosome, SIB_best_acc_list, SIB_avg_acc_list
    

# Simulated Annealing

In [10]:
def muatation(current):
    
    mutation_gene_count = 3
    tmp = np.copy(current)
    
    idx = np.random.choice(chromosome_length, mutation_gene_count, replace=False)
    val = np.random.choice(x_train.shape[1], mutation_gene_count, replace=False)
    
    for i in range(mutation_gene_count):
        tmp[idx[i]] = val[i]
        
    return tmp

def get_temperature(gen):
    
    parameter = 0.0008
    return ((1 - parameter) ** gen)

def mutation_prob(gen, dE):
    return np.exp(dE / get_temperature(gen))


In [14]:
def run_SA(which_diagnosis):
    
    SA_current_acc_list = []
    SA_best_acc_list = []
    
    # initialize first chromosome
    current_chromosome = np.random.choice(x_train.shape[0], chromosome_length, replace=False)
    current_fitness = calculate_fitness(current_chromosome, which_diagnosis)

    best_chromosome = np.copy(current_chromosome)
    best_fitness = current_fitness

    
    gen = population_size * generation
    
    for i in range(gen):

        new_chromosome = muatation(current_chromosome)
        new_fitness = calculate_fitness(new_chromosome, which_diagnosis)

        dE = new_fitness - current_fitness
        
        if dE >= 0:
            current_chromosome = np.copy(new_chromosome)
            current_fitness = new_fitness

            if current_fitness >= best_fitness:
                best_chromosome = np.copy(new_chromosome)
                best_fitness = new_fitness
        
        # dE < 0
        elif dE >= -0.05:

            if np.random.uniform() < mutation_prob(i, dE):
                current_chromosome = np.copy(new_chromosome)
                current_fitness = new_fitness
    
        if i % (population_size * 50) == 0:
            print(f'SA {i}th')
            
        if i % (population_size) == 0:
            SA_current_acc_list.append(current_fitness)
            SA_best_acc_list.append(best_fitness)

    return best_chromosome, SA_best_acc_list, SA_current_acc_list


# Run experiment

In [16]:
population_size = 100
chromosome_length = 30
generation = 1001

GA_best_chromosome_list = []
SIB_best_chromosome_list = []
SA_best_chromosome_list = []

GA_best_acc_list = []
SIB_best_acc_list = []
SA_best_acc_list = []

GA_avg_acc_list = []
SIB_avg_acc_list = []
SA_current_acc_list = []

for diagnosis_num in range(y_train.shape[0]): # y_train.shape[0] == number of diagnosis
    
    print(f'diagnosis_num: {diagnosis_num}')
    
    # GA
    GA_best_chromosome, GA_best_acc, GA_avg_acc = run_GA(diagnosis_num)
#     GA_acc_sum += GA_best_acc[-1]
    GA_best_chromosome_list.append(list(GA_best_chromosome))
    GA_best_acc_list.append(GA_best_acc)
    GA_avg_acc_list.append(GA_avg_acc)
    print('GA')

    # SIB
    SIB_best_chromosome, SIB_best_acc, SIB_avg_acc = run_SIB(diagnosis_num)
#     SIB_acc_sum += SIB_best_acc[-1]
    SIB_best_chromosome_list.append(list(SIB_best_chromosome))
    SIB_best_acc_list.append(SIB_best_acc)
    SIB_avg_acc_list.append(SIB_avg_acc)
    print('SIB')
    
    # SA
    SA_best_chromosome, SA_best_acc, SA_current_acc = run_SA(diagnosis_num)
#     SA_acc_sum += SA_best_acc[-1]
    SA_best_chromosome_list.append(list(SA_best_chromosome))
    SA_best_acc_list.append(SA_best_acc)
    SA_current_acc_list.append(SA_current_acc)
    print('SA')

diagnosis_num: 0
GA 0th


KeyboardInterrupt: 

# write file

In [None]:
# best_chromosome
with open('./experiment_data/GA_best_chromosome_list.txt', 'w') as f:
    for i in GA_best_chromosome_list:
        f.write("%s\n" % i)
        
with open('./experiment_data/SIB_best_chromosome_list.txt', 'w') as f:
    for i in SIB_best_chromosome_list:
        f.write("%s\n" % i)
        
with open('./experiment_data/SA_best_chromosome_list.txt', 'w') as f:
    for i in SA_best_chromosome_list:
        f.write("%s\n" % i)

# best_acc
with open('./experiment_data/GA_best_acc_list.txt', 'w') as f:
    for i in GA_best_acc_list:
        f.write("%s\n" % i)
        
with open('./experiment_data/SIB_best_acc_list.txt', 'w') as f:
    for i in SIB_best_acc_list:
        f.write("%s\n" % i)
        
with open('./experiment_data/SA_best_acc_list.txt', 'w') as f:
    for i in SA_best_acc_list:
        f.write("%s\n" % i)

# avg_acc
with open('./experiment_data/GA_avg_acc_list.txt', 'w') as f:
    for i in GA_avg_acc_list:
        f.write("%s\n" % i)
        
with open('./experiment_data/SIB_avg_acc_list.txt', 'w') as f:
    for i in SIB_avg_acc_list:
        f.write("%s\n" % i)
        
with open('./experiment_data/SA_current_acc_list.txt', 'w') as f:
    for i in SA_current_acc_list:
        f.write("%s\n" % i)