In [4]:
import pandas as pd
import numpy as np
import scipy
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.animation
import threading
import pickle
import joblib
import copy
import threading
from sklearn import impute, preprocessing, model_selection, base, metrics, linear_model, pipeline, ensemble, svm, multiclass, neighbors, compose, datasets, decomposition, manifold
import tensorflow as tf
from tensorflow import keras
import cv2
from scratch_models import my_decorators
import gym
import pygame as pg

#### Environment
**Setup the CartPole environment**

In [5]:
env = gym.make('CartPole-v1', render_mode='rgb_array')
clock = pg.time.Clock()

**Define a Policy**
It can either be User Controlled (``rl=None``), or a simple harcoded policy (``rl='basic'``) or a Neural Network based policy (``rl='nn'``)

In [12]:
def policy(obs, rl=None):
    action = None
    if rl is None:
        return action

    if rl=='basic':
        angle = obs[2]
        action = 1 if angle>0 else 0
        return action

    if rl=='nn':
        return None

**Simulations**
Next, I shall create some functions to simulate the CartPole environment based on different policies.

In [32]:
# Run animations
def animate(env, win, clock):
    clock.tick(15)
    run = True
    frame = env.render()
    frame = np.rot90(frame)
    frame = np.flipud(frame)
    surface = pg.surfarray.make_surface(frame)

    win.blit(surface, (0, 0))
    pg.display.update()

    for event in pg.event.get():
        if event.type == pg.QUIT:
            run = False
            pg.quit()
    return run
            
# User controlled settings if no policy     
def run_by_user(animation):
    action = None
    if animation:
        user_input = pg.key.get_pressed()
        if user_input[pg.K_RIGHT]:
            action = 1
        elif user_input[pg.K_LEFT]:
            action = 0
        return action
    else:
        raise ValueError('Animation must be True in user-controlled policy')

In [61]:
# List that holds rewards for each run
env_rewards = []

# Main function to run the simulation
def simulate_episode(env, policy_type=None, animation=True):
    obs, info = env.reset()
    clock = pg.time.Clock()
    
    # Initialize Pygame if animation is True
    if animation:
        clock.tick(15)
        win = pg.display.set_mode((600, 400))
        pg.init()
        run = True
    else:
        run = 500


    episode_rewards = 0
    while run:        
        action=policy(obs, policy_type)
        
        # For User controlled setting
        if action is None:
            action = run_by_user(animation)
                
        if action is not None:
            obs, reward, done, truncated, info = env.step(action)
            episode_rewards += reward
            if done:                                                # COMMENT THIS SECTION OUT FOR DEMONSTRATION
                obs, info = env.reset()
                break
        
        if animation:
            run = animate(env, win, clock)
        else:
            run-=1
    
    env_rewards.append((policy_type,episode_rewards))
    env.close()

**1. User Controlled**

In [44]:
simulate_episode(env, animation=True)

**2. Hard-coded Policy**

In [46]:
simulate_episode(env, policy_type='basic', animation=True)

Let us check the performance of this basic policy.

In [52]:
simulate_episode(env, policy_type='basic', animation=False)

In [55]:
# Let it learn for 500 runs
for episode in range(500):
    simulate_episode(env, policy_type='basic' ,animation=False)

In [58]:
max(env_rewards)

('basic', 68.0)

With the hard-coded policy it could run a max of 68 steps before falling down.

In [60]:
env.observation_space

Box([-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38], [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38], (4,), float32)