/
pdNLS.py
197 lines (150 loc) · 6.88 KB
/
pdNLS.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import pandas as pd
import numpy as np
from six import string_types
from .fitting import get_minimizer, get_confidence_interval
from .aggregation import get_results, get_stats
from .auxiliary import convert_param_dict_to_df
class pdNLS(object):
def __init__(self, data, model_eq=None, groupcols=None, params=None,
xname=None, yname=None, yerr=None,
method='leastsq', sigma=0.95, threads=None):
# Ensure the selected columns aren't in the index
data = data.reset_index()
# Setup the groupby columns
# TODO check that groupcols are in the data, otherwise quit
if groupcols is not None:
if ( (not hasattr(groupcols, '__iter__')) | isinstance(groupcols, string_types) ):
groupcols = [groupcols]
self._groupcols = groupcols
self._ngroupcols = len(groupcols)
self._paramnames = [x['name'] for x in params]
# Dependent and independent variables
# TODO check that xname/yname/yerr are in the data, otherwise quit
self._xname = xname
self._yname = yname
self._yerr = yerr
self._datacols = [xname, yname]
if yerr is not None:
self._datacols += [yerr]
# Append the dataframe of data
self.data = ( data
.reset_index()
[self._groupcols + self._datacols]
.set_index(self._groupcols)
)
# Unique index information
index = ( data[self._groupcols + [xname]]
.groupby(self._groupcols)
.max()
.index
)
self._index = index
self._ngroups = data.index.shape[0]
# Fitting and confidence interval methods
self._method = method
# TODO check that method is leastsq, otherwise quit
if not hasattr(sigma, '__iter__'):
sigma = [sigma]
self._sigma = sigma
self._threads = threads
# Dataframe to hold the fitting objects
self._fitobj = pd.DataFrame(index=self._index)
self._fitobj['model_eq'] = model_eq
# TODO search for NaNs in model column and fill
# Setup parameter dataframe
self._params = params
if (isinstance(self._params, list) and isinstance(self._params[0], dict)):
# params is a list of dictionaries
params_df = convert_param_dict_to_df(self._params,
self._ngroups,
self._index)
elif isinstance(self._params, pd.DataFrame):
# params is a dataframe
params_df = self._params
else:
# TODO make unrecognized parameter format throw an error and quit
print('Parameters should be either a list of dictionaries or a dataframe.')
self._params_df = params_df.fillna(method='ffill')
return
def predict(self, xcalc=None, xnum=10):
xname = self._xname
if (xcalc and (xcalc=='global')):
xmin = self.data[xname].min()
xmax = self.data[xname].max()
xdata = np.linspace(xmin, xmax, xnum)
predict_list = list()
for index in self._fitobj.index.values:
if (xcalc and (xcalc=='local')):
xmin = self.data.loc[index, xname].min()
xmax = self.data.loc[index, xname].max()
xdata = np.linspace(xmin, xmax, xnum)
else:
xdata = (self
.data
.loc[index, xname]
.squeeze()
.values
)
model_eq = (self
._fitobj
.loc[index, 'model_eq']
)
params = (self
.results
.sortlevel(axis=1)
.loc[index, (slice(None), ['value'])]
.squeeze()
.values
)
ydata = model_eq(params, xdata)
if xcalc:
predict_data = pd.DataFrame({'xcalc':xdata, 'ycalc':ydata},
index=pd.Index([index]*len(xdata)))
else:
predict_data = pd.Series(ydata, name='ycalc',
index=pd.Index([index]*len(xdata)))
predict_list.append(predict_data)
return pd.concat(predict_list, axis=0)
def fit(self):
# Perform the minimization
self._fitobj['minimizer'] = get_minimizer(self._index, self._fitobj, self.data, self._params_df,
self._xname, self._yname, self._yerr)
self._fitobj['fitobj'] = ( self._fitobj.minimizer
.apply(lambda x: x.minimize(method=self._method))
)
# TODO make the predict function work
# Predict the y values and calculate residuals
self.data['ycalc'] = predict()
self.data['residuals'] = self.data[self._yname] - self.data['ycalc']
self.data = self.data.sortlevel(axis=0)
# Create the stats dataframe
self.stats = pd.DataFrame(index=self._index)
# dof calculations for confidence intervals
self.stats['nobs'] = ( self
.data
.groupby(level=range(self._ngroupcols))
.size()
)
self.stats['npar'] = len(self._params_df.columns.levels[0])
self.stats['dof'] = self.stats.nobs - self.stats.npar
# Get the confidence intervals
mask = self.stats.npar > 1
if mask.sum() > 0:
self._fitobj['ciobj'] = get_confidence_interval(self._fitobj,
mask,
self._sigma,
self._threads)
else:
self._fitobj['ciobj'] = np.NaN
# The results
self.results = ( get_results(self._fitobj,
self._paramnames,
self._sigma)
.sortlevel(axis=0)
)
# The remaining statistics
self.stats = ( get_stats(self._fitobj.fitobj,
self.stats)
.sortlevel(axis=0)
)
return