In [2]:
import gym
from gym import Env
from gym.spaces import Discrete, Box, MultiDiscrete
import numpy as np
import random

from stable_baselines3 import PPO, DQN

In [38]:
# this will be an environment of an array 4*1 (eg. [4,3,2,1] or [1,2,4,3]
class ArrayEnv(Env):
    
    def __init__(self):

        self.game_array_size = 100

        self.action_space = Discrete(6) #6 possible types of swaps (01,02,03,12,13,23)

        high = np.array([1000] * 4)
        low = np.array([0] * 4)

        self.observation_space = Box(low, high, dtype=np.int16)

        #create an array of 100 random numbers between 0 and 1000
        self.state = np.random.randint(1000, size=(4))
        #game legnth of 100, shouldn't take more than 100 swaps

        self.end_array = np.copy(self.state)
        self.end_array.sort()

    #results when action taken
    def step(self, action):
        #print(self.state)
        #get the two values to swap
        if action == 0:
            x_indice = 0
            y_indice = 1
            
        elif action == 1:
            x_indice = 0
            y_indice = 2
            
        elif action == 2:
            x_indice = 0
            y_indice = 3
            
        elif action == 3:
            x_indice = 1
            y_indice = 2
            
        elif action == 4:
            x_indice = 1
            y_indice = 3
            
        elif action == 5:
            x_indice = 2
            y_indice = 3
        else:
            print("NO")
        

        #save the original values in order for ease of reading
        x_original = self.state[x_indice]
        y_original = self.state[y_indice]

        #perform the swap
        temp = self.state[x_indice]
        self.state[x_indice] = self.state[y_indice]
        self.state[y_indice] = temp


        #to calculate reward we first need to know how many elements are in the right spot
        correct_position = np.count_nonzero(self.state == self.end_array)
        # let's only reward if x comes before y in the array to simplify learning
        if x_indice < y_indice:
            #reward is set to the amount of things in correct position with size relative to
            #0.9/100 so that when everything is in place, the reward == 0.9 and then may be added to if the
            # movement itself is correct
            reward = correct_position*(0.9/4)
            #check if value at x is greater than value at y
            if x_original > y_original:
                #if a large x value is moving down the array
                reward +=0.1
            else:
                #undesirable action i.e. swapping two equal values or moving a large value up in the array
                reward = -100

        #check if game is over by comparing the current state to the final intended array
        if (self.state == self.end_array).all() == True:
            done = True
        else:
            done = False

        #set placeholder for info
        info = {}

        #return all data
        return self.state, reward, done, info        


    #implement printing the array here
    def render(self):
        #print (np.count_nonzero(self.state == self.end_array))
        print(self.state)

    #reset/setup the environment
    def reset(self):
        #reset array to random numbers
        self.state = np.random.randint(1000, size=(4))

        #create a sorted array for our final state
        self.end_array = np.copy(self.state)
        self.end_array.sort()
        #reset game length

        return self.state
    
    

In [3]:
# this will be an environment of an array 10*1 (eg. [4,3,2,1,6,7,8,9,29,201]
class ArrayEnv2(Env):
    
    def __init__(self, game_size):

        self.game_array_size = game_size

        self.action_space = MultiDiscrete([self.game_array_size, self.game_array_size]) #10 possible xs and 10 possible ys

        high = np.array([1000] * self.game_array_size)
        low = np.array([0] * self.game_array_size)

        self.observation_space = Box(low, high, dtype=np.int16)

        #create an array of 100 random numbers between 0 and 1000
        self.state = np.random.randint(1000, size=(self.game_array_size))
        #game legnth of 100, shouldn't take more than 100 swaps

        self.end_array = np.copy(self.state)
        self.end_array.sort()

    #results when action taken
    def step(self, action):

        
        x_indice = action[0]
        y_indice = action[1]
        #print("From: {}\tX: {}\tY: {}".format(action, x_indice, y_indice))
        #save the original values in order for ease of reading
        x_original = self.state[x_indice]
        y_original = self.state[y_indice]

        #perform the swap
        temp = self.state[x_indice]
        self.state[x_indice] = self.state[y_indice]
        self.state[y_indice] = temp


        #to calculate reward we first need to know how many elements are in the right spot
        correct_position = np.count_nonzero(self.state == self.end_array)
        # let's only reward if x comes before y in the array to simplify learning
        if x_indice < y_indice:
            #reward is set to the amount of things in correct position with size relative to
            #0.9/100 so that when everything is in place, the reward == 0.9 and then may be added to if the
            # movement itself is correct
            reward = correct_position*(0.9/self.game_array_size)
            #check if value at x is greater than value at y
            if x_original > y_original:
                #if a large x value is moving down the array
                reward +=0.1
            else:
                #undesirable action i.e. swapping two equal values or moving a large value up in the array
                reward = -100
        else:
            reward = -100

        #check if game is over by comparing the current state to the final intended array
        if (self.state == self.end_array).all() == True:
            done = True
        else:
            done = False

        #set placeholder for info
        info = {}

        #return all data
        return self.state, reward, done, info        


    #implement printing the array here
    def render(self):
        #print (np.count_nonzero(self.state == self.end_array))
        print(self.state)

    #reset/setup the environment
    def reset(self):
        #reset array to random numbers
        self.state = np.random.randint(1000, size=(self.game_array_size))

        #create a sorted array for our final state
        self.end_array = np.copy(self.state)
        self.end_array.sort()
        #reset game length

        return self.state

In [None]:
env = ArrayEnv2(5)

model = PPO("MlpPolicy", env, verbose=1, tensorboard_log="./PPO_array_5/")
model.learn(total_timesteps=10000000, log_interval=4)
model.save("test5-1")

In [None]:
env = ArrayEnv2(5)
obs = env.reset()

episodes = 10
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0 
    #print("--- original array ---")
    #env.render()
    #print("--- beginning sort ---")
    moves = 0
    while not done:
        moves+=1
        action, _states = model.predict(state)
        state, reward, done, info = env.step(action)
        score+=reward
        env.render()
    print("Episode:{} \tScore:{} \tMoves:{}".format(episode, score, moves))

In [None]:
env = ArrayEnv2(5)
for x in range(1000):
    new_action = env.action_space.sample()
    obs, reward, done, info = env.step(new_action)
    env.render()
    if done:
        print("cool")
        obs = env.reset()
env.close()

In [19]:
#COPIED CODE
""" Optuna example that optimizes the hyperparameters of
a reinforcement learning agent using A2C implementation from Stable-Baselines3
on a OpenAI Gym environment.
This is a simplified version of what can be found in https://github.com/DLR-RM/rl-baselines3-zoo.
You can run this example as follows:
    $ python sb3_simple.py
"""
from typing import Any
from typing import Dict

import gym
import optuna
from optuna.pruners import MedianPruner
from optuna.samplers import TPESampler
from stable_baselines3 import A2C
from stable_baselines3.common.callbacks import EvalCallback
import torch
import torch.nn as nn


N_TRIALS = 100
N_STARTUP_TRIALS = 5
N_EVALUATIONS = 2
N_TIMESTEPS = int(2e4)
EVAL_FREQ = int(N_TIMESTEPS / N_EVALUATIONS)
N_EVAL_EPISODES = 3

ENV_ID = "CartPole-v1"

DEFAULT_HYPERPARAMS = {
    "policy": "MlpPolicy",
    "env": ArrayEnv2(5),
}


def sample_ppo_params(trial: optuna.Trial) -> Dict[str, Any]:
    """Sampler for A2C hyperparameters."""
    gamma = 1.0 - trial.suggest_float("gamma", 0.0001, 0.1, log=True)
    max_grad_norm = trial.suggest_float("max_grad_norm", 0.3, 5.0, log=True)
    gae_lambda = 1.0 - trial.suggest_float("gae_lambda", 0.001, 0.2, log=True)
    n_steps = 2 ** trial.suggest_int("exponent_n_steps", 3, 10)
    learning_rate = trial.suggest_float("lr", 1e-5, 1, log=True)
    ent_coef = trial.suggest_float("ent_coef", 0.00000001, 0.1, log=True)
    ortho_init = trial.suggest_categorical("ortho_init", [False, True])
    net_arch = trial.suggest_categorical("net_arch", ["tiny", "small"])
    activation_fn = trial.suggest_categorical("activation_fn", ["tanh", "relu"])

    # Display true values
    trial.set_user_attr("gamma_", gamma)
    trial.set_user_attr("gae_lambda_", gae_lambda)
    trial.set_user_attr("n_steps", n_steps)

    net_arch = [
        {"pi": [64], "vf": [64]} if net_arch == "tiny" else {"pi": [64, 64], "vf": [64, 64]}
    ]

    activation_fn = {"tanh": nn.Tanh, "relu": nn.ReLU}[activation_fn]

    return {
        "n_steps": n_steps,
        "gamma": gamma,
        "gae_lambda": gae_lambda,
        "learning_rate": learning_rate,
        "ent_coef": ent_coef,
        "max_grad_norm": max_grad_norm,
        "policy_kwargs": {
            "net_arch": net_arch,
            "activation_fn": activation_fn,
            "ortho_init": ortho_init,
        },
    }


class TrialEvalCallback(EvalCallback):
    """Callback used for evaluating and reporting a trial."""

    def __init__(
        self,
        eval_env: gym.Env,
        trial: optuna.Trial,
        n_eval_episodes: int = 5,
        eval_freq: int = 10000,
        deterministic: bool = True,
        verbose: int = 0,
    ):

        super().__init__(
            eval_env=eval_env,
            n_eval_episodes=n_eval_episodes,
            eval_freq=eval_freq,
            deterministic=deterministic,
            verbose=verbose,
        )
        self.trial = trial
        self.eval_idx = 0
        self.is_pruned = False

    def _on_step(self) -> bool:
        if self.eval_freq > 0 and self.n_calls % self.eval_freq == 0:
            super()._on_step()
            self.eval_idx += 1
            self.trial.report(self.last_mean_reward, self.eval_idx)
            # Prune trial if need
            if self.trial.should_prune():
                self.is_pruned = True
                return False
        return True


def objective(trial: optuna.Trial) -> float:

    kwargs = DEFAULT_HYPERPARAMS.copy()
    # Sample hyperparameters
    kwargs.update(sample_ppo_params(trial))
    # Create the RL model
    model = PPO(**kwargs)
    # Create env used for evaluation
    eval_env = ArrayEnv2(5)
    # Create the callback that will periodically evaluate
    # and report the performance
    eval_callback = TrialEvalCallback(
        eval_env, trial, n_eval_episodes=N_EVAL_EPISODES, eval_freq=EVAL_FREQ, deterministic=True
    )

    nan_encountered = False
    try:
        model.learn(N_TIMESTEPS, callback=eval_callback)
    except AssertionError as e:
        # Sometimes, random hyperparams can generate NaN
        print(e)
        nan_encountered = True
    finally:
        # Free memory
        model.env.close()
        eval_env.close()

    # Tell the optimizer that the trial failed
    if nan_encountered:
        return float("nan")

    if eval_callback.is_pruned:
        raise optuna.exceptions.TrialPruned()

    return eval_callback.last_mean_reward



# Set pytorch num threads to 1 for faster training
torch.set_num_threads(1)

sampler = TPESampler(n_startup_trials=N_STARTUP_TRIALS)
# Do not prune before 1/3 of the max budget is used
pruner = MedianPruner(n_startup_trials=N_STARTUP_TRIALS, n_warmup_steps=N_EVALUATIONS // 3)

study = optuna.create_study(sampler=sampler, pruner=pruner, direction="maximize")
try:
    study.optimize(objective, n_trials=N_TRIALS, timeout=600)
except KeyboardInterrupt:
    pass

print("Number of finished trials: ", len(study.trials))

print("Best trial:")
trial = study.best_trial

print("  Value: ", trial.value)

print("  Params: ")
for key, value in trial.params.items():
    print("    {}: {}".format(key, value))

print("  User attrs:")
for key, value in trial.user_attrs.items():
    print("    {}: {}".format(key, value))

[32m[I 2021-11-15 16:24:01,039][0m A new study created in memory with name: no-name-e7f1cb3b-c25d-437d-9d4c-4a8ed148406f[0m


Number of finished trials:  1
Best trial:


ValueError: No trials are completed yet.