In [1]:
args = {
    "nt": 1,
    "nb_epoch": 250,
    "batch_size": 1,
    "output_channels": [3, 48, 96, 192],
    "num_P_CNN": 1,
    "num_R_CLSTM": 1,
    "num_passes": 1,
    "pan_hierarchical": False,
    "downscale_factor": 4,
    "resize_images": False,
    "train_proportion": 0.7,
    "results_subdir": "dummy",
    "dataset_weights": "various",
    "data_subset_weights": "gen_ellipseV_crossH",
    "dataset": "general_shape_static",
    "data_subset": "general_cross_static_2nd_stage",
    "data_subset_mode": "test",
    "model_choice": "baseline",
    "system": "laptop",
    "reserialize_dataset": False,
    "output_mode": "Error"
}
args["results_subdir"] = f"eval_results/{args['dataset']}/{args['data_subset']}"

In [2]:
# LOAD MODEL and TEST DATA

import argparse
from config import update_settings, get_settings
from data_utils import serialize_dataset
import numpy as np
import os
from datetime import datetime


update_settings(args["system"], args["dataset_weights"], args["data_subset_weights"], args["results_subdir"])
DATA_DIR, WEIGHTS_DIR, RESULTS_SAVE_DIR, LOG_DIR = get_settings()["dirs"]
data_dirs = [DATA_DIR, WEIGHTS_DIR, RESULTS_SAVE_DIR, LOG_DIR]
if not os.path.exists(RESULTS_SAVE_DIR):
    os.makedirs(RESULTS_SAVE_DIR)

import os
import warnings
import hickle as hkl

# Suppress warnings
warnings.filterwarnings("ignore")
# or '2' to filter out INFO messages too
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

import tensorflow as tf
import shutil
import keras
from keras import backend as K
from keras import layers
from data_utils import SequenceGenerator, IntermediateEvaluations, create_dataset_from_serialized_generator, config_gpus 
%matplotlib tk
import matplotlib.pyplot as plt
# import addcopyfighandler
from matplotlib.colors import LinearSegmentedColormap

# PICK MODEL
if args["model_choice"] == "baseline":
    # Predict next frame along RGB channels only
    if not args['pan_hierarchical']:
        from PPN_models.PPN_Baseline import ParaPredNet
    else:
        from PPN_models.PPN_Baseline import ParaPredNet
        print("Using Pan-Hierarchical Representation")
elif args["model_choice"] == "cl_delta":
    # Predict next frame and change from current frame
    from PPN_models.PPN_CompLearning_Delta_Predictions import ParaPredNet
elif args["model_choice"] == "cl_recon":
    # Predict current and next frame
    from PPN_models.PPN_CompLearning_Recon_Predictions import ParaPredNet
elif args["model_choice"] == "multi_channel":
    # Predict next frame along Disparity, Material Index, Object Index, 
    # Optical Flow, Motion Boundaries, and RGB channels all stacked together
    assert args["dataset"] == "monkaa" or args["dataset"] == "driving", "Multi-channel model only works with Monkaa or Driving dataset"
    from PPN_models.PPN_Multi_Channel import ParaPredNet
    bottom_layer_output_channels = 7 # 1 Disparity, 3 Optical Flow, 3 RGB
    args["output_channels"][0] = bottom_layer_output_channels
else:
    raise ValueError("Invalid model choice")

# where weights are loaded prior to eval
if (args["dataset_weights"], args["data_subset_weights"]) in [
    ("rolling_square", "single_rolling_square"),
    ("rolling_circle", "single_rolling_circle"),
]:
    # where weights will be loaded/saved
    weights_file = os.path.join(WEIGHTS_DIR, f"para_prednet_"+args["data_subset"]+"_weights.hdf5")
elif (args["dataset_weights"], args["data_subset_weights"]) in [
    ("all_rolling", "single"),
    ("all_rolling", "multi")
]:
    # where weights will be loaded/saved
    weights_file = os.path.join(WEIGHTS_DIR, f"para_prednet_"+args["dataset_weights"]+"_"+args["data_subset_weights"]+"_weights.hdf5")
elif args["dataset_weights"] in ["all_rolling", "ball_collisions", "various"]:
    # where weights will be loaded/saved
    weights_file = os.path.join(WEIGHTS_DIR, f"para_prednet_"+args["dataset_weights"]+"_"+args["data_subset_weights"]+"_weights.hdf5")
else:
    # where weights will be loaded/saved
    weights_file = os.path.join(WEIGHTS_DIR, f"para_prednet_"+args["dataset_weights"]+"_weights.hdf5")
# weights_file = os.path.join(f"/home/evalexii/Documents/Thesis/code/parallel_prednet/model_weights/{args['dataset_weights']}/{args['data_subset_weights']}", f"para_prednet_{args['data_subset_weights']}_weights.hdf5")
assert os.path.exists(weights_file), "Weights file not found"
if args['dataset'] != args['dataset_weights']: 
    print(f"WARNING: dataset ({args['dataset']}) and dataset_weights ({args['dataset_weights']}/{args['data_subset_weights']}) do not match - generalizing...") 
else:
    print(f"OK: dataset ({args['dataset']}) and dataset_weights ({args['dataset_weights']}/{args['dataset_weights']}) match") 

# Training parameters
nt = args["nt"]  # number of time steps
batch_size = args["batch_size"]  # 4
output_channels = args["output_channels"]

# Define image shape
if args["dataset"] == "kitti":
    original_im_shape = (128, 160, 3)
    im_shape = original_im_shape
elif args["dataset"] == "monkaa" or args["dataset"] == "driving":
    original_im_shape = (540, 960, 3)
    downscale_factor = args["downscale_factor"]
    im_shape = (original_im_shape[0] // downscale_factor, original_im_shape[1] // downscale_factor, 3)
elif args["dataset"] in ["rolling_square", "rolling_circle"]:
    original_im_shape = (50, 100, 3)
    downscale_factor = args["downscale_factor"]
    im_shape = (original_im_shape[0] // downscale_factor, original_im_shape[1] // downscale_factor, 3) if args["resize_images"] else original_im_shape
else:
    original_im_shape = (50, 50, 3)
    downscale_factor = args["downscale_factor"]
    im_shape = (original_im_shape[0] // downscale_factor, original_im_shape[1] // downscale_factor, 3) if args["resize_images"] else original_im_shape

print(f"Working on dataset: {args['dataset']}")

# Create ParaPredNet
if args["dataset"] == "kitti":
    # These are Kitti specific input shapes
    inputs = (keras.Input(shape=(nt, im_shape[0], im_shape[1], 3)))
    PPN = ParaPredNet(args, im_height=im_shape[0], im_width=im_shape[1])  # [3, 48, 96, 192]
    outputs = PPN(inputs)
    PPN = keras.Model(inputs=inputs, outputs=outputs)

elif args["dataset"] == "monkaa":
    # These are Monkaa specific input shapes
    inputs = (keras.Input(shape=(nt, im_shape[0], im_shape[1], 1)),
        keras.Input(shape=(nt, im_shape[0], im_shape[1], 1)),
        keras.Input(shape=(nt, im_shape[0], im_shape[1], 1)),
        keras.Input(shape=(nt, im_shape[0], im_shape[1], 3)),
        keras.Input(shape=(nt, im_shape[0], im_shape[1], 1)),
        keras.Input(shape=(nt, im_shape[0], im_shape[1], 3)),
    )
    PPN = ParaPredNet(args, im_height=im_shape[0], im_width=im_shape[1])  # [3, 48, 96, 192]
    outputs = PPN(inputs)
    PPN = keras.Model(inputs=inputs, outputs=outputs)

elif args["dataset"] == "driving":
    # These are driving specific input shapes
    inputs = (keras.Input(shape=(nt, im_shape[0], im_shape[1], 1)),
        keras.Input(shape=(nt, im_shape[0], im_shape[1], 3)),
        keras.Input(shape=(nt, im_shape[0], im_shape[1], 3)),
    )
    PPN = ParaPredNet(args, im_height=im_shape[0], im_width=im_shape[1])  # [3, 48, 96, 192]
    outputs = PPN(inputs)
    PPN = keras.Model(inputs=inputs, outputs=outputs)

else:
    # These are rolling_square specific input shapes
    inputs = keras.Input(shape=(nt, im_shape[0], im_shape[1], 3))
    PPN_layer = ParaPredNet(args, im_height=im_shape[0], im_width=im_shape[1])
    PPN_layer.output_mode = "Prediction"
    PPN_layer.continuous_eval = True
    outputs = PPN_layer(inputs)
    PPN = keras.Model(inputs=inputs, outputs=outputs)

resos = PPN.layers[-1].resolutions
PPN.compile(optimizer="adam", loss="mean_squared_error")
print("ParaPredNet compiled...")
PPN.build(input_shape=(None, nt) + im_shape)
print(PPN.summary())
num_layers = len(output_channels)  # number of layers in the architecture
print(f"{num_layers} PredNet layers with resolutions:")
for i in reversed(range(num_layers)):
    print(f"Layer {i+1}:  {resos[i][0]} x {resos[i][1]} x {output_channels[i]}")

# load previously saved weights
try: 
    PPN.load_weights(weights_file)
    print("Weights loaded successfully...")
except: 
    raise ValueError("Weights don't fit - exiting...")

# Load dataset - only working for animations
try:
    test_data = hkl.load(DATA_DIR + f"{args['data_subset']}_{args['data_subset_mode']}.hkl")[0]
except:
    png_paths = [DATA_DIR + f"{args['dataset']}/frames/{args['data_subset']}_{args['data_subset_mode']}/"]
    serialize_dataset(data_dirs, pfm_paths=[], pgm_paths=[], png_paths=png_paths, dataset_name=args['data_subset'], test_data=True)
    print("Dataset serialized...")
    test_data = hkl.load(DATA_DIR + f"{args['data_subset']}_{args['data_subset_mode']}.hkl")[0]
    print("Test data ready...")

td_len = test_data.shape[0]

Using TensorFlow backend
Working on dataset: general_shape_static
ParaPredNet compiled...
Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 1, 50, 50, 3)]    0         
                                                                 
 para_pred_net (ParaPredNet  (1, 1, 50, 50, 3)         6915948   
 )                                                               
                                                                 
Total params: 6915948 (26.38 MB)
Trainable params: 6915948 (26.38 MB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
None
4 PredNet layers with resolutions:
Layer 4:  6 x 6 x 192
Layer 3:  12 x 12 x 96
Layer 2:  25 x 25 x 48
Layer 1:  50 x 50 x 3
Weights loaded successfully...


In [None]:
# CALC MSE

mse_total = 0
mse_prev_total = 0
steps = 0
for i in range(0, td_len, 2):
    steps += 1
    # manually initialize PPN layer states
    PPN.layers[-1].init_layer_states()
    # if i < 123: continue
    for j in range(2):
        # ground_truth_image = next(dataset_iter)[0]
        print(f"Iteration {i+j+1}/{td_len}") if (i+j) % 100 == 0 else None
        ground_truth_image = np.reshape(test_data[i+j], (1, 1, *test_data.shape[1:]))
        if j == 0:
            prev_image = ground_truth_image
        predicted_image = PPN.layers[-1](ground_truth_image)
        if j == 1:
            error_image = ground_truth_image - predicted_image
            error_prev_image = ground_truth_image - prev_image
            mse = np.mean(error_image**2)
            mse_prev = np.mean(error_prev_image**2)
            mse_total += mse
            mse_prev_total += mse_prev

print(f"Average MSE: {mse_total/steps:.5f}")
print(f"Average MSE Prev: {mse_prev_total/steps:.5f}")

In [3]:
# SHOW SINGLE PLOT

plot_idx = np.random.randint(0, td_len)

fig, axs = plt.subplots(3, 3, figsize=(15, 15))
plt.show(block=False)
rg_colormap = LinearSegmentedColormap.from_list('custom_cmap', [(0, 'red'), (0.5, 'black'), (1, 'green')])

# for i in range(0, td_len, 3):
#     steps += 1
# manually initialize PPN layer states
PPN.layers[-1].init_layer_states()
i = plot_idx
for j in range(3):
    # ground_truth_image = next(dataset_iter)[0]
    print(f"Iteration {i+j+1}/{td_len}") if (i+j) % 100 == 0 else None
    ground_truth_image = np.reshape(test_data[i+j], (1, 1, *test_data.shape[1:]))
    predicted_image = PPN.layers[-1](ground_truth_image)
    error_image = ground_truth_image - predicted_image

    error_image_grey = np.mean(error_image, axis=-1, keepdims=True)
    # clear the axes
    for jj in range(3):
        axs[j, jj].cla()

    # print the two images side-by-side
    axs[j, 0].imshow(ground_truth_image[0,0,...])
    axs[j, 1].imshow(predicted_image[0,0,...])
    axs[j, 2].imshow(error_image_grey[0,0,...], cmap=rg_colormap)

    mse = np.mean(error_image**2)

    # add titles
    # if j == 0:
    axs[j, 0].set_title("Ground Truth")
    axs[j, 1].set_title("Predicted")
    axs[j, 2].set_title(f"Error, MSE: {mse:.3f}")
    
    fig.suptitle(f"Frame {i+j+1}/{td_len}")

    fig.canvas.draw()
    fig.canvas.flush_events()

    fig.savefig(RESULTS_SAVE_DIR+f"frame_{i+j+1}.png")

    # enable click-through plotting
    # plt.show(block=True)

    # Wait for user input to continue or close the current plot
    # user_input = input("Press enter to continue or type 'close' to close the plot and stop: ")
    # if user_input.lower() == 'close':
    #     plt.close()
    #     break

    # delay n seconds
plt.pause(200)

Iteration 1901/5000


In [None]:
# SHOW ALL PLOTS

# plot_idx = (td_len - 1) // np.random.randint(1, 100)

fig, axs = plt.subplots(3, 3, figsize=(15, 15))
plt.show(block=False)
rg_colormap = LinearSegmentedColormap.from_list('custom_cmap', [(0, 'red'), (0.5, 'black'), (1, 'green')])

for i in range(0, td_len, 3):
    # manually initialize PPN layer states
    PPN.layers[-1].init_layer_states()
    # if i < 123: continue
    for j in range(3):
        # ground_truth_image = next(dataset_iter)[0]
        print(f"Iteration {i+j+1}/{td_len}") if (i+j) % 100 == 0 else None
        ground_truth_image = np.reshape(test_data[i+j], (1, 1, *test_data.shape[1:]))
        predicted_image = PPN.layers[-1](ground_truth_image)
        error_image = ground_truth_image - predicted_image

        error_image_grey = np.mean(error_image, axis=-1, keepdims=True)
        # clear the axes
        for jj in range(3):
            axs[j, jj].cla()

        # print the two images side-by-side
        axs[j, 0].imshow(ground_truth_image[0,0,...])
        axs[j, 1].imshow(predicted_image[0,0,...])
        axs[j, 2].imshow(error_image_grey[0,0,...], cmap=rg_colormap)

        # add titles
        if j == 0:
            axs[j, 0].set_title("Ground Truth")
            axs[j, 1].set_title("Predicted")
            axs[j, 2].set_title(f"Error, MSE: {mse:.3f}")
    
    fig.suptitle(f"Frame {i+j+1}/{td_len}")

    fig.canvas.draw()
    fig.canvas.flush_events()

    fig.savefig(RESULTS_SAVE_DIR+f"frame_{i+j+1}.png")

    # enable click-through plotting
    # plt.show(block=True)

    # Wait for user input to continue or close the current plot
    # user_input = input("Press enter to continue or type 'close' to close the plot and stop: ")
    # if user_input.lower() == 'close':
    #     plt.close()
    #     break

    # delay n seconds
    # plt.pause(5)