<a href="https://colab.research.google.com/github/jackrankin/chess_engine/blob/main/cartpole_for_barry.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install gymnasium
!pip install renderlab

In [3]:
import numpy as np
import gymnasium as gym
import renderlab as rl

from sklearn.neighbors import KNeighborsClassifier
from copy import deepcopy

In [5]:
class Particle:
  def __init__(self, env, sweep_depth):
    self.env = env
    self.sweep_depth = sweep_depth

  def render(self):
    print(self.env.state)

  def push_forward(self):
    cost, action = self.sweep(0)
    return action, cost, cost == float('inf')

  def sweep(self, depth):
    if depth == self.sweep_depth:
      return abs(self.env.state[2]), 0

    minimum_cost = float('inf')
    best_action = 0
    og_state = self.env.state[:]

    for action in range(2):
      observation, reward, terminated, truncated, info = self.env.step(action)

      if terminated or truncated:
        self.env.unwrapped.state = og_state
        continue

      future_cost, future_action = self.sweep(depth+1)
      self.env.unwrapped.state = og_state[:]

      if future_cost < minimum_cost:
        minimum_cost = future_cost
        best_action = action

    return minimum_cost, best_action

In [16]:
class Agent():
  def __init__(self, env, sweep_depth, num_particles, bucket_precision):
    self.env = env
    self.sweep_depth = sweep_depth
    self.buckets = {}
    self.particles = []
    self.num_particles = num_particles
    self.bucket_precision = bucket_precision

  def seed_particles(self, agent_state):
    for i in range(self.num_particles):
      tmp_env = gym.make("CartPole-v1")
      tmp_env.reset()

      # seed particles close to the agent's state, varying slightly for future sweeps

      """
        MESS WITH THE -.5 to 0.5 RANGE


      """
      tmp_env.unwrapped.state = [i + np.random.uniform(-0.5,0.5) for i in agent_state]
      self.particles.append(Particle(tmp_env, self.sweep_depth))

  def sweep_particles(self):
    for i in range(self.num_particles):

      action, cost, done = self.particles[i].push_forward()

      if done:
        continue

      bucket = []

      # buckets created by rounding the actions to bucket_precision decimals
      for j in range(4):
        bucket.append(round(self.particles[i].env.state[j], self.bucket_precision))

      bucket = tuple(bucket)

      # we choose to cache the action with the optimal reward
      if cost < self.buckets.get(bucket, (float('inf'), float('inf')))[1]:
        self.buckets[bucket] = (action, cost)

  def push_forward(self):
    state = []

    for i in range(4):
      state.append(round(self.env.state[i], self.bucket_precision))

    state = tuple(state)

    # find the nearest neighbors in the cache to determine the optimal action


    """
      MESS WITH THE 100 NEIGHBORS



    """
    neigh = KNeighborsClassifier(100)

    X = np.array(list(self.buckets.keys()))
    Y = np.array([i[0] for i in self.buckets.values()])

    neigh.fit(X,Y)
    action = neigh.predict(np.array([state]))[0]
    observation, reward, terminated, truncated, info = self.env.step(action)

    return action, reward, terminated or truncated

In [None]:
env = gym.make("CartPole-v1", render_mode = "rgb_array")

env.reset()

"""
  MESS WITH THE AGENT CONSTRUCTOR (3 and 2000)




"""

a = Agent(env, 3, 2000, 1) # sweepdepth, number of particles, and bucket_precision

i = 0

while True:

  i += 1
  a.seed_particles(a.env.state)
  a.sweep_particles()
  t = a.push_forward()
  print(i, a.env.state)

  if t[2] or i == 200: break