In [30]:
import random
import pandas as pd
import numpy as np
from PIL import Image
from keras.layers import Input, Lambda, Dense, Dropout, Convolution2D, MaxPooling2D, Flatten,Activation,Concatenate
from keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
from keras.models import Model,load_model, model_from_json
import tensorflow as tf
import gymnasium as gym
import math
import pygame, sys
from tensorflow import keras
from collections import deque
import math

In [31]:
#enable eager execution in tensorflow
#tf.config.run_functions_eagerly(True)
env = gym.make('Pendulum-v1')

input_shape = (3,)
num_actions = 1

This actor network has three internal Dense layers with Leaky ReLU activation
The optimizer is ADAM with a learning rate of 0.001
The target actor network is the same as the policy network. 
They are shallow Feed-Forward Networks with the current state as input
The last activation used is ‘tanh’ as the action is in the range -2,2 - will need to scale

In [32]:
def actor_network(input_shape=(3,)):
        model = Sequential()
        model.add(Dense(128, activation=tf.keras.layers.LeakyReLU(alpha=0.2),input_shape=input_shape))
        model.add(Dense(64, activation=tf.keras.layers.LeakyReLU(alpha=0.2)))
        model.add(Dense(16, activation=tf.keras.layers.LeakyReLU(alpha=0.2)))
        model.add(Dense(num_actions,activation='tanh'))
        return model

actor, target_actor = actor_network(),actor_network()
optimizer_actor = Adam(learning_rate=0.001)


This critic network has two inputs, both state and action
    Critic and Target Critic have the same architecture
    Critic intakes both state and action as input outputting the expected q-value
    The concat variable in between the architecture helps us to combine the state and action inputs. 
    The last activation is linear as q-values are continuous values

In [33]:
def critic_network(state_dim, action_dim):
    # Define the input layers
    state_input = Input(shape=(state_dim,),dtype=tf.float64)
    action_input = Input(shape=(action_dim,),dtype=tf.float64)

    
    state_h1 = Dense(128, activation=tf.keras.layers.LeakyReLU(alpha=0.2))(state_input)
    state_h2 = Dense(64, activation=tf.keras.layers.LeakyReLU(alpha=0.2))(state_h1)

    action_h1 = Dense(128, activation=tf.keras.layers.LeakyReLU(alpha=0.2))(action_input)
    action_h2 = Dense(64, activation=tf.keras.layers.LeakyReLU(alpha=0.2))(action_h1)
    concat = Concatenate()([state_h2, action_h2])

    # Define the output layer
    dense1 = Dense(64, activation=tf.keras.layers.LeakyReLU(alpha=0.2))(concat)
    dense2 = Dense(32, activation=tf.keras.layers.LeakyReLU(alpha=0.2))(dense1)
    output = Dense(1, activation='linear')(dense2)
    
    model = Model(inputs=[state_input, action_input], outputs=output)
    return model
    
critic, target_critic = critic_network(3,1),critic_network(3,1)
optimizer_critic = Adam(learning_rate=0.001)

Update target networks.  Mix of old weights and new update network weights using the variable tau
(is this the polyak thing they were talking about at openai?)

In [34]:
def update_target_networks(actor_model, critic_model, target_actor_model, target_critic_model):
    tau = 0.05
    # Update the target actor model
    actor_weights = actor_model.get_weights()
    target_actor_weights = target_actor_model.get_weights()
    for i in range(len(actor_weights)):
        target_actor_weights[i] = tau * actor_weights[i] + (1 - tau) * target_actor_weights[i]
    target_actor_model.set_weights(target_actor_weights)

    # Update the target critic model
    critic_weights = critic_model.get_weights()
    target_critic_weights = target_critic_model.get_weights()
    for i in range(len(critic_weights)):
        target_critic_weights[i] = tau * critic_weights[i] + (1 - tau) * target_critic_weights[i]
    target_critic_model.set_weights(target_critic_weights)
    return target_critic_model, target_actor_model

Noise - this person copied from a different tutorial

In [35]:
class OrnsteinUhlenbeckActionNoise:
    def __init__(self, mu, sigma, theta, dt, size):
        self.mu = mu
        self.sigma = sigma
        self.theta = theta
        self.dt = dt
        self.size = size
        self.reset()
    
    def reset(self):
        self.state = np.ones(self.size) * self.mu
    
    def __call__(self):
        x = self.state
        dx = self.theta * (self.mu - x) * self.dt + self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.size)
        self.state = x + dx
        return self.state

def ddpg_add_exploration_noise(exploration_noise, action, noise_scale):
    noise = noise_scale * exploration_noise()

    action = np.clip(action + noise, -2.0, 2.0)
    
    return action

# Usage example:
action_dim = 1  # Dimensionality of the action space
noise_mu = 0.0
noise_sigma = 0.2
noise_theta = 0.15
noise_dt = 0.01

# Create an instance of the OrnsteinUhlenbeckActionNoise class
exploration_noise = OrnsteinUhlenbeckActionNoise(noise_mu, noise_sigma, noise_theta, noise_dt, size=action_dim)

Training loop

In [36]:
gamma = tf.cast(tf.constant(0.95),tf.float64)
num_episodes = 1000
maxlen = 1000
batch = 128
replay = deque(maxlen=maxlen)
epoch = 0
count=0
max_loss = math.inf
count = 0
epsilon = 0.99
for episode in range(num_episodes):
    ep_len = 0
    state, info = env.reset()
    print('epsilon getting updated',epsilon)
    epsilon*=0.99
    # Run the episode
    while True:
        count+=1
        ep_len+=1
    
        action = 2*actor.predict(np.array(state).reshape(-1,3),verbose=0)[0]
        action = ddpg_add_exploration_noise(exploration_noise, action, noise_scale=0.1)
        
        next_state, reward, done,_ ,_ = env.step(action)
        done = 1 if done else 0
        
        # print('reward and status',reward,state)
        state = state.reshape(3)
            
        replay.append((np.array(state),action,reward,np.array(next_state),done))
        state = next_state

        if done:
            break
    
        if count>batch:
            count = 0
            batch_ = random.sample(replay,batch)
            current_state = tf.convert_to_tensor([x[0] for x in batch_])
            next_state = tf.convert_to_tensor([x[3] for x in batch_])
            reward = tf.convert_to_tensor([x[2] for x in batch_])
            done =   tf.convert_to_tensor([x[4] for x in batch_])
            actions =   tf.convert_to_tensor([x[1] for x in batch_])
            other_actions = [[1,0] for x in range(batch)]
            
            q_actions = target_actor(next_state) 
            target_q = tf.cast(reward,tf.float64) + (tf.cast(tf.constant(1.0),tf.float64)-tf.cast(done,tf.float64))*gamma*tf.cast(target_critic([next_state,q_actions]),tf.float64)

            with tf.GradientTape() as tape:
                current_q_value = critic([current_state,actions])
                critic_loss = tf.reduce_mean(tf.math.pow(target_q-tf.cast(current_q_value,tf.float64),2))
    
            grads_critic = tape.gradient(critic_loss, critic.trainable_variables)
            optimizer_critic.apply_gradients(zip(grads_critic, critic.trainable_variables))
                
            with tf.GradientTape() as tape:
                actions = actor(current_state,training=True)                
                current_q_value = critic([current_state,actions],training=True)
                actor_loss = -tf.reduce_mean(current_q_value)
                
            grads_actor = tape.gradient(actor_loss, actor.trainable_variables)
            optimizer_actor.apply_gradients(zip(grads_actor, actor.trainable_variables))

            print('Epoch {} done with loss actor={} , critic={} !!!!!!'.format(epoch,actor_loss,critic_loss))
            if epoch%10==0:
                    actor.save('pendulum/actor/actor.keras')
                    critic.save('pendulum/critic/critic.keras')
            
            if epoch%5==0:
                    target_critic, target_actor = update_target_networks(actor,critic,target_actor,target_critic)
            epoch+=1

epsilon getting updated 0.99
Epoch 0 done with loss actor=0.1405719667673111 , critic=54.10826313256703 !!!!!!
Epoch 1 done with loss actor=0.25855425000190735 , critic=54.78034256034678 !!!!!!
Epoch 2 done with loss actor=0.3816452622413635 , critic=53.22062670855044 !!!!!!
Epoch 3 done with loss actor=0.4880574345588684 , critic=51.48655908790728 !!!!!!
Epoch 4 done with loss actor=0.5843860507011414 , critic=50.69829358415643 !!!!!!
Epoch 5 done with loss actor=0.6825251579284668 , critic=48.50471256652803 !!!!!!
Epoch 6 done with loss actor=0.8160197138786316 , critic=46.572832202689725 !!!!!!
Epoch 7 done with loss actor=0.9434939026832581 , critic=43.37045579773272 !!!!!!
Epoch 8 done with loss actor=1.085705280303955 , critic=42.732261850421594 !!!!!!
Epoch 9 done with loss actor=1.292078971862793 , critic=42.939582990018955 !!!!!!
Epoch 10 done with loss actor=1.3876368999481201 , critic=38.09702863802654 !!!!!!
Epoch 11 done with loss actor=1.6260749101638794 , critic=40.07208

KeyboardInterrupt: 

In [37]:
import tensorflow as tf
import numpy as np
import gymnasium as gym
import math
from PIL import Image
import pygame, sys
from pygame.locals import *
from tensorflow import keras

#pygame essentials
pygame.init()
DISPLAYSURF = pygame.display.set_mode((500,500),0,32)
clock = pygame.time.Clock()
pygame.display.flip()

#openai gym env
env = gym.make('Pendulum-v1')
state, info = env.reset()

done = False
count=0
done=False
steps = 0
#loading trained model
model = tf.keras.models.load_model('pendulum/actor/actor.keras')
total_wins =0
episodes = 0


def print_summary(text,cood,size):
        font = pygame.font.Font(pygame.font.get_default_font(), size)
        text_surface = font.render(text, True, (0,0,0))
        DISPLAYSURF.blit(text_surface,cood)
     
while episodes<1000 :
    pygame.event.get()
    for event in pygame.event.get():
                if event.type==QUIT:
                                pygame.quit()
                                raise Exception('training ended')
    # Get the action probabilities from the policy network
    # Choose an action based on the action probabilities
    
    action = model.predict(np.array(state).reshape(-1,3))[0]
    
    next_state, reward, done, info = env.step(action) # take a step in the environment
    print('reward and done?',reward,done)
    image = env.render(mode='rgb_array') # render the environment to the screen
   
    #convert image to pygame surface object
    image = Image.fromarray(image,'RGB')
    mode,size,data = image.mode,image.size,image.tobytes()
    image = pygame.image.fromstring(data, size, mode)

    DISPLAYSURF.blit(image,(0,0))
    pygame.display.update()
    clock.tick(100)
    if done:
        state, info = env.reset()
        pygame.display.update()
        pygame.time.delay(100)
        episodes+=1
        
    pygame.time.delay(100)
    state = next_state

pygame.quit()

TypeError: <class 'keras.src.models.sequential.Sequential'> could not be deserialized properly. Please ensure that components that are Python object instances (layers, models, etc.) returned by `get_config()` are explicitly deserialized in the model's `from_config()` method.

config={'module': 'keras', 'class_name': 'Sequential', 'config': {'name': 'sequential_6', 'trainable': True, 'dtype': {'module': 'keras', 'class_name': 'DTypePolicy', 'config': {'name': 'float32'}, 'registered_name': None, 'shared_object_id': 6255478736}, 'layers': [{'module': 'keras.layers', 'class_name': 'InputLayer', 'config': {'batch_shape': [None, 3], 'dtype': 'float32', 'sparse': False, 'name': 'input_layer_19'}, 'registered_name': None}, {'module': 'keras.layers', 'class_name': 'Dense', 'config': {'name': 'dense_66', 'trainable': True, 'dtype': {'module': 'keras', 'class_name': 'DTypePolicy', 'config': {'name': 'float32'}, 'registered_name': None, 'shared_object_id': 6255478736}, 'units': 128, 'activation': {'module': 'keras.layers', 'class_name': 'LeakyReLU', 'config': {'name': 'leaky_re_lu_54', 'trainable': True, 'dtype': {'module': 'keras', 'class_name': 'DTypePolicy', 'config': {'name': 'float32'}, 'registered_name': None, 'shared_object_id': 6255478736}, 'negative_slope': 0.2}, 'registered_name': None}, 'use_bias': True, 'kernel_initializer': {'module': 'keras.initializers', 'class_name': 'GlorotUniform', 'config': {'seed': None}, 'registered_name': None}, 'bias_initializer': {'module': 'keras.initializers', 'class_name': 'Zeros', 'config': {}, 'registered_name': None}, 'kernel_regularizer': None, 'bias_regularizer': None, 'kernel_constraint': None, 'bias_constraint': None}, 'registered_name': None, 'build_config': {'input_shape': [None, 3]}}, {'module': 'keras.layers', 'class_name': 'Dense', 'config': {'name': 'dense_67', 'trainable': True, 'dtype': {'module': 'keras', 'class_name': 'DTypePolicy', 'config': {'name': 'float32'}, 'registered_name': None, 'shared_object_id': 6255478736}, 'units': 64, 'activation': {'module': 'keras.layers', 'class_name': 'LeakyReLU', 'config': {'name': 'leaky_re_lu_55', 'trainable': True, 'dtype': {'module': 'keras', 'class_name': 'DTypePolicy', 'config': {'name': 'float32'}, 'registered_name': None, 'shared_object_id': 6255478736}, 'negative_slope': 0.2}, 'registered_name': None}, 'use_bias': True, 'kernel_initializer': {'module': 'keras.initializers', 'class_name': 'GlorotUniform', 'config': {'seed': None}, 'registered_name': None}, 'bias_initializer': {'module': 'keras.initializers', 'class_name': 'Zeros', 'config': {}, 'registered_name': None}, 'kernel_regularizer': None, 'bias_regularizer': None, 'kernel_constraint': None, 'bias_constraint': None}, 'registered_name': None, 'build_config': {'input_shape': [None, 128]}}, {'module': 'keras.layers', 'class_name': 'Dense', 'config': {'name': 'dense_68', 'trainable': True, 'dtype': {'module': 'keras', 'class_name': 'DTypePolicy', 'config': {'name': 'float32'}, 'registered_name': None, 'shared_object_id': 6255478736}, 'units': 16, 'activation': {'module': 'keras.layers', 'class_name': 'LeakyReLU', 'config': {'name': 'leaky_re_lu_56', 'trainable': True, 'dtype': {'module': 'keras', 'class_name': 'DTypePolicy', 'config': {'name': 'float32'}, 'registered_name': None, 'shared_object_id': 6255478736}, 'negative_slope': 0.2}, 'registered_name': None}, 'use_bias': True, 'kernel_initializer': {'module': 'keras.initializers', 'class_name': 'GlorotUniform', 'config': {'seed': None}, 'registered_name': None}, 'bias_initializer': {'module': 'keras.initializers', 'class_name': 'Zeros', 'config': {}, 'registered_name': None}, 'kernel_regularizer': None, 'bias_regularizer': None, 'kernel_constraint': None, 'bias_constraint': None}, 'registered_name': None, 'build_config': {'input_shape': [None, 64]}}, {'module': 'keras.layers', 'class_name': 'Dense', 'config': {'name': 'dense_69', 'trainable': True, 'dtype': {'module': 'keras', 'class_name': 'DTypePolicy', 'config': {'name': 'float32'}, 'registered_name': None, 'shared_object_id': 6255478736}, 'units': 1, 'activation': 'tanh', 'use_bias': True, 'kernel_initializer': {'module': 'keras.initializers', 'class_name': 'GlorotUniform', 'config': {'seed': None}, 'registered_name': None}, 'bias_initializer': {'module': 'keras.initializers', 'class_name': 'Zeros', 'config': {}, 'registered_name': None}, 'kernel_regularizer': None, 'bias_regularizer': None, 'kernel_constraint': None, 'bias_constraint': None}, 'registered_name': None, 'build_config': {'input_shape': [None, 16]}}], 'build_input_shape': [None, 3]}, 'registered_name': None, 'build_config': {'input_shape': [None, 3]}}.

Exception encountered: <class 'keras.src.layers.core.dense.Dense'> could not be deserialized properly. Please ensure that components that are Python object instances (layers, models, etc.) returned by `get_config()` are explicitly deserialized in the model's `from_config()` method.

config={'module': 'keras.layers', 'class_name': 'Dense', 'config': {'name': 'dense_66', 'trainable': True, 'dtype': {'module': 'keras', 'class_name': 'DTypePolicy', 'config': {'name': 'float32'}, 'registered_name': None, 'shared_object_id': 6255478736}, 'units': 128, 'activation': {'module': 'keras.layers', 'class_name': 'LeakyReLU', 'config': {'name': 'leaky_re_lu_54', 'trainable': True, 'dtype': {'module': 'keras', 'class_name': 'DTypePolicy', 'config': {'name': 'float32'}, 'registered_name': None, 'shared_object_id': 6255478736}, 'negative_slope': 0.2}, 'registered_name': None}, 'use_bias': True, 'kernel_initializer': {'module': 'keras.initializers', 'class_name': 'GlorotUniform', 'config': {'seed': None}, 'registered_name': None}, 'bias_initializer': {'module': 'keras.initializers', 'class_name': 'Zeros', 'config': {}, 'registered_name': None}, 'kernel_regularizer': None, 'bias_regularizer': None, 'kernel_constraint': None, 'bias_constraint': None}, 'registered_name': None, 'build_config': {'input_shape': [None, 3]}}.

Exception encountered: Error when deserializing class 'Dense' using config={'name': 'dense_66', 'trainable': True, 'dtype': 'float32', 'units': 128, 'activation': {'module': 'keras.layers', 'class_name': 'LeakyReLU', 'config': {'name': 'leaky_re_lu_54', 'trainable': True, 'dtype': {'module': 'keras', 'class_name': 'DTypePolicy', 'config': {'name': 'float32'}, 'registered_name': None, 'shared_object_id': 6255478736}, 'negative_slope': 0.2}, 'registered_name': None}, 'use_bias': True, 'kernel_initializer': {'module': 'keras.initializers', 'class_name': 'GlorotUniform', 'config': {'seed': None}, 'registered_name': None}, 'bias_initializer': {'module': 'keras.initializers', 'class_name': 'Zeros', 'config': {}, 'registered_name': None}, 'kernel_regularizer': None, 'bias_regularizer': None, 'kernel_constraint': None, 'bias_constraint': None}.

Exception encountered: Could not interpret activation function identifier: {'module': 'keras.layers', 'class_name': 'LeakyReLU', 'config': {'name': 'leaky_re_lu_54', 'trainable': True, 'dtype': {'module': 'keras', 'class_name': 'DTypePolicy', 'config': {'name': 'float32'}, 'registered_name': None, 'shared_object_id': 6255478736}, 'negative_slope': 0.2}, 'registered_name': None}