# Partially learned gradient descent without gradients as input.
    This Python script implements a partially learned gradient descent algorithm using TensorFlow for
    iterative image reconstruction from projection data.
    
    :param validation: The `validation` parameter in the code is used to determine whether to generate a
    set of random data for validation purposes or for training purposes. When `validation` is set to
    `True`, the code generates data for validation by using the Shepp-Logan phantom in the ODL library.
    This, defaults to False (optional)
    :return: The code is a TensorFlow implementation of a partially learned
    gradient descent algorithm without gradients as input for solving an inverse problem in computed
    tomography. The code includes the definition of placeholders, variable definitions, an iterative
    scheme, loss calculation, optimizer setup, and training/validation data generation.

In [1]:
import tensorflow.compat.v1 as tf
import numpy as np
import odl
import odl.contrib.tensorflow
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
tf.disable_v2_behavior()

Instructions for updating:
non-resource variables are not supported in the long term


In [2]:
sess = tf.InteractiveSession()

# Create ODL data structures
size = 128
space = odl.uniform_discr([-64, -64], [64, 64], [size, size], dtype='float32')

geometry = odl.tomo.parallel_beam_geometry(space, num_angles=30)
operator = odl.tomo.RayTransform(space, geometry)
pseudoinverse = odl.tomo.fbp_op(operator)

# Ensure operator has fixed operator norm for scale invariance
opnorm = odl.power_method_opnorm(operator)
operator = (1 / opnorm) * operator
pseudoinverse = pseudoinverse * opnorm

# User selected paramters
n_data = 20
n_memory = 5
n_iter = 10



In [3]:
def random_ellipse():
    return ((np.random.rand() - 0.3) * np.random.exponential(0.3),
            np.random.exponential() * 0.2, np.random.exponential() * 0.2,
            np.random.rand() - 0.5, np.random.rand() - 0.5,
            np.random.rand() * 2 * np.pi)


In [4]:
def random_phantom(spc):
    n = np.random.poisson(100)
    ellipses = [random_ellipse() for _ in range(n)]
    return odl.phantom.ellipsoid_phantom(spc, ellipses)


In [5]:
def conv2d(x, W, stride=(1, 1)):
    """
    The conv2d function performs a 2D convolution operation using TensorFlow with specified stride and
    padding.
    
    :param x: The parameter `x` in the `conv2d` function represents the input tensor to the
    convolutional layer. This tensor typically contains the input data or features that are being
    processed through the convolution operation
    :param W: The parameter `W` in the `conv2d` function represents the filter weights for the
    convolution operation. These weights are learned during the training process of a neural network.
    The filter weights determine how the convolution operation is applied to the input data `x` to
    extract features
    :param stride: The `stride` parameter in the `conv2d` function represents the stride length for
    moving the convolutional filter/kernel across the input tensor `x`
    :return: The function `conv2d` is returning the result of applying a 2D convolution operation using
    the input `x`, filter `W`, and the specified stride. The output of the convolution operation is
    returned.
    """
    return tf.nn.conv2d(x, W, strides=[1, stride[0], stride[1], 1], padding='SAME')


In [6]:
def generate_data(validation=False):
    """Generate a set of random data."""
    n_iter = 1 if validation else n_data

    x_arr = np.empty((n_iter, space.shape[0], space.shape[1], 1), dtype='float32')
    y_arr = np.empty((n_iter, operator.range.shape[0], operator.range.shape[1], 1), dtype='float32')
    x_true_arr = np.empty((n_iter, space.shape[0], space.shape[1], 1), dtype='float32')

    for i in range(n_iter):
        if validation:
            phantom = odl.phantom.shepp_logan(space, True)
        else:
            phantom = random_phantom(space)

        data = operator(phantom)
        noisy_data = data + odl.phantom.white_noise(operator.range) * np.mean(np.abs(data)) * 0.05
        fbp = pseudoinverse(noisy_data)

        x_arr[i, ..., 0] = fbp
        x_true_arr[i, ..., 0] = phantom
        y_arr[i, ..., 0] = noisy_data

    return x_arr, y_arr, x_true_arr



In [7]:
with tf.name_scope('placeholders'):
    x_0 = tf.placeholder(tf.float32, shape=[None, size, size, 1], name="x_0")
    x_true = tf.placeholder(tf.float32, shape=[None, size, size, 1], name="x_true")
    y = tf.placeholder(tf.float32, shape=[None, operator.range.shape[0], operator.range.shape[1], 1], name="y")

    s = tf.fill([tf.shape(x_0)[0], size, size, n_memory], np.float32(0.0), name="s")



In [8]:
create_new_parameters_for_network_retraining = False

In [9]:
with tf.name_scope('variable_definitions'):
    if create_new_parameters_for_network_retraining:
        # Parameters if the network should be re-trained
        w1 = tf.get_variable(
            "w1", shape=[3, 3, n_memory + 1, 32],
            initializer=tf.compat.v1.keras.initializers.glorot_normal(dtype=tf.float32)  # or glorot_uniform
        )
        b1 = tf.Variable(tf.constant_initializer(0.01), shape=[1, 1, 1, 32], name='b1')

        w2 = tf.get_variable(
            "w2", shape=[3, 3, 32, 32],
            initializer=tf.compat.v1.keras.initializers.glorot_normal(dtype=tf.float32)
        )
        b2 = tf.Variable(tf.constant_initializer(0.01), shape=[1, 1, 1, 32], name='b2')

        w3 = tf.get_variable(
            "w3", shape=[3, 3, 32, n_memory + 1],
            initializer=tf.compat.v1.keras.initializers.glorot_normal(dtype=tf.float32)
        )
        b3 = tf.Variable(tf.constant_initializer(0.0), shape=[1, 1, 1, n_memory + 1], name='b3')
    else:
        # If trained network is available, re-use as starting guess
        ld = np.load('code/partially_learned_gradient_descent_no_gradients_parameters.npz')

        w1 = tf.Variable(ld['w1'], name='w1')
        b1 = tf.Variable(ld['b1'], name='b1')

        w2 = tf.Variable(ld['w2'], name='w2')
        b2 = tf.Variable(ld['b2'], name='b2')

        w3 = tf.Variable(ld['w3'], name='w3')
        b3 = tf.Variable(ld['b3'], name='b3')


In [10]:
with tf.name_scope('variable_definitions'):
    if create_new_parameters_for_network_retraining:
        # Parameters if the network should be re-trained
        w1 = tf.get_variable("w1", shape=[3, 3, n_memory + 1, 32],
            initializer=tf.compat.v1.keras.initializers.glorot_normal(dtype=tf.float32)  # or glorot_uniform
        )
        b1 = tf.Variable(tf.constant(0.01, shape=[1, 1, 1, 32]), name='b1')

        w2 = tf.get_variable("w2", shape=[3, 3, 32, 32],
            initializer=tf.compat.v1.keras.initializers.glorot_normal(dtype=tf.float32)  # or glorot_uniform
        )
        b2 = tf.Variable(tf.constant(0.01, shape=[1, 1, 1, 32]), name='b2')

        w3 = tf.get_variable("w3", shape=[3, 3, 32, n_memory + 1],
            initializer=tf.compat.v1.keras.initializers.glorot_normal(dtype=tf.float32)  # or glorot_uniform
        )
        b3 = tf.Variable(tf.constant(0.00, shape=[1, 1, 1, n_memory + 1]), name='b3')
    else:
        # If trained network is available, re-use as starting guess
        ld = np.load('code/partially_learned_gradient_descent_no_gradients_parameters.npz')

        w1 = tf.Variable(tf.constant(ld['w1']), name='w1')
        b1 = tf.Variable(tf.constant(ld['b1']), name='b1')

        w2 = tf.Variable(tf.constant(ld['w2']), name='w2')
        b2 = tf.Variable(tf.constant(ld['b2']), name='b2')

        w3 = tf.Variable(tf.constant(ld['w3']), name='w3')
        b3 = tf.Variable(tf.constant(ld['b3']), name='b3')



In [11]:
# Implementation of the iterative scheme
x_values = [x_0]
x = x_0
for i in range(n_iter):
    with tf.name_scope('iterate_{}'.format(i)):
        update = tf.concat([x, s], axis=3)

        update = tf.nn.relu(conv2d(update, w1) + b1)
        update = tf.nn.relu(conv2d(update, w2) + b2)

        update = conv2d(update, w3) + b3

        s = tf.nn.relu(update[..., 1:])
        dx = update[..., 0:1]

        x = x + dx
        x_values.append(x)



In [12]:
print(f"x: {x}")
print(f"x_values: {x_values}")

x: Tensor("iterate_9/add_3:0", shape=(?, 128, 128, 1), dtype=float32)
x_values: [<tf.Tensor 'placeholders/x_0:0' shape=(?, 128, 128, 1) dtype=float32>, <tf.Tensor 'iterate_0/add_3:0' shape=(?, 128, 128, 1) dtype=float32>, <tf.Tensor 'iterate_1/add_3:0' shape=(?, 128, 128, 1) dtype=float32>, <tf.Tensor 'iterate_2/add_3:0' shape=(?, 128, 128, 1) dtype=float32>, <tf.Tensor 'iterate_3/add_3:0' shape=(?, 128, 128, 1) dtype=float32>, <tf.Tensor 'iterate_4/add_3:0' shape=(?, 128, 128, 1) dtype=float32>, <tf.Tensor 'iterate_5/add_3:0' shape=(?, 128, 128, 1) dtype=float32>, <tf.Tensor 'iterate_6/add_3:0' shape=(?, 128, 128, 1) dtype=float32>, <tf.Tensor 'iterate_7/add_3:0' shape=(?, 128, 128, 1) dtype=float32>, <tf.Tensor 'iterate_8/add_3:0' shape=(?, 128, 128, 1) dtype=float32>, <tf.Tensor 'iterate_9/add_3:0' shape=(?, 128, 128, 1) dtype=float32>]


In [13]:
with tf.name_scope('loss'):
    loss = tf.reduce_mean(tf.reduce_sum((x - x_true) ** 2, axis=(1, 2)))



In [14]:
with tf.name_scope('optimizer'):
    # Learning rate
    global_step = tf.Variable(0, trainable=False)
    starter_learning_rate = 1e-3
    learning_rate = tf.train.inverse_time_decay(starter_learning_rate,
                                                global_step=global_step,
                                                decay_rate=1.0,
                                                decay_steps=500,
                                                staircase=True,
                                                name='learning_rate')

    optimizer = tf.train.RMSPropOptimizer(learning_rate).minimize(loss, global_step=global_step)



Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor


In [15]:
print(f"optimizer: {optimizer}")

optimizer: name: "optimizer/RMSProp"
op: "AssignAdd"
input: "optimizer/Variable"
input: "optimizer/RMSProp/value"
attr {
  key: "T"
  value {
    type: DT_INT32
  }
}
attr {
  key: "_class"
  value {
    list {
      s: "loc:@optimizer/Variable"
    }
  }
}
attr {
  key: "use_locking"
  value {
    b: false
  }
}



In [16]:
# Initialize all TF variables
tf.global_variables_initializer().run()

# Solve with an ODL callback to see what happens in real time
callback = odl.solvers.CallbackShow(clim=[0.1, 0.4])

# Generate validation data
x_arr_validate, y_arr_validate, x_true_arr_validate = generate_data(validation=True)


In [17]:
print(f"x_arr_validate: {x_arr_validate}")
print(f"y_arr_validate: {y_arr_validate}")
print(f"x_true_arr_validate: {x_true_arr_validate}")

x_arr_validate: [[[[-0.10140869]
   [-0.25602037]
   [-0.3885349 ]
   ..., 
   [-0.5222962 ]
   [-0.20758657]
   [-0.02674543]]

  [[ 0.20892316]
   [-0.07333555]
   [-0.29943323]
   ..., 
   [-0.3590231 ]
   [ 0.00687671]
   [ 0.2180908 ]]

  [[ 0.10738922]
   [ 0.23372526]
   [-0.02825779]
   ..., 
   [ 0.2516903 ]
   [ 0.2396455 ]
   [-0.08490887]]

  ..., 
  [[ 0.1335358 ]
   [ 0.14565633]
   [ 0.07794141]
   ..., 
   [ 0.16508862]
   [ 0.18416016]
   [-0.1588907 ]]

  [[ 0.02378147]
   [-0.14154857]
   [-0.11896355]
   ..., 
   [-0.34875485]
   [-0.00724699]
   [ 0.2507021 ]]

  [[-0.01584784]
   [-0.28331408]
   [-0.3335182 ]
   ..., 
   [-0.44376302]
   [-0.31773907]
   [-0.11828604]]]]
y_arr_validate: [[[[ 0.01409422]
   [-0.00411432]
   [-0.00190468]
   ..., 
   [-0.00666123]
   [-0.00017102]
   [-0.00197337]]

  [[-0.00783188]
   [ 0.01333595]
   [-0.00805861]
   ..., 
   [-0.00838623]
   [ 0.00097341]
   [ 0.01067705]]

  [[-0.00487761]
   [ 0.0021541 ]
   [ 0.01055892]
   .

In [18]:
train_new_network = True

In [22]:
import pandas as pd
if train_new_network:
    # Train the network
    n_train = 100
    validation_losses = []
    for i in range(0, n_train):
        x_arr, y_arr, x_true_arr = generate_data()

        _, loss_training = sess.run([optimizer, loss],
                                    feed_dict={x_0: x_arr,
                                            x_true: x_true_arr,
                                            y: y_arr})

        # Validate on shepp-logan
        x_values_result, loss_result = sess.run([x_values, loss],
                    feed_dict={x_0: x_arr_validate,
                                x_true: x_true_arr_validate,
                                y: y_arr_validate})

        # print(f'iter={i}, validation loss={loss_result}')
        validation_losses.insert(i, loss_result)
            # callback((space ** (n_iter + 1)).element(
                # [xv.squeeze() for xv in x_values_result]))
    data = {
        'validation_losses' : validation_losses
    }
    df = pd.DataFrame(iter(data))
    df.to_excel("validation losses.xlsx")
else:
    # Validate on shepp-logan
    x_values_result, loss_result = sess.run([x_values, loss],
                feed_dict={x_0: x_arr_validate,
                            x_true: x_true_arr_validate,
                            y: y_arr_validate})

    print(f'validation loss={loss_result}')

    callback((space ** (n_iter + 1)).element(
        [xv.squeeze() for xv in x_values_result]))


KeyboardInterrupt: 