In [1]:
import random
import numpy as np
import tqdm
import time
import pandas as pd
import matplotlib.pyplot as plt

## **1. MRP 구현**

In [2]:
#상태 전이 확률을 정의.
#각 state와, state에서 전이 가능한 state, 전이할 확률을 순서대로 정의

transition_matrix = {
    'facebook': {     #state facebook은 다음에 90% 확률로 facebook, 10%의 확률로 class 1으로 이동
        'facebook': 0.9,
        'class 1': 0.1
    },
    'class 1': {      #state class 1은 다음에 50% 확률로 facebook, 50%의 확률로 class 2로 이동
        'facebook': 0.5,
        'class 2': 0.5
    },
    'class 2': {       #state class 2은 다음에 20% 확률로 sleep, 80%의 확률로 class 3으로 이동
        'sleep': 0.2,
        'class 3': 0.8
    },
    'class 3': {
        'pub': 0.4,
        'pass': 0.6
    },
    'pub': {
            'class 1': 0.2,
            'class 2': 0.4,
            'class 3': 0.4,
        },
    'pass': {         # pass state는 반드시 sleep 1으로 이동
        'sleep': 1
    },
    'sleep': {'sleep': 1}   #sleep은 sleep state만으로 이동하며, terminal state
}

In [3]:
def reward(state):
    return {
        'facebook': -1,
        'class 1': -2,
        'class 2': -2,
        'class 3': -2, 
        'pub': 1,
        'pass': 10,
        'sleep': 0,
    }[state]   

In [4]:
def stochastic_choice(choices):
    values, probs = zip(*choices)
    value = np.random.choice(values, size=1, replace=True, p=probs)

    return value[0]

def Markov_Rewrad_Process(state, gamma):

    total_reward = 0
    t = 0
    
    while state != 'sleep':                       #에피소드가 끝날 때 까지 반복
        # print("현재 상태: {}".format(state))  # 현재 state를 출력

        total_reward += pow(gamma, t) * reward(state)
        
        next_state_prob = transition_matrix[state]  #현재 상태에서 전이 가능한 state와 확률을 가져옴
        # print(next_state_prob)
        
        next_state = stochastic_choice(next_state_prob.items()) #상태 전이 확률 정보를 통해 다음 state 결정

        # print("다음 상태: {}".format(next_state))         #선택된 다음 상태 출력

        state = next_state                         #다음 상태를 현재 상태로 입력하고, 반복
        t += 1
        # print()

    # print("Agent go to sleeep. Episode Done!")
    return total_reward

In [5]:
def calculate_state_value_mrp(state, gamma, episode):

    reward_list = []

    for t in tqdm.tqdm(range(episode)):
        re_t = Markov_Rewrad_Process(state, gamma)
        reward_list.append(re_t)

    # print(sum(reward_list))
    return round(sum(reward_list) / episode, 2)   

In [6]:
state_value = {key:0 for key in set(transition_matrix.keys())}
for target_state in set(transition_matrix.keys()):
    s_v = calculate_state_value_mrp(target_state, 0.9, 400)
    state_value[target_state] = s_v

state_value

100%|████████████████████████████████████████████████████████████████████████████| 400/400 [00:00<00:00, 4456.18it/s]
100%|████████████████████████████████████████████████████████████████████████████| 400/400 [00:00<00:00, 2068.22it/s]
100%|███████████████████████████████████████████████████████████████████████████| 400/400 [00:00<00:00, 32707.31it/s]
100%|████████████████████████████████████████████████████████████████████████████| 400/400 [00:00<00:00, 7993.95it/s]
100%|████████████████████████████████████████████████████████████████████████████| 400/400 [00:00<00:00, 7036.18it/s]
100%|█████████████████████████████████████████████████████████████████████████| 400/400 [00:00<00:00, 3403086.41it/s]
100%|████████████████████████████████████████████████████████████████████████████| 400/400 [00:00<00:00, 1472.09it/s]


{'pub': 2.04,
 'class 1': -4.77,
 'pass': 10.0,
 'class 2': 1.01,
 'class 3': 4.01,
 'sleep': 0.0,
 'facebook': -7.36}

## **2. Gamma Experiment**

### **2-1. State, Action, Reward, Transition prob def**

In [7]:
# 행동은 상태전이확률과 비슷하게 현재 상태에서 수행 가능한 행동으로 표현
actions = {
    'facebook': ['continue', 'go to class'],
    'class 1': ['checkout facebook', 'study'],
    'class 2': ['sleep', 'study'],
    'class 3': ['go to the pub', 'study'],
    'pub': ["return to study"],
    'sleep': ['sleep'],
}

#action의 key에 따라 states 재정의
states = set(actions.keys())  #state를 다시 정의

In [8]:
# 상태 전이확률은 이제 행동의 영향을 함께 받기 때문에, 다시 정의할 필요가 있음
def transition_prob(state, action):
    '''transition probabilities given state and action'''
    return {
        'facebook': {
            'continue': {'facebook': 1},
            'go to class': {'class 1': 1}
        },
        'class 1': {
            'checkout facebook': {'facebook': 1},
            'study': {'class 2': 1}
        },
    #class2는 sleep = {sleep, 1}, study = {class 3, 1} 의 전이 확률을 가짐
    'class 2': {
            'sleep': {'sleep': 1},
            'study': {'class 3': 1}
        },
    #class3는 go to the pub= {pub, 1}, study={sleep, 1}의 전이 확률을 가짐
    'class 3': {
            'go to the pub': {'pub': 1},
            'study': {'sleep': 1}
        },
    #pub은 return to study={class 1: 0.2 , class 2 : 0.4, class 3 : 0.4}의 전이 확률을 가짐
    'pub': {
            "return to study" : {
                'class 1': 0.2,
                'class 2': 0.4,
                'class 3': 0.4,
            }
        },
    #sleep은 sleep=1의 transition을 가짐
    'sleep': {'sleep': {'sleep': 1}},
}[state][action]

In [9]:
def reward(state, action):
    return {
        'facebook': {'continue': -1, 'go to class': 0},
        'class 1': {'checkout facebook': -1, 'study': -2},
        'class 2': {'sleep': 0, 'study': -2},
        'class 3': {'go to the pub': 1, 'study': 10},
        'pub': {"return to study" : 0},
        'sleep': {'sleep': 0},
    }[state][action]  

In [10]:
#MDP의 구성요소들을 입력시켜 mdu_tuple 생성
mdp_tuple = (states, actions, transition_prob, reward)

### **2-2. Value Iteration Model 정의**

In [11]:
best_policy = {'facebook': 'go to class',
               'class 3': 'study',
               'class 2': 'study',
               'sleep': 'sleep',
               'pub': 'return to study',
               'class 1': 'study'}

In [12]:
#함수를 통해 정의하면 쉽게 확인 가능
def value_iteration(mdp_tuple, steps=1, gamma = 0.9):
    states, actions, transition_prob, reward = mdp_tuple    #입력받은 mdp_tuple을 분리하여 정의

    # state value initialize
    value = {s: 0 for s in states}    #value iteration 시작 전 value값 초기화
    for i in range(steps):            #설정한 반복 횟수동안 value iteration 진행
        for s, v in value.items():    #모든 상태에 대해 반복
            rewards_in_s = []
            for a in actions[s]:      #현재 선택된 상태에서 수행 가능한 행동에 대한 보상 계산
                r = reward(s, a)
                reward_for_a = r
                for s_bar, p_ss in transition_prob(s, a).items(): #실제 보상에 이전 iteration으로 얻은 가치 함수의 값을 더해줌
                    reward_for_a += gamma * p_ss * value[s_bar]
                rewards_in_s.append(reward_for_a)
            if len(rewards_in_s) > 0:       #만약 가치 값이 존재한다면, 그중 가장 큰 가치 값을 value로 선택
                value[s] = max(rewards_in_s)

        print("Value_Iteration_{}:{}".format(i, value))
        print("="*100)
    return value

mdp_tuple = (states, actions, transition_prob, reward)
print("mdp_tuple(함수 입력값)", mdp_tuple)
value = value_iteration(mdp_tuple, steps=5)

print("최종 가치 함수(함수 출력값):", value)

mdp_tuple(함수 입력값) ({'pub', 'class 1', 'class 2', 'class 3', 'sleep', 'facebook'}, {'facebook': ['continue', 'go to class'], 'class 1': ['checkout facebook', 'study'], 'class 2': ['sleep', 'study'], 'class 3': ['go to the pub', 'study'], 'pub': ['return to study'], 'sleep': ['sleep']}, <function transition_prob at 0x7f14ba17c2c0>, <function reward at 0x7f14ba17c0e0>)
Value_Iteration_0:{'pub': 0.0, 'class 1': -1.0, 'class 2': 0.0, 'class 3': 10.0, 'sleep': 0.0, 'facebook': -0.9}
Value_Iteration_1:{'pub': 3.4200000000000004, 'class 1': -1.81, 'class 2': 7.0, 'class 3': 10.0, 'sleep': 0.0, 'facebook': -1.629}
Value_Iteration_2:{'pub': 5.794200000000001, 'class 1': 4.3, 'class 2': 7.0, 'class 3': 10.0, 'sleep': 0.0, 'facebook': 3.87}
Value_Iteration_3:{'pub': 6.894000000000001, 'class 1': 4.3, 'class 2': 7.0, 'class 3': 10.0, 'sleep': 0.0, 'facebook': 3.87}
Value_Iteration_4:{'pub': 6.894000000000001, 'class 1': 4.3, 'class 2': 7.0, 'class 3': 10.0, 'sleep': 0.0, 'facebook': 3.87}
최종 가치 함수(

In [13]:
def policy_from_value(state, action, value, prob, reward):   #학습된 value와 전이 확률, 보상에 대한 정보를 입력받음
    rewards_action = []
    for action in actions[state]:         # 현재 상태에서 선택 가능한 행동의 가치를 모두 계산
        r = reward(state, action)
        r_action = r
        # print(state, action)
        for s, p_ss in prob(state, action).items():
            r_action += p_ss *  (reward(state, action) + value[s])
        rewards_action.append((r_action, action))
    return max(rewards_action)[1]     #현재 상태에서 가장 큰 가치를 가지는 행동을 선택. 결국 최적 행동을 선택하는 최적 정책이 됨

In [14]:
# 기존 MDP에서 정책 함수에 사용하기 위한 value 값 추가
def MDP_VI(mdp_tuple, state, policy, value):
    states, actions, prob, reward = mdp_tuple
    gamma = 0.9

    total_reward = 0
    k = 0

    while state != 'sleep':
        print("current state: {}".format(state))
        ##############################코드 작성 ####################################

        # 변경된 정책 함수와, 함수 값에 필요한 입력 데이터 설정
        selected_action = policy_from_value(state, actions, value, prob, reward)


        # 상태, 행동에 따른 전이 확률과, 전이 확률에 따라 다음 상태를 선택하는 함수 작성
        next_state_probs = prob(state, selected_action)
        next_state = stochastic_choice(next_state_probs.items())

        # 보상함수 작성
        r = reward(state, selected_action)


        #다음 상태를 현재 상태로 변경, 누적 보상과 에피소드 수 증가
        state = next_state
        total_reward += r
        k += 1
        #############################################################################


        print("action: {}".format(selected_action))
        print("next_state:", end='');print(next_state)
        print("reward: {}".format(r))

        print()
    print("total reward: {}".format(total_reward))

In [15]:
# MDP(mdp_tuple, 'class 1', policy_from_value(state, actions, value, transition_prob, reward))
value_v = value
MDP_VI(mdp_tuple, 'class 1', policy_from_value, value_v)

current state: class 1
action: study
next_state:class 2
reward: -2

current state: class 2
action: study
next_state:class 3
reward: -2

current state: class 3
action: study
next_state:sleep
reward: 10

total reward: 6


In [16]:
best_policy = {'facebook': 'go to class',
               'class 3': 'study',
               'class 2': 'study',
               'sleep': 'sleep',
               'pub': 'return to study',
               'class 1': 'study'}

def value_iteration(mdp_tuple, steps=1, gamma=0.9, best_policy = best_policy):
    states, actions, transition_prob, reward = mdp_tuple
    value = {s: 0 for s in states}  # 초기 가치 함수 설정
    
    start_time = time.time()
    updated_states = 0  # 업데이트된 상태 개수
    best_count = 0
    
    for i in range(steps):
        new_value = value.copy()
        for s in states:
            rewards_in_s = []
            for a in actions[s]:
                r = reward(s, a)
                reward_for_a = r + gamma * sum(p_ss * value[s_bar] for s_bar, p_ss in transition_prob(s, a).items())
                rewards_in_s.append(reward_for_a)
            
            if rewards_in_s:
                new_value[s] = max(rewards_in_s)
                updated_states += 1

        v_policy = {}
        for state in states:
            best_action = policy_from_value(state, actions, new_value, transition_prob, reward)
            v_policy[state] = best_action
        
        if v_policy == best_policy:
            if best_count > 1:
                break # 가치 함수가 더 이상 변하지 않으면 종료
            best_count += 1
        value = new_value
    
    elapsed_time = time.time() - start_time
    return value, i + 1, elapsed_time, updated_states

In [17]:
# 감마값을 변경하며 실험
gamma_values = [0.1, 0.2, 0.3, 0.4, 0.5, 0.7, 0.9, 0.95, 0.99, 1]
num_steps = 200
results = []

for gamma in gamma_values:
    final_value, iterations, time_taken, updates = value_iteration(mdp_tuple, steps=num_steps, gamma=gamma)
    results.append([gamma, iterations, time_taken, updates])

# 데이터프레임 생성 및 출력
df_results = pd.DataFrame(results, columns=["Gamma", "Iterations", "Time Taken (s)", "Updated States"])
print(df_results)

   Gamma  Iterations  Time Taken (s)  Updated States
0   0.10         200        0.011825            1200
1   0.20         200        0.011791            1200
2   0.30         200        0.011793            1200
3   0.40           4        0.000239              24
4   0.50           4        0.000237              24
5   0.70           4        0.000241              24
6   0.90           4        0.000236              24
7   0.95           4        0.000235              24
8   0.99           4        0.000236              24
9   1.00           4        0.000244              24
