<a href="https://colab.research.google.com/github/DS-Aditya-928/CartPoleProject4/blob/main/RL_Project4_template.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
'''

Imports! The first 3 are for our cartpole simulation, numpy is for our bot, tqdm is a super easy way to
draw progress bars, and the last one is used to play the video of the simulation.

'''
import gymnasium as gym
from gym import logger as gymlogger
from gym.wrappers import RecordVideo, RecordEpisodeStatistics

import numpy as np
from tqdm import tqdm

import moviepy.editor

  """
  lines_video = [l for l in lines if ' Video: ' in l and re.search('\d+x\d+', l)]
  from scipy.ndimage.filters import sobel

  if event.key is 'enter':

  from pkg_resources import resource_stream, resource_exists

Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)

Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)

Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)



#📌 Before we go any further:

Let's have a look over what we're trying to accomplish.

<br>

Reinforcement learning is a machine learning algorithm where we let the algorithm make its own descisions in an attempt to maximize a "reward". This reward can be a high score in a game, or more relevant to our project here, the time our algorithm is able to balance a pole mounted on a cart for.


To train our algorithm, we're going to be using something called Q learning.

<br>


---

#📌 What is Q-learning?

This bit is pretty complex, and this video explains Q learning better than I ever could :p

https://www.youtube.com/watch?v=TiAXhVAZQl8  
  
<br>

The basic idea however, is that we are going to maintain a table (appropriately called a Q table) that holds expected changes in score for all of our possible actions at a give state.

<br>

I.E, for the cart pole example, let's say the pole is 5 degrees off centre, and has a velocity of 1 m/s. Our algorithm is going to go to the corresponding cell in our table, and see which one of the possible actions (in the cartpole example, those are moving left or right) will result in the greatest increase in score. So, the cell holds an array with expected score changes for each action.

Let's say the cell for the case described above has this array:  [-1.0, +2.5], with the first value corresponding to the expected change in score if we move to the left, and the other if we move to the right. Moving to the right gives us a score change of +2.5, so we're going to move to the right.


<br>

The training bit here involves having the model change these values in the q table so that it makes better and better decisions over time. We'll delve more into the specifics of how to accomplish this later.

<br>
Got all that? It's ok if you didn't. I'm still wrapping my head around it myself. Feel free to ask us questions during office hours and USE THE INTERNET. It can teach you anything if you use it right.  







---


This here is the CartPoleBot class; all the functions you'll need to implement are in here.




In [2]:
import numpy as np
from collections import defaultdict
import gym

class CartPoleBot:
    env: gym.Env
    learningRate: float
    discountFactor: float

    def __init__(self,
                 env: gym.Env,
                 learningRate: float,
                 initalEpsilon: float,
                 epsilonDecay: float,
                 finalEpsilon: float,
                 discountFactor: float):
        """
        Constructor. Don't change anything here. READ ALL THE COMMENTS THOUGH, they're hella useful.
        """
        self.env = env
        self.learningRate = learningRate

        # epsilon‐greedy params
        self.epsilon = initalEpsilon
        self.epsilonDecay = epsilonDecay
        self.finalEpsilon = finalEpsilon

        # initialize Q‐table as a dict mapping discretized states → zeroed action‐value arrays
        self.qTable = defaultdict(lambda: np.zeros(self.env.action_space.n))

        self.discountFactor = discountFactor


    def discConv(self, obs):
        """
        Discretize the continuous state vector into a hashable tuple of bins.
        DO NOT CHANGE.
        """
        posSpace  = np.linspace(-2.4, 2.4, 10)
        velSpace  = np.linspace(-4, 4,     10)
        angSpace  = np.linspace(-.2095, .2095, 10)
        angVSpace = np.linspace(-4, 4,        10)
        bins = [posSpace, velSpace, angSpace, angVSpace]

        tR = []
        for i in range(len(obs)):
            tR.append(np.digitize(obs[i], bins[i]))
        return tuple(tR)


    def getAction(self, observation):
        """
        Epsilon-greedy: with probability epsilon pick a random action,
        otherwise pick the action with highest Q-value for the current state.
        """
        state = self.discConv(observation)
        if np.random.random() < self.epsilon:
            # explore
            return self.env.action_space.sample()
        else:
            # exploit
            return int(np.argmax(self.qTable[state]))


    def update(self, pastObv, action, reward, terminated, currObv):
        """
        Q-learning update rule:
           Q(s,a) ← Q(s,a) + α * [ R + γ⋅max_a′ Q(s′,a′) – Q(s,a) ]
        If terminated=True, we treat future reward as zero.
        """
        past_state = self.discConv(pastObv)
        curr_state = self.discConv(currObv)

        old_q      = self.qTable[past_state][action]
        future_max = 0 if terminated else np.max(self.qTable[curr_state])

        td_error = (reward + self.discountFactor * future_max) - old_q
        self.qTable[past_state][action] = old_q + self.learningRate * td_error


    def decayEpsilon(self):
        """
        Reduce epsilon by epsilonDecay, but never go below finalEpsilon.
        """
        self.epsilon = max(self.finalEpsilon, self.epsilon - self.epsilonDecay)




In [5]:
# 0) Monkey‐patch numpy so that bool8 exists
import numpy as np
if not hasattr(np, "bool8"):
    np.bool8 = np.bool_

# 1) Imports
import gym
from gym.wrappers import RecordVideo
from collections import defaultdict
from tqdm import tqdm


class CartPoleBot:
    def __init__(self,
                 env: gym.Env,
                 learningRate: float,
                 initalEpsilon: float,
                 epsilonDecay: float,
                 finalEpsilon: float,
                 discountFactor: float):
        self.env = env
        self.learningRate = learningRate
        self.epsilon = initalEpsilon
        self.epsilonDecay = epsilonDecay
        self.finalEpsilon = finalEpsilon
        self.qTable = defaultdict(lambda: np.zeros(self.env.action_space.n))
        self.discountFactor = discountFactor

    def discConv(self, obs):
        posSpace  = np.linspace(-2.4, 2.4, 10)
        velSpace  = np.linspace(-4, 4,     10)
        angSpace  = np.linspace(-.2095, .2095, 10)
        angVSpace = np.linspace(-4, 4,        10)
        bins = [posSpace, velSpace, angSpace, angVSpace]
        return tuple(np.digitize(obs[i], bins[i]) for i in range(len(obs)))

    def getAction(self, observation):
        state = self.discConv(observation)
        if np.random.random() < self.epsilon:
            return self.env.action_space.sample()
        else:
            return int(np.argmax(self.qTable[state]))

    def update(self, pastObv, action, reward, terminated, currObv):
        past_state = self.discConv(pastObv)
        curr_state = self.discConv(currObv)
        old_q      = self.qTable[past_state][action]
        future_max = 0 if terminated else np.max(self.qTable[curr_state])
        td_error   = (reward + self.discountFactor * future_max) - old_q
        self.qTable[past_state][action] = old_q + self.learningRate * td_error

    def decayEpsilon(self):
        self.epsilon = max(self.finalEpsilon, self.epsilon - self.epsilonDecay)


# 3) Wrap CartPole in RecordVideo
env = RecordVideo(
    gym.make("CartPole-v1", render_mode="rgb_array"),
    "/content",
    episode_trigger=lambda ep: ep % 5000 == 0,
    new_step_api=True
)

# 4) Hyperparameters & Agent
learningRate   = 0.05
nEps           = 60_000
startEpsilon   = 1.0
epsilonDecay   = 1.0 / 30_000
finalEpsilon   = 0.1
discountFactor = 0.95

balanceAgent = CartPoleBot(
    env,
    learningRate,
    startEpsilon,
    epsilonDecay,
    finalEpsilon,
    discountFactor
)

# 5) Training Loop with safe unpacking
for episode in tqdm(range(nEps), desc="Episodes"):
    # reset can return 1, 2 or more values
    reset_ret = env.reset()
    if isinstance(reset_ret, tuple):
        observation, info = reset_ret[:2]
    else:
        observation, info = reset_ret, {}

    done = False
    while not done:
        action = balanceAgent.getAction(observation)


        step_ret = env.step(action)
        if isinstance(step_ret, tuple):
            if len(step_ret) == 5:
                newObv, reward, terminated, truncated, info = step_ret
            elif len(step_ret) == 4:
                newObv, reward, done_flag, info = step_ret
                terminated, truncated = done_flag, False
            else:
                raise RuntimeError(f"Unexpected step() return: {step_ret}")
        else:
            raise RuntimeError("env.step() did not return a tuple!")

        balanceAgent.update(observation, action, reward, terminated, newObv)
        done = terminated or truncated
        observation = newObv

    balanceAgent.decayEpsilon()

env.close()



  deprecation(

  deprecation(

  logger.warn(

Episodes: 100%|██████████| 60000/60000 [8:25:51<00:00,  1.98it/s]


In [6]:
#call this to play one of the generated mp4s. Replace N with the episode count. Or just download it idk im not ur dad
moviepy.editor.ipython_display("/content/rl-video-episode-5000.mp4")