In [11]:
import stormpy as sp
import numpy as np

import stormpy.pomdp
from rlshield.model_simulator import SimulationExecutor, Tracker

def build_pomdp(program, formula):
    options = stormpy.BuilderOptions([formula])
    options.set_build_state_valuations()
    options.set_build_choice_labels()
    options.set_build_all_labels()
    return sp.build_sparse_model_with_options(program, options)

def compute_winning_region(model, formula, initial=True):
    options = sp.pomdp.IterativeQualitativeSearchOptions()
    model = sp.pomdp.prepare_pomdp_for_qualitative_search_Double(model, formula)
    solver = sp.pomdp.create_iterative_qualitative_search_solver_Double(model, formula, options)
    if initial:
        solver.compute_winning_policy_for_initial_states(100)
    else:
        solver.compute_winning_region(100)
        
    return solver.last_winning_region

def construct_otf_shield(model, winning_region):
    return sp.pomdp.BeliefSupportWinningRegionQueryInterfaceDouble(model, winning_region)

In [12]:
print("calculating the shield for relative distance : " + str(0) + " and relative velocity : " + str(0))
prism_program_path = 'cruise_control.prism'
property = "Pmax=? [!\"bad\" U \"goal\"]"
constant = "init_s=" + str(0) + ", init_v=" + str(0) 
        
prism_program = sp.parse_prism_program(prism_program_path)
prop = sp.parse_properties_for_prism_program(property, prism_program)[0]
    
prism_program, props = stormpy.preprocess_symbolic_input(prism_program, [prop], constant)
prop = props[0]
prism_program = prism_program.as_prism_program()
raw_formula = prop.raw_formula 
    
model = build_pomdp(prism_program, raw_formula)
model = sp.pomdp.make_canonic(model)
print(model)

calculating the shield for relative distance : 0 and relative velocity : 0
-------------------------------------------------------------- 
Model type: 	POMDP (sparse)
States: 	121
Transitions: 	8631
Choices: 	3021
Observations: 	121
Reward Models:  none
State Labels: 	4 labels
   * deadlock -> 0 item(s)
   * init -> 1 item(s)
   * bad -> 121 item(s)
   * goal -> 21 item(s)
Choice Labels: 	30 labels
   * a14 -> 100 item(s)
   * a3 -> 100 item(s)
   * a28 -> 100 item(s)
   * a11 -> 100 item(s)
   * a13 -> 100 item(s)
   * a21 -> 100 item(s)
   * a22 -> 100 item(s)
   * a8 -> 100 item(s)
   * a20 -> 100 item(s)
   * a15 -> 100 item(s)
   * a7 -> 100 item(s)
   * a12 -> 100 item(s)
   * a6 -> 100 item(s)
   * a29 -> 100 item(s)
   * a18 -> 100 item(s)
   * a27 -> 100 item(s)
   * a17 -> 100 item(s)
   * a19 -> 100 item(s)
   * a26 -> 100 item(s)
   * a2 -> 100 item(s)
   * a24 -> 100 item(s)
   * a16 -> 100 item(s)
   * a4 -> 100 item(s)
   * a25 -> 100 item(s)
   * a5 -> 100 item(s)
   * 

In [8]:
num_rel_dist_states = 20 
num_rel_vel_states = 6 
num_actions = 30
actions = np.arange(30)
safe_actions_dict = {}

print(actions)

[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
 24 25 26 27 28 29]


In [10]:
for rel_dist_idx in range(num_rel_dist_states):
    s = rel_dist_idx
    for rel_vel in range(num_rel_vel_states):
        v = rel_vel 
        
        print("calculating the shield for relative distance : " + str(s) + " and relative velocity : " + str(v))
        prism_program_path = 'cruise_control.prism'
        property = "Pmax=? [!\"bad\" U \"goal\"]"
        constant = "init_s=" + str(s) + ", init_v=" + str(v) 
        
        prism_program = sp.parse_prism_program(prism_program_path)
        prop = sp.parse_properties_for_prism_program(property, prism_program)[0]
    
        prism_program, props = stormpy.preprocess_symbolic_input(prism_program, [prop], constant)
        prop = props[0]
        prism_program = prism_program.as_prism_program()
        raw_formula = prop.raw_formula 
    
        model = build_pomdp(prism_program, raw_formula)
        model = sp.pomdp.make_canonic(model)
    
        winning_region = compute_winning_region(model, raw_formula, True)
        otf_shield = construct_otf_shield(model, winning_region)    
    
        tracker = Tracker(model, otf_shield)
        
        allowed_actions = []
        for j in range(num_actions):
            action_allowed = tracker._shield.query_action(tracker._tracker.get_current_belief_support(), j)
            if action_allowed:
                allowed_actions.append(actions[j])
        
        safe_actions_dict[(s,v)] = allowed_actions
        print(allowed_actions)

calculating the shield for relative distance : 0 and relative velocity : 0
[]
calculating the shield for relative distance : 0 and relative velocity : 1
[]
calculating the shield for relative distance : 0 and relative velocity : 2
[]
calculating the shield for relative distance : 0 and relative velocity : 3
[]
calculating the shield for relative distance : 0 and relative velocity : 4
[]
calculating the shield for relative distance : 0 and relative velocity : 5
[]
calculating the shield for relative distance : 1 and relative velocity : 0
[]
calculating the shield for relative distance : 1 and relative velocity : 1
[]
calculating the shield for relative distance : 1 and relative velocity : 2
[]
calculating the shield for relative distance : 1 and relative velocity : 3
[]
calculating the shield for relative distance : 1 and relative velocity : 4
[]
calculating the shield for relative distance : 1 and relative velocity : 5
[]
calculating the shield for relative distance : 2 and relative ve

KeyboardInterrupt: 