In [1]:
import sys
sys.path.insert(0, '../ELINA/python_interface/')


import numpy as np
import re
import csv
from elina_box import *
from elina_interval import *
from elina_abstract0 import *
from elina_manager import *
from elina_dimension import *
from elina_scalar import *
from elina_interval import *
from elina_linexpr0 import *
from elina_lincons0 import *
from sys import argv
import ctypes
from ctypes.util import find_library
from gurobipy import *
import time

libc = CDLL(find_library('c'))
cstdout = c_void_p.in_dll(libc, 'stdout')

In [2]:
class layers:
    def __init__(self):
        self.layertypes = []
        self.weights = []
        self.biases = []
        self.numlayer = 0
        self.ffn_counter = 0

def parse_bias(text):
    if len(text) < 1 or text[0] != '[':
        raise Exception("expected '['")
    if text[-1] != ']':
        raise Exception("expected ']'")
    v = np.array([*map(lambda x: np.double(x.strip()), text[1:-1].split(','))])
    #return v.reshape((v.size,1))
    return v

def parse_vector(text):
    if len(text) < 1 or text[0] != '[':
        raise Exception("expected '['")
    if text[-1] != ']':
        raise Exception("expected ']'")
    v = np.array([*map(lambda x: np.double(x.strip()), text[1:-1].split(','))])
    return v.reshape((v.size,1))
    #return v

def balanced_split(text):
    i = 0
    bal = 0
    start = 0
    result = []
    while i < len(text):
        if text[i] == '[':
            bal += 1
        elif text[i] == ']':
            bal -= 1
        elif text[i] == ',' and bal == 0:
            result.append(text[start:i])
            start = i+1
        i += 1
    if start < i:
        result.append(text[start:i])
    return result

def parse_matrix(text):
    i = 0
    if len(text) < 1 or text[0] != '[':
        raise Exception("expected '['")
    if text[-1] != ']':
        raise Exception("expected ']'")
    return np.array([*map(lambda x: parse_vector(x.strip()).flatten(), balanced_split(text[1:-1]))])

def parse_net(text):
    lines = [*filter(lambda x: len(x) != 0, text.split('\n'))]
    i = 0
    res = layers()
    while i < len(lines):
        if lines[i] in ['ReLU', 'Affine']:
            W = parse_matrix(lines[i+1])
            b = parse_bias(lines[i+2])
            res.layertypes.append(lines[i])
            res.weights.append(W)
            res.biases.append(b)
            res.numlayer+= 1
            i += 3
        else:
            raise Exception('parse error: '+lines[i])
    return res
   
def parse_spec(text):
    text = text.replace("[", "")
    text = text.replace("]", "")
    with open('dummy', 'w') as my_file:
        my_file.write(text)
    data = np.genfromtxt('dummy', delimiter=',',dtype=np.double)
    low = np.copy(data[:,0])
    high = np.copy(data[:,1])
    return low,high

def get_perturbed_image(x, epsilon):
    image = x[1:len(x)]
    num_pixels = len(image)
    LB_N0 = image - epsilon
    UB_N0 = image + epsilon
     
    for i in range(num_pixels):
        if(LB_N0[i] < 0):
            LB_N0[i] = 0
        if(UB_N0[i] > 1):
            UB_N0[i] = 1
    return LB_N0, UB_N0


def generate_linexpr0(weights, bias, size):
    linexpr0 = elina_linexpr0_alloc(ElinaLinexprDiscr.ELINA_LINEXPR_DENSE, size)
    cst = pointer(linexpr0.contents.cst)
    elina_scalar_set_double(cst.contents.val.scalar, bias)
    for i in range(size):
        elina_linexpr0_set_coeff_scalar_double(linexpr0,i,weights[i])
    return linexpr0

In [3]:
class elina():
    
    SUPPORTED_LT = ['ReLU', 'Affine']
    
    def __init__(self):        
        # Allocate memory
        self.man = elina_box_manager_alloc()
            
    def build_hypercube(self, LB, UB):
        num_pixels = len(LB)
        itv = elina_interval_array_alloc(num_pixels)
        for i in range(num_pixels):
            elina_interval_set_double(itv[i], LB[i], UB[i])
        
        elina_hypercube = elina_abstract0_of_box(self.man, 0, num_pixels, itv)
        
        elina_interval_array_free(itv,num_pixels)
        return elina_hypercube
        
    def propagate_hypercube(self, layertype, weights, biases, elina_hypercube):
        assert layertype in self.SUPPORTED_LT, 'Net type not supported'
        
        dims = elina_abstract0_dimension(self.man, elina_hypercube)
        num_in_pixels = dims.intdim + dims.realdim
        num_out_pixels = len(weights)
        
        # Output dimensions to add
        dimadd = elina_dimchange_alloc(0,num_out_pixels) 
        for i in range(num_out_pixels):
                dimadd.contents.dim[i] = num_in_pixels
        elina_abstract0_add_dimensions(self.man, True, elina_hypercube, dimadd, False)
        elina_dimchange_free(dimadd)
        
        # Transform nn weights and biases
        np.ascontiguousarray(weights, dtype=np.double)
        np.ascontiguousarray(biases, dtype=np.double)
        
        #handle affine layer
        for i in range(num_out_pixels):
            elina_hypercube = elina_abstract0_assign_linexpr_array(
                man=self.man, 
                destructive=True, 
                org=elina_hypercube, 
                tdim=ElinaDim(num_in_pixels + i), 
                linexpr_array=generate_linexpr0(weights[i],biases[i],num_in_pixels), 
                size=1, 
                dest=None
            )

        # Remove input elements and dimensions
        dimrem = elina_dimchange_alloc(0,num_in_pixels)
        for i in range(num_in_pixels): dimrem.contents.dim[i] = i
        elina_abstract0_remove_dimensions(self.man, True, elina_hypercube, dimrem)
        elina_dimchange_free(dimrem)
        
        # handle ReLU layer 
        if(layertype=='ReLU'):
            elina_hypercube = relu_box_layerwise(
                man=self.man,
                destructive=True,
                elem=elina_hypercube,
                start_offset=0, 
                num_dim=num_out_pixels
            )
            
        return elina_hypercube

In [4]:
# The given naive analyser
class analyser1():
    def __init__(self, nn, LB_N0, UB_N0, label):
        self.nn = nn
        self.LB_N0 = LB_N0
        self.UB_N0 = UB_N0
        self.label = label
        
    def __call__(self):
        # initialize elina toolbox
        self.elina_instance = elina()
        # construct input abstraction
        self.hypercube = self.elina_instance.build_hypercube(LB_N0, UB_N0)
        
        results = self.verify(*self.forward())
        
        elina_manager_free(self.elina_instance.man) 
        
        return results
        
        
    def forward(self):
        for layerno in range(self.nn.numlayer):
            self.hypercube = self.elina_instance.propagate_hypercube(
                layertype = nn.layertypes[layerno], 
                weights = nn.weights[layerno], 
                biases = nn.biases[layerno], 
                elina_hypercube=self.hypercube
            )
        
        dims = elina_abstract0_dimension(self.elina_instance.man,self.hypercube)
        output_size = dims.intdim + dims.realdim
        # get bounds for each output neuron
        bounds = elina_abstract0_to_box(self.elina_instance.man,self.hypercube)
        # Free abstractiion space
        elina_abstract0_free(self.elina_instance.man,self.hypercube)
        return bounds, output_size
    
    def verify(self, bounds, output_size):
        verified_flag = True
        predicted_label = 0
        if(LB_N0[0]==UB_N0[0]):
            for i in range(output_size):
                inf = bounds[i].contents.inf.contents.val.dbl
                flag = True
                for j in range(output_size):
                    if(j!=i):
                        sup = bounds[j].contents.sup.contents.val.dbl
                        if(inf<=sup):
                            flag = False
                            break
                if(flag):
                    predicted_label = i
                    break    
        else:
            inf = bounds[label].contents.inf.contents.val.dbl
            for j in range(output_size):
                if(j!=label):
                    sup = bounds[j].contents.sup.contents.val.dbl
                    if(inf<=sup):
                        predicted_label = label
                        verified_flag = False
                        break
        elina_interval_array_free(bounds,output_size)
        return predicted_label, verified_flag

In [5]:
#TO DEFINE
netname = "../mnist_nets/mnist_relu_6_20.txt"
specname = "../mnist_images/img10.txt"
epsilon = 0.001
#c_label = 

analyze = analyser1


with open(netname, 'r') as netfile:
    netstring = netfile.read()
with open(specname, 'r') as specfile:
    specstring = specfile.read()
nn = parse_net(netstring)
x0_low, x0_high = parse_spec(specstring)
LB_N0, UB_N0 = get_perturbed_image(x0_low,0)

label, _ = analyze(nn,LB_N0,UB_N0,0)()

start = time.time()
if(label==int(x0_low[0])):
    LB_N0, UB_N0 = get_perturbed_image(x0_low,epsilon)
    _, verified_flag = analyze(nn,LB_N0,UB_N0,label)()
    if(verified_flag):
        print("verified")
    else:
        print("can not be verified")  
else:
    print("image not correctly classified by the network. expected label ",int(x0_low[0]), " classified label: ", label)
end = time.time()

print("analysis time: ", (end-start), " seconds")

verified
analysis time:  0.05362701416015625  seconds


# NEW VERSION

Architecture proposal

multiple layers (elina | gurobi). Before executing, merge all gurobi layers that are in a row.

In [None]:
class ElinaToolbox()
    @abstractmethod
    def hypercubeToNumpy(HC):
        """Return inf and sup bounds"""
        return (
            [elt.contents.inf.contents.val.dbl for elt in HC], 
            [elt.contents.sup.contents.val.dbl for elt in HC]
        )
    @abstractmethod
    def NumpyToHypercube(LB, UB):
        num_pixels = len(LB)
        itv = elina_interval_array_alloc(num_pixels)
        for i in range(num_pixels):
            elina_interval_set_double(itv[i], LB[i], UB[i])
        HC = elina_abstract0_of_box(self.man, 0, num_pixels, itv)
        elina_interval_array_free(itv,num_pixels)
        return HC

class SolverLayer():
    SUPPORTED_LT = ['ReLU', 'Affine']
    def __init__(self, layertypes, weights, biases):
        assert layertype in self.SUPPORTED_LT, 'Net type not supported'
        self.layertypes=layertypes
        self.weights=weights
        self.biases=biases

In [None]:
class GurobiLayers(SolverLayer):  
    def __init__(self, layertypes, weights, biases):
        super.__init__(self, layertypes, weights, biases)
        self.m = Model(name='NN')
        self.m.setParam('OutputFlag', False)
        
    def add_relu_constraints(self, hi, inf, sup):
        
        ri = m.addVar(vtype=GRB.CONTINUOUS)
        if (inf >= 0):
            self.m.addConstr(ri == hi)
        elif (sup <= 0):
            self.m.addConstr(ri == 0)
        else:
            k = sup / (sup - inf)
            t = -sup * inf / (sup - inf)

            self.m.addConstr(ri >= 0)
            self.m.addConstr(ri >= hi)
            self.m.addConstr(ri <= k * hi + t)
            
        return ri
    
    def add_hidden_layer_constraint(self, old_ris, layerno):
        return (
            self.m.addVar(vtype=GRB.CONTINUOUS)
            if i == 0 else 
            quicksum([LinExpr(self.weights[i][j, :], old_ris), self.biases[i][j]])
        )
    
    def optimize(self, his_out):
        output_lower_bounds, output_upper_bounds = [], []
        for i in range(len(his_out)):
            self.m.setObjective(his_out[i], GRB.MINIMIZE)
            self.m.optimize()
            output_lower_bounds.append(m.objVal)

            self.m.setObjective(his_out[i], GRB.MAXIMIZE)
            self.m.optimize()
            output_upper_bounds.append(m.objVal)
        return output_lower_bounds, output_upper_bounds
        
        
    def compute(self, lower_bounds, upper_bounds, input_layerno=None, output_layerno=None): 
        """
            Return the output values range of i = output_layerno + 1
        """
        if input_layerno is None: input_layerno = 0
        if output_layerno is None: output_layerno = len(lower_bounds)-1
            
        ris, his = [], []
        for i in range(input_layerno, output_layerno+1):
            old_ris, ris, his = ris, [], []
            for j in range(lower_bounds[i].size):
                hi = self.add_hidden_layer_constraint(old_ris, i)
                ri = self.add_relu_constraints(hi, lower_bounds[i][j], upper_bounds[i][j])
                
                his.append(hi)
                ris.append(ri)
                
        try:
            return optimize(self, ris if self.layertypes[i] == "ReLU" else his)
        except:
            print("Solver can't satisfy conditions")
            raise

In [None]:
class ElinaLayers(SolverLayer):
    def __init__(self, layertypes, weights, biases, man):
        super.__init__(self, layertypes, weights, biases)
        # Transform nn weights and biases
        self.total_layers=length(weights)
        self.man = man
    
    def compute(self, LB, UB, input_layerno=None, output_layerno=None):
        """
            Return the output values range for i=input_layerno + 1 to i = output_layerno + 1
        """
        if input_layerno is None: input_layerno = 0
        if output_layerno is None: output_layerno = self.total_layers
            
        # INPUT HyperCube
        self.HC = ElinaToolbox.NumpyToHypercube(LB, UB)
        
        for layerno in range(input_layerno, output_layerno):
            self.propagate_hypercube(self, layer_no)
            LB_layerno, UB_layerno = ElinaToolbox.hypercubeToNumpy(self.HC)
            lower_bounds.append(LB_layerno)
            upper_bounds.append(UB_layerno)
        return lower_bounds, upper_bounds
        
    def propagate_hypercube(self, layer_no):
        weights, biases = self.weights[layer_no], self.biases[layer_no]
        np.ascontiguousarray(dtype=np.double)
        np.ascontiguousarray(biases, dtype=np.double)
        
        dims = elina_abstract0_dimension(self.man, self.HC)
        
        self.num_in_pixels = dims.intdim + dims.realdim
        self.num_out_pixels = len(weights)
        
        self.add_output_dim()
        self.compute_affine(weights, biases)
        self.remove_input_dim()
        if(self.layertypes[layer_no]=='ReLU'):
            self.compute_relu()
        
    def add_output_dim(self):
        dimadd = elina_dimchange_alloc(0,self.num_out_pixels) 
        for i in range(self.num_out_pixels): 
            dimadd.contents.dim[i] = self.num_in_pixels
        elina_abstract0_add_dimensions(self.man, True, self.HC, dimadd, False)
        elina_dimchange_free(dimadd)
        
    def remove_input_dim(self):
        dimrem = elina_dimchange_alloc(0,self.num_in_pixels)
        for i in range(num_in_pixels): 
            dimrem.contents.dim[i] = i
        elina_abstract0_remove_dimensions(self.man, True, self.HC, dimrem)
        elina_dimchange_free(dimrem)
        
    def compute_affine(self, weights, biases):
        for i in range(self.num_out_pixels):
            self.HC = elina_abstract0_assign_linexpr_array(
                man=self.man, 
                destructive=True, 
                org=self.HC, 
                tdim=ElinaDim(self.num_in_pixels + i), 
                linexpr_array=generate_linexpr0(weights[i],biases[i],self.num_in_pixels), 
                size=1, 
                dest=None
            )
            
    def compute_relu(self):
        self.HC = relu_box_layerwise(
            man=self.man,
            destructive=True,
            elem=self.HC,
            start_offset=0, 
            num_dim=self.num_out_pixels
        )