In [5]:
# =========================================================
# DQN (오프라인 학습, retracing 경고 제거 버전)
# - CSV: s0, action, reward, s2_0, done
# - 고정 배치 크기 + @tf.function(reduce_retracing=True) 훈련 루프
# =========================================================
import os, csv, random
import numpy as np
import tensorflow as tf
from tensorflow import keras

# -----------------------
# 설정
# -----------------------
CSV_PATH        = "toy_transitions.csv"
ROLLOUT_CSV     = "policy_rollout.csv"
SEED            = 42
GAMMA           = 0.99
LR              = 1e-3
BATCH_SIZE      = 128           # 고정 배치 크기 (변경 시 input_signature도 함께 바꿔야 함)
EPOCH_STEPS     = 2000
TARGET_SYNC     = 250
HIDDEN          = 64
NOISE_STD       = 0.1
MAX_EP_LEN      = 20
DATA_EPISODES   = 4000
INIT_STATE_MIN  = -5.0
INIT_STATE_MAX  =  5.0

np.random.seed(SEED)
random.seed(SEED)
tf.random.set_seed(SEED)
tf.config.run_functions_eagerly(False)  # 그래프 모드 고정(선택)

# -----------------------
# 장난감 MDP
# -----------------------
def env_step(x, a):
    x2 = x + float(a) + np.random.normal(0.0, NOISE_STD)
    reward = -abs(x2)
    done = abs(x2) > 6.0
    return x2, reward, done

def env_reset():
    return np.random.uniform(INIT_STATE_MIN, INIT_STATE_MAX)

# -----------------------
# 1) CSV 생성
# -----------------------
def make_toy_csv(path=CSV_PATH, episodes=DATA_EPISODES):
    header = ["s0", "action", "reward", "s2_0", "done"]
    with open(path, "w", newline="") as f:
        w = csv.writer(f)
        w.writerow(header)
        for _ in range(episodes):
            x = env_reset()
            for _t in range(MAX_EP_LEN):
                a = random.choice([-1, 1])
                x2, r, done = env_step(x, a)
                w.writerow([x, a, r, x2, int(done)])
                x = x2
                if done:
                    break
    print(f"[CSV] Saved transitions to: {path}")

# -----------------------
# 2) CSV 로드
# -----------------------
def load_csv(path=CSV_PATH):
    data = np.loadtxt(path, delimiter=",", skiprows=1)
    s  = data[:, 0:1].astype(np.float32)  # (N,1)
    a  = data[:, 1].astype(np.int32)      # (N,)
    r  = data[:, 2].astype(np.float32)    # (N,)
    s2 = data[:, 3:4].astype(np.float32)  # (N,1)
    d  = data[:, 4].astype(np.float32)    # (N,)
    a_idx = ((a + 1) // 2).astype(np.int32)  # -1->0, +1->1
    return s, a_idx, r, s2, d

# -----------------------
# Q 네트워크
# -----------------------
def make_qnet():
    inp = keras.layers.Input(shape=(1,))
    x = keras.layers.Dense(HIDDEN, activation="relu")(inp)
    x = keras.layers.Dense(HIDDEN, activation="relu")(x)
    out = keras.layers.Dense(2, activation="linear")(x)
    model = keras.Model(inp, out)
    model.compile(optimizer=keras.optimizers.Adam(LR), loss=keras.losses.Huber())
    return model

# -----------------------
# 배치 샘플링 (항상 고정 크기 반환)
# -----------------------
def minibatch_sampler(N, batch_size=BATCH_SIZE):
    idx = np.random.randint(0, N, size=(batch_size,))
    return idx

# -----------------------
# 오프라인 DQN 학습 (커스텀 train_step)
# -----------------------
def offline_dqn_train(path=CSV_PATH, epochs=5):
    s, a, r, s2, d = load_csv(path)
    N = s.shape[0]

    online = make_qnet()
    target = make_qnet()
    target.set_weights(online.get_weights())

    # -------- 커스텀 훈련 스텝 (고정 signature) --------
    @tf.function(
        reduce_retracing=True,
        input_signature=[
            tf.TensorSpec(shape=(BATCH_SIZE, 1), dtype=tf.float32),  # sb
            tf.TensorSpec(shape=(BATCH_SIZE,),   dtype=tf.int32),    # ab
            tf.TensorSpec(shape=(BATCH_SIZE,),   dtype=tf.float32),  # rb
            tf.TensorSpec(shape=(BATCH_SIZE, 1), dtype=tf.float32),  # s2b
            tf.TensorSpec(shape=(BATCH_SIZE,),   dtype=tf.float32),  # db
        ],
    )
    def train_step(sb, ab, rb, s2b, db):
        # Q_target(s2,·)
        q_next = target(s2b, training=False)                  # (B,2)
        max_next = tf.reduce_max(q_next, axis=1)              # (B,)
        y = rb + (1.0 - db) * tf.constant(GAMMA, tf.float32) * max_next  # (B,)

        with tf.GradientTape() as tape:
            q_curr = online(sb, training=True)                # (B,2)
            # 선택한 행동 a에 해당하는 Q만 추출
            idx = tf.stack([tf.range(tf.shape(ab)[0], dtype=tf.int32), ab], axis=1)  # (B,2)
            q_sel = tf.gather_nd(q_curr, idx)                 # (B,)
            # Huber 손실
            loss = tf.keras.losses.huber(y_true=y, y_pred=q_sel)

        grads = tape.gradient(loss, online.trainable_variables)
        online.optimizer.apply_gradients(zip(grads, online.trainable_variables))
        return tf.reduce_mean(loss)

    # ---------------------------------------------------

    global_step = 0
    for ep in range(1, epochs + 1):
        losses = []
        for _ in range(EPOCH_STEPS):
            idx = minibatch_sampler(N, BATCH_SIZE)
            sb  = s[idx]
            ab  = a[idx]
            rb  = r[idx]
            s2b = s2[idx]
            db  = d[idx]

            # numpy -> tf.Tensor (dtype/shape 고정)
            sb  = tf.convert_to_tensor(sb,  dtype=tf.float32)   # (B,1)
            ab  = tf.convert_to_tensor(ab,  dtype=tf.int32)     # (B,)
            rb  = tf.convert_to_tensor(rb,  dtype=tf.float32)   # (B,)
            s2b = tf.convert_to_tensor(s2b, dtype=tf.float32)   # (B,1)
            db  = tf.convert_to_tensor(db,  dtype=tf.float32)   # (B,)

            loss = float(train_step(sb, ab, rb, s2b, db).numpy())
            losses.append(loss)

            global_step += 1
            if global_step % TARGET_SYNC == 0:
                target.set_weights(online.get_weights())

        print(f"[Train] epoch {ep}/{epochs} | steps {global_step} | loss {np.mean(losses):.6f}")

    return online

# -----------------------
# 정책 롤아웃 (평가)
# -----------------------
def greedy_action_from_q(qvec):
    a_idx = int(np.argmax(qvec))
    return -1 if a_idx == 0 else +1

def rollout(policy_model, episodes=20, path=ROLLOUT_CSV):
    header = ["episode", "t", "x", "action", "x2", "reward", "done"]
    with open(path, "w", newline="") as f:
        w = csv.writer(f)
        w.writerow(header)
        for ep in range(1, episodes + 1):
            x = env_reset()
            for t in range(1, MAX_EP_LEN + 1):
                q = policy_model.predict(np.array([[x]], dtype=np.float32), verbose=0)[0]
                a = greedy_action_from_q(q)
                x2, r, done = env_step(x, a)
                w.writerow([ep, t, x, a, x2, r, int(done)])
                x = x2
                if done:
                    break
    print(f"[Rollout] Saved policy rollouts to: {path}")

# -----------------------
# 메인
# -----------------------
def main():
    if not os.path.exists(CSV_PATH):
        make_toy_csv(CSV_PATH, episodes=DATA_EPISODES)

    model = offline_dqn_train(CSV_PATH, epochs=8)
    rollout(model, episodes=50, path=ROLLOUT_CSV)
    model.save("dqn_toy_model.keras")
    print("[Done] model saved as dqn_toy_model.keras")

if __name__ == "__main__":
    main()


[CSV] Saved transitions to: toy_transitions.csv
[Train] epoch 1/8 | steps 2000 | loss 0.374068
[Train] epoch 2/8 | steps 4000 | loss 0.531465
[Train] epoch 3/8 | steps 6000 | loss 0.571463
[Train] epoch 4/8 | steps 8000 | loss 0.184641
[Train] epoch 5/8 | steps 10000 | loss 0.085903
[Train] epoch 6/8 | steps 12000 | loss 0.081002
[Train] epoch 7/8 | steps 14000 | loss 0.077639
[Train] epoch 8/8 | steps 16000 | loss 0.075379
[Rollout] Saved policy rollouts to: policy_rollout.csv
[Done] model saved as dqn_toy_model.keras


In [10]:
# =========================================================
# Saved DQN Model Loader & Rollout (for toy MDP)
# - dqn_toy_model.keras 불러와서 정책 실행
# - 결과를 policy_rollout_from_saved.csv 로 저장
# - 단일 상태/배치 상태에 대한 Q값, 행동 추론 함수 포함
# =========================================================
# pip install tensorflow

import csv
import numpy as np
from tensorflow import keras

# ----- 장난감 MDP 설정 (이전과 동일) -----
NOISE_STD = 0.1
INIT_STATE_MIN, INIT_STATE_MAX = -5.0, 5.0
MAX_EP_LEN = 20

def env_step(x, a):
    """ x(실수), a∈{-1,+1} -> x', r, done """
    x2 = x + float(a) + np.random.normal(0.0, NOISE_STD)
    reward = -abs(x2)            # 원점(0)에 가까울수록 보상↑
    done = abs(x2) > 6.0         # 영역 이탈 시 종료
    return x2, reward, done

def env_reset():
    return np.random.uniform(INIT_STATE_MIN, INIT_STATE_MAX)

# ----- 행동 매핑 -----
# Q네트워크 출력은 길이 2 벡터: index 0 -> action -1, index 1 -> action +1
def idx_to_action(a_idx: int) -> int:
    return -1 if a_idx == 0 else +1

def action_to_idx(a: int) -> int:
    # -1 -> 0, +1 -> 1
    return 0 if a == -1 else 1

# ----- 모델 불러오기 -----
def load_policy_model(path="dqn_toy_model.keras"):
    model = keras.models.load_model(path)
    return model

# ----- 단일 상태에서 Q값/행동 추론 -----
def q_values_for_state(model, x: float):
    """ 상태 x(스칼라)에 대한 Q(s,·) -> 길이 2 numpy 배열 """
    s = np.array([[x]], dtype=np.float32)  # (1,1)
    q = model.predict(s, verbose=0)[0]     # (2,)
    return q

def greedy_action(model, x: float) -> int:
    q = q_values_for_state(model, x)
    a_idx = int(np.argmax(q))
    return idx_to_action(a_idx)

# ----- 배치 상태에서 Q/행동 추론 -----
def q_values_for_states(model, xs):
    """ xs: (N,) 또는 (N,1) -> Q: (N,2) """
    xs = np.array(xs, dtype=np.float32).reshape(-1, 1)
    q = model.predict(xs, verbose=0)  # (N,2)
    return q

def greedy_actions(model, xs):
    """ xs: (N,) -> actions: (N,) with values in {-1,+1} """
    q = q_values_for_states(model, xs)      # (N,2)
    a_idx = np.argmax(q, axis=1)            # (N,)
    acts = np.where(a_idx == 0, -1, +1)     # (N,)
    return acts

# ----- 정책 롤아웃 & CSV 저장 -----
def rollout_to_csv(model, episodes=20, out_csv="policy_rollout_from_saved.csv"):
    header = ["episode", "t", "x", "action", "x2", "reward", "done"]
    with open(out_csv, "w", newline="") as f:
        w = csv.writer(f)
        w.writerow(header)
        for ep in range(1, episodes + 1):
            x = env_reset()
            for t in range(1, MAX_EP_LEN + 1):
                a = greedy_action(model, x)
                x2, r, done = env_step(x, a)
                w.writerow([ep, t, x, a, x2, r, int(done)])
                x = x2
                if done:
                    break
    print(f"[Saved] Rollout to: {out_csv}")

# ----- 데모 -----
if __name__ == "__main__":
    model = load_policy_model("dqn_toy_model.keras")

    # 1) 단일 상태 추론 예시
    x0 = 3.0
    q = q_values_for_state(model, x0)
    a = greedy_action(model, x0)
    print(f"State {x0:.2f} -> Q={q} -> greedy action={a}")

    # 2) 배치 상태 추론 예시
    xs = np.linspace(-5, 5, num=11)  # -5,-4,...,5
    qs = q_values_for_states(model, xs)
    acts = greedy_actions(model, xs)
    print("xs:", xs)
    print("Q(xs):\n", qs)
    print("greedy actions:", acts)

    # 3) 정책 롤아웃 CSV로 저장
    rollout_to_csv(model, episodes=50, out_csv="policy_rollout_from_saved.csv")


State 3.00 -> Q=[-23.015701 -18.664534] -> greedy action=1
xs: [-5. -4. -3. -2. -1.  0.  1.  2.  3.  4.  5.]
Q(xs):
 [[ -8.60376  -18.175146]
 [-14.46262  -21.285158]
 [-18.480413 -23.066622]
 [-21.44772  -23.476557]
 [-23.136992 -23.361061]
 [-23.66525  -23.828676]
 [-23.532907 -22.914711]
 [-23.79907  -21.384521]
 [-23.015703 -18.664534]
 [-21.457558 -14.85654 ]
 [-18.816319  -9.753853]]
greedy actions: [-1 -1 -1 -1 -1 -1  1  1  1  1  1]
[Saved] Rollout to: policy_rollout_from_saved.csv


In [13]:
from tensorflow import keras
import numpy as np

model = keras.models.load_model("dqn_toy_model.keras")

def greedy_action(x):
    q = model.predict(np.array([[x]], dtype=np.float32), verbose=0)[0]
    return -1 if np.argmax(q) == 0 else +1

for x in [-3.0, -1.0, 0.0, 1.0, 3.0]:
    print(x, greedy_action(x))


-3.0 -1
-1.0 -1
0.0 -1
1.0 1
3.0 1


In [14]:
import csv, numpy as np

rollout_csv = "policy_rollout.csv"  # 또는 policy_rollout_from_saved.csv
returns = []
lengths = []
cur_ep, cur_ret, cur_len = None, 0.0, 0

with open(rollout_csv, newline="") as f:
    r = csv.DictReader(f)
    for row in r:
        ep = int(row["episode"])
        if cur_ep is None:
            cur_ep = ep
        if ep != cur_ep:
            returns.append(cur_ret)
            lengths.append(cur_len)
            cur_ep, cur_ret, cur_len = ep, 0.0, 0
        cur_ret += float(row["reward"])
        cur_len += 1
    if cur_ep is not None:
        returns.append(cur_ret)
        lengths.append(cur_len)

print(f"Episodes: {len(returns)}")
print(f"Avg return: {np.mean(returns):.3f} ± {np.std(returns):.3f}")
print(f"Avg length: {np.mean(lengths):.2f} steps")
print(f"Best return: {np.max(returns):.3f}  |  Worst return: {np.min(returns):.3f}")


Episodes: 50
Avg return: -20.192 ± 5.069
Avg length: 4.24 steps
Best return: -11.022  |  Worst return: -28.118
