-
Notifications
You must be signed in to change notification settings - Fork 55
/
utils.py
129 lines (111 loc) · 4.31 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
"""
@author: LiShiHang
@software: PyCharm
@file: utils.py
@time: 2018/12/18 14:58
"""
from keras.engine import Layer
import keras.backend as K
class MaxPoolingWithArgmax2D(Layer):
def __init__(
self,
pool_size=(2, 2),
strides=(2, 2),
padding='same',
**kwargs):
super(MaxPoolingWithArgmax2D, self).__init__(**kwargs)
self.padding = padding
self.pool_size = pool_size
self.strides = strides
def call(self, inputs, **kwargs):
padding = self.padding
pool_size = self.pool_size
strides = self.strides
if K.backend() == 'tensorflow':
ksize = [1, pool_size[0], pool_size[1], 1]
padding = padding.upper()
strides = [1, strides[0], strides[1], 1]
output, argmax = K.tf.nn.max_pool_with_argmax(
inputs,
ksize=ksize,
strides=strides,
padding=padding)
else:
errmsg = '{} backend is not supported for layer {}'.format(
K.backend(), type(self).__name__)
raise NotImplementedError(errmsg)
argmax = K.cast(argmax, K.floatx())
return [output, argmax]
def compute_output_shape(self, input_shape):
ratio = (1, 2, 2, 1)
output_shape = [
dim // ratio[idx]
if dim is not None else None
for idx, dim in enumerate(input_shape)]
output_shape = tuple(output_shape)
return [output_shape, output_shape]
def compute_mask(self, inputs, mask=None):
return 2 * [None]
class MaxUnpooling2D(Layer):
def __init__(self, up_size=(2, 2), **kwargs):
super(MaxUnpooling2D, self).__init__(**kwargs)
self.up_size = up_size
def call(self, inputs, output_shape=None):
updates, mask = inputs[0], inputs[1]
with K.tf.variable_scope(self.name):
mask = K.cast(mask, 'int32')
input_shape = K.tf.shape(updates, out_type='int32')
# calculation new shape
if output_shape is None:
output_shape = (
input_shape[0],
input_shape[1] * self.up_size[0],
input_shape[2] * self.up_size[1],
input_shape[3])
# calculation indices for batch, height, width and feature maps
one_like_mask = K.ones_like(mask, dtype='int32')
batch_shape = K.concatenate(
[[input_shape[0]], [1], [1], [1]],
axis=0)
batch_range = K.reshape(
K.tf.range(output_shape[0], dtype='int32'),
shape=batch_shape)
b = one_like_mask * batch_range
y = mask // (output_shape[2] * output_shape[3])
x = (mask // output_shape[3]) % output_shape[2]
feature_range = K.tf.range(output_shape[3], dtype='int32')
f = one_like_mask * feature_range
# transpose indices & reshape update values to one dimension
updates_size = K.tf.size(updates)
indices = K.transpose(K.reshape(
K.stack([b, y, x, f]),
[4, updates_size]))
values = K.reshape(updates, [updates_size])
ret = K.tf.scatter_nd(indices, values, output_shape)
return ret
def compute_output_shape(self, input_shape):
mask_shape = input_shape[1]
return (
mask_shape[0],
mask_shape[1] * self.up_size[0],
mask_shape[2] * self.up_size[1],
mask_shape[3]
)
if __name__ == '__main__':
import keras
import numpy as np
# input = keras.layers.Input((4, 4, 3))
# o = MaxPoolingWithArgmax2D()(input)
# model = keras.Model(inputs=input, outputs=o) # outputs=o
# model.compile(optimizer="adam", loss='categorical_crossentropy')
# x = np.random.randint(0, 100, (3, 4, 4, 3)) # 调试此处
# m = model.predict(x) # 调试此处
# print(m)
input = keras.layers.Input((4, 4, 3))
o = MaxPoolingWithArgmax2D()(input)
o2 = MaxUnpooling2D()(o)
model = keras.Model(inputs=input, outputs=o2) # outputs=o
model.compile(optimizer="adam", loss='categorical_crossentropy')
x = np.random.randint(0, 100, (3, 4, 4, 3)) # 调试此处
m = model.predict(x) # 调试此处
print(m)