In [None]:
import onnx
import torch
import numpy as np
import pandas as pd  
import copy
import pprint
import json
import time

from onnx2pytorch import ConvertModel
from torch import nn

import sys
sys.path.append('.')
sys.path.append('..')

from algorithms.iterative_relaxation import IterativeRelaxation
from algorithms.decision_procedure import MarabouCoreDP
from algorithms.decision_tree import DecisionTree

from models.test_models import ProphecyPaperNetwork, TestModel
from models.acasxu_1_1 import Acasxu1_1
from models.utils import attach_relu_activation_hook, attach_layer_output_hook, get_layers_info
from models.utils import turn_bool_activation_to_int, turn_bool_activation_to_str


torch.set_printoptions(precision=8) 

In [None]:
model = Acasxu1_1()
model.load_state_dict(torch.load('../models/acasxu_1_1.pt'))
_act_handles, activation_signature = attach_relu_activation_hook(model)

In [None]:
model

In [None]:
from properties.read_vnnlib import read_vnnlib
from pathlib import Path
vnnlib_path = Path("../properties/prop_1.vnnlib") # acasxu
# vnnlib_path = Path("./network_properties/prop_0_0.03.vnnlib") # MNIST

input_ranges, specifications = read_vnnlib(vnnlib_path)[0]
print(input_ranges)
print(specifications)

## Test input property

In [None]:
_act_handles, activation_signature = attach_relu_activation_hook(model)

In [None]:
input_data = [[0.62, 0.1, 0.2, 0.47, -0.48]]
X = torch.tensor(input_data, dtype=torch.float)
_logits = model(X)

activation_signature = turn_bool_activation_to_str(activation_signature)
for layer_name, activations in activation_signature.items():
  activation_signature[layer_name] = activations[0]
  
print(activation_signature)

## 0. Prepare ACASXU dataset

In [None]:
# !wget https://raw.githubusercontent.com/safednn-nasa/prophecy_DNN/master/clusterinACAS_0_short.csv -O ./datasets/clusterinACAS_0_shrt.csv

In [None]:
acas_train = np.empty([384221,5],dtype=float)
acas_train_labels = np.zeros(384221,dtype=int)

def read_inputs_from_file(inputFile):
  global acas_train, acas_train_labels, num
  with open(inputFile) as f:
    lines = f.readlines()
    print(len(lines), "examples")
    acas_train = np.empty([len(lines),5],dtype=float)
    acas_train_labels = np.zeros(len(lines),dtype=int)

    for l in range(len(lines)):
      # This is to remove the useless 1 at the start of each string. Not sure why that's there.
      k = [float(stringIn) for stringIn in lines[l].split(',')] 
      
      # acas_train[l+num] = np.zeros(5,dtype=float) 
      # we're asuming that everything is 2D for now. The 1 is just to keep numpy happy.
      if len(k) > 5:
        lab = int(k[5])
        #if ((lab == 0) or (lab == 2)):
        #  lab = 0
        #else:
        #  lab = 1
        acas_train_labels[l+num] = lab

      count = 0
      for i in range(0,5):
        #print(count)
        acas_train[l+num][i] = k[i]
        #print(k[i])

In [None]:
num = 0
read_inputs_from_file('../datasets/clusterinACAS_0_shrt.csv')
print(acas_train.shape)
print(acas_train_labels.shape)
acas_train[:5]

In [None]:
def create_df(inputs, predicted_labels, true_labels, activation_signature):
  data = []
  for index, input_data in enumerate(inputs):
    data_point = { 
      "input": input_data, 
      "true_label": true_labels[index], 
      "predicted_label": predicted_labels[index].item(),
    }
    data_point_full_signature = {} 
    for name, layer_activation in activation_signature.items():
      data_point[name] = json.dumps(layer_activation[index])
      data_point_full_signature[name] = layer_activation[index]
      
    data_point['full_signature'] = json.dumps(data_point_full_signature)
    data.append(data_point)

  return pd.DataFrame(data)

outputs = model(torch.tensor(acas_train, dtype=torch.float32))
predicted_labels = torch.argmin(outputs, dim=1)
activation_signature = turn_bool_activation_to_int(activation_signature, to_list=True)
df = create_df(acas_train, predicted_labels, acas_train_labels, activation_signature)
df.head(10)

In [None]:
# test accuracy
len(df[df["predicted_label"] == df['true_label']]) / (len(df))

## 1. Input properties

### 1.1 With DP 

In [None]:
# Prepare candidate input property for each class 
# For example: specification for output = class 0, meaning constraints for NOT class 0
# since the result is argmin, this would mean we'll encode constraints for y0 NOT being the min value
# i.e. y1 <= y0 or y2 <= y0 or y3 <= y0 or y4 <= y0
specification_for_classes = {
  0: [(np.array([[-1, 1, 0, 0, 0]]), np.array([0])),
      (np.array([[-1, 0, 1, 0, 0]]), np.array([0])),
      (np.array([[-1, 0, 0, 1, 0]]), np.array([0])),
      (np.array([[-1, 0, 0, 0, 1]]), np.array([0]))],
  
  1: [(np.array([[1, -1, 0, 0, 0]]), np.array([0])),
      (np.array([[0, -1, 1, 0, 0]]), np.array([0])),
      (np.array([[0, -1, 0, 1, 0]]), np.array([0])),
      (np.array([[0, -1, 0, 0, 1]]), np.array([0]))],
  
  2: [(np.array([[1, 0, -1, 0, 0]]), np.array([0])),
      (np.array([[0, 1, -1, 0, 0]]), np.array([0])),
      (np.array([[0, 0, -1, 1, 0]]), np.array([0])),
      (np.array([[0, 0, -1, 0, 1]]), np.array([0]))],
  
  3: [(np.array([[1, 0, 0, -1, 0]]), np.array([0])),
      (np.array([[0, 1, 0, -1, 0]]), np.array([0])),
      (np.array([[0, 0, 1, -1, 0]]), np.array([0])),
      (np.array([[0, 0, 0, -1, 1]]), np.array([0]))],
  
  4: [(np.array([[1, 0, 0, 0, -1]]), np.array([0])),
      (np.array([[0, 1, 0, 0, -1]]), np.array([0])),
      (np.array([[0, 0, 1, 0, -1]]), np.array([0])),
      (np.array([[0, 0, 0, 1, -1]]), np.array([0]))],
}

candidates = {}
input_ranges = {}
samples = {}

for y_class in range(5):
  # Group the DataFrame by 'full_signature' column and get the group sizes
  # Get the signature with the highest support
  class_df = df[df['predicted_label'] == y_class]
  group_sizes = class_df.groupby('full_signature').size()
  activation_with_max_support = group_sizes.idxmax()
  activation_pattern = json.loads(activation_with_max_support)
  
  # Turn activation from 0 and 1 to ON and OFF to match DP's format
  for layer, activation in activation_pattern.items():
    activation_pattern[layer] = ["ON" if val == 1 else "OFF" for val in activation]
  candidates[y_class] = activation_pattern
  
  # Find input ranges for the candidate property
  ranges = []
  support_df = class_df[class_df['full_signature'] == activation_with_max_support]
  for i in range(5):
    min_val = support_df['input'].apply(lambda x: x[i]).min()
    max_val = support_df['input'].apply(lambda x: x[i]).max()
    ranges.append([min_val, max_val])
  input_ranges[y_class] = ranges
  
  # Sample a data point from support
  sample_row = support_df.sample()
  assert str(sample_row.get('full_signature').item()) == str(activation_with_max_support)
  samples[y_class] = sample_row.get('input').item()
  
# pprint.pprint("CANDIDATE PROPERTIES:")
# pprint.pprint(candidates)

# pprint.pprint("INPUT RANGES:")
# pprint.pprint(input_ranges)

# pprint.pprint("SAMPLES:")
# pprint.pprint(samples)

In [None]:
input_ranges

In [None]:
samples

In [None]:
dp = MarabouCoreDP()
for y_class in range(5):
  activation_pattern = candidates[y_class]
  x_ranges = input_ranges[y_class]
  specification = specification_for_classes[y_class]
  status, counter_example, _, _ = dp.solve(activation_pattern, model, x_ranges, specification)
  if status == 'unsat':
    print(f"{activation_pattern} is an input property for class {y_class}\n\n")


### 1.2 With Iterative Relaxation

In [None]:
iterative_relaxation = IterativeRelaxation()

for y_class in range(5):
  x_ranges = input_ranges[y_class]
  input_sample = np.array([samples[y_class]])
  specification = specification_for_classes[y_class]
  input_property = iterative_relaxation.call(model, input_sample, x_ranges, specification)
  print(f"input property for class {y_class}: {input_property}\n\n")

## 2. Layer patterns as interpolants

### 2.1 Find candidates with Decision Tree

In [None]:
# relu_layers = ['relu0', 'relu1', 'relu2', 'relu3', 'relu4']
# labels = [0,1,2,3,4]
# for name, module in list(model.named_modules()):
#   if isinstance(module, torch.nn.ReLU):
#     relu_layers.append(name)
# relu_layers

In [None]:
# for layer_name in relu_layers: 
#   for chosen_label in [0,1,2,3,4]:
#     df['satisfies_postcon'] = np.where(df['predicted_label'] == chosen_label, 1, -1)
#     decision_tree = DecisionTree(df, X_col=layer_name, Y_col="satisfies_postcon")
#     leaves_with_activation_pattern = decision_tree.get_potential_layer_properties()

In [None]:
# df['satisfies_postcon'] = np.where(df['predicted_label'] == 3, 1, -1)
# decision_tree = DecisionTree(df, X_col='relu4', Y_col="satisfies_postcon")
# leaves_with_activation_pattern = decision_tree.get_potential_layer_properties()

# num_of_patterns = len(leaves_with_activation_pattern)
# print(f"num_of_patterns: {num_of_patterns}")

# total_support = sum(candidate['support'] for candidate in leaves_with_activation_pattern)
# print(f"total_support: {total_support}")

# top_5_supports = [candidate['support'] for candidate in leaves_with_activation_pattern[:5]]
# print(f"top_5_supports: {top_5_supports}")