In [16]:
import gym
from gym import spaces
import numpy as np
import sympy as sp
from scipy.integrate import odeint
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.patches import Rectangle
import torch
import torch.nn.functional as F
from collections import deque
import random
from torch.utils.tensorboard import SummaryWriter  # Import TensorBoard SummaryWriter
import cv2
import torch.optim.lr_scheduler as lr_scheduler


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

best_policy = 0
good_policy = 0

# Hyperparameters
MAX_NUM_EPISODES = 30000
ALPHA = 1e-4
GAMMA = .99
batch_size = 64
capacity = 1000000
sync_frequency = 10000
use_target_network = True
epsilon = 1
def Epsilon(episode):
    return np.exp(-3*episode/MAX_NUM_EPISODES)

l,g,m1,m2,t=sp.symbols('l,g,m1,m2,t')
theta,s,a = sp.symbols('theta,s,a',cls=sp.Function)
theta,s,a = theta(t),s(t),a(t)
thetadot,sdot = sp.diff(theta,t),sp.diff(s,t)
thetaddot,sddot = sp.diff(thetadot,t),sp.diff(sdot,t)
x = s + l*sp.sin(theta)
y = -l*sp.cos(theta)
T = 0.5*m1*sdot**2 + 0.5*m2*(sp.diff(x,t)**2 + sp.diff(y,t)**2)
U = m2 *g * y
L = T - U
LE1 = sp.diff(L,s) - sp.diff(sp.diff(L,sdot),t) + a
LE2 = sp.diff(L,theta) - sp.diff(sp.diff(L,thetadot),t)
sols = sp.solve([LE1,LE2],(sddot,thetaddot))
sddotlambda = sp.lambdify((m1,m2,l,g,a,s,theta,sdot,thetadot),sols[sddot])
thetaddotlambda = sp.lambdify((m1,m2,l,g,a,s,theta,sdot,thetadot),sols[thetaddot])

def diffeq(y,t,m1,m2,l,g,a):
  s,sdot,theta,thetadot = y
  sddot = sddotlambda(m1,m2,l,g,a,s,theta,sdot,thetadot)
  thetaddot = thetaddotlambda(m1,m2,l,g,a,s,theta,sdot,thetadot)
  return [sdot,sddot,thetadot,thetaddot]


class Cartpendflip(gym.Env):
    def __init__(self):
        super(Cartpendflip, self).__init__()

        self.action_space = spaces.Discrete(20)
        self.action_shape = self.action_space.n
        self.observation_space = spaces.Box(low=np.array([-10,-1000,-10*np.pi,-10*np.pi], dtype=np.float32),high=np.array([10,1000,10*np.pi,10*np.pi], dtype=np.float32))
        self.state = np.array([0.0, 0.0, 0.0])
        self.properties = {'m1': 1, 'm2': 1, 'l': 1, 'g': 9.8}
        self.dt = 0.05
        self.goalstate = np.array([0,0,np.pi,0])
        self.max_steps = 200
        self.current_step = 0

    def ActiontoForce(self,action):
      force = 20*(self.properties['m1']+self.properties['m2'])*(action / (self.action_shape -1) -0.5)
      return force

    def step(self, action):
        reward = 0 
        force = self.ActiontoForce(action)
        self.state = odeint(diffeq,self.state,[0,self.dt],args = tuple([self.properties[k] for k in ['m1','m2','l','g']] + [force]))[1]
        self.current_step += 1
        if  self.current_step >= self.max_steps or np.abs(self.state[0]) > 9 or np.abs(self.state[3]) > 20 or np.abs(self.state[2]) > 9*np.pi:
            done = True
        else:
            done = False
        info = {}

        s,sdot,theta,thetadot = self.state
        sopt,sdotopt,thetaopt,thetadotopt = self.goalstate
        if theta > np.pi- 0.3 and theta <np.pi + 0.3:
            reward = 1

        return self.state, reward, done,{}, info

    def reset(self):
        # Reset the environment state and steps
        self.state = np.random.uniform(-1,1,size=4)
        self.current_step = 0
        return self.state,_
    
    
env = Cartpendflip()

class SLP(torch.nn.Module):
    def __init__(self, input_shape, output_shape, device=device, hidden_shape1=64, hidden_shape2=64):
        super(SLP, self).__init__()
        self.device = device
        self.input_shape = input_shape[0]
        self.output_shape = output_shape
        self.hidden_shape1 = hidden_shape1
        self.hidden_shape2 = hidden_shape2
        self.linear1 = torch.nn.Linear(self.input_shape, self.hidden_shape1)
        self.linear2 = torch.nn.Linear(self.hidden_shape1, self.output_shape)
        #self.linear2 = torch.nn.Linear(self.hidden_shape1, self.hidden_shape2)
        #self.linear3 = torch.nn.Linear(self.hidden_shape2, self.output_shape)

    def forward(self, x):
        x = torch.tensor(x, dtype=torch.float32, device=self.device)
        x = F.relu(self.linear1(x))
        x = self.linear2(x)
        #x = self.linear3(x)
        # x = self.linear3(x)
        return x

class ReplayBuffer(object):
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
        self.absolutebellmanerror = deque(maxlen=capacity)


    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        obs, actions, rewards, next_obs, dones = zip(*batch)
        obs,next_obs = np.array(obs),np.array(next_obs)
        experiences = [torch.Tensor(obs), 
                        torch.Tensor(actions), 
                        torch.Tensor(rewards), 
                        torch.Tensor(next_obs), 
                        torch.Tensor(dones)]
        for i, thing in enumerate(experiences):
            experiences[i] = thing.to(device)
        return experiences

    def __len__(self):
        return len(self.buffer)
    
class Q_Learner(object):
    def __init__(self, env):
        self.obs_shape = env.observation_space.shape
        self.obs_high = env.observation_space.high
        self.obs_low = env.observation_space.low
        self.action_shape = env.action_space.n
        self.Q = SLP(self.obs_shape, self.action_shape).to(device)
        self.best_policy = self.Q
        

        if use_target_network:
            self.target_Q = SLP(self.obs_shape, self.action_shape).to(device)
            self.update_frequency = sync_frequency
            self.step = 0
        self.Q_optimizer = torch.optim.Adam(self.Q.parameters(), lr=ALPHA)
        self.gamma = GAMMA
        self.Q_loss = torch.nn.MSELoss()

    def get_action(self, obs):
        if np.random.random() > epsilon:
            return torch.argmax(self.Q(obs).data).item()
        else:
            return np.random.choice([a for a in range(self.action_shape)])

    def learn(self, obs, actions, rewards, next_obs, dones):
        # Get Q-values for current states (for the actions taken)
        if use_target_network:
            if self.step % self.update_frequency == 0:
                self.target_Q.load_state_dict(self.Q.state_dict())
            next_actions = self.Q(next_obs).argmax(1)
            td_target = rewards + self.gamma * self.target_Q(next_obs).gather(1,next_actions.unsqueeze(1).long()).squeeze(1) * (1 - dones)
            self.step += 1
        else:
            # Compute TD target: reward + gamma * max(next_q_value) * (1 - done)

            td_target = rewards + self.gamma * self.Q(next_obs).max(1)[0] * (1 - dones)

        current_q_values = self.Q(obs).gather(1, actions.unsqueeze(1).long()).squeeze(1)
        # Compute the loss
        tderror = self.Q_loss(current_q_values, td_target)

        # Perform backpropagation and optimization step
        self.Q_optimizer.zero_grad()
        tderror.backward()
        self.Q_optimizer.step()

def train(agent, env, replay_buffer, writer):
    global epsilon
    best_reward = -float('inf')
    last100totalrewards = deque(maxlen=100)
    goodreward = -float('inf')
    for episode in range(MAX_NUM_EPISODES):
        done = False
        obs = env.reset()[0]
        total_reward = 0.0
        epsilon = Epsilon(episode)
        step = 0
        while not done:
            step += 1
            action = agent.get_action(obs)
            next_obs, reward, done,truncated, info = env.step(action)
            replay_buffer.add([obs, action, reward, next_obs, done])
            obs = next_obs

            if len(replay_buffer) > batch_size:
                agent.learn(*replay_buffer.sample(batch_size))

            total_reward += reward
            
            if step > 400:
                break


        if total_reward > best_reward:
            best_reward = total_reward
            best_policy = agent.get_action
            agent.best_policy = best_policy
        if episode == MAX_NUM_EPISODES - 50:
            if total_reward > goodreward:
                goodreward = total_reward
                good_policy = agent.get_action

        last100totalrewards.append(total_reward)
        average_reward = np.mean(last100totalrewards)

        # Log metrics to TensorBoard
        writer.add_scalar('Total Reward', total_reward, episode)
        writer.add_scalar('Best Reward', best_reward, episode)
        writer.add_scalar('Epsilon', epsilon, episode)
        writer.add_scalar('last100totalrewardsaverage', average_reward, episode)
        writer.add_scalar('numstepsperepisode', step, episode)

    return best_policy, good_policy

# Initialize TensorBoard writer
writer = SummaryWriter('runs/cartpole_experiment')  # Specify the directory for logs
# Load TensorBoard in Colab
# After training, load TensorBoard directly in the notebook


replay_buffer = ReplayBuffer(capacity)
agent = Q_Learner(env)
policy,good_policy = train(agent, env, replay_buffer, writer)

# Close the TensorBoard writer
writer.close()

env.close()


  x = torch.tensor(x, dtype=torch.float32, device=self.device)


In [19]:
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.patches import Rectangle
import numpy as np
%matplotlib tk
# Assuming the Cartpendflip environment is defined and initialized properly.
env = Cartpendflip()
l = env.properties['l']  # Retrieve the length from the environment properties
MaxNumSteps = 1000
ss = []
xs = []
ys = []
obs = env.reset()[0]

for step in range(MaxNumSteps):
    action = policy(obs)  # Get action from the policy
    obs, reward, done,_, info = env.step(action)

    s, theta = obs[0], obs[2]
    x = s + l * np.sin(theta)
    y = -l * np.cos(theta)

    ss.append(s)
    xs.append(x)
    ys.append(y)

    if done:  # Check if the episode is done
        break

env.close()

fig, ax = plt.subplots()

# Set axis limits
ax.set_xlim(-1, 7)
ax.set_ylim(-4, 4)

cart_width = 0.4
cart_height = 0.2
# Create patches
cart_patch = Rectangle((ss[0] - cart_width / 2, -cart_height / 2), cart_width, cart_height, fc='blue')
ax.add_patch(cart_patch)  # Add cart as a rectangle

# Plot pendulum elements
pendulum, = ax.plot([], [], 'r-', lw=2)  # Red line for the pendulum

# Animation function
def animate(i):
    # Update cart position using the current value of s[i]
    cart_patch.set_xy((ss[i] - cart_width / 2, -cart_height / 2))

    # Pendulum (line from the cart to the bob)
    pendulum_x = [ss[i], xs[i]]  # From cart center to pendulum bob
    pendulum_y = [0, ys[i]]     # From cart top to pendulum bob
    pendulum.set_data(pendulum_x, pendulum_y)

    return cart_patch, pendulum

# Create the animation
ani = FuncAnimation(fig, animate, frames=len(ss), interval=50, blit=True)
ani.save('flipupcartpend.gif',writer='pillow')

In [None]:
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.patches import Rectangle
import numpy as np
import matplotlib.gridspec as gridspec

# Assuming the Cartpendflip environment and policy are defined and initialized properly
env = Cartpendflip()
l = env.properties['l']  # Retrieve the length of the pendulum from environment properties
MaxNumSteps = 1000
rows, cols = 6, 6  # Number of rows and columns in the subplot grid

# Function to simulate a single Cartpendflip episode
def simulate_episode():
    ss, xs, ys = [], [], []
    obs = env.reset()[0]
    
    for step in range(MaxNumSteps):
        action = policy(obs)  # Get action from the policy
        obs, reward, done, _, info = env.step(action)

        s, theta = obs[0], obs[2]
        x = s + l * np.sin(theta)
        y = -l * np.cos(theta)

        ss.append(s)
        xs.append(x)
        ys.append(y)

        if done:
            break

    return ss, xs, ys

# Collect simulation data for each subplot
sim_data = [simulate_episode() for _ in range(rows * cols)]
env.close()  # Close environment after data collection

fig = plt.figure(figsize=(10, 10))
gs = gridspec.GridSpec(rows, cols, figure=fig)
subplots = []

# Set up subplots and animation elements for each subplot
for r in range(rows):
    for c in range(cols):
        ax = fig.add_subplot(gs[r, c])
        ax.set_xlim(-2, 8)
        ax.set_ylim(-5, 5)

        cart_width = 0.4
        cart_height = 0.2
        cart_patch = Rectangle((0, -cart_height / 2), cart_width, cart_height, fc='blue')
        ax.add_patch(cart_patch)

        pendulum, = ax.plot([], [], 'r-', lw=2)
        subplots.append((ax, cart_patch, pendulum))

# Animation function for updating each subplot
def animate(i):
    for index, (ax, cart_patch, pendulum) in enumerate(subplots):
        ss, xs, ys = sim_data[index]
        
        if i < len(ss):
            # Update cart position for each subplot
            cart_patch.set_xy((ss[i] - cart_width / 2, -cart_height / 2))
            
            # Update pendulum position
            pendulum_x = [ss[i], xs[i]]
            pendulum_y = [0, ys[i]]
            pendulum.set_data(pendulum_x, pendulum_y)
    
    return [patch for _, patch, _ in subplots] + [pendulum for _, _, pendulum in subplots]

# Create the animation
ani = FuncAnimation(fig, animate, frames=MaxNumSteps, interval=50, blit=True)
ani.save('epic.gif',writer='pillow')