Skip to content

Commit

Permalink
NEW: Added ConstantExpansionNode
Browse files Browse the repository at this point in the history
  • Loading branch information
Alberto Escalante authored and pberkes committed Jul 23, 2010
1 parent fe29068 commit f3110f8
Showing 1 changed file with 114 additions and 0 deletions.
114 changes: 114 additions & 0 deletions mdp/nodes/expansion_nodes.py
Expand Up @@ -271,6 +271,120 @@ def _execute(self,x):
return self.rbf_expansion(x)


class ConstantExpansionNode(_ExpansionNode):
"""Expands the input signal x according to a list [f_0, ... f_k]
of functions.
Each function f_i takes the whole bidimensional array x as input and
should output another bidimensional array. The output of the node is
[f_0[x], ... f_k[x]], that is, the concatenation of each one of
the outputs f_i[x]."""

def __init__(self, funcs, input_dim = None, dtype = None, \
approximate_inverse=True, use_hint=False):
self.funcs = funcs
self.approximate_inverse = approximate_inverse
self.use_hint = use_hint
super(_ExpansionNode, self).__init__(input_dim, dtype)

def expanded_dim(self, n):
exp_dim=0
x = numx.zeros((1,n))
for func in self.funcs:
outx = func(x)
exp_dim += outx.shape[1]
return exp_dim

def output_sizes(self, n):
sizes = numx.zeros(len(self.funcs))
x = numx.zeros((1,n))
for i, func in enumerate(self.funcs):
outx = func(x)
sizes[i] = outx.shape[1]
return sizes

def is_trainable(self):
return False

def is_invertible(self):
return self.approximate_inverse

def _inverse(self, x, use_hint=None):
if self.approximate_inverse is False:
ex = "Approximate inversion disabled"
raise mdp.NodeException(ex)
if use_hint is None:
use_hint = self.use_hint

app_x_2, app_ex_x_2 = invert_exp_funcs2(x, self.input_dim, self.funcs, use_hint=use_hint, k=0.001)
return app_x_2

def _set_input_dim(self, n):
self._input_dim = n
self._output_dim = self.expanded_dim(n)

def _execute(self, x):
if self.input_dim is None:
self.set_input_dim(x.shape[1])

num_samples = x.shape[0]
sizes = self.output_sizes(self.input_dim)

out = numx.zeros((num_samples, self.output_dim))

current_pos = 0
for i, func in enumerate(self.funcs):
out[:,current_pos:current_pos+sizes[i]] = func(x)
current_pos += sizes[i]
return out

def residuals(app_x, y_noisy, exp_funcs, x_orig, k=0.0):
"""Computes error signals as the concatenation of the reconstruction error
(y_noisy - exp_funcs(app_x)) and the distance from the original (x_orig - app_x)
using a weighting factor k.
Used to approximate inverses in ConstantExpansionNode.
"""
app_x = app_x.reshape((1,len(app_x)))
app_exp_x = numx.concatenate([func(app_x) for func in exp_funcs],axis=1)


div_y = numx.sqrt(len(y_noisy))
div_x = numx.sqrt(len(x_orig))
return numx.append( (1-k)*(y_noisy-app_exp_x[0]) / div_y, k * (x_orig - app_x[0])/div_x )

def invert_exp_funcs2(exp_x_noisy, dim_x, exp_funcs, use_hint=False, k=0.0):
""" Function that approximates a preimage app_x of exp_x_noisy.
Returns an array app_x, such that each row of exp_x_noisy is close
to each row of exp_funcs(app_x).
use_hint: determines the starting point for the approximation of the preimage. There are
three possibilities.
if it equals False: starting point is generated with a normal distribution
if it equals True: starting point is the first dim_x elements of exp_x_noisy
otherwise: use the parameter use_hint itself as the first approximation
k: weighting factor in [0, 1] to balance between approximation error and
closeness to the starting point. For instance:
k==0: objective is to minimize |exp_funcs(app_x) - exp_x_noisy|
k==1: objective is to minimize |app_x - starting point|
"""

num_samples = exp_x_noisy.shape[0]

if isinstance(use_hint, numx.ndarray):
app_x = use_hint.copy()
elif use_hint == True:
app_x = exp_x_noisy[:,0:dim_x].copy()
else:
app_x = numx.random.normal(size=(num_samples,dim_x))

import scipy.optimize
for row in range(num_samples):
plsq = scipy.optimize.leastsq(residuals, app_x[row], args=(exp_x_noisy[row], exp_funcs, app_x[row], k), ftol=1.49012e-06, xtol=1.49012e-06, gtol=0.0, maxfev=50*dim_x, epsfcn=0.0, factor=1.0)
app_x[row] = plsq[0]

app_exp_x = numx.concatenate([func(app_x) for func in exp_funcs],axis=1)
return app_x, app_exp_x


### old weave inline code to perform a quadratic expansion

Expand Down

0 comments on commit f3110f8

Please sign in to comment.