-
Notifications
You must be signed in to change notification settings - Fork 40
/
routines.py
569 lines (469 loc) · 18.2 KB
/
routines.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
from __future__ import division
from builtins import map
from builtins import next
from builtins import range
from past.utils import old_div
import mdp
# import numeric module (scipy, Numeric or numarray)
numx, numx_rand, numx_linalg = mdp.numx, mdp.numx_rand, mdp.numx_linalg
numx_description = mdp.numx_description
import random
import itertools
import sys
# python <3.5/>=3.5 compatibility
if sys.version_info >= (3, 5):
from inspect import formatannotation
def inspect_formatargspec(
args, varargs=None, varkw=None, defaults=None,
kwonlyargs=(), kwonlydefaults={}, annotations={},
formatarg=str,
formatvarargs=lambda name: '*' + name,
formatvarkw=lambda name: '**' + name,
formatvalue=lambda value: '=' + repr(value),
formatreturns=lambda text: ' -> ' + text,
formatannotation=formatannotation):
"""Copy formatargspec from python 3.7 standard library.
Python 3 has deprecated formatargspec and requested that Signature
be used instead, however this requires a full reimplementation
of formatargspec() in terms of creating Parameter objects and such.
Instead of introducing all the object-creation overhead and having
to reinvent from scratch, just copy their compatibility routine.
Utimately we would need to rewrite our "decorator" routine completely
which is not really worth it right now, until all Python 2.x support
is dropped.
"""
def formatargandannotation(arg):
result = formatarg(arg)
if arg in annotations:
result += ': ' + formatannotation(annotations[arg])
return result
specs = []
if defaults:
firstdefault = len(args) - len(defaults)
for i, arg in enumerate(args):
spec = formatargandannotation(arg)
if defaults and i >= firstdefault:
spec = spec + formatvalue(defaults[i - firstdefault])
specs.append(spec)
if varargs is not None:
specs.append(formatvarargs(formatargandannotation(varargs)))
else:
if kwonlyargs:
specs.append('*')
if kwonlyargs:
for kwonlyarg in kwonlyargs:
spec = formatargandannotation(kwonlyarg)
if kwonlydefaults and kwonlyarg in kwonlydefaults:
spec += formatvalue(kwonlydefaults[kwonlyarg])
specs.append(spec)
if varkw is not None:
specs.append(formatvarkw(formatargandannotation(varkw)))
result = '(' + ', '.join(specs) + ')'
if 'return' in annotations:
result += formatreturns(formatannotation(annotations['return']))
return result
else:
from inspect import formatargspec as inspect_formatargspec
def timediff(data):
"""Returns the array of the time differences of data."""
# this is the fastest way we found so far
return data[1:]-data[:-1]
def refcast(array, dtype):
"""
Cast the array to dtype only if necessary, otherwise return a reference.
"""
dtype = mdp.numx.dtype(dtype)
if array.dtype == dtype:
return array
return array.astype(dtype)
def scast(scalar, dtype):
"""Convert a scalar in a 0D array of the given dtype."""
return numx.array(scalar, dtype=dtype)
def rotate(mat, angle, columns=(0, 1), units='radians'):
"""
Rotate in-place data matrix (NxM) in the plane defined by the columns=[i,j]
when observation are stored on rows. Observations are rotated
counterclockwise. This corresponds to the following matrix-multiplication
for each data-point (unchanged elements omitted):
[ cos(angle) -sin(angle) [ x_i ]
sin(angle) cos(angle) ] * [ x_j ]
If M=2, columns=[0,1].
"""
if units == 'degrees':
angle = angle/180.*numx.pi
cos_ = numx.cos(angle)
sin_ = numx.sin(angle)
[i, j] = columns
col_i = mat[:, i] + 0.
col_j = mat[:, j]
mat[:, i] = cos_*col_i - sin_*col_j
mat[:, j] = sin_*col_i + cos_*col_j
def permute(x, indices=(0, 0), rows=0, cols=1):
"""Swap two columns and (or) two rows of 'x', whose indices are specified
in indices=[i,j].
Note: permutations are done in-place. You'll lose your original matrix"""
## the nicer option:
## x[i,:],x[j,:] = x[j,:],x[i,:]
## does not work because array-slices are references.
## The following would work:
## x[i,:],x[j,:] = x[j,:].tolist(),x[i,:].tolist()
## because list-slices are copies, but you get 2
## copies instead of the one you need with our method.
## This would also work:
## tmp = x[i,:].copy()
## x[i,:],x[j,:] = x[j,:],tmp
## but it is slower (for larger matrices) than the one we use.
[i, j] = indices
if rows:
x[i, :], x[j, :] = x[j, :], x[i, :] + 0
if cols:
x[:, i], x[:, j] = x[:, j], x[:, i] + 0
def hermitian(x):
"""Compute the Hermitian, i.e. conjugate transpose, of x."""
return x.T.conj()
def symrand(dim_or_eigv, dtype="d"):
"""Return a random symmetric (Hermitian) matrix.
If 'dim_or_eigv' is an integer N, return a NxN matrix, with eigenvalues
uniformly distributed on (-1,1).
If 'dim_or_eigv' is 1-D real array 'a', return a matrix whose
eigenvalues are 'a'.
"""
if isinstance(dim_or_eigv, int):
dim = dim_or_eigv
d = (numx_rand.random(dim)*2) - 1
elif isinstance(dim_or_eigv,
numx.ndarray) and len(dim_or_eigv.shape) == 1:
dim = dim_or_eigv.shape[0]
d = dim_or_eigv
else:
raise mdp.MDPException("input type not supported.")
v = random_rot(dim)
#h = mdp.utils.mult(mdp.utils.mult(hermitian(v), mdp.numx.diag(d)), v)
h = mdp.utils.mult(mult_diag(d, hermitian(v), left=False), v)
# to avoid roundoff errors, symmetrize the matrix (again)
h = 0.5*(h.T+h)
if dtype in ('D', 'F', 'G'):
h2 = symrand(dim_or_eigv)
h = h + 1j*(numx.triu(h2)-numx.tril(h2))
return refcast(h, dtype)
def random_rot(dim, dtype='d'):
"""Return a random rotation matrix, drawn from the Haar distribution
(the only uniform distribution on SO(n)).
The algorithm is described in the paper
Stewart, G.W., "The efficient generation of random orthogonal
matrices with an application to condition estimators", SIAM Journal
on Numerical Analysis, 17(3), pp. 403-409, 1980.
For more information see
http://en.wikipedia.org/wiki/Orthogonal_matrix#Randomization"""
H = mdp.numx.eye(dim, dtype=dtype)
D = mdp.numx.ones((dim,), dtype=dtype)
for n in range(1, dim):
x = mdp.numx_rand.normal(size=(dim-n+1,)).astype(dtype)
D[n-1] = mdp.numx.sign(x[0])
x[0] -= D[n-1]*mdp.numx.sqrt((x*x).sum())
# Householder transformation
Hx = ( mdp.numx.eye(dim-n+1, dtype=dtype)
- 2.*mdp.numx.outer(x, x)/(x*x).sum() )
mat = mdp.numx.eye(dim, dtype=dtype)
mat[n-1:, n-1:] = Hx
H = mdp.utils.mult(H, mat)
# Fix the last sign such that the determinant is 1
D[-1] = (-1)**(1-dim%2)*D.prod()
# Equivalent to mult(numx.diag(D), H) but faster
H = (D*H.T).T
return H
def norm2(v):
"""Compute the 2-norm for 1D arrays.
norm2(v) = sqrt(sum(v_i^2))"""
return numx.sqrt((v*v).sum())
def cov2(x, y):
"""Compute the covariance between 2D matrices x and y.
Complies with the old scipy.cov function: different variables
are on different columns."""
mnx = x.mean(axis=0)
mny = y.mean(axis=0)
tlen = x.shape[0]
return old_div(mdp.utils.mult(x.T, y),(tlen-1)) - numx.outer(mnx, mny)
def cov_maxima(cov):
"""Extract the maxima of a covariance matrix."""
dim = cov.shape[0]
maxs = []
if dim >= 1:
cov=abs(cov)
glob_max_idx = (cov.argmax()//dim, cov.argmax()%dim)
maxs.append(cov[glob_max_idx[0], glob_max_idx[1]])
cov_reduce = cov.copy()
cov_reduce = cov_reduce[numx.arange(dim) != glob_max_idx[0], :]
cov_reduce = cov_reduce[:, numx.arange(dim) != glob_max_idx[1]]
maxs.extend(cov_maxima(cov_reduce))
return maxs
else:
return []
def mult_diag(d, mtx, left=True):
"""Multiply a full matrix by a diagonal matrix.
This function should always be faster than dot.
Input:
d -- 1D (N,) array (contains the diagonal elements)
mtx -- 2D (N,N) array
Output:
mult_diag(d, mts, left=True) == dot(diag(d), mtx)
mult_diag(d, mts, left=False) == dot(mtx, diag(d))
"""
if left:
return (d*mtx.T).T
else:
return d*mtx
def comb(N, k):
"""Return number of combinations of k objects from a set of N objects
without repetitions, a.k.a. the binomial coefficient of N and k."""
ret = 1
for mlt in range(N, N-k, -1):
ret *= mlt
for dv in range(1, k+1):
ret //= dv
return ret
# WARNING numpy.linalg.eigh does not support float sizes larger than 64 bits,
# and complex numbers of size larger than 128 bits.
# Also float16 is not supported either.
# This is not a problem for MDP, as long as scipy.linalg.eigh is available.
def get_dtypes(typecodes_key, _safe=True):
"""Return the list of dtypes corresponding to the set of
typecodes defined in numpy.typecodes[typecodes_key].
E.g., get_dtypes('Float') = [dtype('f'), dtype('d'), dtype('g')].
If _safe is True (default), we remove large floating point types
if the numerical backend does not support them.
"""
types = []
for c in numx.typecodes[typecodes_key]:
try:
type_ = numx.dtype(c)
if (_safe and not mdp.config.has_symeig == 'scipy.linalg.eigh'
and type_ in _UNSAFE_DTYPES):
continue
types.append(type_)
except TypeError:
pass
return types
_UNSAFE_DTYPES = [numx.typeDict[d] for d in
['float16', 'float96', 'float128', 'complex192', 'complex256']
if d in numx.typeDict]
def nongeneral_svd(A, range=None, **kwargs):
"""SVD routine for simple eigenvalue problem, API is compatible with
symeig."""
Z2, w, Z = mdp.utils.svd(A)
# sort eigenvalues and corresponding eigenvectors
idx = w.argsort()
w = w.take(idx)
Z = Z.take(idx, axis=0).T
if range is not None:
lo, hi = range
Z = Z[:, lo-1:hi]
w = w[lo-1:hi]
return w, Z
def sqrtm(A):
"""This is a symmetric definite positive matrix sqrt function"""
d, V = mdp.utils.symeig(A)
return mdp.utils.mult(V, mult_diag(numx.sqrt(d), V.T))
# replication functions
def lrep(x, n):
"""Replicate x n-times on a new first dimension"""
shp = [1]
shp.extend(x.shape)
return x.reshape(shp).repeat(n, axis=0)
def rrep(x, n):
"""Replicate x n-times on a new last dimension"""
shp = x.shape + (1,)
return x.reshape(shp).repeat(n, axis=-1)
def irep(x, n, dim):
"""Replicate x n-times on a new dimension dim-th dimension"""
x_shape = x.shape
shp = x_shape[:dim] + (1,) + x_shape[dim:]
return x.reshape(shp).repeat(n, axis=dim)
# /replication functions
try:
# product exists only in itertools >= 2.6
from itertools import product
except ImportError:
def product(*args, **kwds):
"""Cartesian product of input iterables.
"""
# taken from python docs 2.6
# product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy
# product(range(2), repeat=3) --> 000 001 010 011 100 101 110 111
pools = list(map(tuple, args)) * kwds.get('repeat', 1)
result = [[]]
for pool in pools:
result = [x+[y] for x in result for y in pool]
for prod in result:
yield tuple(prod)
def orthogonal_permutations(a_dict):
"""
Takes a dictionary with lists as keys and returns all permutations
of these list elements in new dicts.
This function is useful, when a method with several arguments
shall be tested and all of the arguments can take several values.
The order is not defined, therefore the elements should be
orthogonal to each other.
>>> for i in orthogonal_permutations({'a': [1,2,3], 'b': [4,5]}):
print i
{'a': 1, 'b': 4}
{'a': 1, 'b': 5}
{'a': 2, 'b': 4}
{'a': 2, 'b': 5}
{'a': 3, 'b': 4}
{'a': 3, 'b': 5}
"""
pool = dict(a_dict)
args = []
for func, all_args in list(pool.items()):
# check the size of the list in the second item of the tuple
args_with_fun = [(func, arg) for arg in all_args]
args.append(args_with_fun)
for i in product(*args):
yield dict(i)
def _iter_or_repeat(val):
try:
return iter(val)
except TypeError:
return itertools.repeat(val)
def izip_stretched(*iterables):
"""Same as izip, except that for convenience non-iterables are repeated ad infinitum.
This is useful when trying to zip input data with respective labels
and allows for having a single label for all data, as well as for
havning a list of labels for each data vector.
Note that this will take strings as an iterable (of course), so
strings acting as a single value need to be wrapped in a repeat
statement of their own.
Thus,
>>> for zipped in izip_stretched([1, 2, 3], -1):
print zipped
(1, -1)
(2, -1)
(3, -1)
is equivalent to
>>> for zipped in izip([1, 2, 3], [-1] * 3):
print zipped
(1, -1)
(2, -1)
(3, -1)
"""
iterables = list(map(_iter_or_repeat, iterables))
while iterables:
# need to care about python < 2.6
try:
yield tuple([next(it) for it in iterables])
except StopIteration:
break
def weighted_choice(a_dict, normalize=True):
"""Returns a key from a dictionary based on the weight that the value suggests.
If 'normalize' is False, it is assumed the weights sum up to unity. Otherwise,
the algorithm will take care of normalising.
Example:
>>> d = {'a': 0.1, 'b': 0.5, 'c': 0.4}
>>> weighted_choice(d)
# draws 'b':'c':'a' with 5:4:1 probability
TODO: It might be good to either shuffle the order or explicitely specify it,
before walking through the items, to minimise possible degeneration.
"""
if normalize:
d = a_dict.copy()
s = sum(d.values())
for key, val in list(d.items()):
d[key] = old_div(d[key], s)
else:
d = a_dict
rand_num = random.random()
total_rand = 0
for key, val in list(d.items()):
total_rand += val
if total_rand > rand_num:
return key
return None
def bool_to_sign(an_array):
"""Return -1 for each False; +1 for each True"""
return numx.sign(an_array - 0.5)
def sign_to_bool(an_array, zero=True):
"""Return False for each negative value, else True.
The value for 0 is specified with 'zero'.
"""
if zero:
return numx.array(an_array) >= 0
else:
return numx.array(an_array) > 0
def gabor(size, alpha, phi, freq, sgm, x0=None, res=1, ampl=1.):
"""Return a 2D array containing a Gabor wavelet.
Input arguments:
size -- (height, width) (pixels)
alpha -- orientation (rad)
phi -- phase (rad)
freq -- frequency (cycles/deg)
sgm -- (sigma_x, sigma_y) standard deviation along the axis
of the gaussian ellipse (pixel)
x0 -- (x,y) coordinates of the center of the wavelet (pixel)
Default: None, meaning the center of the array
res -- spatial resolution (deg/pixel)
Default: 1, so that 'freq' is measured in cycles/pixel
ampl -- constant multiplying the result
Default: 1.
"""
# init
w, h = size
if x0 is None: x0 = (w//2, h//2)
y0, x0 = x0
# some useful quantities
freq *= res
sinalpha = numx.sin(alpha)
cosalpha = numx.cos(alpha)
v0, u0 = freq*cosalpha, freq*sinalpha
# coordinates
#x = numx.mgrid[-x0:w-x0, -y0:h-y0]
x = numx.meshgrid(numx.arange(w)-x0, numx.arange(h)-y0)
x = (x[0].T, x[1].T)
xr = x[0]*cosalpha - x[1]*sinalpha
yr = x[0]*sinalpha + x[1]*cosalpha
# gabor
im = ampl*numx.exp(-0.5*(xr*xr/(sgm[0]*sgm[0]) + yr*yr/(sgm[1]*sgm[1]))) \
*numx.cos(-2.*numx.pi*(u0*x[0]+v0*x[1]) - phi)
return im
def residuals(app_x, y_noisy, exp_funcs, x_orig, k=0.0):
"""Function used internally by invert_exp_funcs2 to approximate
inverses in GeneralExpansionNode. """
app_x = app_x.reshape((1,-1))
app_exp_x = numx.concatenate([func(app_x) for func in exp_funcs],axis=1)
div_y = numx.sqrt(len(y_noisy))
div_x = numx.sqrt(len(x_orig))
return numx.append( (1-k)**0.5 *(y_noisy-app_exp_x[0]) / div_y, (k)**0.5 * (x_orig - app_x[0])/div_x )
def invert_exp_funcs2(exp_x_noisy, dim_x, exp_funcs, use_hint=False, k=0.0):
"""Approximates a preimage app_x of exp_x_noisy.
Returns an array app_x, such that each row of exp_funcs(app_x) is close
to each row of exp_x_noisy.
use_hint: determines the starting point for the approximation of the
preimage. There are three possibilities.
if it equals False: starting point is generated with a normal distribution
if it equals True: starting point is the first dim_x elements of exp_x_noisy
otherwise: use the parameter use_hint itself as the first approximation
k: weighting factor in [0, 1] to balance between approximation error and
closeness to the starting point. For instance:
objective function is to minimize:
(1-k) * ||exp_funcs(app_x) - exp_x_noisy||**2/output_dim +
k * ||app_x - starting point||**2/input_dim
Note: this function requires scipy.
"""
if numx_description != 'scipy':
raise NotImplementedError('This function requires scipy.')
else:
import scipy.optimize
num_samples = exp_x_noisy.shape[0]
if isinstance(use_hint, numx.ndarray):
app_x = use_hint.copy()
elif use_hint == True:
app_x = exp_x_noisy[:,0:dim_x].copy()
else:
app_x = numx.random.normal(size=(num_samples,dim_x))
for row in range(num_samples):
plsq = scipy.optimize.leastsq(residuals, app_x[row],
args=(exp_x_noisy[row], exp_funcs,
app_x[row], k), maxfev=50*dim_x)
app_x[row] = plsq[0]
app_exp_x = numx.concatenate([func(app_x) for func in exp_funcs],axis=1)
return app_x, app_exp_x