<a href="https://colab.research.google.com/github/prithwis/AGI/blob/main/TaxiV3_v4A.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

![CC-BY-SA](https://licensebuttons.net/l/by-sa/3.0/88x31.png)<br>


![alt text](https://github.com/Praxis-QR/RDWH/raw/main/images/YantraJaalBanner.png)<br>




[Prithwis Mukerjee](http://www.linkedin.com/in/prithwis)<br>

#Trained Taxi - trying to understand states
for simulating Reinforcement Learning Applications

In [1]:
# 1. Update package list and install the NEW opengl names
!apt-get update > /dev/null 2>&1
!apt-get install -y xvfb ffmpeg freeglut3-dev python3-opengl libgl1-mesa-dev libglu1-mesa-dev mesa-utils > /dev/null 2>&1

# 2. Install the system dependencies (SWIG is the key here)
#!apt-get update
!apt-get install -y swig build-essential python3-dev > /dev/null 2>&1

# 3. Upgrade pip and setuptools to handle the build process better
!pip install --upgrade pip setuptools wheel > /dev/null 2>&1

# 4. Now install gymnasium with box2d support
!pip install "gymnasium[box2d]" > /dev/null 2>&1

# 5. Install the Python libraries
!pip install pyvirtualdisplay  pygame opencv-python > /dev/null 2>&1


In [2]:
import pygame
import cv2
import numpy as np
import os

from IPython.display import Video, display


  from pkg_resources import resource_stream, resource_exists


pygame 2.6.1 (SDL 2.28.4, Python 3.12.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [3]:
import gymnasium as gym

# --- THE SWAP ---
# Old: world = FrogSnakeWorldV3A()


try:
    #env = gym.make("LunarLander-v3")
    env = gym.make("Taxi-v3", render_mode="rgb_array")
    print("Success! The physics engine is ready for AGI training.")
    #env.close()
except Exception as e:
    print(f"Error: {e}")


Success! The physics engine is ready for AGI training.


In [6]:
# --- 1. THE RECORDER CLASS ---
class VideoRecorder:
    def __init__(self, filename='simulation.avi', width=640, height=480, fps=30):
        self.filename = filename
        self.width = width
        self.height = height
        self.fps = fps
        self.fourcc = cv2.VideoWriter_fourcc(*'XVID')
        self.video_writer = None

    def start(self):
        if os.path.exists(self.filename): os.remove(self.filename)
        self.video_writer = cv2.VideoWriter(self.filename, self.fourcc, self.fps, (self.width, self.height))


    def record_frame_with_hud(self, frame_array, reward, step):
        # 1. Convert RGB to BGR for OpenCV
        view = cv2.cvtColor(frame_array, cv2.COLOR_RGB2BGR)
        view = cv2.resize(view, (self.width, self.height))

        # 2. Add text overlay (The HUD)
        font = cv2.FONT_HERSHEY_SIMPLEX
        # Display Step and Reward
        cv2.putText(view, f"Step: {step}", (10, 30), font, 0.7, (255, 255, 255), 2)
        cv2.putText(view, f"Reward: {reward}", (10, 60), font, 0.7, (255, 255, 255), 2)
        self.video_writer.write(view)

    def stop(self):
        self.video_writer.release()
        # Convert to MP4 for browser compatibility
        output_mp4 = self.filename.replace('.avi', '.mp4')
        os.system(f"ffmpeg -y -i {self.filename} -c:v libx264 -pix_fmt yuv420p {output_mp4} -hide_banner -loglevel error")
        return output_mp4

## Understanding States

In [4]:
# Convert state 328 into a tuple
# Returns: (taxi_row, taxi_col, passenger_location, destination)
decoded_state = list(env.unwrapped.decode(328))
print(decoded_state)
# Output might look like: [3, 1, 2, 0]

[3, 1, 2, 0]


In [5]:
# env.encode(taxi_row, taxi_col, passenger_location, destination)
state_index = env.unwrapped.encode(0, 0, 4, 1)
print(state_index)
# Output: 17

17


In [11]:
# 0. Initialize the world
env.reset()
# 1. Choose your coordinates
# (row, col, passenger_idx, destination_idx)
# Passenger indices: 0:R, 1:G, 2:Y, 3:B, 4:Inside Taxi
#state_idx = env.unwrapped.encode(0, 4, 0, 2) # Taxi top-right, Pass at Red, Dest Yellow
state_idx = env.unwrapped.encode(3, 1, 2, 0)

# 2. Force the environment to that state
env.unwrapped.s = state_idx

# 3. Capture the frame
frame = env.render()

# 4. Use your recorder to save it
recorder = VideoRecorder(filename='teleport.avi', width=400, height=400)
recorder.start()
recorder.record_frame_with_hud(frame, 0, f"Teleported to State {state_idx}")
final_file = recorder.stop()
display(Video(final_file, embed=True))

In [12]:
# Look at the rules for State 123
rules = env.unwrapped.P[123]
print(rules)

{0: [(1.0, 223, -1, False)], 1: [(1.0, 23, -1, False)], 2: [(1.0, 123, -1, False)], 3: [(1.0, 103, -1, False)], 4: [(1.0, 123, -10, False)], 5: [(1.0, 123, -10, False)]}


2. Breaking down the Rule: (Probability, Next_State, Reward, Done)
For every action, the environment tells the agent:

Probability (1.0): In this game, physics are perfect. If you go North, you always go North.

Next_State: Where the taxi will end up.

Reward: This is the number you care about. Notice it's mostly -1 (movement cost) or -10 (mistake penalty).

Done: A boolean (True/False). It only becomes True if you perform Action 5 at the correct destination.

3. How the "Brain" Learns This
During training, the agent doesn't have access to this P table. It's like a person walking into a dark room; they don't have a map of the furniture.

The agent tries an action: env.step(action).

The Environment looks at its internal P table.

The Environment "shouts" back: "You just got -10!"

The Agent then updates its Q-Table using that reward.

Key distinction: The Environment owns the rewards. The Agent owns the Q-Table. Training is the process of the Agent "downloading" the Environment's reward logic into its own Q-Table memory.

In [13]:
# env.unwrapped.P[state][action]
print(env.unwrapped.P[16][5])

[(1.0, 0, 20, True)]


##Actual Taxi Movements Here

In [15]:
def train_taxi(episodes=10000):
    # Start with a fresh "empty brain"
    q_table = np.zeros([env.observation_space.n, env.action_space.n])

    alpha = 0.1   # Learning rate
    gamma = 0.6   # Discount factor
    epsilon = 0.1 # Exploration rate

    for i in range(1, episodes + 1):
        state, info = env.reset()
        terminated = False

        while not terminated:
            # Epsilon-Greedy choice
            if np.random.uniform(0, 1) < epsilon:
                action = env.action_space.sample()
            else:
                action = np.argmax(q_table[state])

            next_state, reward, terminated, truncated, info = env.step(action)

            # Bellman Equation Update
            old_value = q_table[state, action]
            next_max = np.max(q_table[next_state])
            new_value = (1 - alpha) * old_value + alpha * (reward + gamma * next_max)
            q_table[state, action] = new_value

            state = next_state
            if truncated: break

    print("Training complete.")
    return q_table



In [16]:
# Execute training
trained_brain = train_taxi(10000)

Training complete.


In [17]:
def run_trained_taxi_by_trips(q_table, num_trips=5):
    recorder = VideoRecorder(filename='taxi_multi_trip.avi', width=400, height=400)
    recorder.start()

    all_rewards = []
    trips_completed = 0
    total_steps_recorded = 0

    print(f"Starting {num_trips} trips...")

    while trips_completed < num_trips:
        state, info = env.reset()
        current_trip_reward = 0
        terminated = False
        truncated = False

        # Internal loop for a single trip
        while not (terminated or truncated):
            action = np.argmax(q_table[state])
            state, reward, terminated, truncated, info = env.step(action)

            current_trip_reward += reward
            total_steps_recorded += 1

            # Record the frame with our HUD
            # We pass the trip number and the reward for THIS trip specifically
            recorder.record_frame_with_hud(
                env.render(),
                current_trip_reward,
                f"Trip {trips_completed + 1}"
            )

        # Trip finished
        trips_completed += 1
        all_rewards.append(current_trip_reward)

        status = "SUCCESS" if terminated else "FAILED (TRUNCATED)"
        print(f"Trip {trips_completed}: {status} | Reward: {current_trip_reward}")

    final_file = recorder.stop()
    print(f"\nAverage Reward: {sum(all_rewards)/len(all_rewards)}")
    display(Video(final_file, embed=True))
    return all_rewards

# Usage:
# rewards_history = run_trained_taxi_by_trips(trained_brain, num_trips=5)

In [18]:
# Usage:
rewards_history = run_trained_taxi_by_trips(trained_brain, num_trips=20)

Starting 20 trips...
Trip 1: SUCCESS | Reward: 7
Trip 2: SUCCESS | Reward: 3
Trip 3: SUCCESS | Reward: 4
Trip 4: SUCCESS | Reward: 8
Trip 5: SUCCESS | Reward: 9
Trip 6: SUCCESS | Reward: 9
Trip 7: SUCCESS | Reward: 8
Trip 8: SUCCESS | Reward: 6
Trip 9: SUCCESS | Reward: 8
Trip 10: SUCCESS | Reward: 9
Trip 11: FAILED (TRUNCATED) | Reward: -200
Trip 12: FAILED (TRUNCATED) | Reward: -200
Trip 13: SUCCESS | Reward: 6
Trip 14: SUCCESS | Reward: 11
Trip 15: SUCCESS | Reward: 4
Trip 16: SUCCESS | Reward: 10
Trip 17: SUCCESS | Reward: 5
Trip 18: SUCCESS | Reward: 11
Trip 19: SUCCESS | Reward: 5
Trip 20: FAILED (TRUNCATED) | Reward: -200

Average Reward: -23.85


#Chronobooks <br>
Three science fiction novels by Prithwis Mukerjee. A dystopian Earth. A technocratic society managed by artificial intelligence. Escape and epiphany on Mars. Can man and machine, carbon and silicon explore and escape into other dimensions of existence? An Indic perspective rooted in Advaita Vedanta and the Divine Feminine.  [More information](http://bit.ly/chrono3) <br>
![alt text](https://blogger.googleusercontent.com/img/a/AVvXsEjsZufX_KYaLwAnJP6bUxvDg5RSPn6r8HIZe749nLWX3RuwyshrYEAUpdw03a9WIWRdnzA9epwJOE05eDJ0Ad7kGyfWiUrC2vNuOskb2jA-e8aOZSx8YqzT8mfZi3E4X1Rz3qlEAiv-aTxlCM976BEeTjx4J64ctY3C_FoV4v9aY_U23F8xRqI5Eg=s1600)