In [1]:
import openai_cartpole
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sn 
from stable_baselines3 import PPO  # Assuming stable_baselines3 is used for PPO
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
import os,argparse
import tensorflow as tf

parser = argparse.ArgumentParser(description='Train a PPO agent with adversarial environment modifications.')
parser.add_argument('--log', action='store_true', help='Enable logging to files')
parsedargs = parser.parse_args(['--log'])

logfile='./logs/training_logs.txt'
log_dir='./logs'
SEED=42

def log_info(inpstr, *args):
    message = inpstr.format(*args)
    if parsedargs.log:
        os.makedirs(os.path.dirname(logfile), exist_ok=True)
        with open(logfile, 'a') as file:
            file.write(f"{message}\n")
        print(message)
    
    
    
# Create the modified CartPole environment
env = gym.make('openai_cartpole/ModifiedCartPole-v1', render_mode="human")
env.reset()
#env=Monitor(env, log_dir, allow_early_resets=True)
ppo_filename='./models/ppo/ppo-cartpole.zip'
print("$$$$",os.path.exists(ppo_filename))
# Train a PPO agent on the cartpole with a fixed seed for reproducibility
if os.path.exists(ppo_filename):
    log_info("Loading pretrained PPO Model from: {}", ppo_filename)
    ppo_agent = PPO.load(ppo_filename)
else:
    log_info("Training PPO Model and saving: {}", ppo_filename)
    ppo_agent = PPO("MlpPolicy", env, seed=SEED, verbose=1, tensorboard_log="./logs/ppo")
    ppo_agent.learn(total_timesteps=10000)
    ppo_agent.save(ppo_filename)
mean_reward, std_reward = evaluate_policy(ppo_agent, env, n_eval_episodes=100)
print(f"Eval reward: {mean_reward} (+/-{std_reward})")
# TODO: Check if the training done is reproducible

# Keep the agent frozen throughout the adversary training 
# create ppo agent for the carpole agent, then make changes for each action chosen by Q space


# Original Parameters
gravity, masscart, masspole, length, force_mag = env.get_params()

# Range of the delta changes in the parameters
GRAVITY_MIN, GRAVITY_MAX = (gravity * 0.9, gravity * 1.1)
MASSCART_MIN, MASSCART_MAX = (masscart * 0.9, masscart * 1.1)
MASSPOLE_MIN, MASSPOLE_MAX = (masspole * 0.9, masspole * 1.1)
LENGTH_MIN, LENGTH_MAX = (length * 0.9, length * 1.1)
FORCE_MAG_MIN, FORCE_MAG_MAX = (force_mag * 0.9, force_mag * 1.1)



# Define the action space for the adversary
action_space = {
    "delta_gravity": np.linspace(GRAVITY_MIN-gravity, GRAVITY_MAX-gravity, 3),
    "delta_masscart": np.linspace(MASSCART_MIN-masscart, MASSCART_MAX-masscart, 3),
    "delta_masspole": np.linspace(MASSPOLE_MIN-masspole, MASSPOLE_MAX-masspole, 3),
    "delta_length": np.linspace(LENGTH_MIN-length, LENGTH_MAX-length, 3),
    "delta_force_mag": np.linspace(FORCE_MAG_MIN-force_mag, FORCE_MAG_MAX-force_mag, 3),
}
# action space compress: change only one action at a time. compress from 3^5 --> 3*5


# Define the observation space for the adversary (assuming discrete spaces)
observation_space = {
    "gravity": np.linspace(GRAVITY_MIN, GRAVITY_MAX, 3),
    "masscart": np.linspace(MASSCART_MIN, MASSCART_MAX, 3),
    "masspole": np.linspace(MASSPOLE_MIN, MASSPOLE_MAX, 3),
    "length": np.linspace(LENGTH_MIN, LENGTH_MAX, 3),
    "force_mag": np.linspace(FORCE_MAG_MIN, FORCE_MAG_MAX, 3),
}

action_space_variables = action_space.keys()
num_delta_per_variable = 3

get_action_from_index_map = {
    variable_index * num_delta_per_variable + change_index: (variable_index, change_index,variable)
    for variable_index, variable in enumerate(action_space_variables)
    for change_index in range(num_delta_per_variable)
}

def get_action_index(variable_index, change_index):
    """
    Convert a variable index and a change index to a single action index.
    There are 5 variables and 3 changes, so the total number of actions is 15.
    """
    return variable_index * 3 + change_index

def get_action_from_index(action_index):
    """
    Convert a single action index back to the variable index and change index.
    """
    variable_index,index,variable_name=get_action_from_index_map[action_index]
    delta_value=action_space[variable_name][index]
    
    return variable_name, delta_value

def get_observation_index(observation):
    """
    Convert an observation (a list of indices representing the state of each variable)
    to a single index using base-3 arithmetic.
    """
    base = 3
    observation_index = 0
    for i, obs in enumerate(observation):
        observation_index += obs * (base ** i)
    return observation_index

def get_observation_from_index(index):
    """
    Convert a single observation index back to the observation state.
    """
    observation = []
    base = 3
    for i in range(5):
        observation.append(index % base)
        index //= base
    return observation[::-1]  # Reverse it because the last element corresponds to the highest place value




# Parameters for Q-learning
learning_rate = 0.1
discount_factor = 0.99
epsilon = 0.1  # exploration rate
num_episodes = 100
init_scale=0.01


# Initialize Q-table with random noise
statespacesize = np.prod([len(observation_space[key]) for key in observation_space])
actionspacesize = 15
Q = np.random.randn(statespacesize, actionspacesize) * init_scale
log_info("Q matrix initialised with size :{}",Q.shape)



adv_rewards=[]


$$$$ True
Loading pretrained PPO Model from: ./models/ppo/ppo-cartpole.zip




In [None]:
state_history={}
for episode in range(num_episodes):
    counter=0
    print(episode)
    gravity, masscart, masspole, length, force_mag = env.get_params()  # For the adversary
    state_index = get_observation_index([
        np.digitize([gravity], observation_space['gravity'])[0] - 1,
        np.digitize([masscart], observation_space['masscart'])[0] - 1,
        np.digitize([masspole], observation_space['masspole'])[0] - 1,
        np.digitize([length], observation_space['length'])[0] - 1,
        np.digitize([force_mag], observation_space['force_mag'])[0] - 1,
    ])
    state_history[episode]=[]
    doneAdversary = False

    while not doneAdversary and counter<1000:
        
        action = {param: 0 for param in action_space.keys()}  # Initialize all actions to 'no change'
        action_index=-1
        
        # Choose action from state using policy derived from Q (e.g., ε-greedy)
        if np.random.rand() < epsilon:
            # # Choose a random action for this parameter
            # action_value = np.random.choice(action_space[parameter_to_modify])
            action_index=np.random.randint(actionspacesize)
            parameter_modify,action_value=get_action_from_index(action_index)
            

            # Create an action dictionary with the chosen modification
            action[parameter_modify] = action_value
            
        else:
            action_index = np.argmax(Q[state_index])  # Exploit learned values
            # print(action_index,state_index,Q[state_index])
            variable, delta = get_action_from_index(action_index)
            action[variable] = delta
            
        
        # Take the action and modify the environment parameters
        env.init_params(**action)
        env.reset()

        doneAgent = False
        total_reward = 0
        obs,_= env.reset()  # Move the reset outside of the while loop
        while not doneAgent and counter<1000:
            # Evaluate the PPO agent's performance with the modified parameters
            obs = obs.reshape(1, -1)
            a, _states = ppo_agent.predict(obs, deterministic=True)
            temp= env.step(a.item())
            obs, r, doneAgent,info,_=temp
            
            total_reward += r
        
        counter=counter+1

        # Adversary's reward is the negative of the PPO agent's total reward
        outer_reward = -total_reward
        # next_params = env.get_params()
        next_g, next_m, next_mp, next_l, next_fmg = env.get_params()
        next_state_index = get_observation_index([
            np.digitize([next_g], observation_space['gravity'])[0] - 1,
            np.digitize([next_m], observation_space['masscart'])[0] - 1,
            np.digitize([next_mp], observation_space['masspole'])[0] - 1,
            np.digitize([next_l], observation_space['length'])[0] - 1,
            np.digitize([next_fmg], observation_space['force_mag'])[0] - 1,
        ])
        
        state_history[episode].append((state_index,action_index))
        # Update Q-table using the Q-learning algorithm
        Q[state_index, action_index] += learning_rate * (outer_reward + discount_factor * np.max(Q[next_state_index]) - Q[state_index, action_index])
        hm = sn.heatmap(data = Q,annot=True) 
        plt.show()
        print("Outer reward:",outer_reward)
        adv_rewards.append(outer_reward)

mean_reward, std_reward = evaluate_policy(ppo_agent, env, n_eval_episodes=100, determinsitic=False)
print(f"Mean_reward for the PPO agent:{mean_reward:.2f} +/- {std_reward:.2f}")

env.close()
mean_adv_reward=np.mean(np.sum(adv_rewards))
print(f"Mean reward of the adversary: {mean_adv_reward:.2f} - Num episodes: {num_episodes}")


In [None]:
np.maximum()