In [None]:
!pip install gymnasium[atari] --quiet
!pip install gymnasium --quiet
!pip install -U gymnasium[atari] --quiet
!pip install imageio_ffmpeg --quiet
!pip install npy_append_array --quiet
!pip install pyTelegramBotAPI --quiet
!pip install gymnasium[accept-rom-license] --quiet
!pip install gymnasium[box2d] --quiet

In [9]:
import numpy as np
import random
import itertools
import scipy.misc
import matplotlib.pyplot as plt


class gameOb():
    def __init__(self,coordinates,size,intensity,channel,reward,name):
        self.x = coordinates[0]
        self.y = coordinates[1]
        self.size = size
        self.intensity = intensity
        self.channel = channel
        self.reward = reward
        self.name = name
        
class gameEnv():
    def __init__(self,partial,size):
        self.sizeX = size
        self.sizeY = size
        self.actions = 4
        self.objects = []
        self.partial = partial
        a = self.reset()
        plt.imshow(a,interpolation="nearest")
        
        
    def reset(self):
        self.objects = []
        hero = gameOb(self.newPosition(),1,1,2,None,'hero')
        self.objects.append(hero)
        bug = gameOb(self.newPosition(),1,1,1,1,'goal')
        self.objects.append(bug)
        hole = gameOb(self.newPosition(),1,1,0,-1,'fire')
        self.objects.append(hole)
        bug2 = gameOb(self.newPosition(),1,1,1,1,'goal')
        self.objects.append(bug2)
        hole2 = gameOb(self.newPosition(),1,1,0,-1,'fire')
        self.objects.append(hole2)
        bug3 = gameOb(self.newPosition(),1,1,1,1,'goal')
        self.objects.append(bug3)
        bug4 = gameOb(self.newPosition(),1,1,1,1,'goal')
        self.objects.append(bug4)
        state = self.renderEnv()
        self.state = state
        return state

    def moveChar(self,direction):
        # 0 - up, 1 - down, 2 - left, 3 - right
        hero = self.objects[0]
        heroX = hero.x
        heroY = hero.y
        penalize = 0.
        if direction == 0 and hero.y >= 1:
            hero.y -= 1
        if direction == 1 and hero.y <= self.sizeY-2:
            hero.y += 1
        if direction == 2 and hero.x >= 1:
            hero.x -= 1
        if direction == 3 and hero.x <= self.sizeX-2:
            hero.x += 1     
        if hero.x == heroX and hero.y == heroY:
            penalize = 0.0
        self.objects[0] = hero
        return penalize
    
    def newPosition(self):
        iterables = [ range(self.sizeX), range(self.sizeY)]
        points = []
        for t in itertools.product(*iterables):
            points.append(t)
        currentPositions = []
        for objectA in self.objects:
            if (objectA.x,objectA.y) not in currentPositions:
                currentPositions.append((objectA.x,objectA.y))
        for pos in currentPositions:
            points.remove(pos)
        location = np.random.choice(range(len(points)),replace=False)
        return points[location]

    def checkGoal(self):
        others = []
        for obj in self.objects:
            if obj.name == 'hero':
                hero = obj
            else:
                others.append(obj)
        ended = False
        for other in others:
            if hero.x == other.x and hero.y == other.y:
                self.objects.remove(other)
                if other.reward == 1:
                    self.objects.append(gameOb(self.newPosition(),1,1,1,1,'goal'))
                else: 
                    self.objects.append(gameOb(self.newPosition(),1,1,0,-1,'fire'))
                return other.reward,False
        if ended == False:
            return 0.0,False

    def renderEnv(self):
        #a = np.zeros([self.sizeY,self.sizeX,3])
        a = np.ones([self.sizeY+2,self.sizeX+2,3])
        a[1:-1,1:-1,:] = 0
        hero = None
        for item in self.objects:
            a[item.y+1:item.y+item.size+1,item.x+1:item.x+item.size+1,item.channel] = item.intensity
            if item.name == 'hero':
                hero = item
        if self.partial == True:
            a = a[hero.y:hero.y+3,hero.x:hero.x+3,:]
        b = scipy.misc.imresize(a[:,:,0],[84,84,1],interp='nearest')
        c = scipy.misc.imresize(a[:,:,1],[84,84,1],interp='nearest')
        d = scipy.misc.imresize(a[:,:,2],[84,84,1],interp='nearest')
        a = np.stack([b,c,d],axis=2)
        return a

    def step(self,action):
        penalty = self.moveChar(action)
        reward,done = self.checkGoal()
        state = self.renderEnv()
        if reward == None:
            print(done)
            print(reward)
            print(penalty)
            return state,(reward+penalty),done
        else:
            return state,(reward+penalty),

In [48]:
import tensorflow as tf 
from tensorflow.keras.layers import Dense, Conv2D, Input, Lambda
 
class ActorNetwork(tf.keras.Model):
    def __init__(self, input_dims, action_bound, std_bound, action_dim=1):
        super(ActorNetwork, self).__init__()
        self.fc1 = Dense(256, activation="relu", input_shape=input_dims, kernel_initializer="he_uniform")
        self.fc2 = Dense(64, activation="relu", kernel_initializer="he_uniform")
        self.out_mu = Dense(action_dim, activation='tanh')
        self.mu_output = Lambda(lambda x: x * action_bound)
        self.std_output = Dense(action_dim, activation='softplus')

    def call(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        out_mu = self.out_mu(x)
        mu_output = self.mu_output(out_mu)
        std_output = self.std_output(x)
        return mu_output, std_output


class CriticNetwork(tf.keras.Model):
    def __init__(self, input_dims, action_dim=1):
        super(CriticNetwork, self).__init__()
        self.fc1 = Dense(256, activation="relu", input_shape=input_dims, kernel_initializer="he_uniform")
        self.fc2 = Dense(64, activation="relu", kernel_initializer="he_uniform")
        self.fc3 = Dense(1)

    def call(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        return x

In [50]:
from tensorflow.keras.optimizers import Adam
import tensorflow as tf 
import tensorflow_probability as tfp
import numpy as np

class ActorCriticAgent:
  
    def __init__(self, input_dims, out_dims, action_bound, std_bound, gamma, lr1, lr2, action_space, batch_size, chkpt, algo_name): 
        self.gamma = gamma
        self.input_dims = input_dims
        self.out_dims = out_dims
        self.batch_size = batch_size 
        self.action_space = action_space 
        self.action_bound = action_bound
        self.std_bound = std_bound
        self.action = None  
        self.fname = chkpt + '_' + algo_name 
        self.actor_network = ActorNetwork(self.input_dims, self.action_bound, self.std_bound, self.out_dims)
        self.actor_network.compile(optimizer=Adam(learning_rate=lr1))
        self.critic_network = CriticNetwork(input_dims, out_dims)
        self.critic_network.compile(optimizer=Adam(learning_rate=lr2))


    def get_action(self, state): 
        state = np.reshape(state, [1, self.input_dims[0]])
        mu, std = self.actor_network(state)
        mu, std = mu[0], std[0]
        return np.random.normal(mu, std, size=self.out_dims)
 
    def save_models(self):
        self.actor_network.save(self.fname + "_" + "actor_network")
        self.critic_network.save(self.fname + "_" + "critic_network")
        print("[+] Saving the model")


    def load_models(self):
        self.actor_network = tf.keras.models.load_model(self.fname + "_" + "actor_network") 
        self.critic_network = tf.keras.models.load_model(self.fname + "_" + "critic_network") 
        print("[+] Loading the model")

  
    def log_pdf(self, mu, std, action):
        std = tf.clip_by_value(std, self.std_bound[0], self.std_bound[1])
        var = std ** 2
        log_policy_pdf = -0.5 * (action - mu) ** 2 / var - 0.5 * tf.math.log(
            var * 2 * np.pi
        )
        return tf.reduce_sum(log_policy_pdf, 1, keepdims=True)
    
    def get_critic_prediction(self, state): 
        return self.critic_network(state)
        
    def compute_actor_loss(self, mu, std, actions, advantages):
        log_policy_pdf = self.log_pdf(mu, std, actions)
        loss_policy = log_policy_pdf * advantages
        return tf.reduce_sum(-loss_policy)
    
    def learn(self, states, actions, advantages, td_targets): 
    
        state = tf.convert_to_tensor(np.array(state).reshape(1, -1))
        next_state = tf.convert_to_tensor(np.array(next_state).reshape(1, -1))    

        with tf.GradientTape() as tape:
            values = self.critic_network(states, training=True)
            loss = self.compute_loss(values, td_targets)
        
        params = self.critic_network.trainable_variables
        grads = tape.gradient(loss, params)
        
        self.critic_network.optimizer.apply_gradients(zip(grads, params))

        with tf.GradientTape() as tape: 
            mu, std = self.actor_network(states, training=True)
            mu, std = mu[0], std[0]
            loss = self.compute_actor_loss(mu, std, actions, advantages)
        
        params = self.actor_network.trainable_variables
        grads = tape.gradient(loss, params)
        
        self.actor_network.optimizer.apply_gradients(zip(grads, params))


    def compute_loss(self, v_pred, td_targets):
        mse = tf.keras.losses.MeanSquaredError()
        return mse(td_targets, v_pred)

In [27]:
import collections
import cv2
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
import tensorflow as tf
from gymnasium.wrappers import *


def manage_memory():
    gpus = tf.config.list_physical_devices('GPU')
    if gpus:
        try:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
        except RuntimeError as e:
            print(e)


def plot_learning_curve(scores, figure_file):

    x = [_ for _ in range(len(scores))]
    running_avg = np.zeros(len(scores))
    for i in range(len(running_avg)):
        running_avg[i] = np.mean(scores[max(0, i-100):(i+1)])
    plt.plot(x, running_avg, label="Avg reward for agent", color="black")
    plt.plot(scores, label="Reward for agent", color="red")
    plt.xlabel("episodes")
    plt.ylabel("rewards")
    plt.title('Running average of previous 100 scores')
    plt.legend()
    plt.savefig(figure_file)


def make_env(env_name): 
    env = gym.make(env_name, render_mode="rgb_array")
    
    if len(env.observation_space.shape) >= 3: 
        #env = AtariPreprocessing(env, 10, 4, 84, False, True)
        env = ResizeObservation(env, 84)
        env = GrayScaleObservation(env, keep_dim=False)
        env = FrameStack(env, 4, lz4_compress=False)
        env = NormalizeObservation(env)

    return env

In [28]:
class Writer:
    def __init__(self, fname): 
        self.fname = fname 

    def write_to_file(self, content): 
        with open(self.fname, "a") as file: 
            file.write(content + "\n")

    def read_file(self, fname):
        with open(fname, "r") as file: 
            return file.read()
            

In [14]:
import time
from telebot import TeleBot
import datetime
import telebot

token = "6238487424:AAG0jRhvbiVa90qUcf2fAirQr_-quPMs7cU"
chat_id = "1055055706"
bot = TeleBot(token=token) 

def telegram_send(message, bot):
    chat_id = "1055055706"
    bot.send_message(chat_id=chat_id, text=message)

def welcome_msg(multi_step, double_dqn, dueling):
    st = 'Hi! Starting learning with DQN Multi-step = %d, Double DQN = %r, Dueling DQN = %r' % (multi_step, double_dqn, dueling)
    telegram_send(st, bot)
    
def info_msg(episode, max_episode, reward, best_score, loss): 
    st = f"Current Episode: {episode}, Current Reward: {reward}, Max Episode: {max_episode}, Best Score: {best_score}, loss: {loss}"
    telegram_send(st, bot)

def end_msg(learning_time):
    st = 'Finished! Learning time: ' + str(datetime.timedelta(seconds=int(learning_time)))
    telegram_send(st, bot)
    print(st)


In [32]:
import numpy as np 
import imageio


class RecordVideo: 
    
    def __init__(self, prefix_fname,  out_directory="videos/", fps=10): 
        self.prefix_fname = prefix_fname
        self.out_directory = out_directory
        self.fps = fps
        self.images = []
        
    def add_image(self, image): 
        self.images.append(image)
    
    def save(self, episode_no): 
        name = self.out_directory + self.prefix_fname + "_" + str(episode_no) + ".mp4"
        imageio.mimsave(name, [np.array(img) for i, img in enumerate(self.images)], fps=self.fps)
        self.images = []

In [63]:
class Trainer: 
    
    def __init__(self, env, action_space, input_dims, out_dims, video_prefix, is_tg,
                                 noe, max_steps, record, lr1, lr2, gamma, chkpt,
                                 algo_name, update_interval, action_bound, std_bound): 
        self.env = env
        self.noe = noe 
        self.max_steps = max_steps 
        self.input_dims = input_dims
        self.out_dims = out_dims
        self.update_interval = update_interval
        self.action_bound = action_bound
        self.std_bound = std_bound

        self.recorder = RecordVideo(video_prefix)
        self.is_tg = is_tg 
        self.record = record
        self.agent = ActorCriticAgent(input_dims, out_dims, action_bound, std_bound,
                                          gamma, lr1, lr2, action_space, 32, chkpt, algo_name)
        
    def td_target(self, reward, next_state, done):
        if done:
            return reward
        v_value = self.agent.get_critic_prediction(
            np.reshape(next_state, [1, self.input_dims]))
        return np.reshape(reward + args.gamma * v_value[0], [1, 1])
    
    def list_to_batch(self, list):
        batch = list[0]
        for elem in list[1:]:
            batch = np.append(batch, elem, axis=0)
        return batch
    
    def advatnage(self, td_targets, baselines):
        return td_targets - baselines
    
    def list_to_batch(self, list):
        batch = list[0]
        for elem in list[1:]:
            batch = np.append(batch, elem, axis=0)
        return batch
    
    def train(self): 

        ep_rewards = []
        avg_rewards = []
        best_reward = float("-inf")
        done = False

        for episode in range(self.noe): 
            state_batch = []
            action_batch = []
            td_target_batch = []
            advatnage_batch = []
            
            state = self.env.reset()
            rewards = 0 

            if self.record and episode % 50 == 0: 
                img = self.env.render()
                self.recorder.add_image(img)
            
            step = 0
            while not done or step <= self.max_steps:
               
                if type(state) == tuple: 
                    state = state[0]
                
             
                action = self.agent.get_action(state)
                action = np.clip(action, -self.action_bound, self.action_bound)
                next_info = self.env.step(action)
                next_state, reward, terminated, truncated, _ = next_info 
                done = terminated or truncated 
                rewards += reward
                
                state = np.reshape(state, [1, self.input_dims[0]])
                action = np.reshape(action, [1, self.out_dims])
                next_state = np.reshape(next_state, [1, self.input_dims[0]])
                reward = np.reshape(reward, [1, 1])

                td_target = self.td_target((reward), next_state, done)
                advantage = self.advatnage(
                                    td_target, self.agent.get_critic_prediction(state)
                            )

                state_batch.append(state)
                action_batch.append(action)
                td_target_batch.append(td_target)
                advatnage_batch.append(advantage) 
                
                state = next_state
                step += 1 
                
                if self.update_interval % step == 0 or done:
                    states = self.list_to_batch(state_batch)
                    actions = self.list_to_batch(action_batch)
                    td_targets = self.list_to_batch(td_target_batch)
                    advantages = self.list_to_batch(advatnage_batch)
                    
                    self.agent.learn(states, actions, advantages, td_targets)
                    
                    state_batch = []
                    action_batch = []
                    td_target_batch = []
                    advatnage_batch = []
                

                if self.record and episode % 50 == 0:
                    img = self.env.render()
                    self.recorder.add_image(img)
                    

            if self.record and episode % 50 == 0:
                self.recorder.save(episode)

            if rewards > best_reward: 
                self.agent.save_models()
                best_reward = rewards

            ep_rewards.append(rewards)
            avg_reward = np.mean(ep_rewards[-100:])
            avg_rewards.append(avg_reward)
            print(f"Episode: {episode} Reward: {rewards} Best Score: {best_reward}, Average Reward: {avg_reward}")

        return ep_rewards, avg_rewards



In [64]:
env = make_env("MountainCarContinuous-v0")
n_actions = env.action_space.shape[0]
input_dims = env.observation_space.shape
noe = 1000 
print(input_dims, n_actions)
max_steps = 1000000
video_prefix = "actor_critic"
is_tg = True 
record = True
lr1 = 1e-4
lr2 = 1e-4
gamma = 0.95
chpkt = 'models/'
algo_name = "actor_critic"

action_bound = env.action_space.high[0]
std_bound = [1e-2, 1.0]
update_interval = 20

if __name__ == "__main__": 
  
    trainer = Trainer(env, action_space, input_dims, n_actions, video_prefix,
                              is_tg, noe, max_steps, record, lr1, lr2, gamma,
                              chpkt, algo_name, update_interval, action_bound, std_bound)
    ep_rewards = trainer.train()
    plot_learning_curve(ep_rewards, "actor_critic.png")

(2,) 1


TypeError: 'tuple' object cannot be interpreted as an integer

In [18]:
import pickle 

with open("actor_critic_eps_rewards.obj", "wb") as f: 
  pickle.dump(ep_rewards[0], f)

with open("actor_critic_avg_rewards.obj", "wb") as f: 
  pickle.dump(ep_rewards[1], f)

NameError: name 'ep_rewards' is not defined

In [None]:
plot_learning_curve(ep_rewards[0], "actor_critic.png")

In [None]:
def greedy_policy(observation, q_val_network, action_space): 
    state = tf.convert_to_tensor([observation])
    actions = q_val_network(state)
    action = tf.math.argmax(actions, axis=1).numpy()[0]
    return action

In [None]:
import random 

class Eval: 

    def __init__(self, env, model_path, number_of_episode=50):
        self.env = env 
        self.model = tf.keras.models.load_model(model_path)
        self.recorder = RecordVideo('dqn_lunarlander', 'test_videos/', 15)
        self.number_of_episode = number_of_episode
        
    def test(self): 
        rewards = []
        steps = []
        for episode in range(self.number_of_episode): 
            done = False
            reward = 0
            step = 0
            state = env.reset(seed=random.randint(0,500))
            if episode % 10 == 0: 
                img = env.render()
                self.recorder.add_image(img) 

            while not done:

                if type(state) == tuple: 
                  state = state[0]
                action =  greedy_policy(state, self.model, action_space)
                state, reward_prob, terminated, truncated, _ = env.step(action)
                done = terminated or truncated 
                reward += reward_prob
                step += 1 
                if episode % 10 == 0:
                    img = env.render()
                    self.recorder.add_image(img)
            
            rewards.append(reward)
            steps.append(step)
            self.recorder.save(1) if episode % 10 == 0 else None 
        
        return rewards, steps


In [None]:
evaluator = Eval(env, "/content/models/_actor_critic_actor_network", 10)
evaluator.test()