In [1]:
import os
import cplex
import pandas as pd 
from time import time
import tensorflow as tf
import docplex.mp.model as mp
from docplex.mp import model_reader
from typing import List, Tuple, Any
from docplex.mp.constr import LinearConstraint 
from utils.explanations import get_minimal_explanation, get_explanation_relaxed

Explicar instancia

In [9]:
# @title explain_instance
def explain_instance(
    initial_network,
    configuration: dict,
    instance_index: int,
    data: pd.DataFrame,
    model_h5,
    n_classes
) -> Tuple[List[LinearConstraint], mp.Model, int]:
    method = configuration["method"]
    (
        mdl_milp_with_binary_variable,
        output_bounds_binary_variables,
        bounds,
    ) = initial_network
    network_input = data.iloc[instance_index, :-1]
    # print(network_input)  # network_input = instance
    network_input = tf.reshape(tf.constant(network_input), (1, -1))
    network_output = model_h5.predict(tf.constant(network_input))[0]
    network_output = tf.argmax(network_output)
    mdl_aux = mdl_milp_with_binary_variable.clone()
    (explanation, model) = get_minimal_explanation(
        mdl_aux,
        network_input,
        network_output,
        n_classes=n_classes,
        method=method,
        output_bounds=output_bounds_binary_variables
    )

    return (explanation, model, network_output)

In [10]:
# @title explain_instance_relaxed
def explain_instance_relaxed(
    initial_network,
    initial_network_relaxed,
    configuration: dict,
    instance_index: int,
    data: pd.DataFrame,
    model_h5,
    n_classes,
    delta=1.0
) -> Tuple[List[LinearConstraint], mp.Model]:
    method = configuration["method"]
    (
        mdl_milp_with_binary_variable,
        output_bounds_binary_variables,
        bounds,
    ) = initial_network

    model_milp_relaxed, output_bounds_relaxed = initial_network_relaxed
    network_input = data.iloc[instance_index, :-1]
    # print(network_input)  # network_input = instance
    network_input = tf.reshape(tf.constant(network_input), (1, -1))
    network_output = model_h5.predict(tf.constant(network_input))[0]
    network_output = tf.argmax(network_output)

    mdl_aux = model_milp_relaxed.clone()

    (explanation, model) = get_explanation_relaxed(
        mdl_aux,
        network_input,
        network_output,
        n_classes=n_classes,
        # method=method,
        # output_bounds=output_bounds_binary_variables,
        delta=delta,
        # min_max_results_path_relaxed_global = min_max_results_path_relaxed_
    )

    return (explanation, model)

Utils

In [11]:
def export_milp_as_lp(mdl: mp.Model, file: str):
  mdl.export_as_lp(f"{file}")

def read_cplex_model(file: str):
  return cplex.Cplex(file)

def convert_string_to_pixel(pixel_str: str, matrix_size: tuple[int, int]) -> tuple[int, int]:
    # Remover o prefixo "x_"
    pixel_number = int(pixel_str.split("_")[1])
    # Obter as dimensões da matriz
    rows, cols = matrix_size
    # Calcular as coordenadas do pixel
    row_index = pixel_number // cols
    col_index = pixel_number % cols
    return row_index, col_index
  
def get_coordinates_from_explanation(
  explanation: list[LinearConstraint],
  matrix_size: tuple[int, int],
) -> list[tuple[int, int]]:
    coordinates = []
    for constraint in explanation:
        # Extrair coordenadas da variável associada à restrição linear
        variable_name = constraint.left_expr.name
        x, y = convert_string_to_pixel(variable_name, matrix_size)
        coordinates.append((x, y))
    return coordinates

def create_directory(path_to_create:str):
  array_splited = path_to_create.split('/')
  min_max_path_full = './'
  for path in array_splited:
    if path == '':
      continue
    min_max_path_full = f'{min_max_path_full}/{path}'
    if not os.path.exists(min_max_path_full):
      os.makedirs(min_max_path_full)
  return array_splited

Benchmark

In [12]:

def benchmark_instance( 
    initial_network: Any,
    initial_network_relaxed: Any,
    model_h5: Any,
    n_classes: int,
    data: pd.DataFrame,
    instance_index: int,
    resultados: pd.DataFrame,
    file_result:str,
    delta = 0.1):
 
    (
        mdl_milp_with_binary_variable,
        output_bounds_binary_variables,
        bounds,
    ) = initial_network
    
    network_input = data.iloc[instance_index, :-1]
    network_input = tf.reshape(tf.constant(network_input), (1, -1))
    network_output = model_h5.predict(tf.constant(network_input))[0]
    network_output = tf.argmax(network_output)
    
    # Relaxed
    model_milp_relaxed, output_bounds_relaxed = initial_network_relaxed
    mdl_aux_2 = model_milp_relaxed.clone()
    start_time = time()
    (explanation_relaxed_global, model_relaxed_global) = get_explanation_relaxed(
        mdl_aux_2,
        network_input,
        network_output,
        n_classes=n_classes,
        # method=method,
        # output_bounds=output_bounds_binary_variables,
        delta=1,
        # min_max_results_path_relaxed_global = 'min_max_results_path_relaxed_'
    )
    end_time = time()
    time_relaxed_global = end_time - start_time
    len_relaxed_global = len(explanation_relaxed_global)

    
    
    # Original
    #mdl_aux = mdl_milp_with_binary_variable.clone()
    #start_time = time()
    #(explanation, model) = get_minimal_explanation(
    #    mdl_aux,
    #    network_input,
    #    network_output,
    #    n_classes=n_classes,
    #    method=method,
    #    output_bounds=output_bounds_binary_variables,
    #    min_max_results_path_original = 'min_max_results_path_original'
    #)
    #end_time = time()
    #tempo_original = end_time - start_time
    #len_original = len(explanation)
    #
    
    
    len_resultados = len(resultados)
    resultados.loc[len_resultados] = [
        instance_index,
        
        None, #tempo_original,
        None,
        time_relaxed_global,
        
        None, #len_original, 
        None,
        len_relaxed_global,
        
        delta,
        
        None, #get_coordinates_from_explanation(explanation, matrix_size) if explanation is not None else None,
        None,
        explanation_relaxed_global, # get_coordinates_from_explanation(explanation_relaxed_global, matrix_size),
        # explanation_relaxed_global, 
    ]
    resultados.to_csv(f'{file_result}', index=False)

In [None]:
config = {
  "name": "digits",
  "n_classes": 10,
}
dataset_name, n_classes = config["name"], config["n_classes"]

data_train = pd.read_csv(f"datasets/{config['name']}/train.csv")
data_test = pd.read_csv(f"datasets/{config['name']}/test.csv")
data = pd.concat([data_train, data_test])


In [None]:
initial_network = None
initial_network_relaxed = None
configurations = None
model_h5 = None 
instance_index = 1
resultados = pd
file_result = f'{file_result}/df.csv'
delta = None

In [None]:
benchmark_instance( 
initial_network = initial_network,
initial_network_relaxed = initial_network_relaxed,
model_h5 = model_h5,
n_classes = n_classes,
data = data,
instance_index = instance_index,
resultados = resultados,
file_result = file_result,
delta = None

)

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Parâmetros dados
dvc = 50
delta = 0.05
log_delta_inv = np.log(1 / delta)

# Função de crescimento do conjunto de hipóteses (aproximação)
def mH(N, dvc):
    return N**dvc

# Valores de N (ajustado para começar de 2)
N_values = np.arange(2, 10001)

# Cálculo dos limites (com tratamento de erros)
def safe_log(x):
    with np.errstate(divide='ignore', invalid='ignore'):
        return np.where(x <= 0, 0, np.log(x))

vc_bound = np.sqrt((8 / N_values) * safe_log(4 * mH(2 * N_values, dvc) / delta))
rademacher_bound = np.sqrt((2 * safe_log(2 * N_values * mH(N_values, dvc)) / N_values)) + np.sqrt((2 * log_delta_inv) / N_values) + 1 / N_values
parrondo_bound = np.sqrt((1 / N_values) * (2 * vc_bound + safe_log(6 * mH(2 * N_values, dvc) / delta)))
devroye_bound = np.sqrt((1 / (2 * N_values)) * (4 * vc_bound * (1 + vc_bound) + safe_log(4 * mH(N_values**2, dvc) / delta)))

# Plotando os limites
plt.figure(figsize=(12, 8))
plt.plot(N_values, vc_bound, label='Original VC Bound')
plt.plot(N_values, rademacher_bound, label='Rademacher Penalty Bound')
plt.plot(N_values, parrondo_bound, label='Parrondo and Van den Broek')
plt.plot(N_values, devroye_bound, label='Devroye')
plt.xlabel('N')
plt.ylabel('Bound')
plt.title('Bounds on Generalization Error as a Function of N')
plt.legend()
plt.grid(True)
plt.show()

# Verificando os valores para N = 10.000
N_large = 10000
vc_large = np.sqrt((8 / N_large) * safe_log(4 * mH(2 * N_large, dvc) / delta))
rademacher_large = np.sqrt((2 * safe_log(2 * N_large * mH(N_large, dvc)) / N_large)) + np.sqrt((2 * log_delta_inv) / N_large) + 1 / N_large
parrondo_large = np.sqrt((1 / N_large) * (2 * vc_large + safe_log(6 * mH(2 * N_large, dvc) / delta)))
devroye_large = np.sqrt((1 / (2 * N_large)) * (4 * vc_large * (1 + vc_large) + safe_log(4 * mH(N_large**2, dvc) / delta)))

(vc_large, rademacher_large, parrondo_large, devroye_large)
