In [None]:
import random
import gymnasium as gym
import numpy as np
import tensorflow as tf

from collections import deque
from keras.layers import Conv2D, Flatten, Dense


In [None]:
# Create tetris environment
env = gym.make("ALE/Tetris-v5")

In [None]:
# Define the state and action space sizes
action_space = env.action_space.n # 5
state_space = env.observation_space.shape # (210, 160, 3)

In [None]:
# Hyperparameters
num_episodes = 10000
batch_size = 64
learning_rate = 0.1
discount_rate = 0.99
exploration_rate = 1
max_exploration_rate = 1
min_exploration_rate = 0.01
exploration_decay_rate = 0.999
update_target_network_steps = 10000

In [None]:
def create_model() -> tf.keras.Model:
    """Create a convolutional neural network model.

    Returns
    -------
    tf.keras.Model
        A sequential model with the following layers:
        - Conv2D with 32 filters, kernel size of 8, stride of 4, and relu activation
        - Conv2D with 64 filters, kernel size of 4, stride of 2, and relu activation
        - Conv2D with 64 filters, kernel size of 3, stride of 1, and relu activation
        - Flatten layer
        - Dense layer with 512 units and relu activation
        - Dense layer with 5 units and linear activation
    """
    model = tf.keras.Sequential([
        Conv2D(32, (8, 8), strides=(4, 4), activation='relu', input_shape=state_space),
        Conv2D(64, (4, 4), strides=(2, 2), activation='relu'),
        Conv2D(64, (3, 3), strides=(1, 1), activation='relu'),
        Flatten(),
        Dense(512, activation='relu'),
        Dense(action_space, activation='linear')
    ])
    return model

In [None]:
model = create_model()
target_model = create_model()

# Compile the model with an optimizer and loss function
model.compile(optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=learning_rate), loss='mse')

# Initially, set the target model weights equal to the model's weights
target_model.set_weights(model.get_weights())

In [None]:
def train(model: tf.keras.Model, target_model: tf.keras.Model, minibatch: np.ndarray, discount_rate: float):
    """Train the model using the minibatch of transitions.

    Parameters
    ----------
    model : tf.keras.Model
        The main neural network model that is being trained.
    target_model : tf.keras.Model
        The target neural network model that is used to predict the Q-values for the next state.
    minibatch : np.ndarray
        The minibatch of transitions to train the model on.
    discount_rate : float
        The discount rate to use when calculating the Q-values.
    """
    # Extract information from the minibatch
    states = np.array([transition[0] for transition in minibatch]) # (64, 210, 160, 3)
    actions = np.array([transition[1] for transition in minibatch]) # (64,)
    rewards = np.array([transition[2] for transition in minibatch]) # (64,)
    next_states = np.array([transition[3] for transition in minibatch]) # (64, 210, 160, 3)
    dones = np.array([transition[4] for transition in minibatch]) # (64,)

    # Predict Q-values for starting state and next state
    current_q_values = model.predict(states, verbose=0)
    next_q_values = target_model.predict(next_states, verbose=0)

    # Update Q-values for actions taken
    for i in range(len(minibatch)):
        if dones[i]:
            next_q_values[i][actions[i]] = rewards[i]
        else:
            next_q_values[i][actions[i]] = rewards[i] + discount_rate * np.amax(next_q_values[i])


    # Perform a gradient descent step
    model.fit(states, next_q_values, epochs=1, verbose=0, batch_size=len(minibatch))

In [None]:
# Initialize a step counter and replay memory
step_counter = 0
replay_memory = deque(maxlen=10000)

for episode in range(num_episodes):
    state, _ = env.reset()
    done = False
    while not done:
        # Exploration-exploitation trade-off
        exploration_threshold = random.uniform(0, 1)
        # If exploration_threshold > exploration_rate, then exploitation
        if exploration_threshold > exploration_rate:
            q_values = model.predict(np.expand_dims(state, axis=0), verbose=0) # add batch dimension
            action = np.argmax(q_values[0])
        else:
            action = env.action_space.sample()
        
        # Increment the step counter
        step_counter += 1
        
        # Take action and observe the next state and reward
        next_state, reward, done, _, _ = env.step(action)

        # Add the experience to replay memory
        replay_memory.append((state, action, reward, next_state, done))
        
        # Sample a minibatch from the replay buffer
        if len(replay_memory) > batch_size:
            minibatch = random.sample(replay_memory, batch_size)
            # Train the model on the minibatch
            train(model, target_model, minibatch, discount_rate)

        if step_counter % update_target_network_steps == 0:
            # Update the the target network with new weights
            target_model.set_weights(model.get_weights())

        # Decay the exploration rate
        exploration_rate =  max(min_exploration_rate, exploration_rate * exploration_decay_rate)
    # End of episode
    print(f'Episode: {episode}, Exploration Rate: {exploration_rate:.2f}')