In [1]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
from torch.distributions import Categorical
import numpy as np
import tyro
from torch.utils.tensorboard import SummaryWriter
from dataclasses import dataclass
import tensorboard

In [2]:
@dataclass
class Args:
    env_id: str = 'CartPole-v1'
    seed: int = 0
    steps: int = 10000
    lr: float = 1e-3
    max_episode_len: int = 400
    
    writer: bool = True
    experement_num: int = 0
    log_path: str = f'runs\experiment_{experement_num}'

# args  = tyro.from_args(Args)
args = Args()

class Policy(nn.Module):
    def __init__(self, env):
        super().__init__()
        self.linear1 = nn.Linear(env.observation_space.shape[0], 128)
        self.linear2 = nn.Linear(128, 128)
        self.linear3 = nn.Linear(128, env.action_space.n)

    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = F.relu(self.linear2(x))
        x = self.linear3(x)
        x = F.softmax(x, dim=-1)
        return x

# Seeds
np.random.seed(args.seed)
torch.manual_seed(args.seed)

#env
env = gym.make(args.env_id)
obs, _ = env.reset()
#policy
policy = Policy(env)
optimizer = Adam(policy.parameters(), lr=args.lr)

# results writer for tensorboard
if args.writer:
    writer = SummaryWriter(args.log_path)
    # writer.add_text(args.text_id, args.exp_summary)
    writer.add_graph(policy, torch.as_tensor(obs))
#save reward sum for SummaryWriter
    

# #training algo
# def generate_episode(env, policy):
#     obs, _ = env.reset()
#     log_probs, rewards = [], []
#     done = False
#     while not done:

#         probs = policy(torch.as_tensor(obs))
    
#         cat = Categorical(probs)
#         action = cat.sample()
#         log_prob = cat.log_prob(action)

#         new_obs, reward, done, truncated, info = env.step(action.item())
#         # print(new_obs, reward)
#         if truncated:
#             print(done, truncated)
        
#         log_probs.append(log_prob)
#         rewards.append(reward)

#         obs = new_obs
        
#     return torch.stack(log_probs), torch.as_tensor(np.asarray(rewards))


# def train(steps, env, policy):
#     for step in range(steps):
#         if step % 500 == 0:
#             print(step)
#         log_probs, rewards = generate_episode(env, policy)

#         if args.writer:
#             writer.add_scalar('episodic_return', rewards.sum(), step)
#             writer.add_scalar('episodic_lenght', log_probs.shape[0], step)
        
#         loss = - (log_probs * rewards).mean()

#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()
#===================================================================================
#===================================================================================
#===================================================================================

def discount_rewards(rewards, gamma=0.99):
    discounted = []
    r = 0
    for reward in reversed(rewards):
        r = reward + gamma * r
        discounted.insert(0, r)
    return torch.as_tensor(discounted)

def generate_episode(env, policy):
    obs, _ = env.reset()
    log_probs, rewards = [], []
    done, truncated = False, False
    ep_len = 0
    while not done and not truncated and ep_len <  args.max_episode_len:  # Исправлено
        ep_len += 1
        
        probs = policy(torch.as_tensor(obs, dtype=torch.float32))
        cat = Categorical(probs)
        action = cat.sample()
        log_prob = cat.log_prob(action)

        new_obs, reward, done, truncated, info = env.step(action.item())
        
        log_probs.append(log_prob)
        rewards.append(reward)

        obs = new_obs

    rewards = discount_rewards(rewards)  # Добавлено дисконтирование
    return torch.stack(log_probs), rewards

def train(steps, env, policy):
    for step in range(steps):
        if step % 500 == 0:
            print(step)
        log_probs, rewards = generate_episode(env, policy)
        # print(log_probs.shape)

        if args.writer:
            # print('reward: shape | sum: ', rewards.shape, rewards.sum())
            writer.add_scalar('episodic_return', rewards.sum(), step)
            # print('log_probs: shape: ', log_probs.shape[0])
            writer.add_scalar('episodic_lenght', log_probs.shape[0], step)

        baseline = rewards.mean()  # Добавлено baseline
        loss = - (log_probs * (rewards - baseline)).mean()  # Исправлено

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()


train(args.steps, env, policy)
print('end')

0
500
1000
1500
2000
2500
3000
3500
4000
4500
5000
5500
6000
6500
7000
7500
8000
8500
9000
9500
end


In [8]:
%load_ext tensorboard
%tensorboard --logdir runs

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


Reusing TensorBoard on port 6007 (pid 3444), started 2 days, 2:56:54 ago. (Use '!kill 3444' to kill it.)

In [48]:
torch.save(policy.state_dict(), 'weights.pth')

In [82]:
env = gym.make(args.env_id, render_mode='rgb_array')
obs, _ = env.reset()
done = False
while not done:
    # Преобразуйте состояние в тензор
    obs_tensor = torch.as_tensor(obs, dtype=torch.float32)
    
    # Получите вероятности действий
    with torch.no_grad():
        probs = policy(obs_tensor)
    
    # Создайте распределение и выберите действие
    cat = Categorical(probs)
    action = cat.sample()
    
    # Выполните действие в среде
    obs, reward, done, _, _ = env.step(action.item())
    # Отрендерите среду
    env.render()

# Закройте среду после завершения
env.close()