-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
gsn.py
208 lines (174 loc) · 7.81 KB
/
gsn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""
The GSNCost class.
"""
from functools import wraps
from pylearn2.compat import OrderedDict
from pylearn2.costs.cost import Cost
from pylearn2.costs.autoencoder import GSNFriendlyCost
from pylearn2.space import CompositeSpace
from pylearn2.utils import safe_zip
class GSNCost(Cost):
"""
Customizable cost class for GSNs.
Note from IG: the following is not 100% accurate, you can use
CompositeSpace to model interactions between arbitrarily many
vectors.
This class currently can only handle datasets with only one or two sets
of vectors. The get_input_source and get_target_source methods on the model
instance are called to get the names for the fields in the dataset.
get_input_source() is used for the name of the first set of vectors and
get_target_source() is used for the second set of vectors.
The explicit use of get_input_source and get_target_source (and the
non-existance of similar hooks) is what limits this class to learning
the joint distribution between only 2 sets of vectors. The allow for more
than 2 sets of vectors, the Model class would need to be modified,
preferably
in a way that allows reference to arbitrarily many sets of vectors within
one dataset.
Parameters
----------
costs : list of (int, double, GSNFriendlyCost or callable) tuples
The int component of each tuple is the index of the layer at
which we want to compute this cost.
The double component of the tuple is the coefficient to associate
to with the cost.
The GSNFriendlyCost instance is the cost that will be computed.
If that is a callable rather than an instance of GSN friendly
cost, it will be called with 2 arguments: the initial value
followed by the reconstructed value.
Costs must be of length 1 or 2 (explained in docstring for
GSNCost class) and the meaning of the ordering of the costs
parameter is explained in the docstring for the mode parameter.
walkback : int
How many steps of walkback to perform
mode : str
Must be either 'joint', 'supervised', or 'anti_supervised'.
The terms "input layer" and "label layer" are used below in the
description of the modes. The "input layer" refers to the layer
at the index specified in the first tuple in the costs parameter,
and the "label layer" refers to the layer at the index specified
in the second tuple in the costs parameter.
'joint' means setting all of the layers and calculating
reconstruction costs.
'supervised' means setting just the input layer and attempting to
predict the label layer
'anti_supervised' is attempting to predict the input layer given
the label layer.
"""
def __init__(self, costs, walkback=0, mode="joint"):
super(GSNCost, self).__init__()
self.walkback = walkback
assert mode in ["joint", "supervised", "anti_supervised"]
if mode in ["supervised", "anti_supervised"]:
assert len(costs) == 2
self.mode = mode
msg = "This is (hopefully) a temporary restriction"
assert len(costs) in [1, 2], msg
msg = "Must have only one cost function per index"
assert len(set(c[0] for c in costs)) == len(costs), msg
self.costs = costs
# convert GSNFriendCost instances into just callables
for i, cost_tup in enumerate(self.costs):
if isinstance(cost_tup[2], GSNFriendlyCost):
mutable = list(cost_tup)
mutable[2] = cost_tup[2].cost
self.costs[i] = tuple(mutable)
else:
assert callable(cost_tup[2])
@staticmethod
def _get_total_for_cost(idx, costf, init_data, model_output):
"""
Computes the total cost contribution from one layer given the full
output of the GSN.
Parameters
----------
idx : int
init_data and model_output both contain a subset of the layer \
activations at each time step. This is the index of the layer we \
want to evaluate the cost on WITHIN this subset. This is \
generally equal to the idx of the cost function within the \
GSNCost.costs list.
costf : callable
Function of two variables that computes the cost. The first \
argument is the target value, and the second argument is the \
predicted value.
init_data : list of tensor_likes
Although only the element at index "idx" is accessed/needed, this \
parameter is a list so that is can directly handle the data \
format from GSN.expr.
model_output : list of list of tensor_likes
The output of GSN.get_samples as called by GSNCost.expr.
"""
total = 0.0
for step in model_output:
total += costf(init_data[idx], step[idx])
# normalize for number of steps
return total / len(model_output)
def _get_samples_from_model(self, model, data):
"""
.. todo::
WRITEME properly
Handles the different GSNCost modes.
"""
layer_idxs = [idx for idx, _, _ in self.costs]
zipped = safe_zip(layer_idxs, data)
if self.mode == "joint":
use = zipped
elif self.mode == "supervised":
# don't include label layer
use = zipped[:1]
elif self.mode == "anti_supervised":
# don't include features
use = zipped[1:]
else:
raise ValueError("Unknown mode \"%s\" for GSNCost" % self.mode)
return model.get_samples(use,
walkback=self.walkback,
indices=layer_idxs)
def expr(self, model, data):
"""
Theano expression for the cost.
Parameters
----------
model : GSN object
WRITEME
data : list of tensor_likes
Data must be a list or tuple of the same length as self.costs.
All elements in data must be a tensor_like (cannot be None).
Returns
-------
y : tensor_like
The actual cost that is backpropagated on.
"""
self.get_data_specs(model)[0].validate(data)
output = self._get_samples_from_model(model, data)
total = 0.0
for cost_idx, (_, coeff, costf) in enumerate(self.costs):
total += (coeff *
self._get_total_for_cost(cost_idx, costf, data, output))
coeff_sum = sum(coeff for _, coeff, _ in self.costs)
# normalize for coefficients on each cost
return total / coeff_sum
@wraps(Cost.get_monitoring_channels)
def get_monitoring_channels(self, model, data, **kwargs):
self.get_data_specs(model)[0].validate(data)
rval = OrderedDict()
# if there's only 1 cost, then no need to split up the costs
if len(self.costs) > 1:
output = self._get_samples_from_model(model, data)
rval['reconstruction_cost'] =\
self._get_total_for_cost(0, self.costs[0][2], data, output)
rval['classification_cost'] =\
self._get_total_for_cost(1, self.costs[1][2], data, output)
return rval
@wraps(Cost.get_data_specs)
def get_data_specs(self, model):
# get space for layer i of model
get_space = lambda i: (model.aes[i].get_input_space() if i == 0
else model.aes[i - 1].get_output_space())
# get the spaces for layers that we have costs at
spaces = map(lambda c: get_space(c[0]), self.costs)
sources = [model.get_input_source()]
if len(self.costs) == 2:
sources.append(model.get_target_source())
return (CompositeSpace(spaces), tuple(sources))