In [1]:
import tensorflow as tf
import tensorflow_datasets as tfds
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import affine_transform
import skimage as ski
import time
import imageio


In [2]:
import sys
import os

sys.path.append(os.path.abspath("../src/"))
sys.path.append(os.path.abspath("../src/keras-tf"))

import configuration_handler as cfh
import image_optimisation as io
import transformation as tr
import tf_inverse_compositional_algorithm as tf_ica

We create a configuration file with the meta parameters for the algorithm. 

In [3]:
# reading the parameters from the configuration file
params = cfh.read_config_file("config.ini")
params_rica = params["robust_inverse_compositional_algorithm"]
params_pica = params["pyramidal_inverse_compositional_algorithm"]

## Performing evaluation tests

In [None]:

# Reuse the RobustInverseCompositional and PyramidalInverseCompositional layers from previous code

class TFDSEvaluator:
    def __init__(self, img_size=(256, 256)):
        self.img_size = img_size
        self.batch_size = 32
        self.models = {
            'Pyramidal-tr': PyramidalInverseCompositional(
                transform_type=TransformType.TRANSLATION,
                nscales=3,
                nu=0.5,
                TOL=1e-5,
                robust_type=RobustErrorFunctionType.CHARBONNIER,
                lambda_=0.,
                nanifoutside=True,
                delta=10,
                verbose=False
            ),
            'Pyramidal-eu': PyramidalInverseCompositional(
                transform_type=TransformType.EUCLIDEAN,
                nscales=3,
                nu=0.5,
                TOL=1e-5,
                robust_type=RobustErrorFunctionType.CHARBONNIER,
                lambda_=0.,
                nanifoutside=True,
                delta=10,
                verbose=False
            ),
            'Pyramidal-si': PyramidalInverseCompositional(
                transform_type=TransformType.SIMILARITY,
                nscales=3,
                nu=0.5,
                TOL=1e-5,
                robust_type=RobustErrorFunctionType.CHARBONNIER,
                lambda_=0.,
                nanifoutside=True,
                delta=10,
                verbose=False
            ),
            'Pyramidal-af': PyramidalInverseCompositional(
                transform_type=TransformType.AFFINITY,
                nscales=3,
                nu=0.5,
                TOL=1e-5,
                robust_type=RobustErrorFunctionType.CHARBONNIER,
                lambda_=0.,
                nanifoutside=True,
                delta=10,
                verbose=False
            ),
            'Pyramidal-ho': PyramidalInverseCompositional(
                transform_type=TransformType.HOMOGRAPHY,
                nscales=3,
                nu=0.5,
                TOL=1e-5,
                robust_type=RobustErrorFunctionType.CHARBONNIER,
                lambda_=0.,
                nanifoutside=True,
                delta=10,
                verbose=False
            )
        }
        
        # Load dataset
        self.dataset = self.load_dataset()

    def load_dataset(self):
        """Load imagenette dataset from TFDS, a small subset of ImageNet"""
        ds = tfds.load('imagenette', split='test', shuffle_files=True)
        return ds        

    def prepare_datasets(self, im1_ds, transform_type):
        # duplicate the image dataset to apply transformation and constitute a second ds
        arr = list(im1_ds.as_numpy_iterator())
        im2_ds = tf.data.Dataset.from_tensor_slices(arr)

        # prepare im1_ds
        im1_ds = im1_ds.batch(self.batch_size).map(lambda x: tf.image.resize(x["image"], self.img_size))
        im1_ds = im1_ds.map(lambda x: tf.image.convert_image_dtype(x, tf.float32))
        im1_ds = im1_ds.cache().prefetch(tf.data.AUTOTUNE)

        # prepare the dataset of affine transformations
        p_ds = tf.dataset.from_generator(
            lambda: (self.generate_params(transform_type) for _ in range(len(arr))),
            output_signature=tf.TensorSpec(shape=(8,), dtype=tf.float32)
        )
        p_ds = p_ds.batch(self.batch_size).cache().prefetch(tf.data.AUTOTUNE)

        # prepare im2_ds
        im2_ds = im2_ds.batch(self.batch_size).map(lambda x: tf.image.resize(x["image"], self.img_size))
        im2_ds = im2_ds.map(lambda x: tf.image.convert_image_dtype(x, tf.float32))
        combined_ds = tf.data.Dataset.zip((im2_ds, p_ds))
        transformed_ds = combined_ds.map(
            lambda im, p: affine_transform(
                im, p, 
                interpolation="bilinear", 
                fill_mode="constant", 
                fill_value=np.nan, 
                data_format="channels_last"
            ),
            num_parallel_calls=tf.data.AUTOTUNE
        )
        im2_ds = transformed_ds.cache().batch(self.batch_size).prefetch(tf.data.AUTOTUNE)

        return im1_ds, im2_ds, p_ds


    def generate_affine_params(self):
        """Generate random affine transformation parameters"""
        return {
            "scale": np.random.uniform(0.1, 0.3, 2),
            "rotation": np.random.uniform(-np.pi/6, np.pi/6),
            "shear": np.random.uniform(-0.3, 0.3, 2),
            "translation": np.random.uniform(-30, 30, 2),
            "homography": np.random.uniform(0., 0.2, 2)
        }
    
    def generate_params(self, transform_type: TransformType):
        aff_params = self.generate_affine_params()
        match transform_type:
            case TransformType.TRANSLATION:
                p = aff_params['translation']
                return tf.pad_params(p, 8)
            case TransformType.EUCLIDEAN:
                p = np.array([aff_params['translation'], aff_params['rotation']])
                return tf.pad_params(p, 8)
            case TransformType.SIMILARITY: # specified as (tx, ty, s, sh)
                p = np.array([aff_params['translation'], aff_params['scale'][0], aff_params['shear'][0]])
                return tf.pad_params(p, 8)
            case TransformType.AFFINITY: # specified as (tx, ty, a00, a01, a10, a11)
                p = np.array([aff_params['translation'], aff_params['scale'][0], aff_params['shear'], aff_params['scale'][1]])
                return tf.pad_params(p, 8)
            case TransformType.HOMOGRAPHY: # specified as (tx, ty, a00, a01, a10, a11, a20, a21)
                p = np.array([aff_params['translation'], aff_params['scale'][0], aff_params['shear'], aff_params['scale'][1], 
                              aff_params['homography']])
                return tf.constant(p)

    def evaluate(self, num_samples=50):
        results = {name: {'mse': [], 'mae': [], 'epe': [], 'time': []} 
                  for name in self.models}
        
        for i in range(num_samples):
            # Select random image
            img1 = self.dataset[np.random.randint(len(self.dataset))]
            
            # Generate random transformation
            params = self.generate_affine_params()
            matrix = self.create_affine_matrix(params)
            
            # Apply transformation
            img2 = self.apply_affine_transform(img1, matrix)
            
            # Convert to tensors
            I1 = tf.expand_dims(tf.convert_to_tensor(img1, tf.float32), 0)
            I2 = tf.expand_dims(tf.convert_to_tensor(img2, tf.float32), 0)
            
            # Ground truth parameters (flattened affine matrix)
            true_params = matrix.flatten()[:6]

            for name, model in self.models.items():
                start_time = time.time()
                
                # Estimate transformation
                p_pred, error, DI, Iw = model([I1, I2])
                params_pred = p_pred.numpy()[0]
                
                # Calculate parameter errors
                mse = np.mean((params_pred - true_params)**2)
                mae = np.mean(np.abs(params_pred - true_params))
                
                # Calculate endpoint error (EPE)
                pred_flow = self.params_to_flow(params_pred)
                true_flow = self.params_to_flow(true_params)
                epe = np.mean(np.sqrt(np.sum((pred_flow - true_flow)**2, axis=-1)))
                
                # Store results
                results[name]['mse'].append(mse)
                results[name]['mae'].append(mae)
                results[name]['epe'].append(epe)
                results[name]['time'].append(time.time() - start_time)
                
        return results

    def params_to_flow(self, params):
        """Convert affine parameters to flow field"""
        h, w = self.img_size
        matrix = params.reshape(2, 3)
        
        x = np.linspace(0, w-1, w)
        y = np.linspace(0, h-1, h)
        xx, yy = np.meshgrid(x, y)
        ones = np.ones_like(xx)
        coords = np.stack([xx, yy, ones], axis=0)
        
        transformed = np.tensordot(matrix, coords, axes=([1], [0]))
        flow = np.moveaxis(transformed, 0, -1) - np.stack([xx, yy], axis=-1)
        return flow

    def plot_results(self, results):
        metrics = ['mse', 'mae', 'epe', 'time']
        fig, axs = plt.subplots(2, 2, figsize=(15, 10))
        
        for i, metric in enumerate(metrics):
            ax = axs[i//2, i%2]
            for name in self.models:
                ax.plot(results[name][metric], label=name)
            ax.set_title(f'{metric.upper()} Comparison')
            ax.set_xlabel('Sample Index')
            ax.set_ylabel(metric.upper())
            ax.legend()
            ax.grid(True)
            
        plt.tight_layout()
        plt.show()




In [None]:
# Run evaluation
evaluator = TFDSEvaluator()
results = evaluator.evaluate(num_samples=50)
evaluator.plot_results(results)


In [None]:
# Print summary statistics
print("Average Metrics:")
for name in evaluator.models:
    print(f"\n{name}:")
    for metric in ['mse', 'mae', 'epe', 'time']:
        avg = np.mean(results[name][metric])
        print(f"  {metric.upper()}: {avg:.4f}")