In [1]:
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3 import PPO
from pettingzoo.classic import connect_four_v3
import matplotlib.pyplot as plt
import gymnasium as gym
import numpy as np

In [2]:
class TimestepsRewardLoggerCallback(BaseCallback):
    def __init__(self):
        super().__init__()
        self.rewards = []
        self.timesteps = []

    def _on_step(self) -> bool:
        self.rewards.append(self.locals["rewards"][0])
        self.timesteps.append(self.num_timesteps)
        return True


In [None]:
class ConnectFourAgentWrapper(gym.Env):
    def __init__(self, agent_role="player_0", opponent_model=None):
        self.env = connect_four_v3.env()
        self.env.reset()
        self.agent_role = agent_role
        self.opponent_role = "player_1" if agent_role == "player_0" else "player_0"
        self.opponent_model = opponent_model  
        self.action_space = gym.spaces.Discrete(7)
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(6, 7, 2), dtype=np.int8)

    def reset(self, seed=None, options=None):
        self.env.reset(seed=seed)
        if self.agent_role == "player_1":
            obs = self.env.observe("player_0")
            action = self._get_opponent_action(obs)
            self.env.step(action)

        obs = self.env.observe(self.agent_role)["observation"]
        return obs, {}

    def step(self, action):
        obs = self.env.observe(self.agent_role)
        mask = obs["action_mask"]
        if mask[action] == 0:
            action = np.random.choice(np.nonzero(mask)[0])
        self.env.step(action)

        done = self.env.terminations[self.agent_role] or self.env.truncations[self.agent_role]
        reward = self.env.rewards[self.agent_role]

        if done:
            return np.zeros((6, 7, 2)), reward, True, False, {}

        obs = self.env.observe(self.opponent_role)
        action = self._get_opponent_action(obs)
        self.env.step(action)

        done = self.env.terminations[self.agent_role] or self.env.truncations[self.agent_role]
        reward = self.env.rewards[self.agent_role]
        next_obs = self.env.observe(self.agent_role)["observation"] if not done else np.zeros((6, 7, 2))
        return next_obs, reward, done, False, {}

    def _get_opponent_action(self, obs_dict):
        if self.opponent_model is None:
            mask = obs_dict["action_mask"]
            return np.random.choice(np.nonzero(mask)[0])
        else:
            obs = obs_dict["observation"]
            action, _ = self.opponent_model.predict(obs, deterministic=True)
            mask = obs_dict["action_mask"]
            if mask[action] == 0:
                action = np.random.choice(np.nonzero(mask)[0])
            return action

all_rewards_A = []
all_timesteps_A = []

all_rewards_B = []
all_timesteps_B = []


env_A = DummyVecEnv([lambda: ConnectFourAgentWrapper(agent_role="player_0")])
model_A = PPO("MlpPolicy", env_A, verbose=1, device="cpu")

env_B = DummyVecEnv([lambda: ConnectFourAgentWrapper(agent_role="player_1")])
model_B = PPO("MlpPolicy", env_B, verbose=1, device="cpu")

n_rounds = 5
timesteps_per_round = 1000  

coefficient = 0.1 * timesteps_per_round

for i in range(n_rounds):

    callback_A = TimestepsRewardLoggerCallback()
    frozen_B = PPO.load("model_B_temp", device="cpu") if i > 0 else None
    env_A = DummyVecEnv([lambda: ConnectFourAgentWrapper(agent_role="player_0", opponent_model=frozen_B)])
    model_A.learn(total_timesteps=timesteps_per_round + (i+1) * coefficient, callback=callback_A)
    model_A.save("model_A_temp")

    all_rewards_A.extend(callback_A.rewards)
    all_timesteps_A.extend([t + (all_timesteps_A[-1] if all_timesteps_A else 0) for t in callback_A.timesteps])

    callback_B = TimestepsRewardLoggerCallback()
    frozen_A = PPO.load("model_A_temp", device="cpu")
    env_B = DummyVecEnv([lambda: ConnectFourAgentWrapper(agent_role="player_1", opponent_model=frozen_A)])
    model_B.set_env(env_B)
    model_B.learn(total_timesteps=timesteps_per_round, callback=callback_B)
    model_B.save("model_B_temp")
    
    all_rewards_B.extend(callback_B.rewards)
    all_timesteps_B.extend([t + (all_timesteps_B[-1] if all_timesteps_B else 0) for t in callback_B.timesteps])
    


In [None]:
def smooth(y, window=100):
    return np.convolve(y, np.ones(window)/window, mode='valid')

plt.figure(figsize=(10, 4))
plt.plot(smooth(all_timesteps_A, 100), smooth(all_rewards_A, 100), label="Agent A")
plt.title("Agent A – Średnia nagroda vs Timesteps")
plt.xlabel("Timesteps")
plt.ylabel("Średnia nagroda")
plt.grid()
plt.legend()
plt.show()

plt.figure(figsize=(10, 4))
plt.plot(smooth(all_timesteps_B, 100), smooth(all_rewards_B, 100), label="Agent B", color="orange")
plt.title("Agent B – Średnia nagroda vs Timesteps")
plt.xlabel("Timesteps")
plt.ylabel("Średnia nagroda")
plt.grid()
plt.legend()
plt.show()



In [4]:
MODEL_A = "best_model_A"
MODEL_B = "best_model_B"

NUM_GAMES = 1
VISUALIZE = True

model_A = PPO.load(MODEL_A, device="cpu")
model_B = PPO.load(MODEL_B, device="cpu")

results = {"Agent A": 0, "Agent B": 0, "Remis": 0}


def prepare_obs(obs):
    return np.array(obs, dtype=np.float32)

for _ in range(10):
    for game in range(NUM_GAMES):
        if VISUALIZE:
            env = connect_four_v3.env(render_mode="human")
        else:   
            env = connect_four_v3.env(render_mode=None)
        env.reset()
        final_rewards = {"player_0": 0, "player_1": 0}
        done = False

        if game % 2 == 0:
            agents = {"player_0": model_A, "player_1": model_B}
            names = {"player_0": "Agent A", "player_1": "Agent B"}
        else:
            agents = {"player_0": model_B, "player_1": model_A}
            names = {"player_0": "Agent B", "player_1": "Agent A"}


        while not done:
            for agent in env.agent_iter():
                obs, reward, termination, truncation, info = env.last()
                done = termination or truncation

                if not done:
                    model = agents[agent]
                    action, _ = model.predict(prepare_obs(obs["observation"]), deterministic=True)

                    if obs["action_mask"][action] == 0:
                        action = np.random.choice(np.nonzero(obs["action_mask"])[0])
                else:
                    action = None

                env.step(action)
                final_rewards[agent] = reward

        env.close()

        if final_rewards["player_0"] > final_rewards["player_1"]:
            winner = names["player_0"]
        elif final_rewards["player_1"] > final_rewards["player_0"]:
            winner = names["player_1"]
        else:
            winner = "Remis"

        results[winner] += 1

    print(f"\n===== PODSUMOWANIE PO {NUM_GAMES} GRACH =====")
    print(f"Agent A wygrał: {results['Agent A']} razy")
    print(f"Agent B wygrał: {results['Agent B']} razy")
    print(f"Remisy: {results['Remis']} razy")

    results[names["player_0"]] = 0
    results[names["player_1"]] = 0
    results["Remis"] = 0



===== PODSUMOWANIE PO 1 GRACH =====
Agent A wygrał: 1 razy
Agent B wygrał: 0 razy
Remisy: 0 razy

===== PODSUMOWANIE PO 1 GRACH =====
Agent A wygrał: 0 razy
Agent B wygrał: 1 razy
Remisy: 0 razy

===== PODSUMOWANIE PO 1 GRACH =====
Agent A wygrał: 0 razy
Agent B wygrał: 1 razy
Remisy: 0 razy

===== PODSUMOWANIE PO 1 GRACH =====
Agent A wygrał: 1 razy
Agent B wygrał: 0 razy
Remisy: 0 razy

===== PODSUMOWANIE PO 1 GRACH =====
Agent A wygrał: 1 razy
Agent B wygrał: 0 razy
Remisy: 0 razy

===== PODSUMOWANIE PO 1 GRACH =====
Agent A wygrał: 0 razy
Agent B wygrał: 1 razy
Remisy: 0 razy

===== PODSUMOWANIE PO 1 GRACH =====
Agent A wygrał: 0 razy
Agent B wygrał: 1 razy
Remisy: 0 razy

===== PODSUMOWANIE PO 1 GRACH =====
Agent A wygrał: 1 razy
Agent B wygrał: 0 razy
Remisy: 0 razy

===== PODSUMOWANIE PO 1 GRACH =====
Agent A wygrał: 0 razy
Agent B wygrał: 1 razy
Remisy: 0 razy

===== PODSUMOWANIE PO 1 GRACH =====
Agent A wygrał: 1 razy
Agent B wygrał: 0 razy
Remisy: 0 razy
