diff --git a/mdp/nodes/expansion_nodes.py b/mdp/nodes/expansion_nodes.py index 6ce970f7..4762612e 100644 --- a/mdp/nodes/expansion_nodes.py +++ b/mdp/nodes/expansion_nodes.py @@ -271,6 +271,120 @@ def _execute(self,x): return self.rbf_expansion(x) +class ConstantExpansionNode(_ExpansionNode): + """Expands the input signal x according to a list [f_0, ... f_k] + of functions. + Each function f_i takes the whole bidimensional array x as input and + should output another bidimensional array. The output of the node is + [f_0[x], ... f_k[x]], that is, the concatenation of each one of + the outputs f_i[x].""" + + def __init__(self, funcs, input_dim = None, dtype = None, \ + approximate_inverse=True, use_hint=False): + self.funcs = funcs + self.approximate_inverse = approximate_inverse + self.use_hint = use_hint + super(_ExpansionNode, self).__init__(input_dim, dtype) + + def expanded_dim(self, n): + exp_dim=0 + x = numx.zeros((1,n)) + for func in self.funcs: + outx = func(x) + exp_dim += outx.shape[1] + return exp_dim + + def output_sizes(self, n): + sizes = numx.zeros(len(self.funcs)) + x = numx.zeros((1,n)) + for i, func in enumerate(self.funcs): + outx = func(x) + sizes[i] = outx.shape[1] + return sizes + + def is_trainable(self): + return False + + def is_invertible(self): + return self.approximate_inverse + + def _inverse(self, x, use_hint=None): + if self.approximate_inverse is False: + ex = "Approximate inversion disabled" + raise mdp.NodeException(ex) + if use_hint is None: + use_hint = self.use_hint + + app_x_2, app_ex_x_2 = invert_exp_funcs2(x, self.input_dim, self.funcs, use_hint=use_hint, k=0.001) + return app_x_2 + + def _set_input_dim(self, n): + self._input_dim = n + self._output_dim = self.expanded_dim(n) + + def _execute(self, x): + if self.input_dim is None: + self.set_input_dim(x.shape[1]) + + num_samples = x.shape[0] + sizes = self.output_sizes(self.input_dim) + + out = numx.zeros((num_samples, self.output_dim)) + + current_pos = 0 + for i, func in enumerate(self.funcs): + out[:,current_pos:current_pos+sizes[i]] = func(x) + current_pos += sizes[i] + return out + +def residuals(app_x, y_noisy, exp_funcs, x_orig, k=0.0): + """Computes error signals as the concatenation of the reconstruction error + (y_noisy - exp_funcs(app_x)) and the distance from the original (x_orig - app_x) + using a weighting factor k. + Used to approximate inverses in ConstantExpansionNode. + """ + app_x = app_x.reshape((1,len(app_x))) + app_exp_x = numx.concatenate([func(app_x) for func in exp_funcs],axis=1) + + + div_y = numx.sqrt(len(y_noisy)) + div_x = numx.sqrt(len(x_orig)) + return numx.append( (1-k)*(y_noisy-app_exp_x[0]) / div_y, k * (x_orig - app_x[0])/div_x ) + +def invert_exp_funcs2(exp_x_noisy, dim_x, exp_funcs, use_hint=False, k=0.0): + """ Function that approximates a preimage app_x of exp_x_noisy. + + Returns an array app_x, such that each row of exp_x_noisy is close + to each row of exp_funcs(app_x). + + use_hint: determines the starting point for the approximation of the preimage. There are + three possibilities. + if it equals False: starting point is generated with a normal distribution + if it equals True: starting point is the first dim_x elements of exp_x_noisy + otherwise: use the parameter use_hint itself as the first approximation + k: weighting factor in [0, 1] to balance between approximation error and + closeness to the starting point. For instance: + k==0: objective is to minimize |exp_funcs(app_x) - exp_x_noisy| + k==1: objective is to minimize |app_x - starting point| + """ + + num_samples = exp_x_noisy.shape[0] + + if isinstance(use_hint, numx.ndarray): + app_x = use_hint.copy() + elif use_hint == True: + app_x = exp_x_noisy[:,0:dim_x].copy() + else: + app_x = numx.random.normal(size=(num_samples,dim_x)) + + import scipy.optimize + for row in range(num_samples): + plsq = scipy.optimize.leastsq(residuals, app_x[row], args=(exp_x_noisy[row], exp_funcs, app_x[row], k), ftol=1.49012e-06, xtol=1.49012e-06, gtol=0.0, maxfev=50*dim_x, epsfcn=0.0, factor=1.0) + app_x[row] = plsq[0] + + app_exp_x = numx.concatenate([func(app_x) for func in exp_funcs],axis=1) + return app_x, app_exp_x + ### old weave inline code to perform a quadratic expansion