In [1]:
# -*- coding: utf-8 -*-
"""rise_st_multiplicative_norm_zero_cineca.ipynb

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1Vxe74LFV5mUpC-IZp8BSjnQnJbXwjopw

### ***Cineca***
"""

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import activations
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.models import load_model
from keras import activations
import numpy as np
import pickle
import os
from keras.models import load_model
from skimage.transform import resize
from tqdm import tqdm
import copy
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error

"""
##### ***Data & Black-Box***

"""

# IMPORTO I DATI PER VOTTIGNASCO


# Ottieni il percorso effettivo da una variabile d'ambiente
work_path = os.environ['WORK']  # Ottieni il valore della variabile d'ambiente WORK
v_test_OHE_path = os.path.join(work_path, "Water_Resources/rise-video/data/Vottignasco/Vottignasco_00425010001_test_month_OHE.npy")
v_test_image_path = os.path.join(work_path, "Water_Resources/rise-video/data/Vottignasco/Vottignasco_00425010001_test_normalized_image_sequences.npy")
v_test_target_dates_path = os.path.join(work_path, "Water_Resources/rise-video/data/Vottignasco/Vottignasco_00425010001_test_target_dates.npy")

# Carica l'array numpy dai file
vottignasco_test_OHE    = np.load(v_test_OHE_path)
vottignasco_test_image  = np.load(v_test_image_path)
vottignasco_test_dates  = np.load(v_test_target_dates_path)


print(len(vottignasco_test_dates))
print(len(vottignasco_test_image))
print(len(vottignasco_test_OHE))

#print(vottingasco_test_OHE[0], "\n")
#print(vottignasco_test_image[0][0], "\n")

# """##### ***Black Boxes***"""


# Se vuoi abilitare il dropout a runtime
mc_dropout = True

# Definizione della classe personalizzata doprout_custom
class doprout_custom(tf.keras.layers.SpatialDropout1D):
    def call(self, inputs, training=None):
        if mc_dropout:
            return super().call(inputs, training=True)
        else:
            return super().call(inputs, training=False)

# Percorso della directory su Cineca
base_dir = os.path.join(os.environ['WORK'], "Water_Resources/rise-video/trained_models/seq2val/Vottignasco")
lstm_suffix = 'time_dist_LSTM'

vott_lstm_models = []

def extract_index(filename):
    """Funzione per estrarre l'indice finale dal nome del file."""
    return int(filename.split('_LSTM_')[-1].split('.')[0])

# Trova tutti i file .keras nella cartella e li aggiunge alla lista
for filename in os.listdir(base_dir):
    if lstm_suffix in filename and filename.endswith(".keras"):
        vott_lstm_models.append(os.path.join(base_dir, filename))

# Ordina i modelli in base all'indice finale
vott_lstm_models = sorted(vott_lstm_models, key=lambda x: extract_index(os.path.basename(x)))

# Lista per i modelli caricati
vott_lstm_models_loaded = []

for i, model_lstm_path in enumerate(vott_lstm_models[:10]):  # Prendo i primi 10 modelli ordinati
    #print(f"Caricamento del modello LSTM {i+1}: {model_lstm_path}")

    # Carico il modello con la classe custom
    model = load_model(model_lstm_path, custom_objects={"doprout_custom": doprout_custom})

    # Aggiungo il modello alla lista
    vott_lstm_models_loaded.append(model)

print(vott_lstm_models_loaded)

"""### ***RISE-Spatio_Temporal***

#### ***Generation Masks (3D): Uniforme***
"""

import numpy as np
from scipy.interpolate import RegularGridInterpolator
from tqdm import tqdm

def generate_masks_3d(N, input_size, seed=42, **kwargs):
    """
    Parametri:
    - input_size: (t, h, w) -> dimensione finale della maschera 3D (time, Height, Width)
    """
    t, h, w = input_size
    l = kwargs.get("l", 8)   # Dimensione della small mask per il tempo
    s = kwargs.get("s", 8)   # Fattore per downsampling spaziale (per h e w)
    p1 = kwargs.get("p1", 0.4)  # Probabilità di attivazione

    np.random.seed(seed)

    # Calcola le dimensioni spaziali della small mask usando solo h e w
    small_s = np.ceil(np.array([h, w]) / s).astype(int)

    # Genera una maschera 3D casuale di dimensione (l, small_h, small_w)
    grid = np.random.rand(N, l, small_s[0], small_s[1]) < p1
    grid = grid.astype('float32')

    # Struttura per le maschere finali di dimensione (N, t, h, w)
    masks = np.empty((N, t, h, w))

    # Coordinate per l'interpolazione spaziale
    grid_x = np.linspace(0, small_s[0] - 1, small_s[0])
    grid_y = np.linspace(0, small_s[1] - 1, small_s[1])
    grid_t = np.linspace(0, l - 1, l)

    for i in tqdm(range(N), desc='Generating masks'):
      # Crea un interpolatore per la maschera corrente
      interpolator = RegularGridInterpolator(
        (grid_t, grid_x, grid_y), grid[i], method='linear', bounds_error=False, fill_value=0
      )
      new_t = np.linspace(0, l - 1, t)
      new_x = np.linspace(0, small_s[0] - 1, h)
      new_y = np.linspace(0, small_s[1] - 1, w)
      mesh_t, mesh_x, mesh_y = np.meshgrid(new_t, new_x, new_y, indexing='ij')
      points = np.stack((mesh_t, mesh_x, mesh_y), axis=-1).reshape(-1, 3)
      interpolated_mask = interpolator(points)
      masks[i] = interpolated_mask.reshape(t, h, w)


    masks = masks[~(masks == 0).all(axis=(1, 2, 3))]
    return masks

"""#### ***Application Masks***"""

def multiplicative_uniform_noise_onechannel(images, masks, channel, **kwargs):
    """
    Applica noise moltiplicativo continuo in 3D alle immagini sulla base delle maschere fornite.

    Parametri:
    - images: array di shape (104, 5, 8, C) -> serie temporale di immagini con canali multipli
    - masks: array di shape (N, 104, 5, 8) -> maschere continue con valori [0, 1]
    - channel: canale specifico su cui applicare il noise (0: Prec, 1: TMax, 2: TMin)
    - std_zero_value: valore del disturbo da applicare dove mask = 0
    """
    std_zero_value = kwargs.get("std_zero_value", -0.6486319166678826)

    masked = []

    # Itera su tutte le N maschere generate
    for mask in masks:
        masked_images = copy.deepcopy(images)  # Copia profonda delle immagini originali

        # Estrai solo il canale desiderato
        channel_values = masked_images[..., channel]

        # Applica la formula: v(p) + z (1-p)
        perturbed_values = channel_values * mask + (1 - mask) * std_zero_value

        # Sovrascrivi il canale con i valori perturbati
        masked_images[..., channel] = perturbed_values

        masked.append(masked_images)

    return masked

"""#### ***Prediction with Black-Box***"""

def ensemble_predict(models, images, x3_exp):
    # Se images è una lista, calcoliamo la lunghezza
    if isinstance(images, list):
        len_x3 = len(images)
    else:
        len_x3 = 1
        images = [images]  # Rendi images una lista con un solo elemento

    # Conversione in tensori
    Y_test = tf.stack([tf.convert_to_tensor(img, dtype=tf.float32) for img in images])
    Y_test_x3 = tf.tile(tf.expand_dims(tf.convert_to_tensor(x3_exp, dtype=tf.float32), axis=0), [len_x3, 1, 1])

    # Inizializza una lista per raccogliere le predizioni
    all_preds = []

    # Itera attraverso i modelli e raccogli le predizioni
    for model in models:
        preds = model.predict([Y_test, Y_test_x3], verbose=0)
        all_preds.append(preds)

    # Converte la lista di predizioni in un tensore di TensorFlow
    all_preds_tensor = tf.stack(all_preds)

    # Calcola la media lungo l'asse dei modelli (asse 0)
    mean_preds = tf.reduce_mean(all_preds_tensor, axis=0)

    return mean_preds.numpy()

"""#### ***Saliency Map***"""

def calculate_saliency_map(preds_array, masks):
    """
    Calcola la mappa di salienza media data una serie di predizioni e maschere.

    :param preds_array: Array di predizioni (numero di maschere x dimensioni predizione).
    :param masks: Array di maschere (numero di maschere x dimensioni maschera).
    :return: Mappa di salienza media.
    """
    sal = []
    for j in range(len(masks)):
        sal_i = preds_array[j] * np.abs(masks[j])
        sal.append(sal_i.reshape(-1, 5, 8))  # Adatta la shape secondo il formato orginiale dei frame

    # Rimuovere le dimensioni extra per fare np.mean lungo axis=0. masks ha shape (N,5,8,1)
    masks_squeezed = np.squeeze(np.abs(masks))
    # Ora calcola la media lungo l'asse 0
    ev_masks = np.mean(masks_squeezed, axis=0)
    ev_masks[ev_masks == 0] = 1e-8  # Sostituiamo gli zeri con un piccolo valore

    sal = (1/ev_masks) * np.mean(sal, axis=0)

    return sal

2025-03-05 11:53:03.555032: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-03-05 11:53:04.739439: I external/local_tsl/tsl/cuda/cudart_stub.cc:31] Could not find cuda drivers on your machine, GPU will not be used.
2025-03-05 11:53:08.480897: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2025-03-05 11:53:08.484102: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2025-03-05 11:53:08.936447: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to

105
105
105
[<keras.src.engine.functional.Functional object at 0x7f088e7bbcd0>, <keras.src.engine.functional.Functional object at 0x7f0887bdba00>, <keras.src.engine.functional.Functional object at 0x7f0887b4cfa0>, <keras.src.engine.functional.Functional object at 0x7f08879a6a70>, <keras.src.engine.functional.Functional object at 0x7f0887a2b700>, <keras.src.engine.functional.Functional object at 0x7f0887b7f7f0>, <keras.src.engine.functional.Functional object at 0x7f08878c6b30>, <keras.src.engine.functional.Functional object at 0x7f08879f3910>, <keras.src.engine.functional.Functional object at 0x7f08877c1030>, <keras.src.engine.functional.Functional object at 0x7f08877ffa00>]


In [None]:
"""#### ***Spatio_Temporal-RISE: Framework***"""

# Funzione sigmoide
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def rise_st_explain_sigmoide(nr_instance, data_test_image, data_test_OHE, models, channel,
                                   N, generate_masks_fn, seed, perturb_instance_fn, calculate_saliency_map_fn, **kwargs):
  print(f"############################### RISE-Temporal on Instance #{nr_instance} ###########################")
  instance    = copy.deepcopy(data_test_image[nr_instance])
  x3_instance = copy.deepcopy(data_test_OHE[nr_instance])

  input_size = (instance.shape[0], instance.shape[1], instance.shape[2])

  masks = generate_masks_fn(N, input_size, seed, **kwargs)
  perturbed_instances = perturb_instance_fn(instance, masks, channel)

  # Predizione su Istanza Originale
  pred_original = ensemble_predict(models, instance, x3_instance)
  # Predizioni su Istanze Perturbate
  preds_masked = ensemble_predict(models, perturbed_instances, x3_instance)

  # Differenza tra predizione originale e perturbata
  diff_pred = [abs(pred_original - pred_masked) for pred_masked in preds_masked]
  weights_array = np.concatenate(diff_pred, axis=0)

  # Standardizzazione (z-score normalization)
  mean = np.mean(weights_array)
  std = np.std(weights_array)
  standardized_weights = (weights_array - mean) / std
  # Applicare la sigmoide ai pesi standardizzati
  normalized_weights_std = sigmoid(standardized_weights)

  # Calcolo della mappa di salienza
  saliency_map_i = calculate_saliency_map_fn(1-normalized_weights_std, masks)
  print(f"############### Processo completato. Mappa di salienza generata per Istanza #{nr_instance} ###############")
  return np.squeeze(saliency_map_i)

In [6]:
N = 10
l=10 
s=2
p=0.5

seed = 42

channel = 0
nr_instance = 0
models = vott_lstm_models_loaded

instance    = copy.deepcopy(vottignasco_test_image[nr_instance])
x3_instance = copy.deepcopy(vottignasco_test_OHE[nr_instance])

input_size = (instance.shape[0], instance.shape[1], instance.shape[2])

masks = generate_masks_3d(N, input_size, seed, l=l, s=s, p1=p)
perturbed_instances = multiplicative_uniform_noise_onechannel(instance, masks, channel)

# Predizione su Istanza Originale
pred_original = ensemble_predict(models, instance, x3_instance)
# Predizioni su Istanze Perturbate
preds_masked = ensemble_predict(models, perturbed_instances, x3_instance)

Generating masks: 100%|██████████| 10/10 [00:00<00:00, 38.66it/s]


In [9]:
pred_original

array([[0.22002165]], dtype=float32)

In [18]:
preds_masked

array([[1.4628854],
       [1.6318104],
       [1.649078 ],
       [1.7874655],
       [1.1747181],
       [1.5245184],
       [0.93771  ],
       [1.6932075],
       [1.5337497],
       [1.8104208]], dtype=float32)

In [23]:
np.abs(pred_original - preds_masked)

array([[1.2428638],
       [1.4117888],
       [1.4290564],
       [1.5674438],
       [0.9546965],
       [1.3044968],
       [0.7176883],
       [1.4731859],
       [1.3137281],
       [1.5903991]], dtype=float32)

In [25]:
diff_pred = [abs(pred_original - pred_masked) for pred_masked in preds_masked]

kernel_width = np.percentile(diff_pred, 90)
# Importanza vicini
weights = np.exp(- (np.array(diff_pred) ** 2) / (kernel_width ** 2))

weights

array([[[0.53425026]],

       [[0.44535634]],

       [[0.43657795]],

       [[0.36895615]],

       [[0.6908086 ]],

       [[0.5012717 ]],

       [[0.81136775]],

       [[0.4144658 ]],

       [[0.49637893]],

       [[0.3582602 ]]], dtype=float32)

In [22]:
distances = np.abs(pred_original - preds_masked)
kernel_width = np.percentile(distances, 90)
# Importanza vicini
weights = np.exp(- (np.array(distances) ** 2) / (kernel_width ** 2))

weights

array([[0.53425026],
       [0.44535634],
       [0.43657795],
       [0.36895615],
       [0.6908086 ],
       [0.5012717 ],
       [0.81136775],
       [0.4144658 ],
       [0.49637893],
       [0.3582602 ]], dtype=float32)