In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import numpy as np
from torch.distributions import Categorical


In [3]:
class enva(): # player를 9로 해보자!
    def __init__(self):
        self.height = 10
        self.width = 10
        self.num_obstacle_range =4
        self.num_obstacle_range_min = 3
        self.turn = 0
        self.position = int(round(self.width/2))
        self.map = np.array([[0.0 for j in range(self.width)] for i in range(self.height)])
        self.map[self.height-1][self.position] = 2. # 2.
        self.done = False
        
    def step(self, key):
        #obstacle down
        i = self.height - 2
        while(i>=0):
            self.map[i+1] = self.map[i]
            i -= 1
        #obstacle init(first_line)
        self.map[0] = np.array([0 for j in range(self.width)])
        #obstacle making
        num_obstacle = random.randrange(self.num_obstacle_range_min,self.num_obstacle_range)
        i = 0
        while(i < num_obstacle):
            position_obstacle = random.randrange(0,self.width)
            if (self.map[0][position_obstacle] == 1.0):
                continue
            self.map[0][position_obstacle] = 1.0
            i += 1
         #big_obstacle
        big_obstacle = random.randrange(0,20)
        if(big_obstacle == -3):###############################
            big_obstacle = random.randrange(1,self.width-1)
            for j in range(3):
                for k in range(-1,2):
                    self.map[j][big_obstacle + k] = 1.
        #player position
        reward = 1
        # 0 : 왼쪽, 1 : 가만히, 2 : 오른쪽
        if key == 0:
            if(self.position>0):
                self.position -= 1
            
        if key == 1:
            pass      
        if key == 2:
            if(self.position<self.width-1):
                self.position += 1
                  
        if (self.map[self.height-1][self.position] == 1.0):
                    reward = 0
                    self.done = True
                    #print("====Game Over====")
                        
        self.map[self.height-1][self.position] =  2. # 2.
        #reward(turn(time))
        self.turn += 1
        
        return torch.flatten(torch.tensor(self.map),0).numpy(), reward, self.done, _ 

    def reset(self):
        self.turn = 0
        self.position = round(self.width/2)
        self.map = np.array([[0.0 for j in range(self.width)] for i in range(self.height)])
        self.map[self.height-1][self.position] = 2.0
        self.done = False
        
        return torch.flatten(torch.tensor(self.map),0).numpy()

In [4]:
#Hyperparameters
learning_rate = 0.0005
gamma         = 0.98
lmbda         = 0.95
eps_clip      = 0.1
K_epoch       = 5
T_horizon     = 20


In [5]:
# PPO DNN 1 by 100

class PPO(nn.Module):
    def __init__(self):
        super(PPO, self).__init__()
        self.data = []

        self.fc0   = nn.Linear(100,128)
        self.fc1   = nn.Linear(128,128)
        self.fc2   = nn.Linear(128,128)

        self.fc4   = nn.Linear(128,128)
        self.fc_pi = nn.Linear(128,3)
        self.fc_v  = nn.Linear(128,1)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def pi(self,x, s_dim = 0):

        x = torch.flatten(x, start_dim = s_dim, end_dim = -1) ###########
        x = F.relu(self.fc0(x))
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc4(x))
        x = self.fc_pi(x)
        prob = F.softmax(x, dim = s_dim)

        torch.nan_to_num(prob, nan = 0.0)
        return prob
    
    def v(self, x):
        
        x = torch.flatten(x, start_dim = 1, end_dim = -1)
        
        x = F.relu(self.fc0(x))
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc4(x))
        v = self.fc_v(x)

        return v
      
    def put_data(self, transition):
        self.data.append(transition)
        
    def make_batch(self):
        s_lst, a_lst, r_lst, s_prime_lst, prob_a_lst, done_lst = [], [], [], [], [], []

        for transition in self.data:
            s, a, r, s_prime, prob_a, done = transition
            
            s_lst.append(s)
            a_lst.append([a])
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            prob_a_lst.append([prob_a])
            done_mask = 0 if done else 1
            done_lst.append([done_mask])
        
        s,a,r,s_prime,done_mask, prob_a = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
                                          torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
                                          torch.tensor(done_lst, dtype=torch.float), torch.tensor(prob_a_lst)
        self.data = []
        
        # num_layer X batch X hidden으로 만들기
        return s, a, r, s_prime, done_mask, prob_a
        
    def train_net(self):
        s, a, r, s_prime, done_mask, prob_a = self.make_batch()
        
        td_target = r + gamma * self.v(s_prime) * done_mask
        delta = td_target - self.v(s)
        delta = delta.detach().numpy()

        advantage_lst = []
        advantage = 0.0
        for delta_t in delta[::-1]:
            advantage = gamma * lmbda * advantage + delta_t[0]
            advantage_lst.append([advantage])
        advantage_lst.reverse()
        advantage = torch.tensor(advantage_lst, dtype=torch.float) 
       
        for i in range(K_epoch):
        
            pi = self.pi(s, s_dim = 1) 
            pi_a = pi.gather(1,a) 

            ratio = torch.exp(torch.log(pi_a) - torch.log(prob_a))  # a/b == exp(log(a)-log(b)) ##############

            
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * advantage
            
            loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.v(s) , td_target.detach()) 
            
            self.optimizer.zero_grad()
            loss.mean().backward() # 여러 번 loss의 gradient를 구할 때
            self.optimizer.step()
        
def main():
    env = enva()
    model = PPO()
    #model.load_state_dict(torch.load('ppo_ddong_with_dongmin.pt'))

    score = 0.0
    print_interval = 50
    best_score = 0.0
    one_score = 0.0
    
    for n_epi in range(50000):
        s = env.reset()
        done = False
        while not done:
            for t in range(T_horizon):
                prob = model.pi(torch.from_numpy(s).float())
                
                m = Categorical(prob)
                a = m.sample().item()
                s_prime, r, done, info = env.step(a)
                
                model.put_data((s, a, r, s_prime, prob[a].item(), done))
                s = s_prime
                
                score += r
                one_score += r
                if done:
                    break
            
            model.train_net()
            
        if best_score <= one_score:
            best_score = one_score
        #    print("... save model ...")
            print(f"best score : {one_score}")
        #    torch.save(model.state_dict(),'ppo_ddong_with_dongmin.pt')
            
        one_score = 0
            
        if n_epi%print_interval==0 and n_epi!=0:
            print("# of episode :{}, avg score : {:.1f}".format(n_epi, score/print_interval))
            score = 0.0
    
    return best_score

if __name__ == '__main__':
    best_score = main()
    print(f"final best_score : {best_score}")
    


# of episode :100, avg score : 25.1
# of episode :200, avg score : 25.3
# of episode :300, avg score : 23.6
# of episode :400, avg score : 22.5
# of episode :500, avg score : 22.2
# of episode :600, avg score : 26.1
# of episode :700, avg score : 23.5
# of episode :800, avg score : 22.7
# of episode :900, avg score : 22.5
# of episode :1000, avg score : 26.1
# of episode :1100, avg score : 25.4
# of episode :1200, avg score : 24.7
# of episode :1300, avg score : 23.4
# of episode :1400, avg score : 22.5
# of episode :1500, avg score : 23.0
# of episode :1600, avg score : 19.6
# of episode :1700, avg score : 18.7
# of episode :1800, avg score : 19.9
# of episode :1900, avg score : 22.1
# of episode :2000, avg score : 23.7
# of episode :2100, avg score : 21.4
# of episode :2200, avg score : 23.4
# of episode :2300, avg score : 25.1
# of episode :2400, avg score : 22.9
# of episode :2500, avg score : 19.9
# of episode :2600, avg score : 20.4
# of episode :2700, avg score : 21.4
# of episo

RuntimeError: invalid multinomial distribution (encountering probability entry < 0)

In [None]:
model.load_state_dict(torch.load('ppo_ddong_with_dongmin.pt'))

In [None]:
import sys
from time import sleep
import numpy as np
import random
import time
import copy

q = PPO()

q.load_state_dict(torch.load('ppo_ddong_with_dongmin.pt'))
height = 10
width = 10
num_obstacle_range =7
num_obstacle_range_min = 3
turn = 0
position = round(width/2)
player_position = round(width/2)
map = np.array([[0. for j in range(width)] for i in range(height)])
map[height-1][position] =  9. # 2.

player_map = copy.deepcopy(map)
q.eval()
with torch.no_grad():

    while True:
        #print_map
        action = q.pi(torch.flatten(torch.tensor(map).float()))
        print("===============player map==================\t\t===============ai map==================")
        for m,n in zip(player_map,map):
            print(m,end="\t\t\t\t  ")
            print(n)
            
        #print("===============ai map==================")
        #print(map)
        #print("===============player map==================")
        #print(player_map)
        print("Turn:" ,turn)
        #obstacle down
        i = height - 2
        while(i>=0):
            map[i+1] = map[i]
            i -= 1
        #obstacle init(first_line)
        map[0] = np.array([0. for j in range(width)])
        #obstacle making
        num_obstacle = random.randrange(num_obstacle_range_min,num_obstacle_range)
        i = 0
        while(i < num_obstacle):
            position_obstacle = random.randrange(0,width)
            if (map[0][position_obstacle] == 1.):
                continue
            map[0][position_obstacle] = 1.
            i += 1
        #big_obstacle
        big_obstacle = random.randrange(0,20)
        if(big_obstacle == 0):
            big_obstacle = random.randrange(1,width-1)
            for j in range(3):
                for k in range(-1,2):
                    map[j][big_obstacle + k] = 1.
        #copy player_map
        player_map = copy.deepcopy(map)
        #ai position
        #print("value : ",action)
        action = torch.argmax(action).item()
        #print("action : ", action)
        
        # check the time
        start_time = time.time()
        while True:
            
            player_key = input()
            key = action
            
            #if (time.time() - start_time) > 5:
            #    print("...Time Out!!...")
            #    break
            #else :
            #    print("Time: {:.4f}sec".format((time.time() - start_time)))
                
            # player action
            if player_key == "j":
                if(player_position>0):
                    player_position -= 1
                
            if player_key == "k":
                pass        
            if player_key == "l":
                if(player_position<width-1):
                    player_position += 1
                        

            # ai action
            if key == 0:
                if(position>0):
                    position -= 1
                break
            if key == 1:
                break        
            if key ==2:
                if(position<width-1):
                    position += 1
                break        
            
            
        if map[height-1][position] == 1 or player_map[height-1][player_position] == 1:
                    print("====Game Over====")
                    if map[height-1][position] == 1:
                        print("player win!!\n\n")
                    
                    elif player_map[height-1][player_position] == 1:
                        print("AI win!!\n\n")
                    sys.exit()    
        map[height-1][position] =  9. # 2.
        player_map[height-1][player_position] =  9. # 2.j
        #reward(turn(time))
        turn += 1
        
