### Run in collab
<a href="https://colab.research.google.com/github/racousin/data_science_practice/blob/master/website/public/modules/data-science-practice/module9/exercise/module9_exercise2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
%%capture
!pip install swig==4.2.1
!pip install gymnasium==1.2.0

In [2]:
import warnings
warnings.filterwarnings('ignore')
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt

# module9_exercise2 : ML - Arena <a href="https://ml-arena.com/viewcompetition/5" target="_blank"> FrozenLake Competition</a>

### Objective
Get at list an agent running on ML-Arena <a href="https://ml-arena.com/viewcompetition/5" target="_blank"> FrozenLake Competition</a> with mean reward upper than 0.35 (ie 35%)


You should submit an agent file named `agent.py` with a class `Agent` that includes at least the following attributes:

In [3]:
class Agent:
    def __init__(self, env):
        self.env = env

    def choose_action(self, observation, reward=0.0, terminated=False, truncated=False, info=None):
        action = self.env.action_space.sample() # your logic here
        return action

### Description

The game starts with the player at location [0,0] of the frozen lake grid world with the goal located at far extent of the world [7,7].

Holes in the ice are distributed in set locations.

The player makes moves until they reach the goal or fall in a hole.

Each run will consist of 10 attempts to cross the ice. The reward will be the total amount accumulated during those trips. For example, if your agent reaches the goal 3 times out of 10, its reward will be 3.

The environment is based on :

In [4]:
env = gym.make('FrozenLake-v1', map_name="8x8")

In [9]:
"""%%writefile agent.py
import numpy as np

class Agent:
    def __init__(self, env, gamma=0.99, tol=1e-10, max_iter=10000):
        self.env = env
        nS = env.observation_space.n
        nA = env.action_space.n
        P = getattr(getattr(env, "unwrapped", env), "P", None)

        if P is not None:
            V = np.zeros(nS, dtype=float)

            def q_of(s, a, Vvec):
                total = 0.0
                for (p, ns, r, done) in P[s][a]:
                    total += p * (r + (0.0 if done else gamma * Vvec[ns]))
                return total

            for _ in range(max_iter):
                delta = 0.0
                for s in range(nS):
                    qs = [q_of(s, a, V) for a in range(nA)]
                    best = max(qs)
                    delta = max(delta, abs(best - V[s]))
                    V[s] = best
                if delta < tol:
                    break

            policy = np.zeros(nS, dtype=int)
            for s in range(nS):
                qs = [q_of(s, a, V) for a in range(nA)]
                policy[s] = int(np.argmax(qs))
            self.policy = policy.tolist()
            return

        # fallback：取不到 P 则快速 Q-learning 预训练，导出查表策略
        Q = np.zeros((nS, nA), dtype=float)
        eps, eps_end, eps_decay = 1.0, 0.05, 0.999
        alpha = 0.6

        for _ in range(8000):
            s, _ = env.reset()
            done = False
            while not done:
                a = env.action_space.sample() if (np.random.rand() < eps) else int(np.argmax(Q[s]))
                s2, r, terminated, truncated, _ = env.step(a)
                done = terminated or truncated
                Q[s, a] += alpha * (r + (0.0 if done else gamma * np.max(Q[s2])) - Q[s, a])
                s = s2
            eps = max(eps_end, eps * eps_decay)

        self.policy = np.argmax(Q, axis=1).tolist()

    def choose_action(self, observation, reward=0.0, terminated=False, truncated=False, info=None):
        return int(self.policy[observation])

Writing agent.py


In [10]:
# Check the existence of agent
!ls -l agent.py

-rw-r--r-- 1 root root 2015 Nov 11 02:48 agent.py


In [11]:
import gymnasium as gym
import numpy as np

from agent import Agent  # 如果你在同目录保存为 agent.py

def evaluate_agent(n_runs=200, max_steps=200, seed=123):
    env = gym.make("FrozenLake-v1", map_name="8x8")  # 默认 is_slippery=True
    env.reset(seed=seed)
    agent = Agent(env)

    def play_one_episode():
        obs, info = env.reset()
        terminated = truncated = False
        steps = 0
        while not (terminated or truncated):
            a = agent.choose_action(obs)
            obs, r, terminated, truncated, info = env.step(a)
            steps += 1
            if steps >= max_steps:
                # Gymnasium TimeLimit 通常会自己截断；这里兜底
                truncated = True
        # 成功到达 G 的回报为 1，否则 0
        return r

    run_rewards = []
    for _ in range(n_runs):
        successes = sum(play_one_episode() for __ in range(10))
        run_rewards.append(successes)

    run_rewards = np.array(run_rewards, float)
    mean_reward = run_rewards.mean()          # 每 run 的平均成功次数（0~10）
    success_rate = mean_reward / 10.0         # 单次尝试的平均成功率（0~1）

    print(f"Runs: {n_runs} | mean_reward_per_run: {mean_reward:.3f} / 10")
    print(f"Per-attempt success rate: {100*success_rate:.2f}%")
    return mean_reward, success_rate

_ = evaluate_agent(n_runs=300)


Runs: 300 | mean_reward_per_run: 6.283 / 10
Per-attempt success rate: 62.83%


### Before submit
Test that your agent has the right attributes

In [16]:
env = gym.make('FrozenLake-v1', map_name="8x8")
agent = Agent(env)

observation, _ = env.reset()
reward, terminated, truncated, info = None, False, False, None
rewards = []
while not (terminated or truncated):
    action = agent.choose_action(observation, reward=reward, terminated=terminated, truncated=truncated, info=info)
    observation, reward, terminated, truncated, info = env.step(action)
    rewards.append(reward)
print(f'Cumulative Reward: {sum(rewards)}')

Cumulative Reward: 1.0


In [17]:
import gymnasium as gym
from agent import Agent

env = gym.make("FrozenLake-v1", map_name="8x8")
agent = Agent(env, gamma=1.0)  # 想更贴近“到达概率”可用 1.0

def play_one_episode(max_steps=200):
    obs, _ = env.reset()
    terminated = truncated = False
    steps = 0
    r_final = 0.0
    while not (terminated or truncated):
        a = agent.choose_action(obs)
        obs, r, terminated, truncated, _ = env.step(a)
        r_final = r          # 只有到终点时这一步是 1，其余都是 0
        steps += 1
        if steps >= max_steps:
            truncated = True
    return r_final           # 成功=1，失败=0

# —— 按题目：1 次 run = 10 个 episode，奖励=成功次数 ——
successes = sum(play_one_episode() for _ in range(10))
print("Reward for this run (10 tries):", successes, "/ 10")


Reward for this run (10 tries): 9.0 / 10
