In [1]:
from torch import nn
import torch.nn.functional as F
import torch
from fight_env import fight_env
from fight_train_func import rewardFunction, normalizeData
import random
from fightAlgorithm import testDataForAgent
from PPO import PPO



class PolicyNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, action_size)

        

    def forward(self, x):
        
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        logits = self.fc3(x)
        return F.log_softmax(logits, dim=-1)

        

        
class ValueNetwork(nn.Module):
    def __init__(self, state_size):
        super(ValueNetwork, self).__init__()
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)  # 僅一個輸出，表示狀態的價值
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x
    

class MyPPO(PPO):
    def __init__(self, env, policyNetwork, valueNetwork):
        super().__init__(policyNetwork, valueNetwork)
        self.env = env

    def show(self):
        device = torch.device("cpu")
        self.PolicyNetwork.to(device, dtype=torch.float32)
        self.ValueNetwork.to(device, dtype=torch.float32)
        data = self.env.reset()
        while (True):    
            self.env.render()                
            updateData = {}
            
            for j in range(1, 7):
                playerID = str(j) + "P"

                state, _, _ = data[playerID]
                action = self.getAction(normalizeData(state))    
                updateData[playerID] = [self.env.actionSpace[action]]
            data = self.env.update(updateData)  


            if (not self.env.not_done()):
                break

    def learn(self, timeStep=10000, dataNum = 4096, lr=0.003, episode=0.2, epoch=10, batchSize=256):
        print("start learning")
        for i in range(timeStep):
            playtime_count = 0
            
            
            
            while (len(self.ExperienceHistory['oldstate']) < dataNum):
                data = self.env.reset()
                
                agentRewards = [0, 0, 0, 0, 0, 0, 0]
                while (True):                
                    updateData = {}
                    oldStates = {}
                    actions = {}
                    end_game = not self.env.not_done()
                    for j in range(1, 7):
                        playerID = str(j) + "P"

                            
                        
                        state, _, _ = data[playerID]
                        oldStates[playerID] = state
                        if (j % 3 == 1):
                            action = self.env.actionID[testDataForAgent(state)]
                        else:
                            action = self.getAction(normalizeData(state))
                       
                        
                        updateData[playerID] = [self.env.actionSpace[action]]
                        actions[playerID] = action
                    data = self.env.update(updateData)  

                    for j in range(1, 7):
                        playerID = str(j) + "P"
                        old_state = oldStates[playerID]
                        new_state, liveLoss, scoreUp = data[playerID]
                        action = actions[playerID]
                        reward = rewardFunction(old_state, self.env.actionSpace[action], scoreUp, liveLoss)
                        
                        done = int(end_game)
                        
                        self.ExperienceHistory['oldstate'].append(normalizeData(old_state))
                        self.ExperienceHistory['state'].append(normalizeData(new_state))
                        self.ExperienceHistory['action'].append(action)
                        self.ExperienceHistory['reward'].append(reward)
                        self.ExperienceHistory['done'].append(done)

                        agentRewards[j] += reward

                                

                    if (end_game):
                        if (i % 10 == 0):
                            print(f"time step:{i + 1}", end=" ")
                            for j in range(1, 7):
                                
                                print(f"player{j} reward: {agentRewards[j]}", end=",")
                            print()
                        playtime_count += 1
                        break
            self.train(epochs=epoch, lr=lr, episode=episode, batch_size=batchSize)





pygame 2.5.2 (SDL 2.28.3, Python 3.9.16)
Hello from the pygame community. https://www.pygame.org/contribute.html


In [2]:
env = fight_env(FPS=300)
policyNetwork = PolicyNetwork(5, 7)
valueNetwork = ValueNetwork(5)
agent = MyPPO(env, policyNetwork=policyNetwork, valueNetwork=valueNetwork)


In [3]:
agent.learn(timeStep=10000, lr=0.003, dataNum=4096)

start learning
time step:1 player1 reward: 8.66999999999986,player2 reward: -6.7699999999999,player3 reward: -6.969999999999896,player4 reward: 8.069999999999872,player5 reward: -6.969999999999896,player6 reward: -6.269999999999911,


  with torch.autograd.detect_anomaly():


In [4]:
import pygame

for i in range(10):
    agent.show()
    pygame.quit()