In [1]:
import sys
sys.path.insert(1, '/home/taylor/Classes/cs230/achtung')
import achtung_process;

pygame 1.9.6
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
import matplotlib.pyplot as plt
%matplotlib inline

In [3]:
import gym
import numpy as np
import pickle
import torch as th
import torch.nn as nn

In [4]:
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.vec_env import DummyVecEnv, VecEnv
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.vec_env import VecTransposeImage
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

In [5]:
env = achtung_process.AchtungProcess(1)
env.env.speed = 0 # set to zero for training (i.e., no frame delay)
env.env.render_game = False

Achtung Die Kurve!


In [7]:
# for i in range(100):
#     action = 0
#     obs, rewards, done, info = env.step(action)
#     env.render()
    
#     plt.imshow(np.resize(obs[-2], (80, 80)), cmap="gray") 
#     plt.show()

#     if done:
#         obs = env.reset()

In [8]:
# env = DummyVecEnv([lambda: env])

In [6]:
class CustomCNN(BaseFeaturesExtractor):
    """
    :param observation_space: (gym.Space)
    :param features_dim: (int) Number of features extracted.
        This corresponds to the number of unit for the last layer.
    """

    def __init__(self, observation_space: gym.spaces.Box, features_dim: int = 128):
        super(CustomCNN, self).__init__(observation_space, features_dim)
        # We assume CxHxW images (channels first)
        # Re-ordering will be done by pre-preprocessing or wrapper
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=8, stride=4, padding=0),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=0),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.cnn(
                th.as_tensor(observation_space.sample()[None]).float()
            ).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))

policy_kwargs = dict(
    features_extractor_class=CustomCNN,
    features_extractor_kwargs=dict(features_dim=128),
)

In [7]:
model = PPO("CnnPolicy", 
            env, 
            policy_kwargs=policy_kwargs,
            n_steps=250,
            n_epochs=10,
            batch_size=100,
            learning_rate=1.0e-4,
            clip_range=0.1,
            clip_range_vf=0.1,
            vf_coef=0.5,
            ent_coef=0.01) 

In [8]:
# Evaluate the initial random policy
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=100)
print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

mean_reward:54.96 +/- 24.65


In [None]:
# Train
rewards = []
stds = []

for i in range(100):
    print("iteration: ", i+1)
    model.learn(total_timesteps=10000)
    
    print("   saving...")
    model.save("ppo_achtung")
    
    print("   evalute...")
    mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=100)
    print(f"   mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")
    
    rewards.append(mean_reward)
    stds.append(std_reward)
    
    with open("baselines_training/ppo_reward.txt", "wb") as f:   
        pickle.dump(rewards, f)
    with open("baselines_training/ppo_std.txt", "wb") as f:   
        pickle.dump(stds, f)

In [None]:
# Evaluate the trained policy
model.load("ppo_achtung")
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=100)
print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")