In [1]:
from custom_sim_RL import ProcessSimulation

trialname = 'RLtrial'
sim = ProcessSimulation(trialname)

runtime = 3600
sim.setup_run(runtime_cryst=runtime)
result = sim.output()

print(result)

Could not find GLIMDA.


Final Run Statistics: --- 

 Number of steps                                 : 936
 Number of function evaluations                  : 1441
 Number of Jacobian*vector evaluations           : 1520
 Number of function eval. due to Jacobian eval.  : 1441
 Number of error test failures                   : 13
 Number of nonlinear iterations                  : 1437
 Number of nonlinear convergence failures        : 0

Solver options:

 Solver                   : CVode
 Linear multistep method  : BDF
 Nonlinear solver         : Newton
 Linear solver type       : SPGMR
 Maximal order            : 5
 Tolerances (absolute)    : 1e-06
 Tolerances (relative)    : 1e-06

Simulation interval    : 0.0 - 3600.0 seconds.
Elapsed simulation time: 0.6658561000000001 seconds.
(38.75052236687746, 67.4877117296064, 104.96721691478905, 0.981166687251344)


In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import numpy as np
from gymnasium import Env, spaces
from custom_sim_RL import ProcessSimulation  # My class

### Custom Gym Environment for Crystallization ###
class CrystallizationEnv(Env):
    def __init__(self, sim_time=3600, step_time=300):
        super(CrystallizationEnv, self).__init__()
        self.sim_time = sim_time
        self.step_time = step_time
        self.n_steps = sim_time // step_time

        self.temp_min = 280.0
        self.temp_max = 330.0
        self.temp_rate = 0.5 / 60  # K/s

        self.action_space = spaces.Discrete(3)  # 0: cool, 1: hold, 2: heat
        self.observation_space = spaces.Box(
            low=np.array([0.0, self.temp_min]),
            high=np.array([self.sim_time, self.temp_max]),
            dtype=np.float32
        )

        self.reset()

    def step(self, action):
        if action == 0:
            self.temp_slope -= self.temp_rate
        elif action == 2:
            self.temp_slope += self.temp_rate

        self.t += self.step_time
        self.time_points.append(self.t)

        new_temp = self.temp_program[-1][1] + self.temp_slope * self.step_time
        new_temp = np.clip(new_temp, self.temp_min, self.temp_max)
        self.temp_program.append([self.t, new_temp])

        done = self.t >= self.sim_time
        reward = 0.0

        if done:
            temp_prog_np = np.array(self.temp_program)
            sim = ProcessSimulation(trialname="rl_trial")
            sim.setup_run(temp_program=temp_prog_np, runtime_cryst=self.sim_time)
            D10, D50, D90, span = sim.output()
            reward = D50 - 2.0 * span  # Custom reward function
            obs = np.array([self.t, new_temp], dtype=np.float32)
            return obs, reward, True, False, {"D50": D50, "span": span}

        obs = np.array([self.t, new_temp], dtype=np.float32)
        return obs, reward, False, False, {}

    def reset(self, seed=None, options=None):
        self.t = 0.0
        self.temp_slope = 0.0
        self.temp_program = [[0.0, 323.15]]
        self.time_points = [0.0]
        return np.array([self.t, 323.15], dtype=np.float32), {}



### Policy Network ###
class PolicyNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(PolicyNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, output_dim),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        return self.fc(x)



### Training Loop ###
def train():
    env = CrystallizationEnv(sim_time=1800, step_time=300)  # Use shorter runs for faster training
    obs_dim = env.observation_space.shape[0]
    act_dim = env.action_space.n

    policy = PolicyNetwork(obs_dim, act_dim)
    optimizer = optim.Adam(policy.parameters(), lr=1e-2)
    gamma = 0.99

    best_reward = -np.inf
    best_policy = None

    for episode in range(50):  # Start with 50 episodes for quick iteration
        state, _ = env.reset()
        log_probs = []
        rewards = []

        done = False
        while not done:
            state_tensor = torch.tensor(state, dtype=torch.float32)
            probs = policy(state_tensor)
            dist = Categorical(probs)
            action = dist.sample()

            log_probs.append(dist.log_prob(action))
            state, reward, done, truncated, info = env.step(action.item())
            rewards.append(reward if done else 0.0)  # reward only at end

        # Discounted returns
        returns = []
        G = 0
        for r in reversed(rewards):
            G = r + gamma * G
            returns.insert(0, G)
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)

        # Policy gradient update
        loss = sum(-log_prob * Gt for log_prob, Gt in zip(log_probs, returns))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_reward = sum(rewards)
        print(f"Episode {episode}, Total Reward: {total_reward:.2f}, D50: {info.get('D50', 0):.1f}, Span: {info.get('span', 0):.2f}")

        # Save best model
        if total_reward > best_reward:
            best_reward = total_reward
            best_policy = policy.state_dict()

    print("Training complete. Best reward:", best_reward)
    if best_policy:
        torch.save(best_policy, "best_crystallization_policy.pth")



if __name__ == "__main__":
    train()


  gym.logger.warn(
  gym.logger.warn(


UnboundLocalError: local variable 'temp_init' referenced before assignment