<a href="https://colab.research.google.com/github/noswad/Python/blob/master/RCMAB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install optuna

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
import optuna
import joblib

class RCMAB:
    def __init__(self, context_dim, action_dim, hidden_dim, kappa):
        self.context_dim = context_dim  # 上下文的維度（特徵數量）
        self.action_dim = action_dim    # 動作的維度（可選動作數量）
        self.kappa = kappa              # 用於控制貝葉斯回歸中獎勵的權重

        # 初始化神經網絡
        self.model = nn.Sequential(
            nn.Linear(context_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )

        # 初始化貝葉斯回歸參數
        self.mu = np.zeros(action_dim)  # 貝葉斯回歸的均值參數
        self.sigma = np.ones(action_dim)  # 貝葉斯回歸的標準差參數
        self.memory = []  # 用於存儲訓練資料

        # 初始化優化器
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.01)

    def predict(self, context):
        """ 預測給定上下文的獎勵 """
        context_tensor = torch.tensor(context, dtype=torch.float32)
        with torch.no_grad():  # 在不計算梯度的情況下進行前向傳播
            return self.model(context_tensor).numpy()

    def sample_parameters(self):
        """ 抽樣貝葉斯回歸參數 """
        return np.random.normal(self.mu, self.sigma)

    def select_action(self, context):
        """ 根據上下文選擇最佳動作 """
        predicted_rewards = self.predict(context)
        sampled_parameters = self.sample_parameters()
        mv_values = self.kappa * predicted_rewards - np.square(sampled_parameters)
        return np.argmax(mv_values)

    def update(self, context, action, reward):
        """ 更新模型參數 """
        self.memory.append((context, action, reward))  # 添加訓練資料到記憶體

        # 更新神經網絡
        context_tensor = torch.tensor(context, dtype=torch.float32)
        reward_tensor = torch.tensor([reward], dtype=torch.float32)

        self.optimizer.zero_grad()
        output = self.model(context_tensor)[action]
        loss = (output - reward_tensor).pow(2).mean()
        loss.backward()
        self.optimizer.step()

    def train(self, contexts, actions, rewards, epochs=100):
        """ 訓練模型 """
        for epoch in range(epochs):
            for context, action, reward in zip(contexts, actions, rewards):
                self.update(context, action, reward)

    def evaluate(self, contexts, actions, rewards):
        """ 評估模型 """
        total_loss = 0
        for context, action, reward in zip(contexts, actions, rewards):
            context_tensor = torch.tensor(context, dtype=torch.float32)
            reward_tensor = torch.tensor([reward], dtype=torch.float32)
            with torch.no_grad():
                output = self.model(context_tensor)[action]
                loss = (output - reward_tensor).pow(2).mean().item()
                total_loss += loss
        return total_loss / len(contexts)

    def save_model(self, path):
        """ 保存模型 """
        torch.save(self.model.state_dict(), path)

    def load_model(self, path):
        """ 加載模型 """
        self.model.load_state_dict(torch.load(path))
        self.model.eval()  # 設置為評估模式

# 定義脈絡、動作和獎勵函數
def get_current_context():
    # 回傳使用者的特徵向量，例如年齡、性別等
    return {"age": 25, "gender": "male"}

# 創建數據集
context_dim = 5
action_dim = 3
data_size = 100
contexts = np.random.rand(data_size, context_dim)
actions = np.random.randint(0, action_dim, size=data_size)
rewards = np.random.rand(data_size)

# 分割訓練和測試數據
X_train, X_test, y_train, y_test = train_test_split(contexts, rewards, test_size=0.2, random_state=42)

# 使用 Optuna 進行貝葉斯優化
def objective(trial):
    hidden_dim = trial.suggest_int('hidden_dim', 5, 50)
    kappa = trial.suggest_float('kappa', 0.1, 5.0)

    model = RCMAB(context_dim, action_dim, hidden_dim, kappa)
    model.train(X_train, actions[:len(X_train)], y_train, epochs=100)
    loss = model.evaluate(X_test, actions[len(X_train):], y_test)

    return loss

study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=50)

# 最佳參數
best_hidden_dim = study.best_params['hidden_dim']
best_kappa = study.best_params['kappa']
print(f"Best hidden_dim: {best_hidden_dim}, Best kappa: {best_kappa}")

# 使用最佳參數重新訓練模型
best_model = RCMAB(context_dim, action_dim, best_hidden_dim, best_kappa)
best_model.train(X_train, actions[:len(X_train)], y_train, epochs=100)
loss = best_model.evaluate(X_test, actions[len(X_train):], y_test)
print(f"Evaluation loss: {loss}")

# 保存模型
model_path = 'best_rcmab_model.pth'
best_model.save_model(model_path)
print(f"Model saved to {model_path}")

# 加載並評估模型
loaded_model = RCMAB(context_dim, action_dim, best_hidden_dim, best_kappa)
loaded_model.load_model(model_path)
loaded_loss = loaded_model.evaluate(X_test, actions[len(X_train):], y_test)
print(f"Loaded model evaluation loss: {loaded_loss}")