In [56]:
# Requirements
!pip install gymnasium[atari]
!pip install gymnasium[accept-rom-license]
!pip install ale-py
!pip install stable-baselines3
!pip install imageio[ffmpeg]



In [57]:
# Import libraries
import gymnasium as gym
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback
import ale_py
from gymnasium.wrappers import TransformReward, TimeLimit
from google.colab import drive
import numpy as np
import os

In [58]:
# Create custom reward and penalty functions
class CustomRewardWrapper(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        # Counters for specific actions taken by the agent
        self.fire_count = 0
        self.left_right_count = 0

        # Thresholds for applying penalties and rewards
        self.left_right_threshold = 100
        self.fire_penalty_threshold = 500

        # Penalties and rewards definitions
        self.fire_penalty = -0.01  # Penalty for excessive "FIRE" actions
        self.termination_penalty = -0.1  # Penalty for game over
        self.movement_reward = 0.01  # Reward for moving left or right

        # Additional bonuses
        self.survival_bonus = 0.005  # Bonus for surviving longer

    def reset(self, **kwargs):
        # Reset counters when a new episode starts
        self.fire_count = 0
        self.left_right_count = 0
        return self.env.reset(**kwargs)

    def step(self, action):
        # Execute the action and observe the outcome
        obs, reward, terminated, truncated, info = self.env.step(action)

        # Increment fire count and apply penalty if threshold is exceeded
        if action == 1:
            self.fire_count += 1
            if self.fire_count > self.fire_penalty_threshold:
                reward += self.fire_penalty  # Penalize for excessive fire actions

        # Increment left/right count and reward movement within threshold
        if action == 2 or action == 3:
            self.left_right_count += 1
            if self.left_right_count <= self.left_right_threshold:
                reward += self.movement_reward  # Reward for moving left or right

        # Apply bonuses for gameplay strategies
        reward += self.survival_bonus  # Bonus for each step the game continues

        # Apply termination penalty if the game ends
        if terminated:
            reward += self.termination_penalty

        return obs, reward, terminated, truncated, info

In [59]:
# Removed because the incremental height penalty overwhelmed the rest of the rewards
# Create custom reward and penalty function
#class CustomRewardWrapper(gym.Wrapper):
    #def __init__(self, env):
        # super().__init__(env)
        # Counters for specific actions taken by the agent
        #self.fire_count = 0
        #self.left_right_count = 0

        # Thresholds for applying penalties and rewards
        #self.left_right_threshold = 100
        #self.fire_penalty_threshold = 500

        # Penalties and rewards definitions
        #self.fire_penalty = -0.01  # Penalty for excessive "FIRE" actions
        #self.termination_penalty = -0.1  # Penalty for game over
        #self.movement_reward = 0.01  # Reward for moving left or right

        # Additional bonuses
        #self.low_stack_bonus = 0.02  # Bonus for keeping stack low
        #self.survival_bonus = 0.005  # Bonus for surviving longer

        # Game specific settings
        #self.game_height = 20  # Total height of the Tetris board
        #self.height_threshold = self.game_height / 2  # Threshold for applying height penalty
        #self.max_height_penalty = -0.02  # Maximum penalty for exceeding height threshold

    #def reset(self, **kwargs):
        # Reset counters when a new episode starts
        #self.fire_count = 0
        #self.left_right_count = 0
        #return self.env.reset(**kwargs)

    #def step(self, action):
        # Execute the action and observe the outcome
        #obs, reward, terminated, truncated, info = self.env.step(action)

        # Increment fire count and apply penalty if threshold is exceeded
        #if action == 1:
            #self.fire_count += 1
            #if self.fire_count > self.fire_penalty_threshold:
                #reward += self.fire_penalty  # Penalize for excessive fire actions

        # Increment left/right count and reward movement within threshold
        #if action == 2 or action == 3:
            #self.left_right_count += 1
            #if self.left_right_count <= self.left_right_threshold:
                #reward += self.movement_reward  # Reward for moving left or right

        # Evaluate the board and adjust reward based on the current stack height
        #board_reward, low_stack = self.evaluate_board(obs)
        #reward += board_reward

        # Apply bonuses for gameplay strategies
        #if low_stack:
            #reward += self.low_stack_bonus  # Bonus for keeping the stack below the threshold
        #reward += self.survival_bonus  # Bonus for each step the game continues

        # Apply termination penalty if the game ends
        #if terminated:
            #reward += self.termination_penalty

        #return obs, reward, terminated, truncated, info

    #def evaluate_board(self, observation):
        # Determine filled positions on the board and calculate height penalties
        #is_filled = np.any(observation != [0, 0, 0], axis=-1)
        #height_penalty = 0
        #low_stack = True  # Assume low stack until found otherwise

        # Calculate height penalty for each column
        #for col in range(is_filled.shape[1]):
            #column_data = is_filled[:, col]
            #first_filled_index = np.argmax(column_data) if np.any(column_data) else len(column_data)
            #column_height = len(column_data) - first_filled_index
            #if column_height > self.height_threshold:
                #excess_height = column_height - self.height_threshold
                #height_penalty += (excess_height / self.height_threshold) * self.max_height_penalty
                #low_stack = False  # Set to False if any column exceeds the threshold

        #return height_penalty, low_stack

In [60]:
# Register the Atari environment
gym.register_envs(ale_py)

# Create the Tetris environment and modify with time limit and custom class
env = gym.make("ALE/Tetris-v5")
#env = TimeLimit(env, max_episode_steps=5000)  # Limit episodes to 5000 steps
env = CustomRewardWrapper(env)

# Mount Google Drive
drive.mount('/content/drive')

# Define the path to the model
model_path = "/content/drive/My Drive/tetris_checkpoints/dqn_tetris_v4.zip"
save_path = "/content/drive/My Drive/tetris_checkpoints/"

# Check if the model exists and load or create a new one
if os.path.exists(model_path):
    print("Loading existing model to continue training.")
    model = DQN.load(model_path, env=env)
else:
    print("No existing model found, training a new model.")
    model = DQN(
        policy="MlpPolicy",
        env=env,
        learning_rate=1e-4,
        buffer_size=1000000,
        learning_starts=20000,
        batch_size=32,
        tau=1.0,
        gamma=0.99,
        train_freq=4,
        target_update_interval=10000,
        exploration_fraction=0.1,
        exploration_final_eps=0.02,
        verbose=1
    )

# Callback to save the best model
eval_callback = EvalCallback(
    env,
    best_model_save_path=save_path,
    log_path=save_path,
    eval_freq=10000,
    deterministic=True,
    render=False
)

# Create the directory if it doesn't exist
os.makedirs('/content/tetris_checkpoints/', exist_ok=True)

# Train the model
model.learn(total_timesteps=200000, callback=[eval_callback])

# Save the final model
final_model_path = os.path.join(save_path, "dqn_tetris_v14")
model.save(final_model_path)
print("Training complete and final model saved at", final_model_path)


# Evaluate the trained model
mean_reward, std_reward = evaluate_policy(model, model.get_env(), n_eval_episodes=10)
print(f"Mean reward: {mean_reward} +/- {std_reward}")

# Close the environment
env.close()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Loading existing model to continue training.
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.




----------------------------------
| rollout/            |          |
|    ep_len_mean      | 590      |
|    ep_rew_mean      | 4.1      |
|    exploration_rate | 0.884    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 882      |
|    time_elapsed     | 2        |
|    total_timesteps  | 2362     |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 563      |
|    ep_rew_mean      | 3.84     |
|    exploration_rate | 0.779    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 879      |
|    time_elapsed     | 5        |
|    total_timesteps  | 4501     |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 544      |
|    ep_rew_mean      | 3.7      |
|    exploration_rate | 0.68     |
| time/               |          |
|    episodes       