**input_vars**
cbr_traffic, cbr_th, cbr_prb, cbr_queue, cbr_snr, vbr_traffic, vbr_th, vbr_prb, vbr_queue, vbr_snr, sla_level, n_utilized_prbs, prev_sla_violation
device_count, cbr_traffic, vbr_traffic, reamaining_embb_count, reamaining_mmtc_count, remaining_prbs


## Property Definition

In [3]:
model_input_size = 19
model_output_size = 15

copy_one_argmax_candidate = 0
copy_two_argmax_candidate = 13

base_lower = 0.4
base_upper = 0.8
epsilon = 0.001

DECLARATION_LINE_BASE = "(declare-const {} Real)"
INPUT_CONSTRAINT_LINE_BASE = "(assert ({} {} {}))"
OUTPUT_CONSTRAINT_LINE_BASE = "(assert (<= {} {}))"

base_path = "/home/mzi/sys-rl-verif/applications/cmars/vnnlib/plain_comparative/test/in{}_lower{}_upper{}_eps{}_out{}_{}_{}.vnnlib"

def gen_prop(copy_one_argmax_candidate=copy_one_argmax_candidate, copy_two_argmax_candidate=copy_two_argmax_candidate, 
            base_lower=base_lower, base_upper=base_upper, base_path=base_path):
    lines = []
    lines.append("; variable declaration")
    for i in range(2*model_input_size):
        lines.append(DECLARATION_LINE_BASE.format(f"X_{i}"))

    for i in range(2*model_output_size):
        lines.append(DECLARATION_LINE_BASE.format(f"Y_{i}"))

    lines.append(" ")
    lines.append("; input constraints")
    for i in range(model_input_size):
        lines.append(INPUT_CONSTRAINT_LINE_BASE.format(">=", f"X_{i}", base_lower))
        lines.append(INPUT_CONSTRAINT_LINE_BASE.format("<=", f"X_{i}", base_upper))

    for i in range(model_input_size):
        lines.append(INPUT_CONSTRAINT_LINE_BASE.format(">=", f"X_{model_input_size+i}", 0.0))
        lines.append(INPUT_CONSTRAINT_LINE_BASE.format("<=", f"X_{model_input_size+i}", epsilon))

    lines.append(" ")
    lines.append("; output constraints")
    for i in range(model_output_size):
        if i == copy_one_argmax_candidate:
            continue
        lines.append(OUTPUT_CONSTRAINT_LINE_BASE.format(f"Y_{i}", f"Y_{copy_one_argmax_candidate}"))

    for i in range(model_output_size):
        if i == copy_two_argmax_candidate:
            continue
        lines.append(OUTPUT_CONSTRAINT_LINE_BASE.format(f"Y_{model_output_size+i}", f"Y_{model_output_size+copy_two_argmax_candidate}"))
    
    f = open(
        base_path.format(
        model_input_size, str(base_lower).replace(".", ""), str(base_upper).replace(".", ""), str(epsilon)[2:], 
        model_output_size, copy_one_argmax_candidate, copy_two_argmax_candidate
        ), 
    "w")
    f.write('\n'.join(lines) )
    f.close()



In [4]:
for i in [1, 2, 3, 4, 5, 6, 7, 8, 10 , 11, 12, 13, 14]:
    gen_prop(copy_one_argmax_candidate=i, copy_two_argmax_candidate=i)


In [12]:
copy_one_argmax_candidate, copy_two_argmax_candidate = 0, 14
base_lower = 0.0
base_upper = 1.0
partition_size = 10

base_path = "/home/mzi/sys-rl-verif/applications/cmars/vnnlib/plain_comparative/10_partitions/in{}_lower{}_upper{}_eps{}_out{}_{}_{}.vnnlib"

step = float(base_upper-base_lower) / partition_size
for i in range(partition_size):
    gen_prop(base_path=base_path, base_lower = round(base_lower + i*step, 5), base_upper= round(base_lower + (i+1)*step, 5))


## Model Processing

#### CMARS 

In [2]:
import torch
import torch.nn as nn
import onnx
import sys
sys.path.append("/home/mzi/sys-rl-verif/applications")
from cmars.lib.config_mappol import get_config
import cmars.lib.mdp_config as mdp_config
from cmars.lib.util import *
from cmars.lib.rnn import *
from cmars.lib.cnn import *
from cmars.lib.act import *
from cmars.lib.mlp import *
from cmars.lib.distributions import *
from gym import spaces
import os

parser = get_config()
parser.add_argument("--add_move_state", action='store_true', default=False)
parser.add_argument("--add_local_obs", action='store_true', default=False)
parser.add_argument("--add_distance_state", action='store_true', default=False)
parser.add_argument("--add_enemy_action_state", action='store_true', default=False)
parser.add_argument("--add_agent_id", action='store_true', default=False)
parser.add_argument("--add_visible_state", action='store_true', default=False)
parser.add_argument("--add_xy_state", action='store_true', default=False)
parser.add_argument("--use_state_agent", action='store_true', default=False)
parser.add_argument("--use_mustalive", action='store_false', default=True)
parser.add_argument("--add_center_xy", action='store_true', default=False)
parser.add_argument("--use_single_network", action='store_true', default=False)
all_args = parser.parse_known_args()[0]

class R_Actor(nn.Module):
    def __init__(self, args, obs_space, action_space, device=torch.device("cpu")):
        super(R_Actor, self).__init__()
        self.hidden_size = args.hidden_size
        self._gain = args.gain
        self._use_orthogonal = args.use_orthogonal
        self._use_policy_active_masks = args.use_policy_active_masks
        self._use_naive_recurrent_policy = args.use_naive_recurrent_policy
        self._use_recurrent_policy = args.use_recurrent_policy
        self._recurrent_N = args.recurrent_N
        self.tpdv = dict(dtype=torch.float64, device=device)

        obs_shape = get_shape_from_obs_space(obs_space)
        base = CNNBase if len(obs_shape) == 3 else MLPBase
        self.base = base(args, obs_shape)

        if self._use_naive_recurrent_policy or self._use_recurrent_policy:
            self.rnn = RNNLayer(self.hidden_size, self.hidden_size, self._recurrent_N, self._use_orthogonal)

        self.act = ACTLayer(action_space, self.hidden_size, self._use_orthogonal, self._gain, args)

        self.to(device)

    def forward(self, obs, rnn_states, masks, available_actions=None, deterministic=False):
        obs = check(obs).to(**self.tpdv)

        actor_features = self.base(obs)
        action_ligits_probs = self.act(actor_features, available_actions, deterministic)

        return action_ligits_probs

BASE = "/home/mzi/sys-rl-verif/applications/cmars/models/output_{}/h{}/N{}/seed=110/lr=8e-05/critic_lr=0.001/gamma=0.4/episode_length=500/lamda_lagr=10/lagrangian_coef_rate=1e-07safety_bound=1e-05/run1/models"


In [5]:
def get_model(n_prbs=15, hidden_size=32, layer_count=1):
    assert n_prbs in [15, 30, 80, 140]
    assert layer_count in [1, 2, 3, 4, 5, 6, 7, 8]
    assert hidden_size in [32, 64, 128]

    # set configs
    all_args.layer_N = layer_count
    all_args.hidden_size = hidden_size
    models_path = BASE.format(n_prbs, hidden_size, layer_count)
    embb_actor_policy = models_path + "/actor_type_embb.pt"

    act_space = spaces.Discrete(n_prbs)
    embb_obs_space = spaces.Box(low=0, high=10e6, shape=(mdp_config.EMBB_LOCAL_OBS_VAR_COUNT+mdp_config.AUG_LOCAL_STATE_VAR_COUNT,))

    # base policy
    device = torch.device('cpu')
    embb_actor = R_Actor(all_args, embb_obs_space, act_space)
    embb_actor.load_state_dict(torch.load(embb_actor_policy, map_location=device))

    # remove softmax
    class CMARS_Actor_Wrapper(nn.ModuleList):
        def __init__(self, cmars_actor, type = "embb", device=torch.device("cpu")):
            super(CMARS_Actor_Wrapper, self).__init__()        
            self.to(device)

            self.af = nn.ReLU()
            if type == "embb":
                self.lin1 = nn.Linear(mdp_config.AUG_LOCAL_STATE_VAR_COUNT + mdp_config.EMBB_LOCAL_OBS_VAR_COUNT, all_args.hidden_size)
            elif type == "mmtc":
                self.lin1 = nn.Linear(mdp_config.AUG_LOCAL_STATE_VAR_COUNT + mdp_config.MMTC_LOCAL_OBS_VAR_COUNT, all_args.hidden_size)
            else:
                raise NotImplementedError()

            self.midlayers = []
            for i in range(all_args.layer_N):
                self.midlayers.append(nn.Linear(all_args.hidden_size, all_args.hidden_size))

            for iter, item in enumerate(self.midlayers):
                item.weight.data = cmars_actor.base.mlp.fc2[iter][0].weight.data

            for i in range(all_args.layer_N):
                setattr(self, "lin{}".format(i+2), self.midlayers[i])

            self.out = nn.Linear(all_args.hidden_size, n_prbs)
            self.out.weight.data = cmars_actor.act.action_out.linear.weight.data

        def forward(self, obs):
            obs = self.af(self.lin1(obs))

            for item in self.midlayers:
                obs = self.af(item(obs))    

            logits = self.out(obs)

            return logits

    embb_cmars_wrapper = CMARS_Actor_Wrapper(embb_actor, "embb")

    return embb_cmars_wrapper


#### Comparative

In [6]:
def get_params_argmax(input_size):
    
    # Take sum of the input vars
    c01 = torch.zeros([1, 1, input_size+1])
    c01[0][0][0] = 1

    c02 = torch.zeros([1, 1, input_size+1])
    c02[0][0][0] = 1
    c02[0][0][-1] = 1

    return c01, c02

def cmars() -> nn.Sequential:
    base_model = get_model()

    class MyModel(nn.ModuleList):
        def __init__(self, device=torch.device("cpu")):
            super(MyModel, self).__init__()

            input_size = 19
            self.input_size = input_size
            c01, c02 = get_params_argmax(input_size)
            
            self.ft = torch.nn.Flatten()

            #################
            # Model
            ################# 
            self.base_model = base_model
            
            #################
            # Input summation
            #################
            self.input_conv1 = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=input_size+1)
            self.input_conv1.weight = torch.nn.Parameter(c01, requires_grad=True)
            self.input_conv1.bias = torch.nn.Parameter(torch.zeros_like(self.input_conv1.bias, requires_grad=True))
            
            self.input_conv2 = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=input_size+1)
            self.input_conv2.weight = torch.nn.Parameter(c02, requires_grad=True)
            self.input_conv2.bias = torch.nn.Parameter(torch.zeros_like(self.input_conv2.bias, requires_grad=True))            

        def forward(self, obs):
            # input processing
            input1 = self.input_conv1(obs)
            input2 = self.input_conv2(obs)
            
            # the model
            copy1_logits = self.base_model(input1)
            copy2_logits = self.base_model(input2)
            
            return self.ft(torch.concat((copy1_logits, copy2_logits), dim=1))

    model = MyModel()

    return model


In [11]:
base_model = get_model()
base_model

self._use_feature_normalization:  False


CMARS_Actor_Wrapper(
  (0): ReLU()
  (1): Linear(in_features=19, out_features=32, bias=True)
  (2): Linear(in_features=32, out_features=32, bias=True)
  (3): Linear(in_features=32, out_features=15, bias=True)
)

#### ONNX

In [7]:
import onnx, onnxscript

In [9]:
torch_input = torch.randn(1, 1, 38)
model = cmars()
onnx_program = torch.onnx.dynamo_export(model, torch_input)
onnx_program.save("camrs.onnx")


self._use_feature_normalization:  False




## Dummy comparative model

In [None]:

def make_plain_model(scen="normal_1", hidden_size=32, layer_count=2):

    class MyModel(nn.ModuleList):
        def __init__(self, device=torch.device("cpu")):
            super(MyModel, self).__init__()

            input_size, output_size = 4, 4
            self.input_size = input_size
            c01, c02 = get_params_argmax(input_size)
            
            self.af = nn.ReLU()
            self.ft = torch.nn.Flatten()

            #################
            # Model
            ################# 
            # self.lin1 = nn.Linear(input_size, output_size)
            
            #################
            # Input summation
            #################
            self.input_conv1 = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=input_size+1)
            self.input_conv1.weight = torch.nn.Parameter(c01, requires_grad=True)
            self.input_conv1.bias = torch.nn.Parameter(torch.zeros_like(self.input_conv1.bias, requires_grad=True))
            
            self.input_conv2 = nn.Conv1d(in_channels=1, out_channels=1, kernel_size=input_size+1)
            self.input_conv2.weight = torch.nn.Parameter(c02, requires_grad=True)
            self.input_conv2.bias = torch.nn.Parameter(torch.zeros_like(self.input_conv2.bias, requires_grad=True))            

        def forward(self, obs):
            # input processing
            input1 = self.input_conv1(obs)
            input2 = self.input_conv2(obs)
            
            # the model
            # lin1 = self.lin1(input1)
            # lin2 = self.lin1(input2)
            
            return self.ft(torch.concat((input1, input2), dim=1))

    model = MyModel()

    return model

def plain_comparative() -> nn.Sequential:
    model = make_plain_model()
    return model

