# 공정 정보

In [3]:
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd

# AMR 정보
initial_energy = 15
Residual_energy = 15

#노드 정보
start_node = 'SP'
end_node = 'EP'
process_nodes = ['A','B','C','D','CP']
nodes = [start_node]+process_nodes+[end_node]

#공정 및 이동 관련 정보
process_times = {'A': 3, 'B': 4, 'C': 6, 'D': 4,'CP':0}

# 소요시간 설정
times = {
        ('SP', 'A'): 2, ('SP', 'B'): 4, ('SP', 'C'): 8, ('SP', 'D'): 12, ('SP', 'CP'): 7,
        ('A', 'A'): 0, ('A', 'B'): 3, ('A', 'C'): 7, ('A', 'D'): 10, ('A', 'CP'): 7, ('A', 'EP'): 11,
        ('B', 'A'): 3, ('B', 'B'): 0, ('B', 'C'): 5, ('B', 'D'): 8, ('B', 'CP'): 7, ('B', 'EP'): 9,
        ('C', 'A'): 7, ('C', 'B'): 5, ('C', 'C'): 0, ('C', 'D'): 4, ('C', 'CP'): 2, ('C', 'EP'): 6,
        ('D', 'A'): 10, ('D', 'B'): 4, ('D', 'C'): 4, ('D', 'D'): 0, ('D', 'CP'): 8, ('D', 'EP'): 3,
        ('CP', 'A'): 7, ('CP', 'B'): 7, ('CP', 'C'): 2, ('CP', 'D'): 8, ('CP', 'CP'): 0, ('CP', 'EP'): 7}

# State, Action, Input encoding

In [4]:
class State:
    def __init__(self, current_node, process_nodes, current_energy):
        self.current_node = current_node
        self.process_nodes = process_nodes
        self.current_energy = current_energy

    def __repr__(self):
        return f"current_node: {self.current_node}, remaining_nodes: {self.process_nodes}, current_energy: {self.current_energy}"

class Action:
    def __init__(self, next_node, process_nodes):
        self.next_node = next_node
        self.process_nodes = process_nodes  # 클래스 변수로 설정
        
    def get_next_node(self):  # 메서드 이름 변경
        a = [int(node == self.next_node) for node in self.process_nodes]
        return a

# DQNetwork input encoding
def encoding(nodes, state, process_nodes):
    current_node_encoding = [int(node == state.current_node) for node in nodes]
    process_nodes_encoding = [int(node in state.process_nodes) for node in process_nodes]
    current_energy = [state.current_energy]
    state_input = current_node_encoding + process_nodes_encoding + current_energy
    return state_input

state = State(start_node, process_nodes, initial_energy)


state_size= encoding(nodes, state, process_nodes)
num_actions = [int(node in state.process_nodes) for node in process_nodes]
action_dim = Action('C', process_nodes)
action_size = action_dim.get_next_node()  # 메서드 호출

print(state_size)
print(action_size)


[1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 15]
[0, 0, 1, 0, 0]


# Num to str

In [5]:
action_dic= {0: 'A',1:'B',2: 'C',3: 'D',4:'CP'}
action_num = int(np.random.choice(5))
action = action_dic[action_num]
action

'A'

In [8]:
import numpy as np

action_dic = {0: 'A', 1: 'B', 2: 'C', 3: 'D', 4: 'CP'}
action_num = int(np.random.choice(5))
action = action_dic[action_num]
del action_dic[action_num]



# 삭제한 후에 새로운 딕셔너리 생성
action_dic = {i: action_dic[i] for i in range(len(action_dic))}
action_dic

KeyError: 1

# DQN

In [9]:
# state = (차량 넘버, 현재 위치, 사용한 시간, 남은 공정, 남은 공정시간, 남은 에너지)
# action = 다음 노드 선정
# reward = 기다린 시간 + 사용한 시간
# Epsilon_greedy


class State():
    def __init__(self, time):
        self.car_num = car_num
        self.current_node = current_node
        self.residula_process = process_nodes
        self.residual_process_time = current_time-time
        self.current_energy = current_energy
        
    def __repr__(self):
        return f"car_num: {self.car_num}, current_node: {self.current_node}, residula_process: {self.residula_process}, residual_process_time: {self.residual_process_time}, current_energy: {self.current_energy}"
        
        
# Q 네트워크를 정의하는 클래스
class QNetwork(nn.Module):
    def __init__(self, state):
        super(QNetwork, self).__init__(state_size,action_size)
        self.fc1 = nn.Linear(state_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_size)
    
    def forward(self, state):
        x = torch.relu(self.fc1(state_size))
        x = torch.relu(self.fc2(x))
        q_values = self.fc3(x)
        return q_values

# DQN 에이전트 클래스
class DQNAgent:
    def __init__(self, state_size, action_size, learning_rate, gamma, epsilon, epsilon_decay):
        self.q_network = QNetwork(state_size, action_size)
        self.target_network = QNetwork(state_size, action_size)  # 타겟 네트워크
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=learning_rate)
        self.gamma = gamma  # 할인 계수
        self.epsilon = epsilon  # 입실론 값 (탐험 vs. 이용)
        self.epsilon_decay = epsilon_decay  # 입실론 감소율
    
    def choose_action(self):
        if random.random() < self.epsilon:
            action_num = str(np.random.choice(5))
            action = action_dic[action_num]
            while action is not process_nodes:
                action_num = str(np.random.choice(5))
                action = action_dic[action_num]
            return action
        else:
            # 입실론 확률보다 크면 Q-value가 가장 높은 액션 선택
            state = torch.Tensor(state)
            q_values = self.q_network(state)
            return torch.argmax(q_values).item()
    
    def learn(self, state, action, reward, next_state):
        # Q-learning 업데이트 수행
        state = torch.Tensor(state)
        next_state = torch.Tensor(next_state)
        q_values = self.q_network(state)
        next_q_values = self.target_network(next_state)
        max_next_q_value = torch.max(next_q_values)
        target_q_value = reward + self.gamma * max_next_q_value
        loss = nn.MSELoss()(q_values[action], target_q_value)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
    
    def update_target_network(self):
        # 타겟 네트워크 업데이트 (탐색 안정성을 위해)
        self.target_network.load_state_dict(self.q_network.state_dict())
    
    def update_epsilon(self):
        # 입실론 값을 감소시키는 함수
        self.epsilon *= self.epsilon_decay
        self.epsilon = max(0.01, self.epsilon)  # 최소 입실론 값

In [11]:
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd

# Define DQN model
class DQN(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, output_size)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Define Replay Buffer
class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, transition):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = transition
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

# Define epsilon-greedy policy
def select_action(model, state, epsilon):
    if random.random() < epsilon:
        return random.randint(0, num_actions - 1)  # Random action
    else:
        with torch.no_grad():
            return model(state).max(1)[1].view(1, 1)  # Greedy action

# Initialize DQN model, target model, and optimizer
input_size = len(state_size)
output_size = num_actions
dqn = DQN(input_size, output_size)
target_dqn = DQN(input_size, output_size)
target_dqn.load_state_dict(dqn.state_dict())
optimizer = optim.Adam(dqn.parameters())

# Hyperparameters
gamma = 0.99
epsilon_start = 1.0
epsilon_end = 0.01
epsilon_decay = 0.995
target_update = 10
memory_capacity = 10000
batch_size = 32

# Initialize replay buffer
replay_buffer = ReplayBuffer(memory_capacity)

# Training loop
epsilon = epsilon_start
state = torch.tensor(state_size, dtype=torch.float32).unsqueeze(0)
for episode in range(num_episodes):
    episode_reward = 0
    for t in range(max_steps):
        # Select an action
        action = select_action(dqn, state, epsilon)
        next_node = action.next_node

        # Update the environment and get the next state and reward
        # Modify this part to match your specific environment and rewards
        next_state, reward, done = update_environment(state, action)
        next_state = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)
        episode_reward += reward

        # Store the transition in the replay buffer
        replay_buffer.push((state, action, reward, next_state, done))

        # Sample a random minibatch of transitions and perform DQN update
        if len(replay_buffer.memory) > batch_size:
            batch = replay_buffer.sample(batch_size)
            batch = np.array(batch, dtype=object).T
            state_batch = torch.cat(batch[0]).float()
            action_batch = torch.cat(batch[1]).long()
            reward_batch = torch.cat(batch[2]).float()
            next_state_batch = torch.cat(batch[3]).float()
            done_batch = torch.cat(batch[4]).int()

            # Compute Q-values and target Q-values
            Q = dqn(state_batch).gather(1, action_batch.unsqueeze(1))
            max_next_Q = target_dqn(next_state_batch).max(1)[0].detach()
            target_Q = reward_batch + gamma * max_next_Q * (1 - done_batch)

            # Update the DQN
            loss = nn.MSELoss()(Q, target_Q.unsqueeze(1))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Update target DQN
        if t % target_update == 0:
            target_dqn.load_state_dict(dqn.state_dict())
        if done:
            break

    # Decay epsilon
    epsilon = max(epsilon_end, epsilon * epsilon_decay)


NameError: name 'state_size' is not defined