In [16]:
import random
import numpy as np
import pandas as pd
import simpy
import gymnasium as gym
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
import math


import logging
logging.basicConfig(filename='manufacturing_RC_env7.log', level=logging.INFO, 
                    format='%(asctime)s:%(levelname)s:%(message)s', filemode='w')


# Constants and assumptions
NUM_MACHINES = 6
BUFFER_CAPACITY = 10
MEAN_PROCESSING_TIME = 83.70
MEAN_STARTUP_TIME = 30
MTBF = 3600
MTTR = 30
MEAN_PART_ARRIVAL_TIME = 20


# Power consumption for each state (in kW)
POWER_STANDBY = 0.1
POWER_IDLE = 9.3
POWER_STARTUP = 10
POWER_BUSY = 15
POWER_FAILED = 0.1


# Power consumption rates (kW)
POWER_CONSUMPTION = {
    'busy': 15,
    'idle': 9.3,
    'startup': 10,
    'standby': 0,
    'failed': 0
}

print("Imports and constants are defined correctly.")

Imports and constants are defined correctly.


In [17]:
class ManufacturingEnv(gym.Env):
    def __init__(self):
        super(ManufacturingEnv, self).__init__()
        self.action_space = gym.spaces.Discrete(NUM_MACHINES + 1)
        self.observation_space = gym.spaces.MultiDiscrete([BUFFER_CAPACITY + 1] + [5] * NUM_MACHINES)
        self.env = simpy.Environment()
        self.buffer = simpy.Store(self.env, capacity=BUFFER_CAPACITY)
        self.max_parts = 0
        self.produced_parts=0
        self.Capacity=0 
        self.data=[]
        self.CC=0
        self.machines_energy=[0]
        self.energy=0
        self.AON_produced=0
        self.produced=0

        self.machines = [{
            'id': i,
            'state': 'idle',  # Start machines in standby state
            'total_energy': 0,
            'time_in_states': {'busy': 0, 'idle': 0, 'startup': 0, 'standby': 0, 'failed': 0}, 
            'cumprocessingtime': 0, 'state_energy_consumption':0,
            'failuretime': random.expovariate(1.0 / MTBF)
        } for i in range(NUM_MACHINES)]

    
    def collect_data(self, machine):
       
     state_energy = 0
     for state, time in machine['time_in_states'].items():
        time_hours=0
        if state==machine['state']:
          time_hours = time
          time_hours=time_hours
          state_energy += POWER_CONSUMPTION[state] * time_hours
          self.data.append({
         'time': self.env.now,
         'machine_id': machine['id'],
         'state': machine['state'],
         'time_in_preocess': time_hours,'state_energy':machine['state_energy_consumption'],
         'total_state_energy':state_energy})


    def machine_process(self, machine):

        yield self.env.timeout(1)
     
        while True:

            logging.info(f'----------------------------------------------')
            logging.info(f'Process just started, env time: {self.env.now}, machine id {machine['id']} is {machine['state']}')
            processing_time = random.expovariate(1.0 / MEAN_PROCESSING_TIME)
            #failure_time = random.expovariate(1.0 / MTBF) 

            if machine['state'] == 'idle':

                machine['time_in_states']['idle'] = 0.99 

                #if self.CC==0:  #####################################################################
                     
                machine['state_energy_consumption']= POWER_CONSUMPTION['idle']
                self.collect_data(machine)
                  
                yield self.env.timeout(0.99)

            elif machine['state'] == 'standby':

                machine['time_in_states']['standby'] = 0.99

                #if self.CC==0:#############################################################################
                 
                machine['state_energy_consumption']= POWER_CONSUMPTION['standby']
                self.collect_data(machine)

                yield self.env.timeout(0.99)               

            elif machine['state']=='startup':
                
                startup_time=random.expovariate(1.0 /MEAN_STARTUP_TIME)
                machine['time_in_states']['startup'] = startup_time
                machine['state_energy_consumption']=POWER_CONSUMPTION['startup']
                self.collect_data(machine)

                yield self.env.timeout(startup_time) 
                machine['state'] = 'busy'


            elif machine['state'] == 'busy':

                if machine['cumprocessingtime']+processing_time <= machine['failuretime']:
                    machine['cumprocessingtime']=processing_time+machine['cumprocessingtime']
                    machine['time_in_states']['busy'] = processing_time
                    
                    yield self.buffer.get()

                    self.produced_parts=self.max_parts- len(self.buffer.items)
                    self.produced=self.produced+1 
                    machine['state_energy_consumption']=POWER_CONSUMPTION['busy']
                    self.collect_data(machine)

                    yield self.env.timeout(processing_time)
                    machine['state'] = 'idle'

                else:
                    failure_time_remaining = machine['failuretime'] - machine['cumprocessingtime']
                    machine['time_in_states']['busy'] = failure_time_remaining

                    yield self.buffer.get() 

                    machine['state_energy_consumption']=POWER_CONSUMPTION['busy']
                    machine['cumprocessingtime']=0
                    self.collect_data(machine)

                    yield self.env.timeout(failure_time_remaining)
                    machine['state'] = 'failed'

                    repair_time = random.expovariate(1.0 / MTTR)
                    machine['time_in_states']['failed'] = repair_time 
                    yield self.env.timeout(repair_time)
                    self.collect_data(machine)
                    machine['state'] ='idle'
        

    def part_arrival(self):

        yield self.env.timeout(1)

        while True:
          
          if len(self.buffer.items)<BUFFER_CAPACITY:
             yield self.env.timeout(np.random.exponential(MEAN_PART_ARRIVAL_TIME))
             logging.info(f'Part just arrived, env time: {self.env.now}')
             yield self.buffer.put(f"{self.env.now}") 
             self.produced_parts=0
             self.Capacity= self.Capacity+1
             self.max_parts = len(self.buffer.items)
             self.AON_produced=self.AON_produced+1

          else:
           self.max_parts = len(self.buffer.items)
           yield self.env.timeout(1)
               

    def sim_step(self):

        yield self.env.timeout(1)

        while True:

              #random_action = random.randint(0, NUM_MACHINES)
              random_action = 6
              logging.info(f'Control just activated, action: {random_action}, env time: {self.env.now}, buffer:{len(self.buffer.items)}')
            
              self.step(random_action)

              yield self.env.timeout(10)
              
              logging.info(f'----------------------------------------------')
              current_machines_energy, info=self.get_state()

              self.machines_energy.append(current_machines_energy)
              current_machines_energy= self.machines_energy[-1]
              previous_machines_energy= self.machines_energy[-2]
              total_reward=self.calculate_reward(current_machines_energy, previous_machines_energy)

              logging.info(f'state info before control: {info}, env time: {self.env.now}')
              logging.info(f'----------------------------------------------')
                           
 
    def step(self, action):

        active_machines = action
        self.CC=0
       
        for machine in self.machines:
            
            logging.info(f'Machine under investigation: {machine["id"]} and the state {machine["state"]}, env time: {self.env.now}, capacity: {self.Capacity}')
            
            if machine['state'] == 'idle' and active_machines > 0 and self.Capacity>0: 
                self.CC=1
                machine['state'] = 'busy'
                active_machines -= 1
                self.Capacity -= 1
                logging.info(f"Machine id {machine['id']} mode changed from idle to {machine['state']}, actions left: {active_machines}, env time: {self.env.now}")
            
            elif machine['state'] == 'idle' and active_machines > 0 and self.Capacity==0: 
                self.CC=1
                machine['state'] = 'standby'
                logging.info(f"Machine id {machine['id']} mode changed from idle to {machine['state']}, actions left: {active_machines}, env time: {self.env.now}")
                
            elif machine['state'] == 'idle' and active_machines == 0:
                self.CC=1
                machine['state'] = 'standby'
                logging.info(f"Machine id {machine['id']} mode changed from idle to {machine['state']}, actions left: {active_machines}, env time: {self.env.now}") 

            elif machine['state'] == 'standby' and active_machines > 0 and self.Capacity>0:
                self.CC=1
                machine['state'] = 'startup'
                active_machines -= 1
                self.Capacity -= 1
                logging.info(f"Machine id {machine['id']} mode changed from standby to {machine['state']}, actions left: {active_machines}, env time: {self.env.now}")

        #return self.get_state()


    def get_state(self):

        parts_in_buffer = len(self.buffer.items)

        self.energy=0

        for machine in self.machines:
          
          logging.info(f"energy consumption of machine id {machine['id']} is {machine['total_energy']}, env time: {self.env.now}")
          self.energy += machine['total_energy'] 

        logging.info(f"total energy consumption {self.energy}, env time: {self.env.now}")

        machine_states = [machine['state'] for machine in self.machines]
        return self.energy, np.array([parts_in_buffer] + machine_states)
    

####################################################################
    def consumption_calculation(self):

        while True:

         for machine in self.machines:

            machine['total_energy']=machine['total_energy']+machine['state_energy_consumption']
    
         yield self.env.timeout(0.99)

#######################################################################


    def calculate_reward(self, current_machines_energy, previous_machines_energy):

        logging.info(f"produced parts {self.produced}, max_parts {self.AON_produced}")
        productivity_reward = np.exp((self.produced) / (self.AON_produced+0.000000001))/math.e
        consumption_reward = np.exp(-0.01*(current_machines_energy -(previous_machines_energy+0.000000001 ))) 
        total_reward = 0.5 * productivity_reward + 0.5 * consumption_reward
        logging.info(f"consumption reward is {consumption_reward}, productivity reward is {productivity_reward}, env time: {self.env.now}")

        return total_reward


    @staticmethod
    def LogHandel():
        for handler in logging.root.handlers[:]:
            handler.close()
            logging.root.removeHandler(handler)



In [18]:
# Create the environment
MEnv = ManufacturingEnv()


# Start machine processes
for machine in MEnv.machines: 
    MEnv.env.process(MEnv.machine_process(machine))

    
# Start part arrival process
MEnv.env.process(MEnv.part_arrival())


# Start the sim_step process
MEnv.env.process(MEnv.sim_step())

# Start the consumption calculation
MEnv.env.process(MEnv.consumption_calculation())


# Run the simulation
MEnv.env.run(until=10000)


# Data collection
Data= pd.DataFrame(MEnv.data)

Data.to_excel("Data.xlsx", sheet_name="Data", index=False)

Data_selected_0= Data[Data['machine_id']==0]
Data_selected_1= Data[Data['machine_id']==1]
Data_selected_2= Data[Data['machine_id']==2]
Data_selected_3= Data[Data['machine_id']==3]
Data_selected_4= Data[Data['machine_id']==4]
Data_selected_5= Data[Data['machine_id']==5]


# Log handler
MEnv.LogHandel()