/
selection.py
195 lines (170 loc) · 9.45 KB
/
selection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# -*- coding: utf-8 -*-
"""
Copyright 2016 William La Cava
license: GNU/GPLv3
"""
import numpy as np
import copy
import pdb
from sklearn.metrics import r2_score
from .population import stacks_2_eqns
from few_lib import ep_lex
# from profilehooks import profile
class SurvivalMixin(object):
"""
class for implementing survival methods.
"""
def survival(self,parents,offspring,elite=None,elite_index=None,X=None,F=None,F_offspring=None):
"""routes to the survival method, returns survivors"""
if self.sel == 'tournament':
survivors, survivor_index = self.tournament(parents + offspring, self.tourn_size, num_selections = len(parents))
elif self.sel == 'lexicase':
survivors, survivor_index = self.lexicase(parents + offspring, num_selections = len(parents), survival = True)
elif self.sel == 'epsilon_lexicase':
# survivors, survivor_index = self.epsilon_lexicase(parents + offspring, num_selections = len(parents), survival = True)
if self.lex_size:
sizes = [len(i.stack) for i in (parents + offspring)]
survivor_index = self.epsilon_lexicase(np.vstack((F,F_offspring)), sizes, num_selections = F.shape[0], survival = True)
survivors = [(parents+ offspring)[s] for s in survivor_index]
else:
survivor_index = self.epsilon_lexicase(np.vstack((F,F_offspring)), [], num_selections = F.shape[0], survival = True)
survivors = [(parents+ offspring)[s] for s in survivor_index]
elif self.sel == 'deterministic_crowding':
survivors, survivor_index = self.deterministic_crowding(parents,offspring,X,X_offspring)
elif self.sel == 'random':
# pdb.set_trace()
survivor_index = np.random.permutation(np.arange(2*len(parents)))[:len(parents)]
survivors = [(parents + offspring)[s] for s in survivor_index]
# elitism
if self.elitism:
if min([x.fitness for x in survivors]) > elite.fitness:
# if the elite individual did not survive and elitism is on, replace worst individual with elite
rep_index = np.argmax([x.fitness for x in survivors])
survivors[rep_index] = elite
survivor_index[rep_index] = elite_index
# return survivors
return survivors,survivor_index
def tournament(self,individuals,tourn_size, num_selections=None):
"""conducts tournament selection of size tourn_size"""
winners = []
locs = []
if num_selections is None:
num_selections = len(individuals)
for i in np.arange(num_selections):
# sample pool with replacement
pool_i = np.random.choice(len(individuals),size=tourn_size)
pool = []
for i in pool_i:
pool.append(np.mean(individuals[i].fitness))
# winner
locs.append(pool_i[np.argmin(pool)])
winners.append(copy.deepcopy(individuals[locs[-1]]))
return winners,locs
def lexicase(self,individuals, num_selections=None, epsilon = False, survival = False):
"""conducts lexicase selection for de-aggregated fitness vectors"""
if num_selections is None:
num_selections = len(individuals)
winners = []
locs = []
if epsilon: # use epsilon lexicase selection
# calculate epsilon thresholds based on median absolute deviation (MAD)
mad_for_case = np.empty([len(individuals[0].fitness_vec),1])
global_best_val_for_case = np.empty([len(individuals[0].fitness_vec),1])
for i in np.arange(len(individuals[0].fitness_vec)):
mad_for_case[i] = self.mad(np.asarray(list(map(lambda x: x.fitness_vec[i], individuals))))
global_best_val_for_case[i] = min(map(lambda x: x.fitness_vec[i], individuals))
# convert fitness values to pass/fail based on epsilon distance
for I in individuals:
fail_condition = np.array(I.fitness_vec > global_best_val_for_case[:,0] + mad_for_case[:,0]) #[f > global_best_val_for_case+mad_for_case for f in I.fitness_vec]
I.fitness_vec = fail_condition.astype(int)
for i in np.arange(num_selections):
candidates = individuals
can_locs = range(len(individuals))
cases = list(np.arange(len(individuals[0].fitness_vec)))
np.random.shuffle(cases)
# pdb.set_trace()
while len(cases) > 0 and len(candidates) > 1:
# get best fitness for case among candidates
# print("candidates:",stacks_2_eqns(candidates),"locations:",can_locs)
# print("fitnesses for case "+str(cases[0])+":",[x.fitness_vec[cases[0]] for x in candidates])
best_val_for_case = min([x.fitness_vec[cases[0]] for x in candidates])
# print("best_val_for_case:",best_val_for_case)
# filter individuals without an elite fitness on this case
# tmp_c,tmp_l = zip(*((x,l) for x,l in zip(candidates,can_locs) if x.fitness_vec[cases[0]] == best_val_for_case))
candidates,can_locs = zip(*((x,l) for x,l in zip(candidates,can_locs) if x.fitness_vec[cases[0]] == best_val_for_case))
cases.pop(0)
choice = np.random.randint(len(candidates))
winners.append(copy.deepcopy(candidates[choice]))
locs.append(can_locs[choice])
if survival: # filter out winners from remaining selection pool
individuals = list(filter(lambda x: x.stack != candidates[choice].stack, individuals))
return winners, locs
def epsilon_lexicase(self, F, sizes, num_selections=None, survival = False):
"""conducts epsilon lexicase selection for de-aggregated fitness vectors"""
# pdb.set_trace()
if self.c: # use c library
# define c types
locs = np.empty(num_selections,dtype='int32',order='F')
# self.lib.epsilon_lexicase(F,F.shape[0],F.shape[1],num_selections,locs)
if self.lex_size:
ep_lex(F,F.shape[0],F.shape[1],num_selections,locs,self.lex_size,np.array(sizes))
else:
ep_lex(F,F.shape[0],F.shape[1],num_selections,locs,self.lex_size,np.array([]))
return locs
else: # use python version
if num_selections is None:
num_selections = F.shape[0]
locs = []
individual_locs = np.arange(F.shape[0])
# calculate epsilon thresholds based on median absolute deviation (MAD)
mad_for_case = np.array([self.mad(f) for f in F.transpose()])
for i in np.arange(num_selections):
can_locs = individual_locs
cases = list(np.arange(F.shape[1]))
np.random.shuffle(cases)
# pdb.set_trace()
while len(cases) > 0 and len(can_locs) > 1:
# get best fitness for case among candidates
best_val_for_case = np.min(F[can_locs,cases[0]])
# filter individuals without an elite fitness on this case
can_locs = [l for l in can_locs if F[l,cases[0]] <= best_val_for_case + mad_for_case[cases[0]]]
cases.pop(0)
choice = np.random.randint(len(can_locs))
locs.append(can_locs[choice])
if survival: # filter out winners from remaining selection pool
individual_locs = [i for i in individual_locs if i != can_locs[choice]]
while len(locs) < num_selections:
locs.append(individual_locs[0])
return locs
def mad(self,x, axis=None):
"""median absolute deviation statistic"""
return np.median(np.abs(x - np.median(x, axis)), axis)
def deterministic_crowding(self,parents,offspring,X_parents,X_offspring):
"""deterministic crowding implementation (for non-steady state).
offspring compete against the parent they are most similar to, here defined as
the parent they are most correlated with.
the offspring only replace their parent if they are more fit.
"""
# get children locations produced from crossover
cross_children = [i for i,o in enumerate(offspring) if len(o.parentid) > 1]
# order offspring so that they are lined up with their most similar parent
for c1,c2 in zip(cross_children[::2], cross_children[1::2]):
# get parent locations
p_loc = [j for j,p in enumerate(parents) if p.id in offspring[c1].parentid]
if len(p_loc) != 2:
pdb.set_trace()
# if child is more correlated with its non-root parent
if r2_score(X_parents[p_loc[0]],X_offspring[c1]) + r2_score(X_parents[p_loc[1]],X_offspring[c2]) < r2_score(X_parents[p_loc[0]],X_offspring[c2]) + r2_score(X_parents[p_loc[1]],X_offspring[c1]):
# swap offspring
offspring[c1],offspring[c2] = offspring[c2],offspring[c1]
survivors = []
survivor_index = []
for i,(p,o) in enumerate(zip(parents,offspring)):
if p.fitness >= o.fitness:
survivors.append(copy.deepcopy(p))
survivor_index.append(i)
else:
survivors.append(copy.deepcopy(o))
survivor_index.append(i+len(parents))
# return survivors along with their indices
return survivors, survivor_index