In [None]:
import time
from pathlib import Path

import cv2
import matplotlib.pyplot as plt
import numpy as np
import onnx
import onnxruntime as rt
import pandas as pd
import tensorflow as tf
import tensorflow.keras as tfk
import tf2onnx
from sklearn.metrics import classification_report, confusion_matrix

In [None]:
images_folder = "/home/simone/workspace/fogna/datasets/ompi/SJZ4_HangClip/bad/"
images_format = "*.tiff"
model_folder = (
    "/home/simone/workspace/fogna/outputs/ompi/sjz4/cocky_euler_9fb06172/keras"
)

In [None]:
images_path = Path(images_folder)
images_list = list(images_path.rglob(images_format))

In [None]:
model = tf.keras.models.load_model(model_folder)
model.summary()

In [None]:
def attach_heatmap(model):
    last_conv_layer = model.layers[-2].layers[-2].get_output_at(0)
    weights = model.layers[-1].get_weights()[0]

    backbone_conv_model = tf.keras.Model(
        inputs=model.layers[-2].input, outputs=[last_conv_layer]
    )

    new_inputs = tf.keras.layers.Input(shape=model.input.shape[1:])
    conv_layer = backbone_conv_model(new_inputs)
    new_model = tf.keras.Model(inputs=new_inputs, outputs=[conv_layer], name="castrato")

    new_input = tf.keras.layers.Input(shape=model.input.shape[1:])
    pred = model(new_input)
    conv = new_model(new_input)
    pred_class = tf.math.argmax(pred, axis=1)

    reshaped_w = tf.expand_dims(weights, axis=0)
    class_weights = tf.transpose(
        tf.gather(reshaped_w, pred_class, axis=2), perm=[2, 1, 0]
    )

    output = tf.matmul(conv, class_weights)

    # output = tf.einsum("bijc,bck->bijk", heatmaps, class_weights)
    heatmaps = tf.image.resize(output, model.input.shape[1:3], method="bilinear")

    return tf.keras.Model(new_input, [pred, heatmaps])

    # last_conv_layer = model.layers[-2].layers[-2].get_output_at(0)
    # weights = model.layers[-1].get_weights()[0]

    # backbone_conv_model = tf.keras.Model(inputs=model.layers[-2].input, outputs=[last_conv_layer])

    # new_inputs = tf.keras.layers.Input(shape=model.input.shape[1:])
    # conv_layer = backbone_conv_model(new_inputs)
    # new_model = tf.keras.Model(inputs=new_inputs, outputs=[conv_layer], name="castrato")

    # new_input = tf.keras.layers.Input(shape=model.input.shape[1:])
    # pred = model(new_input)  # [0]
    # # print(pred.shape)
    # conv = new_model(new_input)
    # pred_class = tf.math.argmax(pred, axis=1)
    # print(pred_class.shape)
    # # print(weights.shape)
    # reshaped_w = tf.expand_dims(weights, axis=0)
    # # print(reshaped_w.shape)
    # # tiled_weights = tf.tile(reshaped_w, [tf.shape(new_input)[0], 1, 1])
    # # print(tiled_weights.shape)
    # # class_weights = tf.gather(tiled_weights, pred_class, axis=2)[:, :, 0]
    # class_weights = tf.transpose(tf.gather(reshaped_w, pred_class, axis=2), perm=[2, 1, 0])
    # # class_weights = tf.squeeze(class_weights, axis=2)
    # print(class_weights.shape)

    # heatmaps = tf.image.resize(
    #     conv, model.input.shape[1:3], method="bilinear"
    # )

    # print(heatmaps)

    # # output = tf.tensordot(
    # #     heatmaps,
    # #     class_weights,
    # #     axes=[[3], [1]],
    # # )# [:, :, :, 0, 0]

    # output = tf.einsum("bijc,bck->bijk", heatmaps, class_weights)
    # # output = tf.matmul(heatmaps, class_weights)

    # return tf.keras.Model(new_input, [pred, output])

In [None]:
last_conv_layer = model.layers[-2].layers[-2].get_output_at(0)
weights = model.layers[-1].get_weights()[0]

backbone_conv_model = tf.keras.Model(
    inputs=model.layers[-2].input, outputs=[last_conv_layer]
)

new_inputs = tf.keras.layers.Input(shape=model.input.shape[1:])
conv_layer = backbone_conv_model(new_inputs)
new_model = tf.keras.Model(inputs=new_inputs, outputs=[conv_layer], name="castrato")

new_input = tf.keras.layers.Input(shape=model.input.shape[1:])
pred = model(new_input)
conv = new_model(new_input)

In [None]:
pred_class = tf.math.argmax(pred, axis=1)
reshaped_w = tf.reshape(weights, [-1, *weights.shape])
tmp = tf.gather(reshaped_w, pred_class, axis=2)
class_weights = tf.transpose(tmp, perm=[2, 1, 0])

output = tf.einsum(
    "bijc,bck->bijk", conv, class_weights
)  # tf.matmul(conv, class_weights)
heatmaps = tf.image.resize(
    output, model.input.shape[1:3], method="bilinear", name="heatmap"
)
heatmaps = tf.cast(
    (heatmaps - tf.reduce_min(heatmaps, axis=[1, 2], keepdims=True))
    / (tf.reduce_max(heatmaps) - tf.reduce_min(heatmaps, axis=[1, 2], keepdims=True))
    * 255,
    dtype=tf.uint8,
)

In [None]:
heatmaps.shape

In [None]:
model_heat = tf.keras.Model(new_input, [pred, heatmaps])

# Test Evaluation

In [None]:
test_df_file = (
    "/home/simone/workspace/fogna/outputs/ompi/sjz4/cocky_euler_9fb06172/test.csv"
)
test_df = pd.read_csv(test_df_file)

In [None]:
test_df.head()

In [None]:
test_datagenerator = tfk.preprocessing.image.ImageDataGenerator()
test_data = test_datagenerator.flow_from_dataframe(
    test_df,
    x_col="filename",
    y_col="label",
    target_size=(448, 448),
    batch_size=32,
    seed=171717,
    class_mode="categorical",
    subset="training",
    shuffle=False,
)

In [None]:
results = model.evaluate(test_data, return_dict=True)

In [None]:
y_pred = model.predict(test_data)

In [None]:
y_true = test_data.labels

In [None]:
y_pred_lbl = np.argmax(y_pred, axis=1)

In [None]:
cm = confusion_matrix(y_true, y_pred_lbl)
print(cm)

In [None]:
cr = classification_report(y_true=y_true, y_pred=y_pred_lbl, digits=4)
print(cr)

# Heatmap

In [None]:
final_model = attach_heatmap(model)

In [None]:
final_model = model_heat

In [None]:
final_model.outputs

In [None]:
i = 13
img = cv2.imread(str(images_list[i]), cv2.IMREAD_COLOR)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (224, 224))

heat = final_model(img[np.newaxis, ...])

print(heat[0][0])

fig, ax = plt.subplots(1, 2, figsize=(16, 9))
ax[0].imshow(img)
ax[1].imshow(img)
ax[1].imshow(heat[1][0, :], cmap="jet", alpha=0.5)

In [None]:
heat

In [None]:
i = 13
img1 = cv2.imread(str(images_list[i]), cv2.IMREAD_COLOR)
img2 = cv2.imread(str(images_list[i + 1]), cv2.IMREAD_COLOR)
# img3 = cv2.imread(str(images_list[i+2]), cv2.IMREAD_COLOR)
img1 = cv2.resize(img1, (224, 224))
img2 = cv2.resize(img2, (224, 224))
# img3 = cv2.resize(img3, (224, 224))

img = np.stack([img1, img2])

# img = np.stack([img1, img2, img3])

heat = final_model(img)

print(heat[0])

fig, ax = plt.subplots(2, 2, figsize=(16, 9))
ax[0][0].imshow(img[0, :])
ax[0][1].imshow(img[0, :])
ax[0][1].imshow(heat[1][0, :], cmap="jet", alpha=0.5)
ax[1][0].imshow(img[1, :])
ax[1][1].imshow(img[1, :])
ax[1][1].imshow(heat[1][1, :], cmap="jet", alpha=0.5)

In [None]:
img.shape

# Convert to ONNX

In [None]:
input_signature = [
    tf.TensorSpec([None, *model.input_shape[1:]], tf.float32, name="inputs")
]

In [None]:
opset = 13
onnx_model, _ = tf2onnx.convert.from_keras(final_model, input_signature, opset=opset)

In [None]:
onnx.save(onnx_model, "heatmap.onnx")

In [None]:
session = rt.InferenceSession("heatmap.onnx", providers=["CPUExecutionProvider"])

In [None]:
for ix, i in enumerate(session.get_inputs()):
    print(f"input {ix}: {i}")

for ix, i in enumerate(session.get_outputs()):
    print(f"output {ix}: {i}")

In [None]:
inp = np.array(img, dtype=np.float32)
inputs = {"inputs": inp[np.newaxis, ...]}
outputs = session.run(None, inputs)

In [None]:
plt.imshow(img)
plt.imshow(outputs[1][0], cmap="jet", alpha=0.5)

In [None]:
inp = np.array(img, dtype=np.float32)
inputs = {"inputs": inp[np.newaxis, ...]}
st = time.time()
outputs = session.run(None, inputs)
print(f"elapsed time: {time.time() - st}")

In [None]:
model_folder = "/home/simone/workspace/fogna/outputs/ompi/sjz4/cocky_euler_9fb06172/onnx/cocky_euler_9fb06172.onnx"
session = rt.InferenceSession(model_folder, providers=["CPUExecutionProvider"])

In [None]:
inp = np.array(img, dtype=np.float32)
inputs = {"inputs": inp[np.newaxis, ...]}
st = time.time()
outputs = session.run(None, inputs)
print(f"elapsed time: {time.time() - st}")

# GPU ONNX

In [None]:
session = rt.InferenceSession("heatmap.onnx", providers=["CUDAExecutionProvider"])

In [None]:
i = 34
img = cv2.imread(str(images_list[i]), cv2.IMREAD_COLOR)
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, (224, 224))
inp = np.array(img, dtype=np.float32)
inputs = {"inputs": inp[np.newaxis, ...]}
st = time.time()
outputs = session.run(None, inputs)
print(f"elapsed time: {time.time() - st}")

In [None]:
outputs[1].shape