In [None]:
import numpy as np
import os, json
from argparse import Namespace
from flat_reach_RL import *
from nnet_util import torch_model_to_nnet, onnx_to_nnet
import onnxruntime

In [None]:
demo_path = "/Users/changliuliu/Documents/GitHub/Composable_Agent_Toolbox/examples/output/FlatReach_near_C200_H32_Nm100.0_lr0.001"
# demo_path = "/home/ruic/Documents/RESEARCH/ICL/Composable_Agent_Toolbox/examples/output/FlatReach_near_C200_H32_Nm100.0_lr0.001"
demo_ep = 300

with open(os.path.join(demo_path, "args.json"), "r") as infile:
    config = json.load(infile)
args_saved = Namespace(**config)

args_saved.demo = True
args_saved.demo_path = demo_path
args_saved.demo_ep = demo_ep

args = args_saved

STATE_DIMENSION = 8
ACTION_SPACE_SIZE = 17

device = 'cpu'
# device = 'cuda'

In [None]:
def state_to_tensor(s, device):
    '''
        [1, dS]
    '''
    return torch.from_numpy(np.asarray(s)).unsqueeze(0).float().to(device)

In [None]:
Q = QNetwork(env_name=args.env_name,
            state_dim=STATE_DIMENSION,
            num_of_actions=ACTION_SPACE_SIZE,
            hidden_size=args.hidden_size).to(device)

checkpt_path = os.path.join(args.demo_path, 'checkpts', '{}_{}.pt'.format(args.env_name, args.demo_ep))
Q.load_model_weights(checkpt_path)
Q.eval()

In [None]:
ANGLE_DISCRETE_SIZE = 8
ACC_MIN = 50
ACC_MAX = 100
ACC_DISCRETE_SIZE = 2
ACTION_SPACE_SIZE = 1 + ANGLE_DISCRETE_SIZE * ACC_DISCRETE_SIZE

def get_action(Q, state, device):

    action_id = Q(state_to_tensor(state, device)).argmax().item()

    if action_id == 0:
        action = [0, 0]
    else:
        action_id_non_zero = action_id - 1
        angle = 2*np.pi / ANGLE_DISCRETE_SIZE * (action_id_non_zero % ANGLE_DISCRETE_SIZE)
        mag = np.floor(action_id_non_zero/ANGLE_DISCRETE_SIZE)*(ACC_MAX-ACC_MIN)/(ACC_DISCRETE_SIZE-1) + ACC_MIN
        action = [mag*np.cos(angle), mag*np.sin(angle)]
    
    return action_id, np.asarray(action).reshape(-1)

# numerical examples
# state = [goal_rel_pos, goal_rel_vel, obs_rel_pos, obs_rel_vel]

# When obstacle is far, changing goal distance should not change action
state = [10, 0, -1, 0, 0, 50, 0, 0]
print(get_action(Q, state, device))

state = [5, 0, -1, 0, 0, 50, 0, 0]
print(get_action(Q, state, device))

# When obstacle is not on the way to goal, changing obstacle vel should not change action
state = [10, 0, -1, 0, 0, 50, 0, 0]
print(get_action(Q, state, device))

state = [10, 0, -1, 0, 0, 50, 0, -1]
print(get_action(Q, state, device))

# When obstacle is on the way to goal, changing goal distance should not change action
state = [20, 0, -1, 0, 10, 0, -2, 0]
print(get_action(Q, state, device))

state = [25, 0, -1, 0, 10, 0, -2, 0]
print(get_action(Q, state, device))


In [None]:
# test obstacle position where goal pos change will change action

obs_pos = [6, 0]  # distance <= 6,          goal changes action
obs_pos = [13, 0] # distance in [7, 13],    goal doesn't change action
obs_pos = [20, 0] # distance in [14, 30],   goal changes action
goal_pos_1 = [20, 10]
goal_pos_2 = [20, 20]

state = goal_pos_1 + [-1, 0] + obs_pos + [-1, 0]
print(get_action(Q, state, device))

state = goal_pos_2 + [-1, 0] + obs_pos + [-1, 0]
print(get_action(Q, state, device))

In [None]:
dummy_input = torch.randn(1, STATE_DIMENSION).to(device)
input_names = ["dummy_input"]
output_names = ["output"]

torch.onnx.export(Q, 
                  dummy_input,
                  f"{demo_ep}.onnx",
                  verbose=False,
                  input_names=input_names,
                  output_names=output_names,
                  export_params=True,
                  )

In [None]:
torch_model_to_nnet(Q, f"{demo_ep}.nnet", decimal=8) # saving 8 digits. More can be used, but that requires more more space.

In [None]:
onnx_to_nnet(f"{demo_ep}.onnx", f"{demo_ep}.nnet", decimal=8)

In [None]:
def onnx_inference(file, input):
    sess = onnxruntime.InferenceSession(
        file, providers=onnxruntime.get_available_providers())
    input_name = sess.get_inputs()[0].name
    pred_onx = sess.run(None, {input_name: input.astype(np.float32)})[0]
    print(pred_onx)

input = np.array([[1,2,3,4,5,6,7,8]])
onnx_inference("300.onnx", input)

In [None]:
# Test nnet in julia using the following code:
# the output could be slightly different due to digits used for saving nnet

using NeuralVerification
net = read_nnet("300.nnet")

In [None]:
# Case 1: when the obstacle is far, changing goal distance should not change action
input = [10.0, 0.0, -1.0, 0.0, 0.0, 50.0, 0.0, 0.0]

# Check the desired action
output = NeuralVerification.compute_output(net, input)
desired_action = argmax(output) # Note Julia's index starts from 1

# Now check whether the input can tolerate some perturbation
radius = [10.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] # This parameter is tunable
input_set  = NeuralVerification.Hyperrectangle(input, radius)

# Choose solver
solver = MIPVerify()

# Problem 1. Now we check if the desired_action remains the same for all inputs in the input_set
for i in 1:17
    if i != desired_action
        vec = zeros(1,17)
        vec[i] = 1.0
        vec[desired_action] = -1.0
        output_set = NeuralVerification.HPolytope(vec, [0.0])
        problem = Problem(net, input_set, output_set)
        result = solve(solver, problem)
        println("action pair: ",i, ",", desired_action, ": ",result.status)
    end
end

# Problem 2. Now let us check at which distance the obstacle can affect the action (make the adjacent action optimal)
index = 6
dmin = 0.0
dmax = 50.0
while dmax>dmin+0.1
    input[index] = (dmax+dmin)/2.0
    input_set  = NeuralVerification.Hyperrectangle(input, radius)
    output = NeuralVerification.compute_output(net, input)
    desired_action = argmax(output) # Note Julia's index starts from 1
    for i in 1:17
        if i != desired_action
        vec = zeros(1,17)
        vec[i] = 1.0
        vec[desired_action] = -1.0
        output_set = NeuralVerification.HPolytope(vec, [0.0])
        problem = Problem(net, input_set, output_set)
        model = NeuralVerification.Model(solver)
        z = NeuralVerification.init_vars(model, problem.network, :z, with_input=true)
        δ = NeuralVerification.init_vars(model, problem.network, :δ, binary=true)
        # get the pre-activation bounds:
        model[:bounds] = NeuralVerification.get_bounds(problem, before_act=true)
        model[:before_act] = true
        NeuralVerification.add_set_constraint!(model, problem.input, first(z))
        NeuralVerification.add_complementary_set_constraint!(model, problem.output, last(z))
        NeuralVerification.encode_network!(model, problem.network, NeuralVerification.BoundedMixedIntegerLP())
        o = NeuralVerification.max_disturbance!(model, first(z) - problem.input.center)
        NeuralVerification.optimize!(model)
        if NeuralVerification.termination_status(model) == NeuralVerification.OPTIMAL
            println("Counter example:",NeuralVerification.value(first(z)))
            dmin = input[index]
            break
        end
        end
    end
    if dmin != input[index]
        dmax = input[index]
    end
    println("[",dmin,",",dmax,"]")
end

# Problem 3. Choose different solvers
input = [10.0, 0.0, -1.0, 0.0, 0.0, 50.0, 0.0, 0.0]
radius = [10.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] # This parameter is tunable
input_set  = NeuralVerification.Hyperrectangle(input, radius)
solver = ReluVal(max_iter = 50) # max_iter is a tunable parameter. there will be fewer unknowns if we set max_iter to be big enough
for i in 1:17
    if i != desired_action
        vec = zeros(1,17)
        vec[i] = 1.0
        vec[desired_action] = -1.0
        output_set = NeuralVerification.HPolytope(vec, [0.0])
        problem = Problem(net, input_set, output_set)
        result = solve(solver, problem)
        println("action pair: ",i, ",", desired_action, ": ",result)
    end
end

In [None]:
# Case 2: When obstacle is not on the way to goal, changing obstacle vel should not change action
input = [10, 0, -1, 0, 0, 50, 0, 0]

# Check the desired action
output = NeuralVerification.compute_output(net, input)
desired_action = argmax(output) # Note Julia's index starts from 1

# Now check whether the input can tolerate some perturbation
radius = [0, 0, 0, 0, 0, 0, 0, 10] # This parameter is tunable
input_set  = NeuralVerification.Hyperrectangle(input, radius)

# Choose solver
solver = MIPVerify()

# Now we check if the desired_action remains the same for all inputs in the input_set
for i in 1:17
    if i != desired_action
        vec = zeros(1,17)
        vec[i] = 1.0
        vec[desired_action] = -1.0
        output_set = NeuralVerification.HPolytope(vec, [0.0])
        problem = Problem(net, input_set, output_set)
        result = solve(solver, problem)
        println(result)
    end
end

In [None]:
# Case 3: When obstacle is on the way to goal, changing goal distance should not change action
input = [20, 0, -1, 0, 10, 0, -2, 0]

# Check the desired action
output = NeuralVerification.compute_output(net, input)
desired_action = argmax(output) # Note Julia's index starts from 1

# Now check whether the input can tolerate some perturbation
radius = [10, 0, 0, 0, 0, 0, 0, 0] # This parameter is tunable
input_set  = NeuralVerification.Hyperrectangle(input, radius)

# Choose solver
solver = MIPVerify()

# Now we check if the desired_action remains the same for all inputs in the input_set
for i in 1:17
    if i != desired_action
        vec = zeros(1,17)
        vec[i] = 1.0
        vec[desired_action] = -1.0
        output_set = NeuralVerification.HPolytope(vec, [0.0])
        problem = Problem(net, input_set, output_set)
        result = solve(solver, problem)
        println(result)
    end
end

In [None]:
# PROBLEM 1: CHECK IF THE POLICY IS ROBUST UNDER INPUT PERTURBATION ** CHECK EVERY ACTION PAIR

# PROBLEM 2: COMPUTE THE MAXIMUM RANGE OF PERTURBATION

# PROBLEM 3: COMPARE THE PERFORMANCE OF DIFFERENT SOLVERS: computation time, conservativeness
