# Cartpole balance task in Open AI Gym 
This Jupyter notebook serves as to analyse the performance of MPC with predictive sampling for the balance task in Open AI Gym. Explanations on how the MPC controller is developed are in the Notebook 'cartpole_swingup_dm_MPC'.

In [1]:
# imports
import gym
import numpy as np
from statistics import mean, stdev
import time

In [2]:
# Initialize environment and agent
#env = gym.make('CartPole-v1', render_mode='human') # render_mode='human' is optional
env = gym.make('CartPole-v1') # no visualization

In [3]:
env.reset()
state, reward, done, info, _ = env.step(1)
state

  if not isinstance(terminated, (bool, np.bool8)):


array([-0.00731368,  0.22669847,  0.03557884, -0.25125936], dtype=float32)

## Check of the model
Check if the dynamical model predicts correcty.

In [4]:
# physics check

# physics from https://github.com/openai/gym/blob/master/gym/envs/classic_control/cartpole.py
dt = 0.02 # changed
g = 9.8 # changed
m_c = 1
m_p = 0.1
l = 0.5
d_c = 5e-4
d_p = 2e-6
gear = 10 # from input to force in Newton

def cartpole_derivatives(state, F, m_c, m_p, l, g): # F * 10 to 
    x, x_dot, theta, theta_dot = state

    sin_theta = np.sin(theta)
    cos_theta = np.cos(theta)
    
    theta_double_dot = (g * sin_theta + cos_theta * (-F - m_p * l * theta_dot**2 * sin_theta + d_c * np.sign(x_dot))/(m_c + m_p) - (d_p * theta_dot)/(m_p * l)) / (l * (4/3 - (m_p * cos_theta**2)/(m_c + m_p)))
    x_double_dot = (F + m_p * l * (theta_dot**2 * sin_theta - theta_double_dot * cos_theta) - d_c * np.sign(x_dot))/(m_c + m_p)
    
    return np.array([x_dot, x_double_dot, theta_dot, theta_double_dot])

def euler_step(state, F, m_c, m_p, l, g, dt):
    derivatives = cartpole_derivatives(state, F*gear, m_c, m_p, l, g) # gear 10 the adjust the force to Newton
    new_state = state + dt * derivatives
    return new_state

action = 0
env.reset()

state, done = env.reset()
euler_state = state

print('startstate', state)
print()
for i in range(100):
    env_state, reward, done, info, _ = env.step(action)
    euler_state = euler_step(euler_state, -1, m_c, m_p, l, g, dt) # -1 is the effect of action 0, the gear is already included in the euler_step function
    if i%10 < 10: # print every 10th timestep    
        print('env ', env_state)
        print('euler ', euler_state)
        print()

startstate [ 0.00623219  0.03447602 -0.01174521 -0.01284546]

env  [ 0.00692171 -0.16047554 -0.01200212  0.27610868]
euler  [ 0.00692171 -0.1604853  -0.01200212  0.27612334]

env  [ 0.0037122  -0.35542423 -0.00647994  0.5649821 ]
euler  [ 0.003712   -0.3554242  -0.00647965  0.56498177]

env  [-0.00339629 -0.5504547   0.0048197   0.8556165 ]
euler  [-0.00339648 -0.55044486  0.00481998  0.85560093]

env  [-0.01440538 -0.74564195  0.02193203  1.149811  ]
euler  [-0.01440538 -0.74562236  0.021932    1.14977982]

env  [-0.02931822 -0.9410432   0.04492825  1.4492899 ]
euler  [-0.02931783 -0.94101376  0.0449276   1.44924255]

env  [-0.04813908 -1.1366876   0.07391405  1.755665  ]
euler  [-0.0481381  -1.13664843  0.07391245  1.75560097]

env  [-0.07087284 -1.3325655   0.10902735  2.0703905 ]
euler  [-0.07087107 -1.33251647  0.10902447  2.07030925]

env  [-0.09752414 -1.5286139   0.15043516  2.3947074 ]
euler  [-0.0975214  -1.52855493  0.15043065  2.39460828]

env  [-0.12809643 -1.7247002   0.1

  logger.warn(


Conclusion: Similar numbers, ehe dynamical equations work fine.

## Sample generation
change from continous to discrete

In [5]:

def generate_samples(opt_u, num_samples, flip_prob):
    samples = []
    for _ in range(num_samples):
        sample = np.array([np.random.choice([1 - x, x], p=[flip_prob, 1 - flip_prob]) for x in opt_u])
        samples.append(sample)
    return np.array(samples)

opt_u = np.array([1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 1, 0, 1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

num_samples = 3
flip_prob = 0.1  # Probability of flipping a bit

samples = generate_samples(opt_u, num_samples, flip_prob)
print(samples)

[[1 0 0 1 1 1 1 0 0 0 0 0 0 0 1 1 0 0 0 1 1 1 0 1 0 0 1 1 1 1 0 1 1 1 0 0
  1 0 1 0 1 1 0 1 0 0 0 0 1 0 1 1 0 1 1 1 1 1 0 0 0 0 1 1 0 0 0 1 0 0 0 0
  1 1 1 1 0 1 1 1 1 0 1 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0]
 [1 0 1 1 1 1 0 0 0 0 0 0 1 0 0 1 0 0 0 1 1 1 0 1 0 0 0 0 1 1 1 1 0 1 0 0
  1 0 1 0 1 1 0 1 0 0 0 0 1 0 1 0 0 1 0 1 1 1 0 1 0 0 1 1 1 0 0 0 0 0 1 0
  1 1 1 1 0 1 1 1 1 1 1 1 0 0 0 0 0 0 0 1 0 0 1 0 1 1 0 0]
 [1 0 1 1 1 1 0 0 0 0 0 0 1 0 1 1 0 0 0 1 1 1 0 1 0 0 0 1 1 1 1 1 1 1 0 0
  1 0 1 0 1 1 0 1 1 0 0 0 1 0 1 1 0 1 1 1 1 1 0 0 1 0 1 1 1 0 0 0 0 1 1 0
  0 0 0 1 0 1 1 1 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]]


In [6]:
# try starting point:
horizon = 10
alternating_array = np.tile([0, 1], horizon // 2 + 1)[:horizon]
print(alternating_array)

[0 1 0 1 0 1 0 1 0 1]


## Final Implementation

In [7]:
# physics from https://github.com/openai/gym/blob/master/gym/envs/classic_control/cartpole.py
dt = 0.02 # changed
g = 9.8 # changed
m_c = 1
m_p = 0.1
l = 0.5
d_c = 5e-4
d_p = 2e-6
gear = 10 # from input to force in Newton


# weights and parameters
w_angle = 1
w_center = 0.5 # different from continuoues version
num_samples = 20
horizon = 100
flip_prob = 0.1  # Probability of flipping a bit


# initalize an optimal action
opt_u = alternating_array = np.tile([0, 1], horizon // 2 + 1)[:horizon] # alternating 0 and 1, different start

def cartpole_derivatives(state, F, m_c, m_p, l, g): # F * 10 to 
    x, x_dot, theta, theta_dot = state

    sin_theta = np.sin(theta)
    cos_theta = np.cos(theta)
    
    theta_double_dot = (g * sin_theta + cos_theta * (-F - m_p * l * theta_dot**2 * sin_theta + d_c * np.sign(x_dot))/(m_c + m_p) - (d_p * theta_dot)/(m_p * l)) / (l * (4/3 - (m_p * cos_theta**2)/(m_c + m_p)))
    x_double_dot = (F + m_p * l * (theta_dot**2 * sin_theta - theta_double_dot * cos_theta) - d_c * np.sign(x_dot))/(m_c + m_p)
    
    return np.array([x_dot, x_double_dot, theta_dot, theta_double_dot])

def euler_step(state, F, m_c, m_p, l, g, dt):
    derivatives = cartpole_derivatives(state, F*gear, m_c, m_p, l, g) # gear 10 the adjust the force to Newton
    new_state = state + dt * derivatives
    return new_state

def rollout(u, state):
    cost = 0

    for j in range(horizon):
        if u[j] == 1:
            action = 1
        else:
            action = -1
        state = euler_step(state, action, m_c, m_p, l, g, dt) # interate through the simulation
        
        cost += (w_angle * (1 - np.cos(state[2])) + w_center * (state[0])**2)  # sum the cost
    return cost

# Policy
def predictiv_sampling_disc(real_state):
  
  # sample control inputs
  global opt_u 
  opt_u = np.delete(opt_u, 0) # Remove the first element
  opt_u = np.append(opt_u, opt_u[-1]) # Duplicate the last element
  
  u = np.random.randint(2, size=(num_samples-1, horizon))
  u = generate_samples(opt_u, num_samples, flip_prob)
  u = np.concatenate((opt_u[np.newaxis, :], u), axis=0) # add opt_u to the new samples

  # calculate rollouts and cost
  cost = [rollout(u[i], real_state) for i in range(num_samples)]
    
  # Choose the control input with the lowest cost
  best_index = np.argmin(cost)
  opt_u = u[best_index]
    
  return opt_u[0] # return the first element of the optimal control input

In [9]:
# Test
env = gym.make('CartPole-v1') # no visualization

# Test
test_episodes = 30
test_scores = []
start_time = time.time()

for e in range(test_episodes):
    state, done = env.reset()
    opt_u = alternating_array = np.tile([0, 1], horizon // 2 + 1)[:horizon] # alternating 0 and 1, different start

    for t in range(501):
        action = predictiv_sampling_disc(state)
        state, reward, done, info, _ = env.step(action)
    

        if done or t == 500:
            print("Test Episode: {}/{}, Score: {}".format(e + 1, test_episodes, t))
            test_scores.append(t)
            break

test_average = mean(test_scores)
test_sigma = stdev(test_scores)
end_time = time.time()
total_time = end_time - start_time
total_steps = sum(test_scores)
average_time_per_step = total_time / total_steps

print()
print('Score average: {:.2f}, Sigma: {:.2f}'.format(test_average, test_sigma))
print('Average time per step: {:.4f} seconds'.format(average_time_per_step))

env.close()


Test Episode: 1/30, Score: 261
Test Episode: 2/30, Score: 135
Test Episode: 3/30, Score: 68
Test Episode: 4/30, Score: 500
Test Episode: 5/30, Score: 254
Test Episode: 6/30, Score: 355
Test Episode: 7/30, Score: 500
Test Episode: 8/30, Score: 240
Test Episode: 9/30, Score: 101
Test Episode: 10/30, Score: 500
Test Episode: 11/30, Score: 500
Test Episode: 12/30, Score: 152
Test Episode: 13/30, Score: 353
Test Episode: 14/30, Score: 500
Test Episode: 15/30, Score: 133
Test Episode: 16/30, Score: 500
Test Episode: 17/30, Score: 500
Test Episode: 18/30, Score: 486
Test Episode: 19/30, Score: 500
Test Episode: 20/30, Score: 203
Test Episode: 21/30, Score: 500
Test Episode: 22/30, Score: 500
Test Episode: 23/30, Score: 148
Test Episode: 24/30, Score: 358
Test Episode: 25/30, Score: 500
Test Episode: 26/30, Score: 500
Test Episode: 27/30, Score: 148
Test Episode: 28/30, Score: 123
Test Episode: 29/30, Score: 200
Test Episode: 30/30, Score: 107

Score average: 327.50, Sigma: 167.43
Average time

## Record a video

In [None]:
from moviepy.editor import ImageSequenceClip

# Load the cartpole environment
env = gym.make("CartPole-v1", render_mode="rgb_array")

# Visualization and video creation
def save_video(policy):
    frames = []

    state, done = env.reset()
    for t in range(501):
        pixels = env.render()
        frames.append(pixels)
        action = policy(state)
        state, reward, done, info, _ = env.step(action)

        if done or t == 500:
            break

    # Save the frames as a video
    clip = ImageSequenceClip(frames, fps=50)
    clip.write_videofile("video/MPC_gym_balance.mp4", codec="libx264")

# Call the save_video function with your policy function
save_video(predictiv_sampling_disc)

env.close()


Moviepy - Building video video/MPC_gym_balance.mp4.
Moviepy - Writing video video/MPC_gym_balance.mp4



                                                                

Moviepy - Done !
Moviepy - video ready video/MPC_gym_balance.mp4


## As note in Discussion: test of different appending method

In [10]:
# physics from https://github.com/openai/gym/blob/master/gym/envs/classic_control/cartpole.py
dt = 0.02 # changed
g = 9.8 # changed
m_c = 1
m_p = 0.1
l = 0.5
d_c = 5e-4
d_p = 2e-6
gear = 10 # from input to force in Newton


# weights and parameters
w_angle = 1
w_center = 0.5 # different from continuoues version
num_samples = 20
horizon = 100
flip_prob = 0.1  # Probability of flipping a bit


# initalize an optimal action
opt_u = alternating_array = np.tile([0, 1], horizon // 2 + 1)[:horizon] # alternating 0 and 1, different start

def generate_samples(opt_u, num_samples, flip_prob):
    samples = []
    for _ in range(num_samples):
        sample = np.array([np.random.choice([1 - x, x], p=[flip_prob, 1 - flip_prob]) for x in opt_u])
        samples.append(sample)
    return np.array(samples)

def cartpole_derivatives(state, F, m_c, m_p, l, g): # F * 10 to 
    x, x_dot, theta, theta_dot = state

    sin_theta = np.sin(theta)
    cos_theta = np.cos(theta)
    
    theta_double_dot = (g * sin_theta + cos_theta * (-F - m_p * l * theta_dot**2 * sin_theta + d_c * np.sign(x_dot))/(m_c + m_p) - (d_p * theta_dot)/(m_p * l)) / (l * (4/3 - (m_p * cos_theta**2)/(m_c + m_p)))
    x_double_dot = (F + m_p * l * (theta_dot**2 * sin_theta - theta_double_dot * cos_theta) - d_c * np.sign(x_dot))/(m_c + m_p)
    
    return np.array([x_dot, x_double_dot, theta_dot, theta_double_dot])

def euler_step(state, F, m_c, m_p, l, g, dt):
    derivatives = cartpole_derivatives(state, F*gear, m_c, m_p, l, g) # gear 10 the adjust the force to Newton
    new_state = state + dt * derivatives
    return new_state

def rollout(u, state):
    cost = 0

    for j in range(horizon):
        if u[j] == 1:
            action = 1
        else:
            action = -1
        state = euler_step(state, action, m_c, m_p, l, g, dt) # interate through the simulation
        
        cost += (w_angle * (1 - np.cos(state[2])) + w_center * (state[0])**2)  # sum the cost
    return cost

# Policy
def predictiv_sampling_disc(real_state):
  
  # sample control inputs
  global opt_u 
  opt_u = np.delete(opt_u, 0) # Remove the first element
  
  if opt_u[-1] == 1: # new additon of the last element  --------- new
    opt_u = np.append(opt_u, 0)
  else:
    opt_u = np.append(opt_u, 1) # new --------------------------- 
        
  opt_u = np.append(opt_u, opt_u[-1]) # Duplicate the last element
  
  u = np.random.randint(2, size=(num_samples-1, horizon))
  u = generate_samples(opt_u, num_samples, flip_prob)
  u = np.concatenate((opt_u[np.newaxis, :], u), axis=0) # add opt_u to the new samples

  # calculate rollouts and cost
  cost = [rollout(u[i], real_state) for i in range(num_samples)]
    
  # Choose the control input with the lowest cost
  best_index = np.argmin(cost)
  opt_u = u[best_index]
    
  return opt_u[0] # return the first element of the optimal control input

In [11]:
# Test
env = gym.make('CartPole-v1') # no visualization

# Test
test_episodes = 30
test_scores = []
start_time = time.time()

for e in range(test_episodes):
    state, done = env.reset()
    opt_u = alternating_array = np.tile([0, 1], horizon // 2 + 1)[:horizon] # alternating 0 and 1, different start

    for t in range(501):
        action = predictiv_sampling_disc(state)
        state, reward, done, info, _ = env.step(action)
    

        if done or t == 500:
            print("Test Episode: {}/{}, Score: {}".format(e + 1, test_episodes, t))
            test_scores.append(t)
            break

test_average = mean(test_scores)
test_sigma = stdev(test_scores)
end_time = time.time()
total_time = end_time - start_time
total_steps = sum(test_scores)
average_time_per_step = total_time / total_steps

print()
print('Score average: {:.2f}, Sigma: {:.2f}'.format(test_average, test_sigma))
print('Average time per step: {:.4f} seconds'.format(average_time_per_step))

env.close()


Test Episode: 1/30, Score: 306
Test Episode: 2/30, Score: 500
Test Episode: 3/30, Score: 131
Test Episode: 4/30, Score: 500
Test Episode: 5/30, Score: 500
Test Episode: 6/30, Score: 500
Test Episode: 7/30, Score: 500
Test Episode: 8/30, Score: 500
Test Episode: 9/30, Score: 82
Test Episode: 10/30, Score: 66
Test Episode: 11/30, Score: 500
Test Episode: 12/30, Score: 212
Test Episode: 13/30, Score: 70
Test Episode: 14/30, Score: 500
Test Episode: 15/30, Score: 500
Test Episode: 16/30, Score: 500
Test Episode: 17/30, Score: 156
Test Episode: 18/30, Score: 500
Test Episode: 19/30, Score: 500
Test Episode: 20/30, Score: 500
Test Episode: 21/30, Score: 500
Test Episode: 22/30, Score: 177
Test Episode: 23/30, Score: 500
Test Episode: 24/30, Score: 500
Test Episode: 25/30, Score: 185
Test Episode: 26/30, Score: 446
Test Episode: 27/30, Score: 500
Test Episode: 28/30, Score: 500
Test Episode: 29/30, Score: 500
Test Episode: 30/30, Score: 250

Score average: 386.03, Sigma: 166.34
Average time p