<a href="https://colab.research.google.com/github/moridin04/CCRNFLRL_PROJECT_COM221ML/blob/main/Copy_of_Hangman.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Training Deep Q-Network Agent to Play Hangman**

Roadmap
1. Markov Decision Process (MDP)
- State Space
- Action Space - Discrete(18)
0 - NOOP

1 - FIRE

2 - UP

3 - RIGHT

4 - LEFT

5 - DOWN

6 - UPRIGHT

7 - UPLEFT

8 - DOWNRIGHT

9 - DOWNLEFT

10 - UPFIRE

11 - RIGHTFIRE

12 - LEFTFIRE

13 - DOWNFIRE

14 - UPRIGHTFIRE

15 - UPLEFTFIRE

16 - DOWNRIGHTFIRE

17 - DOWNLEFTFIRE


- Rewards

2. Setting up "Hangman" environment (Gymnasium)
- Observation Encoding
- Action Encoding
- Reward Shaping Strategy

3. Deep Q-Network
- Input Layer (Word State Encoding)
- Hidden Layer (Fully Connected Layers / Embeddings)
- Output Layer (Q-Values of Each Letter)
- Training Algorithm (Experience Replay, Target Network)
- Hyperparameters (Learning Rate, y, Epsilon-Greedy, Replay Buffer Size, Batch Size)

4. Training Process
- Epsilon Decay
- Replay Buffer (filling and sampling)
- Target Network Update Frequency
- Number of Episodes and Training Duration

5. Evaluation Metrics
- Win Rate
- Average Reward per Episode
- Average Number of Steps until Win/Lose

6. Record Multiple Episodes as Video



## **Installing and Importing**

In [1]:
!pip install --upgrade --pre "gymnasium[atari]" ale-py

import random
import numpy as np
import gymnasium as gym #Hangman Environment
import ale_py
import torch #Neural Networks
import torch.nn as nn
import torch.optim as optim
from collections import deque #Replay Buffer
import matplotlib.pyplot as plt
import imageio
from IPython.display import Video
import cv2
import random
import time
from skimage import transform
import tensorflow as tf
from skimage.color import rgb2gray



## **Check Device Name**

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device: ", device)

Using device:  cuda


## **Trying Environment**

In [3]:
def create_environment():
    env = gym.make("ALE/Hangman-v5", render_mode="rgb_array")

    noop = [1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    fire = [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    up = [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    right = [0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    left = [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    down = [0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    upright = [0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    upleft = [0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    downright = [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    downleft = [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0]
    upfire = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]
    rightfire = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
    leftfire = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0]
    downfire = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0]
    uprightfire = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0]
    upleftfire = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]
    downrightfire = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]
    downleftfire = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]
    possible_actions = [noop, fire, up, right, left, down, upright, upleft, downright, downleft, upfire, rightfire, leftfire, downfire, uprightfire, upleftfire, downrightfire, downleftfire]

    return env, possible_actions

def test_environment():
    env = gym.make("ALE/Hangman-v5", render_mode="rgb_array")

    episodes = 3
    for episode in range(episodes):
        obs, info = env.reset()
        done = False
        total_reward = 0

        while not done:
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            total_reward += reward
            print(f"Reward: {reward}")
            time.sleep(0.02)

        print(f"Episode: {episode+1}, Total Reward: {total_reward}\n")

    env.close()

In [4]:
game, possible_actions = create_environment()

## **Pre-processing (Converting to grayscale and resize)**

In [5]:
def preprocess_frame(frame):
    # Convert to grayscale if RGB
    if len(frame.shape) == 3 and frame.shape[2] == 3:
        frame = rgb2gray(frame)

    # Crop and normalize
    cropped_frame = frame[34:34+192, :160]
    normalized_frame = cropped_frame / 255.0

    # Resize to 84x84
    preprocessed_frame = transform.resize(normalized_frame, [84, 84])

    return preprocessed_frame

## **Stacking Frames**

In [6]:
stack_size = 4
stacked_frames = deque([np.zeros((84,84), dtype=np.int64) for _ in range(stack_size)], maxlen=4)

def stack_frames(stacked_frames, state, is_new_episode):
    frame = preprocess_frame(state)

    if is_new_episode:
        stacked_frames = deque([np.zeros((84,84), dtype=np.int64) for _ in range(stack_size)], maxlen=4)
        for _ in range(4):
          stacked_frames.append(frame)
    else:
        stacked_frames.append(frame)

    stacked_state = np.stack(stacked_frames, axis=2)
    return stacked_state, stacked_frames

# **Hyperparameters**

In [7]:
state_size = [84,84,4]
action_size = len(possible_actions)
learning_rate = 0.0005

total_episodes = 1000
batch_size = 128

explore_start = 1.0
explore_stop = 0.01
decay_rate = 0.0001

gamma = 0.99
memory_size = 1000000

## **Creating Deep Q-Network**

In [17]:
class DQNetwork:
    def __init__(self, state_size, action_size, learning_rate, name='DQNetwork'):
        with tf.compat.v1.variable_scope(name):
            self.inputs_ = tf.compat.v1.placeholder(tf.float32, [None, *state_size], name='inputs')
            self.actions_ = tf.compat.v1.placeholder(tf.float32, [None, action_size], name='actions_')
            self.target_Q = tf.compat.v1.placeholder(tf.float32, [None], name='target')

            self.conv1 = tf.keras.layers.Conv2D(32, [8,8], [4,4], activation='elu')(self.inputs_)
            self.conv2 = tf.keras.layers.Conv2D(64, [4,4], [2,2], activation='elu')(self.conv1)
            self.conv3 = tf.keras.layers.Conv2D(128, [4,4], [2,2], activation='elu')(self.conv2)

            self.flatten = tf.keras.layers.Flatten()(self.conv3)
            self.fc = tf.keras.layers.Dense(512, activation='elu')(self.flatten)
            self.output = tf.keras.layers.Dense(action_size)(self.fc)


            # Safe multiply with guaranteed matching shapes
            self.Q = tf.reduce_sum(tf.multiply(self.output, self.actions_), axis=1)


            self.loss = tf.reduce_mean(tf.square(self.target_Q - self.Q))
            self.optimizer = tf.compat.v1.train.RMSPropOptimizer(learning_rate).minimize(self.loss)

tf.compat.v1.disable_eager_execution()
tf.compat.v1.reset_default_graph()
DQNetwork = DQNetwork(state_size, action_size, learning_rate)

Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor


## **Replay Buffer**

In [18]:
class Memory():
    def __init__(self, max_size):
        self.buffer = deque(maxlen = max_size)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        buffer_size = len(self.buffer)
        index = np.random.choice(np.arange(buffer_size), size = batch_size, replace = False)

        return [self.buffer[i] for i in index]

In [19]:
memory = Memory(max_size = memory_size)
state, _ = game.reset()

for i in range(batch_size):
    if i == 0:
        state, stacked_frames = stack_frames(stacked_frames, state, True)

    action_index = random.randrange(action_size)
    next_state, reward, terminated, truncated, _ = game.step(action_index)
    done = terminated or truncated

    if done:
        next_state = np.zeros(state.shape)
        memory.add((state, action_index, reward, next_state, done))
        state, _ = game.reset()
        state, stacked_frames = stack_frames(stacked_frames, state, True)

    else:
        next_state, stacked_frames = stack_frames(stacked_frames, next_state, False)
        memory.add((state, action_index, reward, next_state, done))
        state = next_state

## **Training Deep Q-Network Agent**

In [20]:
def predict_action(sess, explore_start, explore_stop, decay_rate, decay_step, state, possible_actions):
    exp_exp_tradeoff = np.random.rand()
    explore_probability = explore_stop + (explore_start - explore_stop) * np.exp(-decay_rate * decay_step)

    if explore_probability > exp_exp_tradeoff:
        action_index = random.randrange(len(possible_actions))
    else:
        Qs = sess.run(DQNetwork.output, feed_dict={DQNetwork.inputs_: state.reshape((1, *state.shape))})
        action_index = np.argmax(Qs)
    return action_index, explore_probability

In [21]:
def train_dqn(env, total_episodes, max_steps, memory, DQNetwork,
              explore_start, explore_stop, decay_rate, gamma, batch_size):
    saver = tf.compat.v1.train.Saver()

    episode_rewards_list = []

    with tf.compat.v1.Session() as sess:
        sess.run(tf.compat.v1.global_variables_initializer())
        decay_step = 0

        for episode in range(total_episodes):
            obs, info = env.reset()
            state, stacked_frames = stack_frames(stacked_frames=None, state=obs, is_new_episode=True)
            done = False
            episode_rewards = []

            for step in range(max_steps):
                decay_step += 1
                action_index, explore_probability = predict_action(
                    sess, explore_start, explore_stop, decay_rate, decay_step, state, possible_actions
                )

                next_obs, reward, terminated, truncated, info = env.step(action_index)
                done = terminated or truncated

                next_state, stacked_frames = stack_frames(stacked_frames, next_obs, False)
                memory.add((state, action_index, reward, next_state, done))
                state = next_state
                episode_rewards.append(reward)

                if done:
                    total_reward = np.sum(episode_rewards)
                    print(f"Episode {episode+1}/{total_episodes} | Total Reward: {total_reward:.2f} | Explore P: {explore_probability:.4f}")
                    break

                # --- Learning step ---
                if len(memory.buffer) > batch_size:
                    batch = memory.sample(batch_size)
                    states_mb = np.array([each[0] for each in batch])
                    actions_mb = np.array([each[1] for each in batch])
                    rewards_mb = np.array([each[2] for each in batch])
                    next_states_mb = np.array([each[3] for each in batch])
                    dones_mb = np.array([each[4] for each in batch])

                    target_Qs_batch = []
                    Qs_next_state = sess.run(DQNetwork.output, feed_dict={DQNetwork.inputs_: next_states_mb})

                    for i in range(batch_size):
                        if dones_mb[i]:
                            target_Qs_batch.append(rewards_mb[i])
                        else:
                            target = rewards_mb[i] + gamma * np.max(Qs_next_state[i])
                            target_Qs_batch.append(target)

                    targets_mb = np.array(target_Qs_batch)

                    actions_one_hot = np.eye(len(possible_actions))[actions_mb]
                    loss, _ = sess.run(
                        [DQNetwork.loss, DQNetwork.optimizer],
                        feed_dict={
                            DQNetwork.inputs_: states_mb,
                            DQNetwork.target_Q: targets_mb,
                            DQNetwork.actions_: actions_one_hot,
                        }
                    )

            total_reward = np.sum(episode_rewards)
            episode_rewards_list.append(total_reward)

            if episode % 5 == 0:
                saver.save(sess, "./models/hangman_dqn.ckpt")
                print("Model saved.")

        env.close()
        return episode_rewards_list

In [22]:
def plot_training_progress(rewards, window=20):
    if rewards is None or len(rewards) == 0:
        print("No rewards to plot yet.")
        return

    plt.figure(figsize=(12, 4))

    # Raw rewards
    plt.subplot(1, 2, 1)
    plt.plot(rewards, color="blue", linewidth=1.5)
    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.title("DQN Training on Hangman-v5")

    # Moving average
    plt.subplot(1, 2, 2)
    if len(rewards) > window:
        moving_avg = [np.mean(rewards[i:i+window]) for i in range(len(rewards)-window)]
        plt.plot(moving_avg, color="orange", linewidth=2)
        plt.title(f"Moving Average Reward (window={window})")
    else:
        plt.plot(rewards, color="orange", linewidth=2)
        plt.title("Moving Average Reward (too few episodes)")

    plt.xlabel("Episode")
    plt.ylabel("Reward")
    plt.tight_layout()
    plt.show()

In [None]:
rewards = train_dqn(
    env=game,
    total_episodes=1000,
    max_steps=100,
    memory=memory,
    DQNetwork=DQNetwork,
    explore_start=explore_start,
    explore_stop=explore_stop,
    decay_rate=decay_rate,
    gamma=gamma,
    batch_size=batch_size
)

Model saved.
Model saved.
Model saved.
Model saved.
Model saved.
Model saved.
Model saved.
Model saved.
Model saved.
Model saved.
Model saved.
Model saved.
Model saved.


## **Plotting DQN Training per Episode and Moving Average**

In [None]:
plot_training_progress(rewards, window=50)

## **Recording all Episodes**

In [None]:
def record_episode(env, DQNetwork, possible_actions, stacked_frames, filename="hangman_episode.mp4"):
    with tf.compat.v1.Session() as sess:
        # Load the trained model
        saver = tf.compat.v1.train.Saver()
        saver.restore(sess, "./models/hangman_dqn.ckpt")

        obs, info = env.reset()
        state, stacked_frames = stack_frames(None, obs, True)
        done = False
        total_reward = 0
        frames = []

        while not done:
            # Get Q-values from the trained network
            Qs = sess.run(DQNetwork.output, feed_dict={DQNetwork.inputs_: state.reshape((1, *state.shape))})
            choice = np.argmax(Qs)
            action_index = int(choice)

            # Step in environment
            next_obs, reward, terminated, truncated, info = env.step(action_index)
            done = terminated or truncated
            total_reward += reward

            # Preprocess and stack frames
            next_state, stacked_frames = stack_frames(stacked_frames, next_obs, False)
            state = next_state

            # Render frame
            frame = env.render()
            frames.append(frame)

            if done:
                break

        env.close()
        print(f"Total score (reward): {total_reward}")

        # Save video
        imageio.mimsave(filename, frames, fps=15)
        print(f"Episode saved to {filename}")

        from IPython.display import Video
        return Video(filename, embed=True)

In [None]:
env, possible_actions = create_environment()

for i in range(10):  # record 10 episodes
    filename = f"trained_hangman_ep{i+1}.mp4"
    print(f"\n🎥 Recording Episode {i+1}/10...")
    record_episode(env, DQNetwork, possible_actions, stacked_frames, filename=filename)


In [None]:
from moviepy.editor import VideoFileClip, concatenate_videoclips

clips = [VideoFileClip(f"trained_hangman_ep{i+1}.mp4") for i in range(10)]
final_clip = concatenate_videoclips(clips)
final_clip.write_videofile("hangman_all_episodes.mp4", codec="libx264", fps=15)

print("Combined video saved as hangman_all_episodes.mp4")


In [None]:
from IPython.display import Video
Video("hangman_all_episodes.mp4", embed=True)