In [None]:
import numpy as np
from scipy.optimize import minimize

In [None]:
# data generlization
car1 = np.array([
    [0,0.05,0.1,0.05,0.01],
    [0.01,0.1,0.3,0.1,0.01],
    [0.01,0.05,0.1,0.05,0.01],
    [0,0.01,0.01,0.01,0],
    [0,0.01,0.01,0,0]
])

car2 = np.array([
    [0,0.01,0.02,0.01,0.01],
    [0,0.01,0.02,0.11,0.05],
    [0.01,0.02,0.1,0.2,0.1],
    [0.01,0.02,0.1,0.1,0.05],
    [0,0.01,0.01,0.02,0.01]
])


car3 = np.array([
    [0.01,0.01,0,0,0],
    [0.01,0.01,0,0,0],
    [0.05,0.05,0.01,0,0],
    [0.4,0.2,0,0,0],
    [0.2,0.05,0,0,0]
])

car4 = np.array([
    [0,0,0,0,0],
    [0.01,0.01,0.01,0.01,0],
    [0.01,0.01,0.01,0.01,0],
    [0.05,0.1,0.05,0.01,0],
    [0.1,0.5,0.1,0.01,0]
])

In [None]:
def coverage_new(A, p):
    sh = np.array(A.shape)
    dis  = np.linalg.norm(A-p)
    dis_normalized = dis / np.sqrt(sh.prod())
    return 1-dis_normalized

In [None]:
def ws(A):
    # 给定一个order-2信息粒，求对应的ws向量
    
    Ts = A.shape
    ws = np.zeros(Ts)
    
    sum_scripts = np.sum(np.array(Ts))
    
    indexes = np.array(np.where(A == np.max(A)))
#     sh = indexes.shape
#     scripts = np.sum(indexes, axis=0)
#     diffs = np.sum(abs(indexes - np.full_like((4), 2)), axis=0) # 
 
    if indexes.shape[1] > 1: # 多值的情况
        diffs = np.zeros((indexes.shape[1], indexes.shape[1]))
        for i in range(indexes.shape[1]-1):
            for j in range(i+1, indexes.shape[1]):
#                 print(abs(indexes[:,i] - indexes[:,j]))
                diffs[i][j] = np.sum(abs(indexes[:,i] - indexes[:,j]))
#         print(diffs)
        diff_sum = np.zeros((indexes.shape[1]))
        for i in range(indexes.shape[1]):
#             print(diffs[:,i])
#             print(diffs[i,:])
            diff_sum[i] = np.sum(diffs[:,i] + diffs[i,:])

        flag = np.argmin(diff_sum)
#         print(flag)

    #         inde = np.argmin(scripts - sum_scripts/2) # 
        for i in range(Ts[0]):
            for j in range(Ts[1]):
                    epsilon = abs(i-indexes[0][flag]) + abs(j-indexes[1][flag])   # 为1
                    ws[i][j] = 1 - epsilon * (1/8)
    
    else: # 单值情况
        for i in range(Ts[0]):
            for j in range(Ts[1]):
                    epsilon = abs(i-indexes[0][0]) + abs(j-indexes[1][0])  # 为1
                    ws[i][j] = 1 - epsilon * (1/8)
        
    return ws

In [None]:
def specificity(A):
    m = np.max(A)
    if np.sum(A == m) == 25:
        return 0
    else:
        w = ws(A)
        return (np.sum(A*w) - 0.5) / 0.5

In [None]:
bounds = [(0,1) for i in range(25)]


p = car1


class PSO():
    def __init__(self, pN, dim, max_iter, c1, c2, c3, k, epsilon0):

        self.k = k  ## 近邻个数

        self.c1 = c1  # 加速因子
        self.c2 = c2  # 加速因子
        self.c3 = c3  # 加速因子
        self.pN = pN  # 粒子数量
        self.dim = dim  # 搜索维度
        self.max_iter = max_iter  # 迭代次数

        self.X = np.zeros((self.pN, self.dim))  # 所有粒子的位置：一个pN*dim的矩阵
        self.Xfv = np.full((self.pN, 2), 1.0e78)

        self.gg = np.zeros((self.dim))
        self.gg_fit = np.full((2), 1.0e78)  # 全局最佳适应值
        self.O = np.zeros((max_iter))
        self.coverage = np.zeros((max_iter))
        self.specificity = np.zeros((max_iter))

        self.pbest = np.zeros((self.pN, self.dim))  # 个体经历的最佳位置：一个pN*dim的矩阵
        self.p_fit = np.full((self.pN, 2), 1.0e78)  # 每个个体的历史最佳适应值

        self.epsilon = epsilon0
        self.epsilon0 = epsilon0
    
    def f(self, A):
        
        obj = -(coverage_new(A.reshape((5,5)), p))*(specificity(A.reshape(5,5)))
        h = abs(np.sum(A) - 1)
        
        return obj, h
    
    def cov_sp(self, A):
        
        cov = coverage_new(A.reshape((5,5)), p)
        sp = specificity(A.reshape(5,5))  
        
        return cov, sp

    def fit(self, f1, phi1, f2, phi2, epsilon):
        """
        epsilon约束处理技术
        :param f: 目标函数
        :param X1: 第一个待对比的解
        :param X2: 第二个待对比的解
        :param epsilon: 参数
        :return: 按照epsilon约束处理规则，如果X1优于X2，返回Ture，否则False
        """
        flag = False

        if phi1 <= epsilon and phi2 <= epsilon and f1 < f2:
            flag = True
        if phi1 == phi2 and f1 < f2:
            flag = True
        if phi1 < phi2 and (phi1 > epsilon or phi2 > epsilon):
            flag = True

        return flag 

    def init_Population(self):
        """
        初始化种群
        """
        for i in range(self.pN):
            for j in range(self.dim):
                self.gg[j] = self.pbest[i][j] = self.X[i][j] = bounds[j][0] \
                                                                  + (bounds[j][1] - bounds[j][0]) * np.random.uniform(0,1)
        """
        如果初始epsilon不等于0，初始化epsilon
        """
        if self.epsilon0 != 0:
            phix = []
            for i in range(self.pN):
                res = np.array(self.f(self.X[i])[1::])
                phix.append(np.sum(res[res > 0]))
            phix = sorted(phix)
            self.epsilon0 = phix[int(0.2 * self.pN)]  ###top0.2

        """
        初始化（目标函数，约束违反量）
        """
        for i in range(self.pN):
            res = np.array(self.f(self.X[i]))
            self.Xfv[i][0] = self.p_fit[i][0] = res[0]
            v = res[1::]
            phi = np.sum(v[v > 0])
            self.Xfv[i][1] = self.p_fit[i][1] = phi
            if self.fit(res[0], phi, self.gg_fit[0], self.gg_fit[1], self.epsilon0):
                self.gg = self.X[i].copy()
                self.gg_fit[0] = res[0]
                self.gg_fit[1] = phi

    def iterator(self):
        
        """
        迭代
        """
        for t in range(self.max_iter):
            curreSet = {i for i in range(self.pN)}
            if t < 0.2*self.max_iter:
                self.epsilon = self.epsilon0 * (1 - t / (0.2*self.max_iter))**5
            else:
                self.epsilon = 0

            """
            划分第一个子种群并进化
            """
            indexes = np.full(self.k, 0)
            Uk = np.zeros((self.dim))
            for j in range(self.dim):
                Uk[j] = bounds[j][0] + (bounds[j][1] - bounds[j][0]) \
                        * np.random.uniform(0,1)
            mindistan = 1.0e+70
            minindex = 0

            for i in curreSet:
                distan = np.linalg.norm(self.X[i] - Uk)
                if distan < mindistan:
                    mindistan = distan
                    minindex = i
            curreSet = curreSet - {minindex}
            indexes[0] = minindex

            for j in range(1, self.k):
                mindistan = 1.0e+70
                minindex = 0
                for i in curreSet:
                    distan = np.linalg.norm(self.X[i] - self.pbest[indexes[0]])
                    if distan < mindistan:
                        mindistan = distan
                        minindex = i
                curreSet = curreSet - {minindex}
                indexes[j] = minindex

            """
            进化第一个种群
            """
            for j in range(self.k):
                insets = set(indexes) - {indexes[j]}

                a = np.random.choice(list(insets))
                insets = insets - {a}
                b = np.random.choice(list(insets))
                self.X[indexes[j]] = self.c1 * np.random.uniform(0,1) * self.pbest[indexes[j]] \
                                     + self.c2 * np.random.uniform(0,1) * self.gg \
                                     + self.c3 * np.random.uniform(0,1) * (self.pbest[a] - self.pbest[b])
                """
                控制在搜索范围内
                """
                for d in range(self.dim):
                    if self.X[indexes[j]][d] < bounds[d][0]:
                        self.X[indexes[j]][d] = bounds[d][0]
                    if self.X[indexes[j]][d] > bounds[d][1]:
                        self.X[indexes[j]][d] = bounds[d][1]

                res = np.array(self.f(self.X[indexes[j]]))
                v = res[1::]
                phi = np.sum(v[v > 0])

                if self.fit(res[0], phi, self.p_fit[indexes[j]][0], self.p_fit[indexes[j]][1], self.epsilon):
                    self.p_fit[indexes[j]][0] = res[0]
                    self.p_fit[indexes[j]][1] = phi
                    self.pbest[indexes[j]] = self.X[indexes[j]].copy()
                    if self.fit(res[0], phi, self.gg_fit[0], self.gg_fit[1], self.epsilon):
                        self.gg_fit[0] = res[0]
                        self.gg_fit[1] = phi
                        self.gg = self.X[indexes[j]].copy()


#                 print((self.gg_fit[0], self.gg_fit[1], t * 401 + (j+1)))

            """
            划分其他子种群并进化
            """
            for n in range(1, int(self.pN / self.k)):
                preindexes = indexes
                indexes = np.full(self.k, 0)
                Uk = np.zeros((self.dim))
                for j in range(self.dim):
                    Uk[j] = bounds[j][0] + (bounds[j][1] - bounds[j][0])\
                                   * np.random.uniform(0,1)
                mindistan = 1.0e+70
                minindex = 0

                for i in curreSet:
                    distan = np.linalg.norm(self.X[i] - Uk)
                    if distan < mindistan:
                        mindistan = distan
                        minindex = i
                curreSet = curreSet - {minindex}
                indexes[0] = minindex

                for j in range(1,self.k):
                    mindistan = 1.0e+70
                    minindex = 0
                    for i in curreSet:
                        distan = np.linalg.norm(self.X[i] - self.pbest[indexes[0]])
                        if distan < mindistan:
                            mindistan = distan
                            minindex = i
                    curreSet = curreSet - {minindex}
                    indexes[j] = minindex

                for j in range(self.k):
                    insets = set(indexes) - {indexes[j]}
                    c = np.random.choice(list(insets))

                    self.X[indexes[j]] = self.c1 * np.random.uniform(0,1) * self.pbest[indexes[j]] \
                                         + self.c2 * np.random.uniform(0,1) * self.gg \
                                         + self.c3 * np.random.uniform(0,1) * (self.pbest[preindexes[j]] - self.pbest[c])
                    """
                    控制在搜索范围内
                    """
                    for d in range(self.dim):
                        if self.X[indexes[j]][d] < bounds[d][0]:
                            self.X[indexes[j]][d] = bounds[d][0]
                        if self.X[indexes[j]][d] > bounds[d][1]:
                            self.X[indexes[j]][d] = bounds[d][1]

                    res = np.array(self.f(self.X[indexes[j]]))
                    v = res[1::]
                    phi = np.sum(v[v > 0])

                    if self.fit(res[0], phi, self.p_fit[indexes[j]][0], self.p_fit[indexes[j]][1], self.epsilon):
                        self.p_fit[indexes[j]][0] = res[0]
                        self.p_fit[indexes[j]][1] = phi
                        self.pbest[indexes[j]] = self.X[indexes[j]].copy()
                        if self.fit(res[0], phi, self.gg_fit[0], self.gg_fit[1], self.epsilon):
                            self.gg_fit[0] = res[0]
                            self.gg_fit[1] = phi
                            self.gg = self.X[indexes[j]].copy()


#                     print((self.gg_fit[0], self.gg_fit[1], t * 401 + 10*n + (j+1)))

            """
            记忆种群进化
            """
            for i in range(self.pN):
                indexes = {j for j in range(self.pN)} - {i}
                e = np.random.choice(list(indexes))
                pbest_new = self.pbest[i].copy()
                if self.fit(self.p_fit[i][0], self.p_fit[i][1], self.p_fit[e][0], self.p_fit[e][1], self.epsilon):
                    f = np.random.choice(list(indexes))
                    indexes = indexes - {f}
                    g = np.random.choice(list(indexes))
                    indexes = indexes - {g}
                    h = np.random.choice(list(indexes))
                    for j in range(self.dim):
                        if np.random.uniform(0, 1) > 0.5:
                            pbest_new[j] = self.pbest[f][j] \
                                               + (np.random.uniform(-1, 1)) * (self.pbest[g][j] - self.pbest[h][j])
                else:
                    l = np.random.choice(list(indexes))
                    indexes = indexes - {l}
                    m = np.random.choice(list(indexes))
                    pbest_new = self.pbest[i] + np.random.uniform(0,1) * (self.pbest[e] - self.pbest[i]) \
                                    + np.random.uniform(0,1) * (self.pbest[l] - self.pbest[m])

                """
                控制在搜索范围内
                """
                for d in range(self.dim):
                    if pbest_new[d] < bounds[d][0]:
                        pbest_new[d] = bounds[d][0]
                    if pbest_new[d] > bounds[d][1]:
                        pbest_new[d] = bounds[d][1]

                res = np.array(self.f(pbest_new))
                v = res[1::]
                phi = np.sum(v[v > 0])

                if self.fit(res[0], phi, self.p_fit[i][0], self.p_fit[i][1], self.epsilon):
                    self.p_fit[i][0] = res[0]
                    self.p_fit[i][1] = phi
                    self.pbest[i] = pbest_new.copy()
                    if self.fit(res[0], phi, self.gg_fit[0], self.gg_fit[1], self.epsilon):
                        self.gg_fit[0] = res[0]
                        self.gg_fit[1] = phi
                        self.gg = pbest_new.copy()

#                 print((self.gg_fit[0], self.gg_fit[1], t * 401 + 200 + (i+1)))

            """
            全局最优解的混合
            """
            gg_new = self.gg.copy()
            if np.random.uniform(0,1) < (1 / self.dim):
                dr = np.random.choice([i for i in range(self.dim)])
                gg_new[dr] = self.gg[dr] + (np.random.uniform(0,1)) * (bounds[dr][1] - bounds[dr][0])
                if gg_new[dr] < bounds[dr][0]:
                    gg_new[dr] = bounds[dr][0]
                if gg_new[dr] > bounds[dr][1]:
                    gg_new[dr] = bounds[dr][1]

            res = np.array(self.f(gg_new))
            v = res[1::]
            phi = np.sum(v[v > 0])

            if self.fit(res[0], phi, self.gg_fit[0], self.gg_fit[1], self.epsilon):
                self.gg_fit[0] = res[0]
                self.gg_fit[1] = phi
                self.gg = gg_new.copy()
            
#             self.O = self.gg_fit[0]
            self.coverage[t], self.specificity[t] = self.cov_sp(self.gg)
            self.O[t] = self.coverage[t] * self.specificity[t]
                       
#             print((self.gg_fit[0], self.gg_fit[1], t * 401 + 401))


        return 1


In [None]:
# P = CMPSOWV.PSO(100, G01.dim, 2488, 4.1/3, 4.1/3, 4.1/3, 10, 0)
# def __init__(self, pN, dim, max_iter, c1, c2, c3, k, epsilon0):
# my_pso = PSO(200, 25, 2000, 4.1/3, 4.1/3, 4.1/3, 10, 1)
# my_pso.init_Population()
# my_pso.iterator()

runs = 25
feas = 0
covs = []
sps = []
O = []

for i in range(runs):
    my_pso = PSO(200, 25, 1000, 4.1/3, 4.1/3, 4.1/3, 10, 0)
    my_pso.init_Population()
    my_pso.iterator()
    if my_pso.gg_fit[1] == 0:
        feas += 1
        covs.append(my_pso.coverage[-1])
        sps.append(my_pso.specificity[-1])
        O.append(my_pso.O[-1])
    print(i)

In [None]:
np.max(covs)

In [None]:
np.min(covs)

In [None]:
np.mean(covs)

In [None]:
np.std(covs)

In [None]:
np.max(sps)

In [None]:
np.min(sps)

In [None]:
np.mean(sps)

In [None]:
np.std(sps)

In [None]:
feas