In [21]:
import tensorflow as tf
from tensorflow.python.framework import ops
import numpy as np
from scipy import signal
from tensorflow.python.ops import nn_ops, array_ops

def conv2d(x, W, strides=(1,1,1,1), padding='VALID'):
  return tf.nn.conv2d(x, W, strides, padding)

#Wraper podstawiajacy 
def py_func(func, inp, Tout, stateful=True, name=None):
    
    # Nowa nazwa, żeby móc modyfikować nasz gradient
    rnd_name = 'PyFuncGrad' + str(np.random.randint(0, 1E+8))
    tf.RegisterGradient(rnd_name)(_BinGrad(inp[2]))
    
    #Podstawienie gradientu
    g = tf.get_default_graph()
    with g.gradient_override_map({"PyFunc": rnd_name}):
        return tf.py_func(func, inp, Tout, stateful=stateful, name=name)

#Wrapper na wrappera
def custom_conv2d(x, W, strides=(1,1,1,1), padding='VALID', name='conv'):
    if padding == "SAME":
        shape_x = tf.shape(x)[1:3]
        shape_W = tf.shape(W)[0:2]
        padh = shape_W[0] - shape_x[0] % shape_W[0]
        padw = shape_W[1] - shape_x[1] % shape_W[1]
        npad = ((0, 0), (0, padh), (0, padw), (0,0))
        x = tf.pad(x, [[0,0],[0, padh],[0, padw], [0,0]])
    return py_func(conv, [x,W,strides], [tf.float32])[0]
    
#Skopiowany gradient z conv2d
def _BinGrad(strides):
    return lambda op, grad: [nn_ops.conv2d_backprop_input(array_ops.shape(op.inputs[0]), 
                                                          op.inputs[1], 
                                                          grad, 
                                                          strides,
                                                          'VALID', 
                                                          False, 
                                                          'NHWC'),
                            nn_ops.conv2d_backprop_filter(op.inputs[0],
                                                          array_ops.shape(op.inputs[1]), 
                                                          grad,
                                                          strides,
                                                          'VALID',
                                                          False,
                                                          'NHWC'),
                            op.inputs[2]]

#Nasza operacja
#result[batch,0:conv_height,0:conv_width,w_filter] -> suma konwolucji wszystkich channeli dla konkretnego batcha
#i filtra
def conv(x, W, strides):
    batches,x_height,x_width,x_channels = x.shape
    W_height,W_width,w_channels,w_filters = W.shape
    assert w_channels == x_channels
    result = None
    for batch in range(batches):
        for w_filter in range(w_filters):
            for channel in range(w_channels):
                inp = x[batch,0:x_height,0:x_width,channel]
                fil = np.rot90(W[0:W_height,0:W_width,channel,w_filter], 2)
                conv_result = signal.convolve2d(inp, fil, 'valid')
                conv_height,conv_width = conv_result.shape
                if result is None:
                    result = np.zeros((batches,conv_height,conv_width,w_filters), dtype=np.float32)
                tmp = result[batch,0:conv_height,0:conv_width,w_filter]
                result[batch,0:conv_height,0:conv_width,w_filter] = np.add(tmp, conv_result)
    if not (strides==[1,1,1,1]).all():
        b,h,w,c = result.shape
        sh = strides[1]
        sw = strides[2]
        dh = [i for i in range(h) if i%sh != 0]
        dw = [i for i in range(w) if i%sw != 0]
        result = np.delete(result, dh, 1)
        result = np.delete(result, dw, 2)
    return result

In [22]:
#test na feed forward, jeden input, jeden channel, jeden weight filter
def feedforward_1i_1ch_1w():
    x = tf.placeholder(tf.float32, [None, 9])
    x_reshaped = tf.reshape(x, [-1,3,3,1])
    w = np.zeros([2,2,1,1])
    w[0:2,0:2,0,0] = np.vstack(([1,0],[0,0]))
    W_conv1 = tf.Variable(tf.cast(w,dtype=tf.float32))

    sess = tf.Session()
    sess.as_default()
    sess.run(tf.global_variables_initializer())

    numbers = np.reshape(np.array([1,2,3,4,5,6,7,8,9], dtype=np.float32),(1,9))
    result = custom_conv2d(x_reshaped, W_conv1, strides=(1,2,2,1), padding='SAME')
    result_n = conv2d(x_reshaped, W_conv1, strides=(1,2,2,1), padding='SAME')
    result_conv = result.eval(session=sess, feed_dict={x: numbers})
    result_norm = result_n.eval(session=sess, feed_dict={x: numbers})
    assert result_conv.shape == result_norm.shape
    assert np.linalg.norm(result_conv-result_norm) < 0.01
feedforward_1i_1ch_1w()

In [23]:
[1,2,3][1:3]

[2, 3]

In [24]:
#test na feed forward, jeden input, jeden channel, jeden weight filter
def feedforward_1i_1ch_1w():
    x = tf.placeholder(tf.float32, [None, 42])
    x_reshaped = tf.reshape(x, [-1,7,6,1])
    w = np.zeros([3,4,1,1])
    #w[0:2,0:2,0,0] = np.vstack(([1,0],[0,0]))
    W_conv1 = tf.Variable(tf.cast(w,dtype=tf.float32))

    sess = tf.Session()
    sess.as_default()
    sess.run(tf.global_variables_initializer())

    numbers = np.reshape(np.zeros(42, dtype=np.float32),(1,42))
    result_n = conv2d(x_reshaped, W_conv1, strides=[1,2,4,1], padding='VALID')
    result_norm = result_n.eval(session=sess, feed_dict={x: numbers})
    print(result_norm.shape)
feedforward_1i_1ch_1w()

(1, 3, 1, 1)


In [25]:
#test na feed forward, jeden input, dwa channele, dwa weight filter
def feedforward_1i_2ch_2w():
    x = tf.placeholder(tf.float32, [None, 9])
    x_reshaped = tf.reshape(x, [-1,3,3,2])
    w = np.zeros([2,2,2,2])
    w[0:2,0:2,0,0] = np.vstack(([1,1],[1,1]))
    w[0:2,0:2,1,0] = np.vstack(([1,1],[1,1]))
    w[0:2,0:2,0,1] = np.vstack(([1,1],[1,1]))
    W_conv1 = tf.Variable(tf.cast(w,dtype=tf.float32))

    sess = tf.Session()
    sess.as_default()
    sess.run(tf.global_variables_initializer())

    numbers = np.reshape(np.array([1,2,3,4,5,6,7,8,9], dtype=np.float32),(1,9))
    inputs = np.vstack((numbers,numbers));
    result = custom_conv2d(x_reshaped, W_conv1)
    result_n = conv2d(x_reshaped, W_conv1)
    result_conv = result.eval(session=sess, feed_dict={x: inputs})
    result_norm = result_n.eval(session=sess, feed_dict={x: inputs})
    assert result_conv.shape == result_norm.shape
    assert np.linalg.norm(result_conv-result_norm) < 0.01
feedforward_1i_2ch_2w()

In [26]:
#test na feed forward, jeden input, dwa channele, dwa weight filter
def feedforward_2i_2ch_2w():
    x = tf.placeholder(tf.float32, [None, 9])
    x_reshaped = tf.reshape(x, [-1,3,3,2])
    w = np.zeros([2,2,2,2])
    w[0:2,0:2,0,0] = np.vstack(([1,1],[1,1]))
    w[0:2,0:2,1,0] = np.vstack(([1,1],[1,1]))
    w[0:2,0:2,0,1] = np.vstack(([1,1],[1,1]))
    W_conv1 = tf.Variable(tf.cast(w,dtype=tf.float32))

    sess = tf.Session()
    sess.as_default()
    sess.run(tf.global_variables_initializer())

    numbers = np.reshape(np.array([1,2,3,4,5,6,7,8,9], dtype=np.float32),(1,9))
    inputs = np.vstack((numbers,numbers,numbers,numbers));
    result = custom_conv2d(x_reshaped, W_conv1)
    result_n = conv2d(x_reshaped, W_conv1)
    result_conv = result.eval(session=sess, feed_dict={x: inputs})
    result_norm = result_n.eval(session=sess, feed_dict={x: inputs})
    assert result_conv.shape == result_norm.shape
    assert np.linalg.norm(result_conv-result_norm) < 0.01
feedforward_2i_2ch_2w()

In [27]:
#test na gradienty, jeden input, jeden channel, jeden weight filter
def gradient_1i_1ch_1w():
    x = tf.placeholder(tf.float32, [None, 4])
    x_reshaped = tf.reshape(x, [-1,2,2,1])
    w = np.zeros([2,2,1,1])
    w[0:2,0:2,0,0] = np.vstack(([0.1,0.2],[1,0.3]))
    W_conv1 = tf.Variable(tf.cast(w,dtype=tf.float32))

    sess = tf.Session()
    sess.as_default()
    sess.run(tf.global_variables_initializer())

    numbers = np.reshape(np.array([2,1,1,1], dtype=np.float32),(1,4))
    result = custom_conv2d(x_reshaped, W_conv1)
    result_n = conv2d(x_reshaped, W_conv1)

    var_grad = tf.gradients(result, W_conv1)[0]
    var_grad_n = tf.gradients(result_n, W_conv1)[0]
    gradients = var_grad.eval(session=sess, feed_dict={x: numbers})
    gradients_n = var_grad_n.eval(session=sess, feed_dict={x: numbers})
    assert np.linalg.norm(gradients-gradients_n) < 0.01
gradient_1i_1ch_1w()

In [29]:
#test na feed forward, jeden input, dwa channele, dwa weight filter
def gradient_1i_2ch_2w():
    x = tf.placeholder(tf.float32, [None, 4])
    x_reshaped = tf.reshape(x, [-1,2,2,2])
    w = np.zeros([2,2,2,2])
    w[0:2,0:2,0,0] = np.vstack(([1,1],[1,1]))
    w[0:2,0:2,1,0] = np.vstack(([1,1],[1,1]))
    w[0:2,0:2,0,1] = np.vstack(([1,1],[1,1]))
    W_conv1 = tf.Variable(tf.cast(w,dtype=tf.float32))

    sess = tf.Session()
    sess.as_default()
    sess.run(tf.global_variables_initializer())

    numbers = np.reshape(np.array([1,2,3,4], dtype=np.float32),(1,4))
    inputs = np.vstack((numbers,numbers, numbers, numbers));
    result = custom_conv2d(x_reshaped, W_conv1)
    result_n = conv2d(x_reshaped, W_conv1)
    
    var_grad = tf.gradients(result, W_conv1)[0]
    var_grad_n = tf.gradients(result_n, W_conv1)[0]
    gradients = var_grad.eval(session=sess, feed_dict={x: inputs})
    gradients_n = var_grad_n.eval(session=sess, feed_dict={x: inputs})
    assert np.linalg.norm(gradients-gradients_n) < 0.01
gradient_1i_2ch_2w()

In [20]:
#test na feed forward, dwa inputy, dwa channele, dwa weight filter
def gradient_2i_2ch_2w():
    x = tf.placeholder(tf.float32, [None, 4])
    x_reshaped = tf.reshape(x, [-1,2,2,2])
    w = np.zeros([2,2,2,2])
    w[0:2,0:2,0,0] = np.vstack(([1,1],[1,1]))
    w[0:2,0:2,1,0] = np.vstack(([1,1],[1,1]))
    w[0:2,0:2,0,1] = np.vstack(([1,1],[1,1]))
    W_conv1 = tf.Variable(tf.cast(w,dtype=tf.float32))

    sess = tf.Session()
    sess.as_default()
    sess.run(tf.global_variables_initializer())

    numbers = np.reshape(np.ones(4, dtype=np.float32),(1,4))
    inputs = np.vstack((numbers,numbers,numbers,numbers));
    result = custom_conv2d(x_reshaped, W_conv1, strides=[1,2,2,1], padding='VALID')
    result_n = conv2d(x_reshaped, W_conv1, strides=[1,2,2,1], padding='VALID')
    
    var_grad = tf.gradients(result, W_conv1)[0]
    var_grad_n = tf.gradients(result_n, W_conv1)[0]
    gradients = var_grad.eval(session=sess, feed_dict={x: inputs})
    gradients_n = var_grad_n.eval(session=sess, feed_dict={x: inputs})
    assert np.linalg.norm(gradients-gradients_n) < 0.01
gradient_2i_2ch_2w()

IndexError: list index out of range

In [10]:
a = tf.placeholder(tf.float32, [9])
x = tf.reshape(a, [1,3,3,1])
padded = tf.pad(x, [[0,0],[0,2],[0,3],[0,0]])

sess = tf.Session()
sess.as_default()
sess.run(tf.global_variables_initializer())

num = np.ones(9)
tf.shape(padded).eval(session=sess, feed_dict = {a: num})
padded.eval(session=sess, feed_dict = {a: num})

array([[[[ 1.],
         [ 1.],
         [ 1.],
         [ 0.],
         [ 0.],
         [ 0.]],

        [[ 1.],
         [ 1.],
         [ 1.],
         [ 0.],
         [ 0.],
         [ 0.]],

        [[ 1.],
         [ 1.],
         [ 1.],
         [ 0.],
         [ 0.],
         [ 0.]],

        [[ 0.],
         [ 0.],
         [ 0.],
         [ 0.],
         [ 0.],
         [ 0.]],

        [[ 0.],
         [ 0.],
         [ 0.],
         [ 0.],
         [ 0.],
         [ 0.]]]], dtype=float32)