In [None]:
#### ------------- PSO optimization for AAindex selection ---------------- ####
import numpy as np
import pandas as pd
import random
import time
from datetime import datetime
import pickle


import use_CNN


In [None]:


### maximize this objective function


def CNN_valid_obj(selected, xtrain, ytrain, xtest, ytest, ratio = 0.0):
    cnn = [32, 3, 1, 2, 0.3]
    temp = [0]*15
    dense = [128,0.5,64,0.5,0,0]
    cnn_struc = cnn+temp+dense
    selected = np.array(selected).astype(bool)
    tmpx = xtrain[:,:,selected].reshape(xtrain.shape[0],xtrain.shape[1],np.sum(selected),1)
    model = use_CNN.make_cnn(cnn_struc, tmpx.shape[1:], verbose=False)
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    history = model.fit(tmpx, ytrain, epochs=20, batch_size=600)
    _, train_result = use_CNN.DL_mcc(model, tmpx, ytrain)

    tmp_testx = xtest[:,:,selected].reshape(xtest.shape[0],xtest.shape[1],np.sum(selected),1)
    #pred_score = model.predict(tmp_testx)
    _, test_result = use_CNN.DL_mcc(model, tmp_testx, ytest)

    return(train_result['MCC']*ratio + (1-ratio)*test_result['MCC'])
def dummy_obj(selected, xtrain, ytrain, xtest, ytest, ratio = 0.5):
    print(ratio)
    idx_pos = 0
    for i in range(len(selected)):
        if selected[i] >=1:
            idx_pos += i
    return(10*idx_pos)
# bound: len of vector
# dimension, how many ones in vector
def test_dup_position(ini_pos, current_i):
    for i in range(ini_pos.shape[0]):
        if i == current_i:
            continue
        if np.array_equal(ini_pos[i], ini_pos[current_i]):
            return(True)
    return(False)
def initial_position_int(bound, dimension, n_output):
    ini_pos = np.zeros((n_output, bound))
    #print("--check: ", bound[1], ", ", dimension)
    for i in range(ini_pos.shape[0]):
        position = random.sample(range(bound), dimension)
        ini_pos[i][position] = 1

        while(test_dup_position(ini_pos, i)):
            position = random.sample(range(bound), dimension)
            ini_pos[i][position] = 1
    return(ini_pos)

#### ---------------- neighbor function ------------------- ####   
def defign_von_neumann_neighbor(swarm):
    r = 0
    c = 0
    l = len(swarm)
    l_sqrt_int = int(np.floor(np.sqrt(l)))
    for i in range(l_sqrt_int, 0, -1):
        if l % i == 0:
            r = i
            c = int(l/r)
            break
    ## print("r ", r, "; c ", c)
    ## neighborhood mesh
    #print("-- mesh dim: ", r, ", ", c)
    mesh = np.zeros((r,c))
    check_mesh = []
    for i in range(l):
        col = int((i+1)%c - 1)
        row = int(np.ceil((i+1)/c) - 1)
        mesh[row][col] = i    ### use location to find idx of swarm
        check_mesh.append((row, col))   ### use idx to find location
    return(mesh, check_mesh)

def find_local_best_von_neumann(swarm, idx, mesh, check_mesh, length):
    row, col = check_mesh[idx]
    local_best = -1     ## local best
    locao_best_pos = []  ## local best location
    ### go through the neighbor of idx
    for i in range(row - length, row + length +1):
        for j in range(col - length, col + length + 1):
            if i < 0: 
                i += mesh.shape[0]
            elif i >= mesh.shape[0]:
                i = i - mesh.shape[0] 
            if j <0:
                j += mesh.shape[1]
            elif j >= mesh.shape[1]:
                j = j - mesh.shape[1]
            dist = np.absolute(row + col - i - j) 
            
            ## update local best
            if dist <= length:
                target = int(mesh[i][j])
                if local_best < swarm[target].best_value:
                    local_best = swarm[target].best_value
                    local_best_pos = swarm[target].pos_best.copy()
    return(local_best_pos, local_best)

### ------------------- Particle ------------------ ###
class Particle:
    def __init__(self,x0):
        self.position=[]          # particle position, binary vector
        self.velocity=[]          # particle velocity
        self.pos_best=[]          # best position individual
        self.best_value=-1          # best error individual
        self.value=-1               # error individual
        
        
        for i in range(0, len(x0)):
            self.velocity.append(random.uniform(-1,1))
            self.position.append(x0[i])

    # evaluate current fitness
    def evaluate(self, costFunc, xtrain, ytrain, xtest, ytest, ratio):
        self.value = costFunc(self.position, xtrain, ytrain, xtest, ytest, ratio)

        # check to see if the current position is an individual best
        if self.value > self.best_value or self.best_value==-1:
            self.pos_best=self.position
            self.best_value=self.value

    # update new particle velocity
    def update_velocity(self, best_global_pos):
        w=0.5       # constant inertia weight (how much to weigh the previous velocity)
        c1=2        # cognative constant
        c2=2        # social constant

        for i in range(0, len(self.position)):
            r1 = random.random()
            r2 = random.random()

            vel_cognitive = c1*r1*(self.pos_best[i]-self.position[i])
            vel_social = c2*r2*(best_global_pos[i]-self.position[i])
            self.velocity[i] = w*self.velocity[i]+vel_cognitive+vel_social

    # update the particle position based off new velocity updates
    def update_position(self, dimension):
        tmp_pos = np.array(self.position) + np.array(self.velocity)
        sort_idx = np.argsort(tmp_pos)[::-1]
        
        for i in range(len(self.position)):
            if i in sort_idx[0:dimension]:
                self.position[i]=1
            else:
                self.position[i]=0

#### ---------------- PSO main function ------------------- ####

def PSO_feature_selection(costFunc, dimension, bound, xtrain, ytrain, xtest, ytest, tv_ratio, num_particles = 81, 
                          maxiter = 200, verbose = False, use_neighbor = True):
    length = 1        # neighborhood dist to be checked
    best_value = -1   # best global value/MCC
    best_pos = []     # best global position
    best_trend = []   # the trend of best value over iteration
    
    ## ---- initialize position of particles ---- ##
    
    ini_pos = initial_position_int(bound, dimension, num_particles)
    swarm = []
    for i in range(num_particles):
        swarm.append(Particle(ini_pos[i]))
        
    ## ---- index to find neighbor ---- ##
    mesh, check_mesh = defign_von_neumann_neighbor(swarm)
    
    ## ---- optimize ---- ##
    for i in range(maxiter):
        # cycle through particles in swarm and evaluate fitness
        for j in range(0,num_particles):
            swarm[j].evaluate(costFunc, xtrain, ytrain, xtest, ytest, tv_ratio)

            # determine if current particle is the best (globally)
            if not use_neighbor:
                if swarm[j].value > best_global_value or best_global_value == -1:
                    best_pos = swarm[j].position.copy()
                    best_value = float(swarm[j].value)
                
        # cycle through swarm and update velocities and position
        for j in range(0,num_particles):
            if use_neighbor:
                best_pos, best_value = find_local_best_von_neumann(swarm, j, mesh, check_mesh, length)
            swarm[j].update_velocity(best_pos)
            swarm[j].update_position(dimension)
            
        tmp_v = -1
        for j in range(0,num_particles):    
            if swarm[j].best_value > tmp_v:
                tmp_v = swarm[j].best_value
        best_trend.append(tmp_v)
        if i>=150 and len(best_trend)>2:
            if np.absolute(best_trend[-1] - best_trend[-2]) < 0.000001:
                break
        if verbose:
            print("-- PSO iteration: ", i)
            print("---- Global best value: ", tmp_v)
            # for j in range(num_particles):
            #     print("------ Swarm ", j, ", best: ", swarm[j].best_value, ", value: ", swarm[j].value)
    best_pos = []
    for i in range(len(swarm)):
        if swarm[i].best_value == best_trend[-1]:
            best_pos = swarm[i].pos_best.copy()
            break
    
    return(best_trend, best_pos)



    
def run_pso_optimization():
    """
    this function runs PSO on randomized y, in order to prove non-overfit of the model
    required by reviewers
    """

    tv_ratio = 0.00
    ### 
    # the data contains all aaindex
    # 1._, -> Virus and antiserum names
    # 2._, -> Time range: [1968, 2017] -- Selected data size:  6166
    # 3._, -> antigenicity of 2._
    # x_train -> Time range: [1968, 2012] -- Selected data size:  6113
    # y_train -> antigenicity of x_train
    # x_test -> Time range: [2012, 2017] -- Selected data size:  53
    # y_test -> antigenicity of x_test
    with open("./data/antigenicity_data.pkl",'rb') as f:
        _,_,_,x_train, y_train, x_test, y_test = pickle.load(f)
    print(x_train.shape, y_train.shape, x_test.shape, y_test.shape)

    best_trend, best_pos = PSO_feature_selection(CNN_valid_obj, 10, x_train.shape[2], 
        x_train, y_train, x_test, y_test, tv_ratio, maxiter = 1, verbose = True)
    

    now = str(datetime.now())

    best_trend_file = "../data/output/PSO_aaindex23_3d_sort_best_trend_"+now+".txt"
    best_pos_file = "../data/output/PSO_aaindex23_3d_sort_best_pos_"+now+".txt"
    
    
    FOUT = open(best_trend_file, "w")
    FOUT.write("\t".join(str(x) for x in best_trend))
    FOUT.close()
    
    FOUT = open(best_pos_file, "w")
    FOUT.write("\t".join(str(x) for x in best_pos))
    FOUT.close()





In [None]:
run_pso_optimization()