In [None]:
import numpy as np
import gym
from gym import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.callbacks import CheckpointCallback
import pandas as pd

np.random.seed(42)

## Factory Energy Environment Definition

In [None]:
class FactoryEnergyEnv(gym.Env):
    """Custom Gym environment for Factory I/O energy optimization."""
    
    metadata = {'render.modes': ['human']}
    
    def __init__(self, machine_config, max_steps=3600):
        super(FactoryEnergyEnv, self).__init__()
        
        self.machine_config = machine_config
        self.max_steps = max_steps
        self.current_step = 0
        
        # Define action and observation spaces
        self.action_space = self.define_hybrid_action_space()
        self.observation_space = self.define_observation_space()
        
        # Initialize state
        self.state = None
        self.episode_reward = 0
        self.episode_energy = 0
    
    def define_hybrid_action_space(self):
        """Define action space: discrete choices for machine sequencing."""
        # Actions: which machines to prioritize (0-7 represents different combinations)
        return spaces.Discrete(8)
    
    def define_observation_space(self):
        """Define observation space: factory state variables."""
        # Observations: 10 dimensions
        # [conveyor_power, pusher_power, robot_power, total_power, 
        #  conveyor_load, pusher_load, robot_load, timestamp, energy_consumed, reward]
        return spaces.Box(low=0, high=100, shape=(10,), dtype=np.float32)
    
    def reset(self):
        """Reset environment to initial state."""
        self.current_step = 0
        self.episode_reward = 0
        self.episode_energy = 0
        
        # Initialize state with random values
        self.state = np.array([
            np.random.uniform(5, 15),     # conveyor_power
            np.random.uniform(2, 8),      # pusher_power
            np.random.uniform(5, 12),     # robot_power
            0,                             # total_power (will be calculated)
            np.random.uniform(0.3, 0.9),  # conveyor_load
            np.random.uniform(0.2, 0.8),  # pusher_load
            np.random.uniform(0.3, 0.9),  # robot_load
            0,                             # timestamp
            0,                             # energy_consumed
            0                              # last_reward
        ], dtype=np.float32)
        
        self.state[3] = self.state[0] + self.state[1] + self.state[2]  # total_power
        return self.state
    
    def step(self, action):
        """Execute one step in the environment."""
        self.current_step += 1
        
        # Modify loads based on action
        load_adjustment = (action - 3.5) * 0.05  # Actions 0-7 map to -0.175 to +0.175
        
        self.state[4] = np.clip(self.state[4] + load_adjustment, 0.1, 1.0)  # conveyor_load
        self.state[5] = np.clip(self.state[5] - load_adjustment * 0.5, 0.1, 1.0)  # pusher_load
        self.state[6] = np.clip(self.state[6] - load_adjustment * 0.7, 0.1, 1.0)  # robot_load
        
        # Calculate power based on loads
        self.state[0] = 15 * self.state[4]  # conveyor_power
        self.state[1] = 8 * self.state[5]   # pusher_power
        self.state[2] = 12 * self.state[6]  # robot_power
        self.state[3] = self.state[0] + self.state[1] + self.state[2]  # total_power
        
        # Add noise
        self.state += np.random.normal(0, 0.5, size=self.state.shape)
        self.state = np.clip(self.state, 0, 100).astype(np.float32)
        
        # Update timing
        self.state[7] = self.current_step  # timestamp
        self.state[8] += self.state[3] * 0.1 / 3600  # energy_consumed (cumulative, in kWh)
        
        # Calculate reward (lower energy is better)
        energy_reward = -self.state[3] / 40  # Normalize to [-1, 0]
        load_penalty = -0.1 * abs(sum(self.state[4:7]) - 1.5)  # Encourage balanced loads
        reward = energy_reward + load_penalty
        
        self.state[9] = reward
        self.episode_reward += reward
        self.episode_energy += self.state[3] * 0.1 / 3600
        
        # Check if episode is done
        done = self.current_step >= self.max_steps
        
        info = {
            'total_power': float(self.state[3]),
            'energy_consumed': float(self.state[8]),
            'episode_reward': float(self.episode_reward),
            'conveyor_load': float(self.state[4]),
            'pusher_load': float(self.state[5]),
            'robot_load': float(self.state[6])
        }
        
        return self.state, reward, done, info
    
    def calculate_power_consumption(self, actions):
        """Calculate total power consumption based on actions."""
        return float(self.state[3])
    
    def calculate_reward(self, total_power, actions):
        """Calculate reward signal."""
        return -total_power / 40
    
    def render(self, mode='human'):
        """Render the environment (optional)."""
        print(f"Step {self.current_step}: Power={self.state[3]:.2f}kW, Energy={self.state[8]:.4f}kWh, Reward={self.state[9]:.4f}")

print("FactoryEnergyEnv class defined.")

## Training Configuration

In [None]:
# Factory specifications
factory_specs = {
    'conveyors': {'count': 14, 'max_power': 28},
    'pushers': {'count': 4, 'max_power': 12},
    'pick_place': {'count': 3, 'max_power': 12}
}

# Create environment
env = FactoryEnergyEnv(factory_specs, max_steps=3600)

# Reset and verify
initial_state = env.reset()
print(f"Initial state shape: {initial_state.shape}")
print(f"Initial state: {initial_state}")

## Model Training

In [None]:
# Create vectorized environment for stable-baselines3
vec_env = DummyVecEnv([lambda: FactoryEnergyEnv(factory_specs, max_steps=3600)])

# Define checkpoint callback
checkpoint_callback = CheckpointCallback(
    save_freq=10000,
    save_path='./checkpoints/',
    name_prefix='rl_model'
)

# Create PPO model
model = PPO(
    "MlpPolicy",
    vec_env,
    verbose=1,
    learning_rate=3e-4,
    batch_size=64,
    n_steps=2048,
    n_epochs=10,
    gamma=0.99,
    gae_lambda=0.95
)

print("PPO model created. Ready for training.")
print(f"Policy network: {model.policy}")

## Train the Model

In [None]:
# Train for 100,000 timesteps
total_timesteps = 100000
print(f"Starting training for {total_timesteps} timesteps...")

model.learn(
    total_timesteps=total_timesteps,
    callback=checkpoint_callback,
    log_interval=100
)

print(f"\nTraining complete!")

## Save the Trained Model

In [None]:
import os

# Create models directory if it doesn't exist
models_dir = os.path.join('..', 'models')
os.makedirs(models_dir, exist_ok=True)

# Save the model
model_path = os.path.join(models_dir, 'trained_rl_model')
model.save(model_path)

print(f"Model saved to {model_path}")

## Test the Trained Model

In [None]:
# Test the trained model
test_env = FactoryEnergyEnv(factory_specs, max_steps=100)
obs = test_env.reset()

total_energy = 0
episode_rewards = []

for step in range(100):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, info = test_env.step(action)
    total_energy += info['energy_consumed']
    episode_rewards.append(reward)
    
    if done:
        break

print(f"Test Results:")
print(f"  Steps completed: {step + 1}")
print(f"  Total energy consumed: {total_energy:.4f} kWh")
print(f"  Average reward per step: {np.mean(episode_rewards):.4f}")
print(f"  Min power (kW): {min([test_env.state[3]]):.2f}")