### Step 1: Import Required Libraries

In [1]:
import gymnasium as gym
import numpy as np
import optuna
import torch
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.logger import configure
from stable_baselines3.common.vec_env import DummyVecEnv
import matplotlib.pyplot as plt

### Step 2: Define Environment

In [2]:

class IbuprofenEnv(gym.Env):
    def __init__(self, normalize=False):
        super(IbuprofenEnv, self).__init__()

        # Define the action space and observation space
        self.action_space = gym.spaces.Discrete(5)
        self.observation_space = gym.spaces.Box(low=0, high=100, shape=(1,), dtype=np.float32)

        # Pharmacokinetics parameters
        self.therapeutic_range = (10, 50)
        self.half_life = 2.0
        self.clearance_rate = 0.693 / self.half_life
        self.time_step_hours = 1
        self.bioavailability = 0.9
        self.volume_of_distribution = 0.15
        self.max_steps = 24
        self.current_step = 0
        self.plasma_concentration = 0.0
        self.state_buffer = []
        self.time_in_therapeutic_range = 0  # Track time in the therapeutic range
        self.normalize = normalize

    def reset(self, seed=None, **kwargs):
        super().reset(seed=seed)
        self.current_step = 0
        self.plasma_concentration = 0.0
        self.state_buffer = []
        self.time_in_therapeutic_range = 0  # Reset the counter for each episode
        state = np.array([self.plasma_concentration], dtype=np.float32)
        return self._normalize(state), {}

    def step(self, action):
        dose_mg = action * 200
        absorbed_mg = dose_mg * self.bioavailability
        absorbed_concentration = absorbed_mg / (self.volume_of_distribution * 70)
        self.plasma_concentration += absorbed_concentration
        self.plasma_concentration *= np.exp(-self.clearance_rate * self.time_step_hours)

        state = np.array([self.plasma_concentration], dtype=np.float32)
        normalized_state = self._normalize(state)

        self.state_buffer.append(self.plasma_concentration)

        # Track time in therapeutic range
        if self.therapeutic_range[0] <= self.plasma_concentration <= self.therapeutic_range[1]:
            self.time_in_therapeutic_range += 1  # Increment if in the therapeutic range

        # Reward shaping logic
        if self.therapeutic_range[0] <= self.plasma_concentration <= self.therapeutic_range[1]:
            reward = 10  # Positive reward for being in the therapeutic range
        else:
            if self.plasma_concentration < self.therapeutic_range[0]:
                reward = -5 - (self.therapeutic_range[0] - self.plasma_concentration) * 0.5
            elif self.plasma_concentration > self.therapeutic_range[1]:
                reward = -5 - (self.plasma_concentration - self.therapeutic_range[1]) * 0.5

        # Add a heavy penalty for toxic concentrations
        if self.plasma_concentration > 100:
            reward -= 15  # Severe penalty for toxic levels

        self.current_step += 1
        done = self.current_step >= self.max_steps
        truncated = False  # For this environment, there's no specific truncation condition
        info = {"plasma_concentration": self.plasma_concentration}  # Add plasma_concentration to the info dictionary

        return normalized_state, reward, done, truncated, info

    def _normalize(self, state):
        if self.normalize and len(self.state_buffer) > 1:
            mean = np.mean(self.state_buffer)
            std = np.std(self.state_buffer) + 1e-8
            return (state - mean) / std
        return state




In [3]:
class RewardLoggingCallback(BaseCallback):
    def __init__(self):
        super(RewardLoggingCallback, self).__init__()
        self.episode_rewards = []
        self.current_episode_reward = 0

    def _on_step(self) -> bool:
        # Accumulate reward for the current step
        self.current_episode_reward += self.locals["rewards"][0]

        # If the episode ends, log the reward
        if self.locals["dones"][0]:
            self.episode_rewards.append(self.current_episode_reward)
            self.current_episode_reward = 0  # Reset for the next episode
        return True

In [4]:
class CustomLoggingCallback(BaseCallback):
    def __init__(self, writer, log_interval=10):
        super(CustomLoggingCallback, self).__init__()
        self.writer = writer
        self.log_interval = log_interval
        self.episode_rewards = []
        self.current_episode_reward = 0
        self.time_in_therapeutic_range = 0

    def _on_step(self) -> bool:
        # Track rewards per episode
        self.current_episode_reward += self.locals["rewards"][0]

        # Track time in therapeutic range
        if self.locals["infos"][0]["plasma_concentration"] >= self.env.therapeutic_range[0] and \
                self.locals["infos"][0]["plasma_concentration"] <= self.env.therapeutic_range[1]:
            self.time_in_therapeutic_range += 1

        # Log on every log_interval
        if self.n_calls % self.log_interval == 0:
            # Log policy loss, value loss, and entropy loss
            policy_loss = self.locals["losses"].get("policy_loss", 0)
            value_loss = self.locals["losses"].get("value_loss", 0)
            entropy_loss = self.locals["losses"].get("entropy_loss", 0)

            self.writer.add_scalar("Loss/Policy", policy_loss, self.n_calls)
            self.writer.add_scalar("Loss/Value", value_loss, self.n_calls)
            self.writer.add_scalar("Loss/Entropy", entropy_loss, self.n_calls)
            self.writer.add_scalar("Metrics/Time_in_Therapeutic_Range", self.time_in_therapeutic_range, self.n_calls)

        # If the episode ends, log the total reward for that episode
        if self.locals["dones"][0]:
            self.episode_rewards.append(self.current_episode_reward)
            self.current_episode_reward = 0  # Reset for next episode

        return True

### Step 5: Optimization of Hyperparameters

In [5]:
def optimize_ppo(trial):
    # Set up the environment
    env = DummyVecEnv([lambda: IbuprofenEnv(normalize=True)])

    # Hyperparameter tuning using Optuna
    lr = trial.suggest_float("learning_rate", 1e-5, 1e-3, log=True)
    gamma = trial.suggest_float("gamma", 0.90, 0.99)
    n_epochs = trial.suggest_int("n_epochs", 3, 10)
    ent_coef = trial.suggest_float("ent_coef", 1e-4, 1e-2, log=True)
    batch_size = trial.suggest_int("batch_size", 32, 512, step=32)
    n_steps = trial.suggest_int("n_steps", 64, 2048, step=64)
    gae_lambda = trial.suggest_float("gae_lambda", 0.8, 0.99)
    clip_range = trial.suggest_float("clip_range", 0.1, 0.3)

    model = PPO(
        "MlpPolicy",
        env,
        learning_rate=lr,
        gamma=gamma,
        n_epochs=n_epochs,
        ent_coef=ent_coef,
        batch_size=batch_size,
        n_steps=n_steps,
        gae_lambda=gae_lambda,
        clip_range=clip_range,
        verbose=0,
    )

    # Train the model
    model.learn(total_timesteps=24000)

    # Evaluate the model
    rewards = []
    for _ in range(100):
        obs = env.reset()
        total_reward = 0
        done = False
        while not done:
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, done, _ = env.step(action)
            total_reward += reward
        rewards.append(total_reward)

    return np.mean(rewards)


### Step 6: Perform Optimization

In [6]:
# Running the Optuna optimization
study = optuna.create_study(direction="maximize")
study.optimize(optimize_ppo, n_trials=100)

# Extracting best hyperparameters
best_params = study.best_params
print("Best Parameters:", best_params)

[I 2024-12-03 23:20:21,088] A new study created in memory with name: no-name-a86dd5da-2e39-4682-8930-03861da90fc5
We recommend using a `batch_size` that is a factor of `n_steps * n_envs`.
Info: (n_steps=832 and n_envs=1)
[I 2024-12-03 23:20:49,098] Trial 0 finished with value: -240.0 and parameters: {'learning_rate': 1.032587895743112e-05, 'gamma': 0.930133318656932, 'n_epochs': 6, 'ent_coef': 0.006661960318899696, 'batch_size': 96, 'n_steps': 832, 'gae_lambda': 0.9234313806855804, 'clip_range': 0.2596781596057674}. Best is trial 0 with value: -240.0.
We recommend using a `batch_size` that is a factor of `n_steps * n_envs`.
Info: (n_steps=960 and n_envs=1)
[I 2024-12-03 23:21:09,921] Trial 1 finished with value: -240.0 and parameters: {'learning_rate': 0.00019340424619755764, 'gamma': 0.9313684450677775, 'n_epochs': 8, 'ent_coef': 0.0001407091492971411, 'batch_size': 224, 'n_steps': 960, 'gae_lambda': 0.8987620593371276, 'clip_range': 0.22854361856702}. Best is trial 0 with value: -240

Best Parameters: {'learning_rate': 0.0008137948579639748, 'gamma': 0.9495267567320966, 'n_epochs': 9, 'ent_coef': 0.0013798549030065812, 'batch_size': 288, 'n_steps': 1664, 'gae_lambda': 0.8236677947525577, 'clip_range': 0.2963629220913797}


### Step 7: Train the agent with best hyperparameters

In [7]:
# Set up environment
env = DummyVecEnv([lambda: IbuprofenEnv(normalize=True)])

best_params = study.best_params

# Set up PPO model with hyperparameters
final_model = PPO(
    "MlpPolicy",
    env,
    learning_rate=best_params["learning_rate"],
    gamma=best_params["gamma"],
    n_epochs=best_params["n_epochs"],
    ent_coef=best_params["ent_coef"],
    batch_size=best_params["batch_size"],
    n_steps=best_params["n_steps"],
    gae_lambda=best_params["gae_lambda"],
    clip_range=best_params["clip_range"],
    verbose=1,
)

# Instantiate the custom callback
callback = CustomLoggingCallback()

# Train the model with the custom callback
final_model.learn(total_timesteps=1000, callback=callback)

Using cpu device


TypeError: CustomLoggingCallback.__init__() missing 1 required positional argument: 'writer'

### Step 8: Plot training and evaluation results

In [None]:
# Plot the loss curves
plt.figure(figsize=(12, 6))
plt.subplot(3, 1, 1)
plt.plot(callback.policy_losses, label='Policy Loss')
plt.title('Policy Loss')
plt.xlabel('Steps')
plt.ylabel('Loss')
plt.grid()

### Step 9: Evaluation 

In [None]:
plt.figure(figsize=(12,6))
plt.plot(callback.value_losses, label='Value Loss', color='orange')
plt.title('Value Loss')
plt.xlabel('Steps')
plt.ylabel('Loss')
plt.grid()
plt.show()


In [None]:
plt.figure(figsize=(12,6))
plt.plot(callback.entropy_losses, label='Entropy Loss', color='green')
plt.title('Entropy Loss')
plt.xlabel('Steps')
plt.ylabel('Loss')
plt.grid()


### Step 10: Visualize Optuna Results

In [None]:
optuna.visualization.plot_optimization_history(study).show()


In [None]:
optuna.visualization.plot_param_importances(study).show()

other visualization

In [None]:
therapeutic_range_times = []

for trajectory in plasma_concentration_trajectories:
    time_in_range = sum(env.therapeutic_range[0] <= c <= env.therapeutic_range[1] for c in trajectory)
    therapeutic_range_times.append(time_in_range / len(trajectory))

avg_time_in_therapeutic_range = np.mean(therapeutic_range_times)
print(f"Average Time in Therapeutic Range: {avg_time_in_therapeutic_range * 100:.2f}%")


In [None]:

toxic_level_exceedances = sum(c > 100 for trajectory in plasma_concentration_trajectories for c in trajectory)
print(f"Toxic Level Exceedances Across All Episodes: {toxic_level_exceedances}")

survival_rate = sum(1 for trajectory in plasma_concentration_trajectories if max(trajectory) <= 100) / len(plasma_concentration_trajectories)
print(f"Survival Rate: {survival_rate * 100:.2f}%")

In [None]:
plt.figure(figsize=(12, 8))
plt.imshow(plasma_concentration_trajectories, aspect='auto', cmap='viridis', interpolation='nearest')
plt.colorbar(label="Plasma Concentration (mg/L)")
plt.xlabel("Time Steps")
plt.ylabel("Episode")
plt.title("Plasma Concentration Heatmap Across Episodes")
plt.show()


In [None]:

cumulative_rewards = np.cumsum(reward_history)

plt.figure(figsize=(12, 6))
plt.plot(cumulative_rewards)
plt.xlabel("Episode")
plt.ylabel("Cumulative Reward")
plt.title("Cumulative Reward Trend")
plt.grid()
plt.show()

In [None]:
action_rewards = {a: [] for a in range(env.action_space.n)}

for trajectory, rewards in zip(plasma_concentration_trajectories, evaluation_rewards):
    for action, reward in zip(trajectory, rewards):
        action_rewards[action].append(reward)

plt.figure(figsize=(12, 6))
for action, rewards in action_rewards.items():
    plt.hist(rewards, bins=30, alpha=0.6, label=f"Action {action}")
plt.xlabel("Reward")
plt.ylabel("Frequency")
plt.title("Reward Distribution Per Action")
plt.legend()
plt.grid()
plt.show()

In [None]:
#optimal policy evaluation
action_frequencies = [np.argmax(agent.policy(torch.tensor(state, dtype=torch.float32).unsqueeze(0)).detach().numpy())
                      for state in states]
plt.figure(figsize=(12, 6))
plt.hist(action_frequencies, bins=env.action_space.n, alpha=0.7)
plt.xlabel("Action")
plt.ylabel("Frequency")
plt.title("Action Selection Frequency in Evaluation")
plt.grid()
plt.show()


In [None]:
#learning stability
rolling_avg_rewards = np.convolve(reward_history, np.ones(50)/50, mode='valid')
plt.figure(figsize=(12, 6))
plt.plot(rolling_avg_rewards)
plt.xlabel("Episode")
plt.ylabel("Rolling Average Reward")
plt.title("Learning Stability: Rolling Average of Rewards")
plt.grid()
plt.show()