-
Notifications
You must be signed in to change notification settings - Fork 0
/
parameter_fitting_docx.py
212 lines (162 loc) · 6.63 KB
/
parameter_fitting_docx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
''' Authors: Katinka den Nijs & Robin van den Berg '''
import multiprocessing
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
from deap import algorithms
from deap import base
from deap import creator
from deap import tools
from scipy.optimize import fsolve, root
import random
import math
class ForwardEuler(object):
"""
Class for solving a scalar of vector ODE,
du/dt = f(u, t)
by the ForwardEuler solver.
Class attributes:
t: array of time values
u: array of solution values (at time points t)
k: step number of the most recently computed solution
f: callable object implementing f(u, t)
"""
def __init__(self, f):
if not callable(f):
raise TypeError('f is %s, not a function' % type(f))
self.f = lambda u, t, k: np.asarray(f(u, t, k))
def set_initial_condition(self, U0):
if isinstance(U0, (float, int)): # scalar ODE
self.neq = 1
else: # system of ODEs
U0 = np.asarray(U0)
self.neq = U0.size
self.U0 = U0
def solve(self, time_points):
"""Compute u for t values in time_points list."""
self.t = np.asarray(time_points)
n = self.t.size
if self.neq == 1: # scalar ODEs
self.u = np.zeros(n)
else: # systems of ODEs
self.u = np.zeros((n, self.neq))
# Assume self.t[0] corresponds to self.U0
self.u[0] = self.U0
# Time loop
for k in range(n - 1):
self.k = k
self.u[k + 1] = self.advance()
return self.u, self.t
def advance(self):
"""Advance the solution one time step."""
u, f, k, t = self.u, self.f, self.k, self.t
dt = t[k + 1] - t[k]
u_new = u[k] + dt * f(u[k], t[k], k)
u_new = [(i > 0) * i for i in u_new]
return u_new
class IRF4(object):
""" A class containing the parameters, equations and necessary functions for the standard Martinez model """
def __init__(self, beta, p):
self.beta = beta
self.p = p
def equation(self, y):
F = y ** 3 - self.beta * y ** 2 + y - self.p
return F
def calc_zeropoints(self):
self.intersections = root(self.equation, [0., 0., 10])
return sorted(self.intersections.x)
def plot(self, name='test'):
r_list = np.arange(0, 5, 0.001)
drdt = self.equation(r_list)
intersections = root(self.equation, [0., 0., 10]).x
plt.figure()
plt.title("{}: With intersections: {}".format(name, intersections))
plt.plot(r_list, drdt)
plt.scatter(intersections, np.zeros(len(intersections)))
plt.xlim(0, 3)
plt.ylim(-2, 1)
plt.show()
def fitness(ind):
'''
Calculates the fitness of an individual as the mse on the time series
:param ind: an array containing the parameters we want to be fitting
:return: the fitness of an individual, the lower the better tho
'''
model_ind = IRF4(*ind)
intersections = model_ind.calc_zeropoints()
delta_intersec = abs(intersections[2] - intersections[0])
if (ind[0] ** 2 > 3) and (ind[0] ** 3 + (ind[0] ** 2 - 3) ** (3 / 2) + 9 * ind[0] > 27 / 2 * ind[0] - ind[1]) and (
ind[0] ** 3 - (ind[0] ** 2 - 3) ** (3 / 2) + 9 * ind[0] < 27 / 2 * ind[0] - ind[1]) and \
(delta_intersec > 0.1):
return abs(sum(intersections[0] - inter1_data)) + abs(sum(intersections[1] - inter2_data)),
else:
return 10000000,
def generateES(ind_cls, strg_cls, size):
ind = ind_cls(abs(random.gauss(0, 2)) for _ in range(size))
ind.strategy = strg_cls(abs(random.gauss(0, 3)) for _ in range(size))
return ind
def mutate(ind, mu, sigma, indpb, c=1):
size = len(ind)
t = c / math.sqrt(2. * math.sqrt(size))
t0 = c / math.sqrt(2. * size)
n = random.gauss(0, 1)
t0_n = t0 * n
for indx in range(size):
if random.random() < indpb:
ind.strategy[indx] *= math.exp(t0_n + t * random.gauss(0, 1))
ind[indx] += ind.strategy[indx] * random.gauss(0, 1)
ind[indx] = abs(ind[indx])
return ind,
# this needs to be before the if __name__ blabla to make the multiprocessing work
affymetrix_df = pd.read_csv('matrinez_data.csv')
# # replaces df by average of all rows per sample
# affymetrix_df = affymetrix_df.groupby('Sample').agg({'PRDM1':'mean','BCL6':'mean','IRF4':'mean'})
affymetrix_df = affymetrix_df.set_index('Sample')
inter1_data = np.append(affymetrix_df.loc['CB', 'IRF4'].values, affymetrix_df.loc['CC', 'IRF4'].values)
inter2_data = affymetrix_df.loc['PC', 'IRF4'].values
creator.create("Strategy", np.ndarray)
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", np.ndarray, fitness=creator.FitnessMin)
toolbox = base.Toolbox()
IND_SIZE = 2
toolbox.register("evaluate", fitness)
toolbox.register("individual", generateES, creator.Individual, creator.Strategy,
IND_SIZE)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("mate", tools.cxESBlend, alpha=0.1)
toolbox.register("mutate", mutate, mu=0, sigma=1, indpb=1)
toolbox.register("select", tools.selTournament, tournsize=7)
if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
toolbox.register("map", pool.map)
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("min", np.min)
stats.register("max", np.max)
hof = tools.HallOfFame(1, similar=np.array_equal)
hof.clear()
pop = toolbox.population(n=10000)
pop, logbook = algorithms.eaMuPlusLambda(pop, toolbox, mu=10000, lambda_=5000, cxpb=0.1, mutpb=0.90,
ngen=40, stats=stats, halloffame=hof, verbose=True)
print('this is hof: ', hof)
# show what the best solution looks like
best_sol = IRF4(*hof[0])
print(hof[0])
print('Beta and p of our solution {}, {}'.format(best_sol.beta, best_sol.p))
print("Location of the roots: ", best_sol.calc_zeropoints())
print('Fitness of our best solution: ', fitness(hof[0])[0])
best_sol.plot('ours')
mu_r = 0.1
sigma_r = 2.6
CD40 = 0
lambda_r = 1
k_r = 1
beta = mu_r + CD40 + sigma_r / (lambda_r * k_r)
p = -sigma_r / (lambda_r * k_r) + beta
sol_of_martinez = IRF4(beta, p)
print('Beta and p of martinez {}, {}'.format(beta, p))
print("Location of the roots: ", sol_of_martinez.calc_zeropoints())
print("The fitness of the martinez solution: ", fitness([beta, p])[0])
sol_of_martinez.plot('martinez')