In [1]:
import numpy as np
import inspyred as ins
from pylab import *
import sys
from inspyred import ec, benchmarks

## Custom Parser

In [37]:
from job import Job
from activity import Activity
from operation import Operation
from machine import Machine
import re
import os

def parse(path):
    with open(os.path.join(os.getcwd(), path, "r")) as data:
        total_jobs, total_machines, max_operations = re.findall("\S+", data.readline())
        number_total_jobs, number_total_machines, number_max_operations = int(total_jobs), int(total_machines), int(float(
			max_operations))
		
        jobs_list = []
        #current job id
        id_job = 1

        for key, line in enumerate(data):
			
            if key >= number_total_jobs:
                break
			
            # Split data with multiple spaces as separator
            parsed_line = re.findall('\S+', line)
			# Current job
            job = Job(id_job)
			# Current activity's id
            id_activity = 1
			# Current item of the parsed line
            i = 1

            while i < len(parsed_line):
				# Total number of operations for the activity
                number_operations = int(parsed_line[i])
				# Current activity
                activity = Activity(job, id_activity)
                for id_operation in range(1, number_operations + 1):
                    activity.add_operation(Operation(id_operation, int(parsed_line[i + 2 * id_operation - 1]),
                                int(parsed_line[i + 2 * id_operation])))

                job.add_activity(activity)
                i += 1 + 2 * number_operations
                id_activity += 1

                jobs_list.append(job)
                id_job += 1

	# Machines
    machines_list = []
    for id_machine in range(1, number_total_machines + 1):
        machines_list.append(Machine(id_machine, number_max_operations))

    return jobs_list, machines_list, number_max_operations

ModuleNotFoundError: No module named 'job'

In [15]:
path = r"C:\Users\saadr\Downloads\TextData.zip\Monaldo\Fjsp\Job_Data\Barnes\Text\mt01c1"

In [18]:
parse(path)

FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\saadr\\Downloads\\TextData.zip\\Monaldo\\Fjsp\\Job_Data\\Barnes\\Text\\mt01c1\\r'

In the first line there are (at least) 2 numbers: the first is the number of jobs and the second the number of machines (the 3rd is not necessary, it is the average number of machines per operation)

Every row represents one job: the first number is the number of operations of that job, the second number (let's say k>=1) is the number of machines that can process the first operation; then according to k, there are k pairs of numbers (machine,processing time) that specify which are the machines and the processing times; then the data for the second operation and so on...

test= [6   6   1   
6   1   3   1   1   1   3   1   2   6   1   4   7   1   6   3   1   5   6   
6   1   2   8   1   3   5   1   5   10  1   6   10  1   1   10  1   4   4   
6   1   3   5   1   4   4   1   6   8   1   1   9   1   2   1   1   5   7   
6   1   2   5   1   1   5   1   3   5   1   4   3   1   5   8   1   6   9   
6   1   3   9   1   2   3   1   5   5   1   6   4   1   1   3   1   4   1   
6   1   2   3   1   4   3   1   6   9   1   1   10  1   5   4   1   3   1]

In [25]:
test= [6   6   1   
6   1   3   1   1   1   3   1   2   6   1   4   7   1   6   3   1   5   6   
6   1   2   8   1   3   5   1   5   10  1   6   10  1   1   10  1   4   4   
6   1   3   5   1   4   4   1   6   8   1   1   9   1   2   1   1   5   7   
6   1   2   5   1   1   5   1   3   5   1   4   3   1   5   8   1   6   9   
6   1   3   9   1   2   3   1   5   5   1   6   4   1   1   3   1   4   1   
6   1   2   3   1   4   3   1   6   9   1   1   10  1   5   4   1   3   1]

SyntaxError: invalid syntax. Perhaps you forgot a comma? (1993158878.py, line 1)

In [None]:
m,n = test.shape

In [None]:
jobs

## Class Jobs

In [36]:
class Jobs:
	def __init__(self, id_job):
		self.__id_job = id_job
		self.__activities_to_be_done = []
		self.__activities_done = []

	# Display the job nicer
	def __str__(self):
		output = ""

		for activity in self.__activities_to_be_done:
			output += str(activity) + "\n"

		for activity in self.__activities_done:
			output += str(activity) + "\n"

		return output

	# Return the job's id
	@property
	def id_job(self):
		return self.__id_job

	# Add an activity to the job
	def add_activity(self, activity):
		self.__activities_to_be_done.append(activity)

	# Return if the job is done
	@property
	def is_done(self):
		return len(self.activities_to_be_done) == 0

	# Return the activities yet to be done
	@property
	def activities_to_be_done(self):
		return self.__activities_to_be_done

	# Return the activities already done
	@property
	def activities_done(self):
		return self.__activities_done

	# Method an activity calls to signal it's done
	def activity_is_done(self, activity):
		if not activity.is_done:
			raise EnvironmentError("This activity is not done")
		self.__activities_to_be_done = list(
			filter(lambda element: element.id_activity != activity.id_activity, self.__activities_to_be_done))
		self.__activities_done.append(activity)

	# Return the current activity that need to be processe
	@property
	def current_activity(self):
		if len(self.activities_to_be_done) == 0:
			raise EnvironmentError("All activities are already done")
		return self.__activities_to_be_done[0]

	@property
	def remaining_shop_time(self):
		return sum(map(lambda activity: activity.shop_time, self.activities_to_be_done))

	@property
	def total_shop_time(self):
		return sum(map(lambda activity: activity.shop_time, self.activities_to_be_done + self.activities_done))

	def check_if_previous_activity_is_done(self, activity_id):
		if activity_id == 1:
			return True
		for activity in self.__activities_done:
			if activity.id_activity == activity_id - 1:
				return True
		return False

	def get_activity(self, id_activity):
		for activity in self.__activities_to_be_done:
			if activity.id_activity == id_activity:
				return activity


## Class Machine

In [20]:
class Machine:
	def __init__(self, id_machine, max_operations):
		self.__id_machine = id_machine
		self.__is_working = False
		self.__operations_done = []
		self.__processed_operations = []
		self.__max_operations = max_operations
		self.__current_time = 0
		self.__available_places = [i for i in range(max_operations)]

	# Return the machine's id
	@property
	def id_machine(self):
		return self.__id_machine

	# Return the operations done by the machine
	@property
	def operations_done(self):
		return self.__operations_done

	# Return if the machine is working at max capacity
	def is_working_at_max_capacity(self):
		return len(self.__processed_operations) == self.__max_operations

	# Add an operation to the treatment list of the machine
	def add_operation(self, activity, operation):
		if self.is_working_at_max_capacity():
			raise EnvironmentError("Machine already working at max capacity")
		if operation.id_machine != self.__id_machine:
			raise EnvironmentError("Operation assigned to the wrong machine")

		operation.time = self.__current_time
		operation.is_pending = True
		operation.place_of_arrival = self.__available_places.pop(0)

		self.__processed_operations.append((activity, operation))

	# Method to simulate a work process during one unit of time
	def work(self):
		self.__current_time += 1
		for activity, operation in self.__processed_operations:
			if operation.time + operation.duration <= self.__current_time:
				self.__processed_operations = list(filter(lambda element: not (
						element[0].id_job == activity.id_job and element[0].id_activity == activity.id_activity and
						element[1].id_operation == operation.id_operation), self.__processed_operations))
				self.__available_places.append(operation.place_of_arrival)
				activity.terminate_operation(operation)
				self.__operations_done.append(operation)


## Class Operations

In [28]:
class Operation:
	def __init__(self, id_operation, id_machine, duration):
		self.__id_operation = id_operation
		self.__duration = duration
		self.__id_machine = id_machine
		self.__time = None
		self.__is_pending = False
		self.__place_of_arrival = None

	# Display the operation nicer
	def __str__(self):
		output = "Operation n°" + str(self.__id_operation) + " -> Machine: " + str(
			self.__id_machine) + ", Duration: " + str(self.__duration)

		if not (self.__time is None):
			output += ", Started at time " + str(self.__time)

		return output

	# Return the operation's id
	@property
	def id_operation(self):
		return self.__id_operation

	# Return if an operation is done at time t
	def is_done(self, t):
		return not (self.__time is None) and self.__time + self.__duration <= t

	# Return if a machine is already treating the operation
	@property
	def is_pending(self):
		return self.__is_pending

	# Set the pending status
	@is_pending.setter
	def is_pending(self, value):
		self.__is_pending = value

	# Return the slot of the machine allocated for the treatment of the operation
	@property
	def place_of_arrival(self):
		return self.__place_of_arrival

	# Set the slot of the machine allocated for the treatment of the operation
	@place_of_arrival.setter
	def place_of_arrival(self, value):
		self.__place_of_arrival = value

	# Return the machine's id who has to do the operation
	@property
	def id_machine(self):
		return self.__id_machine

	# Return the operation's duration
	@property
	def duration(self):
		return self.__duration

	# Return at which time the operation started or None
	@property
	def time(self):
		return self.__time

	# Set the time at which the operation started
	@time.setter
	def time(self, value):
		if value < 0:
			raise ValueError("Time < 0 is not possible")
		self.__time = value


## Class Activity

In [23]:
class Activity:
	def __init__(self, job, id_activity):
		self.__job = job
		self.__id_activity = id_activity
		self.__operations_to_be_done = []
		self.__operation_done = None

	# Display the activity nicer
	def __str__(self):
		output = "Job n°" + str(self.id_job) + " Activity n°" + str(self.__id_activity) + "\n"

		output += "Operations to be done\n"
		for operation in self.__operations_to_be_done:
			output += str(operation) + "\n"

		output += "Operation done\n"
		output += str(self.__operation_done) + "\n"

		return output

	# Return the job's id of the activity
	@property
	def id_job(self):
		return self.__job.id_job

	# Return the activity's id
	@property
	def id_activity(self):
		return self.__id_activity

	# Add an operation to the activity
	def add_operation(self, operation):
		self.__operations_to_be_done.append(operation)

	# Return if the activity is done
	@property
	def is_done(self):
		return not (self.__operation_done is None)

	# Return the list of all the operations yet to be done
	@property
	def next_operations(self):
		return self.__operations_to_be_done

	# Return the shortest operation available
	@property
	def shortest_operation(self):
		candidate_operation = None
		for operation in self.__operations_to_be_done:
			if candidate_operation is None or operation.duration < candidate_operation.duration:
				candidate_operation = operation
		return operation

	# Return the list of all the operations already done
	@property
	def operation_done(self):
		return self.__operation_done

	# Allow a machine to say to an activity that it finished an operation
	def terminate_operation(self, operation):
		# Remove the operation from the list of the operations yet to be done
		self.__operations_to_be_done = list(
			filter(lambda element: element.id_operation != operation.id_operation, self.__operations_to_be_done))
		# Append the operation to the list of the operations already done
		self.__operation_done = operation
		self.__job.activity_is_done(self)

	@property
	def shop_time(self):
		return self.operation_done.duration if self.is_done else max(self.__operations_to_be_done,
																	 key=lambda operation: operation.duration)

	@property
	def is_feasible(self):
		return self.__job.check_if_previous_activity_is_done(self.__id_activity)

	@property
	def is_pending(self):
		return len(list(filter(lambda element: element.is_pending, self.__operations_to_be_done))) > 0

	def get_operation(self, id_operation):
		for operation in self.__operations_to_be_done:
			if operation.id_operation == id_operation:
				return operation


## Class Scheduler

In [29]:
import sys

from colorama import init
from termcolor import colored


class Scheduler:
	def __init__(self, machines, max_operations, jobs):
		init()  # Init colorama for color display
		self.__original_stdout = sys.stdout
		self.__machines = machines
		self.__jobs_to_be_done = jobs
		self.__jobs_done = []
		self.__max_operations = max_operations

	# Run the scheduler with an heuristic
	def run(self, heuristic, verbose=True):
		# Disable print if verbose is False
		if not verbose:
			sys.stdout = None

		current_step = 0

		while len(self.__jobs_to_be_done) > 0:
			current_step += 1

			best_candidates = heuristic(self.__jobs_to_be_done, self.__max_operations, current_step)
			for id_machine, candidates in best_candidates.items():
				machine = self.__machines[id_machine - 1]
				for activity, operation in candidates:
					if not (machine.is_working_at_max_capacity() or activity.is_pending):
						machine.add_operation(activity, operation)

			for machine in self.__machines:
				machine.work()

			for job in self.__jobs_to_be_done:
				if job.is_done:
					self.__jobs_to_be_done = list(
						filter(lambda element: element.id_job != job.id_job, self.__jobs_to_be_done))
					self.__jobs_done.append(job)

		print(colored("[SCHEDULER]", "green"), "Done in " + str(current_step) + " units of time")

		# Reenable stdout
		if not verbose:
			sys.stdout = self.__original_stdout

		return current_step


## Class Drawer

In [43]:
import random
import os


class Drawer:
	@staticmethod
	def draw_schedule(number_machines, max_operations, jobs, filename=None):
		import matplotlib.pyplot as plt
		import matplotlib.patches as patches

		# Vertical space between operation
		vertical_space = 1
		# Vertical height of an operation
		vertical_height = 2

		# Dictionary of the operations done, the key correspond to the machine id
		operations_done = {}
		for job in jobs:
			for activity in job.activities_done:
				# Add all done operations
				operation = activity.operation_done
				# If it's the first operation add on the machine, initialize the list
				if operations_done.get(operation.id_machine) is None:
					list_operations = []
				# Else, get the previously added operations
				else:
					list_operations = operations_done.get(operation.id_machine)

				# Append the current operation with its job's id and activity's id
				list_operations.append((job.id_job, activity.id_activity, operation))
				# Update the dictionary
				operations_done.update({operation.id_machine: list_operations})

		# Define random colors for jobs
		colors = ['#%06X' % random.randint(0, 256 ** 3 - 1) for _ in range(len(jobs))]

		# Draw
		plt.clf()
		plot = plt.subplot()

		for id_machine, list_operations in operations_done.items():
			for id_job, id_activity, operation in list_operations:
				# X coord corresponds to the operation's time
				# Y coord corresponds to the order of the operation
				# according to its machine's id, its place of arrival and the max operations allowed simultaneously
				x, y = operation.time, 1 + id_machine * max_operations * (
						vertical_space + vertical_height) + operation.place_of_arrival * (
							   vertical_space + vertical_height)
				# Plot a rectangle with a width of the operation's duration
				plot.add_patch(
					patches.Rectangle(
						(x, y),
						operation.duration,
						vertical_height,
						facecolor=colors[id_job - 1]
					)
				)

		# Display the machines' number as the y-axis legend
		plt.yticks([1 + (i + 1) * max_operations * (vertical_space + vertical_height) + (
				max_operations * (vertical_height + vertical_space) - vertical_space) / 2 for i in
					range(number_machines)], ["machine " + str(i + 1) for i in range(number_machines)])
		# Auto-scale to see all the operations
		plot.autoscale()

		# Display a rectangle with the color and the job's id as the x-axis legend
		handles = []
		for id_job, color in enumerate(colors):
			handles.append(patches.Patch(color=color, label='job ' + str(id_job + 1)))
		plt.legend(handles=handles)

		# Show the schedule order
		plt.show()
		# Saving the scheduler order
		if not (filename is None):
			plt.savefig(os.path.join("output", filename), bbox_inches='tight')

	# Plot a 2d graph
	@staticmethod
	def plot2d(filename, xdata, ydata, title, xlabel, ylabel, approximate=False, min_degree=2, max_degree=8):
		import matplotlib.pyplot as plt
		plt.clf()
		plot = plt.subplot()
		plot.set_title(title)
		plot.set_xlabel(xlabel)
		plot.set_ylabel(ylabel)
		plot.plot(xdata, ydata, 'o' if approximate else '-')
		
		if approximate:
			import numpy as np
			from scipy.interpolate import UnivariateSpline
			from colorama import init
			from termcolor import colored
			init()
			
			# Compute an interval of 150 points
			x = np.linspace(xdata[0], xdata[-1], 150)
			
			# Compute a spline to measure the error between the approximation and the data set
			spline = UnivariateSpline(xdata, ydata)
			y_spline = spline(x)
			plot.plot(x, y_spline, 'C0')
			
			# Compute different polynomial approximations
			legends = ["Original data", "Spline approximation"]
			# Save best polynomial approximation
			best_degree = best_coefficients = best_residual = best_y_poly = None
			
			# Computing different polynomial approximations
			for degree in range(min_degree, max_degree + 1):
				# Find a polynomial to fit the spline
				coefficients = np.polyfit(xdata, ydata, degree)
				poly = np.poly1d(coefficients)
				y_poly = poly(x)
				# Compute the residual (the error)
				residual = np.linalg.norm(y_spline - y_poly, 2)
				# Display current polynomial approximation residual
				print(colored("[DRAWER]", "magenta"), "Polynomial approximation of degree", str(degree),
					  "-> Residual =", residual)
				# Checking if it's a better approximation
				if best_residual is None or residual < best_residual:
					best_degree, best_coefficients, best_residual, best_y_poly = degree, coefficients, residual, y_poly
					
			# Displaying best polynomial found
			legends.append("Approximation of degree " + str(best_degree))
			print(colored("[DRAWER]", "magenta"), "Best approximation found is a polynomial of degree",
				  str(best_degree))
			print("\t", "Coefficients:", best_coefficients)
			print("\t", "Residual:", best_residual)
			# Plotting resulting polynomial
			plot.plot(x, best_y_poly, '--')
			plot.legend(legends)
			
		plot.autoscale()
		plt.show()
		if not (filename is None):
			plt.savefig(os.path.join("output", filename), bbox_inches='tight')

	# Plot a 3d graph
	@staticmethod
	def plot3d(filename, xdata, ydata, zdata, title, xlabel, ylabel, zlabel):
		from mpl_toolkits.mplot3d import Axes3D
		import matplotlib.pyplot as plt

		plt.clf()
		fig = plt.figure()
		plot = fig.gca(projection='3d')
		plot.set_title(title)
		plot.set_xlabel(xlabel)
		plot.set_ylabel(ylabel)
		plot.set_zlabel(zlabel)
		plot.scatter(xdata, ydata, zdata, c='b', marker='o')
		plot.autoscale()
		plt.show()
		if not (filename is None):
			plt.savefig(os.path.join("output", filename), bbox_inches='tight')


## Class GeneticScheduler

In [44]:
import copy
import sys
import random

from deap import base
from deap import creator


from colorama import init
from termcolor import colored


class GeneticScheduler:
	def __init__(self, machines, jobs):
		init()  # Init colorama for color display
		self.__original_stdout = sys.stdout
		self.__toolbox = base.Toolbox()
		self.__machines = machines
		self.__jobs = jobs

	# Constraint order
	@staticmethod
	def constraint_order_respected(individual):
		list = [(activity.id_job, activity.id_activity) for (activity, _) in individual]
		for key, (id_job, id_activity) in enumerate(list):
			if id_activity == 1:
				continue
			elif not list.index((id_job, id_activity - 1)) < key:
				return False
		return True

	# Initialize an individual for the genetic algorithm
	def init_individual(self, ind_class, size):
		temp_jobs_list = copy.deepcopy(self.__jobs)
		temp_machines_list = copy.deepcopy(self.__machines)

		# Run the scheduler
		s = Scheduler(temp_machines_list, 1, temp_jobs_list)
		s.run(Heuristics.random_operation_choice, verbose=False)

		# Retriving all the activities and the operation done
		list_activities = []
		for temp_job in temp_jobs_list:
			for temp_activity in temp_job.activities_done:
				activity = self.__jobs[temp_activity.id_job - 1].get_activity(temp_activity.id_activity)
				operation = activity.get_operation(temp_activity.operation_done.id_operation)
				list_activities.append((temp_activity.operation_done.time, activity, operation))
		# Ordering activities by time
		list_activities = sorted(list_activities, key=lambda x: x[0])
		individual = [(activity, operation) for (_, activity, operation) in list_activities]
		del temp_jobs_list, temp_machines_list
		return ind_class(individual)

	# Initialize a population
	def init_population(self, total_population):
		return [self.__toolbox.individual() for _ in range(total_population)]

	# Compute the time an individual take
	def compute_time(self, individual):
		# List matching the activities to the time it takes place
		list_time = []
		# Operation schedule on machines indexed by machines' id
		schedule = {}
		for machine in self.__machines:
			schedule.update({machine.id_machine: []})
		# Operation done indexed by job's id
		operations_done = {}
		for job in self.__jobs:
			operations_done.update({job.id_job: []})

		# For each item in individual, we compute the actual time at which the operation considered start
		for activity, operation in individual:
			# Get at which time the previous operation is done
			time_last_operation, last_operation_job = operations_done.get(activity.id_job)[-1] if len(
				operations_done.get(activity.id_job)) > 0 else (0, None)
			time_last_machine, last_operation_machine = schedule.get(operation.id_machine)[-1] if len(
				schedule.get(operation.id_machine)) > 0 else (0, None)

			if last_operation_machine is None and last_operation_job is None:
				time = 0
			elif last_operation_machine is None:
				time = time_last_operation + last_operation_job.duration
			elif last_operation_job is None:
				time = time_last_machine + last_operation_machine.duration
			else:
				time = max(time_last_machine + last_operation_machine.duration,
						   time_last_operation + last_operation_job.duration)

			list_time.append(time)

			operations_done.update({activity.id_job: operations_done.get(activity.id_job) + [(time, operation)]})
			schedule.update({operation.id_machine: schedule.get(operation.id_machine) + [(time, operation)]})

		# We compute the total time we need to process all the jobs
		total_time = 0
		for machine in self.__machines:
			if len(schedule.get(machine.id_machine)) > 0:
				time, operation = schedule.get(machine.id_machine)[-1]
				if time + operation.duration > total_time:
					total_time = time + operation.duration

		return total_time, list_time

	# Evaluate the fitness for an individual, in our case it means compute the total time an individual take
	def evaluate_individual(self, individual):
		return self.compute_time(individual)[0],

	# Create a mutant based on an individual
	# In our case it means select another operation within an activity with multiple choices for an operation
	@staticmethod
	def mutate_individual(individual):
		# Select the possible candidates, meaning the activities with multiple choices for an operation
		candidates = list(filter(lambda element: len(element[0].next_operations) > 1, individual))
		# If some candidates have been found, mutate a random one
		if len(candidates) > 0:
			mutant_activity, previous_operation = candidates[random.randint(0, len(candidates) - 1)]
			id_mutant_activity = [element[0] for element in individual].index(mutant_activity)
			mutant_operation = previous_operation
			while mutant_operation.id_operation == previous_operation.id_operation:
				mutant_operation = mutant_activity.next_operations[
					random.randint(0, len(mutant_activity.next_operations) - 1)]
			individual[id_mutant_activity] = (mutant_activity, mutant_operation)
		# Remove the previous fitness value because it is deprecated
		del individual.fitness.values
		# Return the mutant
		return individual

	# Permute an individual
	# In our case it means select an activity and permute it with another
	# It needs to meet some constraint to be efficient:
	#	You can't move an activity before or after another one from the same job
	@staticmethod
	def compute_bounds(permutation, considered_index):
		considered_activity, _ = permutation[considered_index]
		min_index = key = 0
		max_index = len(permutation) - 1
		while key < max_index:
			activity, _ = permutation[key]
			if activity.id_job == considered_activity.id_job:
				if min_index < key < considered_index:
					min_index = key
				if considered_index < key < max_index:
					max_index = key
			key += 1
		return min_index, max_index

	def permute_individual(self, individual):
		permutation_possible = False
		considered_index = considered_permutation_index = 0
		while not permutation_possible:
			considered_index = min_index = max_index = 0
			# Loop until we can make some moves, i.e. when max_index - min_index > 2
			while max_index - min_index <= 2:
				considered_index = random.randint(0, len(individual) - 1)
				min_index, max_index = self.compute_bounds(individual, considered_index)

			# Select a random activity within those bounds (excluded) to permute with
			considered_permutation_index = random.randint(min_index + 1, max_index - 1)
			min_index_permutation, max_index_permutation = self.compute_bounds(individual,
																			   considered_permutation_index)
			if min_index_permutation < considered_index < max_index_permutation:
				permutation_possible = considered_index != considered_permutation_index

		# A possible permutation has been found
		individual[considered_index], individual[considered_permutation_index] = individual[
																					 considered_permutation_index], \
																				 individual[considered_index]
		return individual

	# Move an activity inside the scheduler (different than swapping)
	def move_individual(self, individual):
		considered_index = min_index = max_index = 0
		# Loop until we can make some moves, i.e. when max_index - min_index > 2
		while max_index - min_index <= 2:
			considered_index = random.randint(0, len(individual) - 1)
			min_index, max_index = self.compute_bounds(individual, considered_index)
		# Loop until we find a different index to move to
		new_index = random.randint(min_index + 1, max_index - 1)
		while considered_index == new_index:
			new_index = random.randint(min_index + 1, max_index - 1)
		# Move the activity inside the scheduler
		individual.insert(new_index, individual.pop(considered_index))
		return individual

	def evolve_individual(self, individual, mutation_probability, permutation_probability, move_probability):
		future_individual = copy.deepcopy(individual)
		if random.randint(0, 100) < mutation_probability:
			future_individual = self.mutate_individual(future_individual)
		if random.randint(0, 100) < permutation_probability:
			future_individual = self.permute_individual(future_individual)
		if random.randint(0, 100) < move_probability:
			future_individual = self.move_individual(future_individual)
		return future_individual

	# Run a tournament between individuals within a population to get some of them
	@staticmethod
	def run_tournament(population, total=10):
		# Because you can't have a bigger population as a result of the tournament, we assert that constraint
		assert total <= len(population)
		new_population = []
		while len(new_population) < total:
			first_individual = population[random.randint(0, len(population) - 1)]
			second_individual = population[random.randint(0, len(population) - 1)]
			if first_individual.fitness.values[0] < second_individual.fitness.values[0]:
				new_population.append(first_individual)
				population.remove(first_individual)
			else:
				new_population.append(second_individual)
				population.remove(second_individual)
		del population
		return new_population

	# Simulate the individual with the machines
	def run_simulation(self, individual):
		total_time, list_time = self.compute_time(individual)
		for key, (individual_activity, individual_operation) in enumerate(individual):
			activity = self.__jobs[individual_activity.id_job - 1].get_activity(individual_activity.id_activity)
			operation = activity.get_operation(individual_operation.id_operation)
			operation.time = list_time[key]
			operation.place_of_arrival = 0
			activity.terminate_operation(operation)
		return total_time

	# Run the genetic scheduler
	def run_genetic(self, total_population=10, max_generation=100, verbose=False):
		assert total_population > 0, max_generation > 0
		# Disable print if verbose is False
		if not verbose:
			sys.stdout = None

		creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
		creator.create("Individual", list, fitness=creator.FitnessMin)

		self.__toolbox.register("individual", self.init_individual, creator.Individual, size=1)
		self.__toolbox.register("mutate", self.mutate_individual)
		self.__toolbox.register("permute", self.permute_individual)
		self.__toolbox.register("evaluate", self.evaluate_individual)

		print(colored("[GENETIC]", "cyan"), "Generating population")
		population = self.init_population(total_population)

		best = population[0]
		best.fitness.values = self.evaluate_individual(best)
		print(colored("[GENETIC]", "cyan"), "Starting evolution for", max_generation, "generations")
		for current_generation in range(max_generation):
			# Generate mutation and permutation probabilities for the next generation
			mutation_probability = random.randint(0, 100)
			permutation_probability = random.randint(0, 100)
			move_probability = random.randint(0, 100)
			# Evolve the population
			print(colored("[GENETIC]", "cyan"), "Evolving to generation", current_generation + 1)
			mutants = list(set([random.randint(0, total_population - 1) for _ in
								range(random.randint(1, total_population))]))
			print(colored("[GENETIC]", "cyan"), "For this generation,", len(mutants), "individual(s) will mutate")
			for key in mutants:
				individual = population[key]
				population.append(
					self.evolve_individual(individual, mutation_probability, permutation_probability, move_probability))
			# Evaluate the entire population
			fitnesses = list(map(self.evaluate_individual, population))
			for ind, fit in zip(population, fitnesses):
				ind.fitness.values = fit
				if best.fitness.values[0] > ind.fitness.values[0]:
					print(colored("[GENETIC]", "cyan"), "A better individual has been found. New best time = ",
						  ind.fitness.values[0])
					best = copy.deepcopy(ind)
			population = self.run_tournament(population, total=total_population)

		print(colored("[GENETIC]", "cyan"), "Evolution finished")
		if self.constraint_order_respected(best):
			print(colored("[GENETIC]", "cyan"), "Best time found equals", best.fitness.values[0])
			print(colored("[GENETIC]", "cyan"), "Simulating work on machines")
			total_time = self.run_simulation(best)
			print(colored("[GENETIC]", "cyan"), "Simulation finished")
			print(colored("[GENETIC]", "cyan"), "Genetic scheduler finished")
		else:
			print(colored("[GENETIC]", "cyan"), "The individual doesn't match the constraint order")

		# Reenable stdout
		if not verbose:
			sys.stdout = self.__original_stdout

		return total_time


## Class EvaluateSolutions:

In [45]:

from colorama import init
from termcolor import colored

import os
import copy
import timeit


class EvaluateSolutions:
	def __init__(self, path):
		init()  # Init colorama for color display
		self.__path = path
		self.__files_list = {}

	def run(self, population_size=100, max_generation=200):
		results = {}
		print(colored("[EVALUATION]", "red"), "Population size =", population_size, "& Max generation =",
			  max_generation)
		for (path, _, filenames) in os.walk(self.__path):
			for filename in filenames:
				jobs_list, machines_list, _ = parse(os.path.join(path, filename))
				print(colored("[EVALUATION]", "red"), "Running evaluation for", filename)
				time_results = []
				computation_times = []
				for i in range(1, 6):
					print(colored("[EVALUATION]", "red"), "Starting instance", i)
					temp_machines_list, temp_jobs_list = copy.deepcopy(machines_list), copy.deepcopy(
						jobs_list)
					start = timeit.default_timer()	
					s = GeneticScheduler(temp_machines_list, temp_jobs_list)
					time = s.run_genetic(total_population=population_size, max_generation=max_generation, verbose=False)
					stop = timeit.default_timer()
					print(colored("[EVALUATION]", "red"), "Done in", time, "units of time,", stop - start, "seconds")
					time_results.append(time)
					computation_times.append(stop - start)
					del s, start, stop, time, temp_jobs_list, temp_machines_list
				average_time = sum(time_results) / float(len(time_results))
				computation_time = sum(computation_times) / float(len(computation_times))
				print(colored("[EVALUATION]", "red"), "Average time found for", filename, ":", average_time)
				results.update({filename: (average_time, computation_time)})
				del time_results, computation_times

			print(colored("[EVALUATION]", "red"), "Resulting average time for files in", self.__path)
			for filename, (average_time, computation_time) in results.items():
				print("\t", filename, "- Average time =", average_time, "for an average computation time of",
					  computation_time, "seconds")


## Class Heuristics

In [46]:
class Heuristics:
	# When a choice between multiple operations is available, always pick the first one
	@staticmethod
	def select_first_operation(jobs_to_be_done, max_operations, _):
		best_candidates = {}

		for job in jobs_to_be_done:
			current_activity = job.current_activity
			best_operation = current_activity.shortest_operation

			if best_candidates.get(best_operation.id_machine) is None:
				best_candidates.update({best_operation.id_machine: [(current_activity, best_operation)]})
			elif len(best_candidates.get(best_operation.id_machine)) < max_operations:
				best_candidates.get(best_operation.id_machine).append((current_activity, best_operation))
			else:
				list_operations = best_candidates.get(best_operation.id_machine)

				for key, (_, operation) in enumerate(list_operations):
					if operation.duration < best_operation.duration:
						list_operations.pop(key)
						break

				if len(list_operations) < max_operations:
					list_operations.append((current_activity, best_operation))

		return best_candidates

	# LEPT rule
	@staticmethod
	def longest_expected_processing_time_first(jobs_to_be_done, max_operations, current_time):
		pass

	# Shortest slack per remaining operations
	# S/RO = [(Due date - Today’s date) - Total shop time remaining] / Number of operations remaining
	@staticmethod
	def shortest_slack_per_remaining_operations(jobs_to_be_done, max_operations, current_time):
		pass

	# Highest critical ratios
	# CR = Processing Time / (Due Date – Current Time)
	@staticmethod
	def highest_critical_ratios(jobs_to_be_done, max_operations, current_time):
		best_candidates = {}
		critical_ratios = {}
		assignment = {}

		for job in jobs_to_be_done:
			current_activity = job.current_activity

			# Compute critical ratio for each operation for an activity
			for operation in current_activity.next_operations:
				critical_ratio = operation.duration / (job.total_shop_time - current_time)
				critical_ratios.update({job.id_job: (current_activity, operation, critical_ratio)})

			for id_job, current_activity, operation, critical_ratio in critical_ratios.items():
				if assignment.get(operation.id_machine) is None:
					assignment.update({operation.id_machine: (current_activity, operation, critical_ratio)})

				elif len(assignment.get(operation.id_machine)) < max_operations:
					list_operations = assignment.get(operation.id_machine)
					list_operations.append((current_activity, operation, critical_ratio))
					best_candidates.update({operation.id_machine: list_operations})

	# TODO: end that

	# Assign randomly jobs to machine
	@staticmethod
	def random_operation_choice(jobs_to_be_done, max_operations, _):
		import random
		best_candidates = {}
		dict_operations = {}

		for job in jobs_to_be_done:
			current_activity = job.current_activity
			for operation in current_activity.next_operations:
				if dict_operations.get(operation.id_machine) is None:
					dict_operations.update({operation.id_machine: [(current_activity, operation)]})
				else:
					dict_operations.get(operation.id_machine).append((current_activity, operation))

		for machine, list_operations in dict_operations.items():
			best_candidates.update({machine: list(
				set([list_operations[random.randint(0, len(list_operations) - 1)] for _ in range(max_operations)]))})

		return best_candidates

	## Creation of Machine assignment and operation sequence lists (need improvement)
	##
	@staticmethod
	def initialisation_list(jobs_to_be_done):
		machine_assignment = []
		operation_sequence = []
		for job in jobs_to_be_done:
			for activity in job.activities_to_be_done:
				operation_sequence.append(job.id_job)
				machine_assignment.append(activity.next_operations[0].id_machine)
		print("machine assignment :")
		for machine in machine_assignment:
			print(str(machine))
		print("operation sequence :")
		for operation in operation_sequence:
			print(operation)


## Class Benchmarks

In [47]:
from colorama import init
from termcolor import colored

import copy
import numpy as np
import timeit


class Benchmarks:
	def __init__(self, path, start=0, stop=4, samples=20):
		init()  # Init colorama for color display
		self.__size = list(set(np.logspace(start, stop, num=samples, dtype=np.int)))
		self.__name = path.split('/')[-1].split('.')[0]
		self.__jobs_list, self.__machines_list, self.__number_max_operations = parse(path)

	# Benchmarks the genetic scheduler when we increase total population
	def population(self, max_generation=100):
		benchmarks_population = []

		print(colored("[BENCHMARKS]", "yellow"), "Gathering computation time for different population sizes")
		for size in self.__size:
			print(colored("[BENCHMARKS]", "yellow"), "Current population size =", size)
			start = timeit.default_timer()
			temp_machines_list, temp_jobs_list = copy.deepcopy(self.__machines_list), copy.deepcopy(self.__jobs_list)
			s = GeneticScheduler(temp_machines_list, temp_jobs_list)
			total_time = s.run_genetic(total_population=size, max_generation=max_generation, verbose=False)
			stop = timeit.default_timer()
			print(colored("[BENCHMARKS]", "yellow"), "Done in", stop - start, "seconds")
			benchmarks_population.append((size, max_generation, stop - start, total_time))
			del s, temp_machines_list, temp_jobs_list
		print(colored("[BENCHMARKS]", "yellow"), "Gathering for different population sizes completed")

		Drawer.plot2d(self.__name + "_benchmarks_population", [element[0] for element in benchmarks_population],
					  [element[2] for element in benchmarks_population],
					  "Time as a function of population size for " + self.__name + " (" + str(
						  max_generation) + " generations)", "Population size", "Time (in seconds)", approximate=True)

		return benchmarks_population

	# Benchmarks the genetic scheduler when we increase max generation
	def generation(self, population_size=100):
		benchmarks_generation = []

		print(colored("[BENCHMARKS]", "yellow"), "Gathering computation time for different generation numbers")
		for size in self.__size:
			print(colored("[BENCHMARKS]", "yellow"), "Current max generation =", size)
			start = timeit.default_timer()
			temp_machines_list, temp_jobs_list = copy.deepcopy(self.__machines_list), copy.deepcopy(
				self.__jobs_list)
			s = GeneticScheduler(temp_machines_list, temp_jobs_list)
			total_time = s.run_genetic(total_population=population_size, max_generation=size, verbose=False)
			stop = timeit.default_timer()
			print(colored("[BENCHMARKS]", "yellow"), "Done in", stop - start, "seconds")
			benchmarks_generation.append((population_size, size, stop - start, total_time))
			del s, temp_machines_list, temp_jobs_list
		print(colored("[BENCHMARKS]", "yellow"), "Gathering for different population sizes completed")

		Drawer.plot2d(self.__name + "_benchmarks_generation", [element[1] for element in benchmarks_generation],
					  [element[2] for element in benchmarks_generation],
					  "Time as a function of max generation for " + self.__name + " (" + str(
						  population_size) + " individuals)", "Max generation", "Time (in seconds)")

		return benchmarks_generation

	# Benchmarks the genetic scheduler for different couples of population size and max generation
	def population_and_generation(self):
		import itertools
		benchmarks_population_and_generation = []
		params = itertools.product(self.__size, repeat=2)
		print(colored("[BENCHMARKS]", "yellow"),
			  "Gathering times for different couples of population size and max generation")
		for population, generation in params:
			print(colored("[BENCHMARKS]", "yellow"), "Current population size =", population, ", max generation =",
				  generation)
			start = timeit.default_timer()
			temp_machines_list, temp_jobs_list = copy.deepcopy(self.__machines_list), copy.deepcopy(
				self.__jobs_list)
			s = GeneticScheduler(temp_machines_list, temp_jobs_list)
			total_time = s.run_genetic(total_population=population, max_generation=generation, verbose=False)
			stop = timeit.default_timer()
			print(colored("[BENCHMARKS]", "yellow"), "Done in", stop - start, "seconds")
			benchmarks_population_and_generation.append((population, generation, stop - start, total_time))
			del s, temp_machines_list, temp_jobs_list
		print(colored("[BENCHMARKS]", "yellow"), "Gathering for different couples completed")

		# Plot graph with solution time as Z axis
		Drawer.plot3d(self.__name + "_benchmarks_generation_with_solution_time",
					  [element[0] for element in benchmarks_population_and_generation],
					  [element[1] for element in benchmarks_population_and_generation],
					  [element[3] for element in benchmarks_population_and_generation],
					  "Best time found as a function of population size and max generation", "Population size",
					  "Max generation", "Total time")

		# Plot graph with computation time as Z axis
		Drawer.plot3d(self.__name + "_benchmarks_generation_with_computation_time",
					  [element[0] for element in benchmarks_population_and_generation],
					  [element[1] for element in benchmarks_population_and_generation],
					  [element[2] for element in benchmarks_population_and_generation],
					  "Computation time as a function of population size and max generation", "Population size",
					  "Max generation", "Computation time")

		return benchmarks_population_and_generation

	# Run all the benchmarks
	def run(self):
		# Run benchmark with constant max generation
		benchmark_population = self.population()

		# Run benchmark with constant population size
		benchmark_generation = self.generation()

		# Run benchmark with changing population size and max generation
		benchmark_population_and_generation = self.population_and_generation()

		return benchmark_population, benchmark_generation, benchmark_population_and_generation


In [None]:
from activity import Activity
from operation import Operation
from machine import Machine

import os
import re


def parse(path):
	with open(os.path.join(os.getcwd(), path), "r") as data:
		total_jobs, total_machines, max_operations = re.findall('\S+', data.readline())
		number_total_jobs, number_total_machines, number_max_operations = int(total_jobs), int(total_machines), int(float(
			max_operations))
		jobs_list = []
		# Current job's id
		id_job = 1

		for key, line in enumerate(data):
			if key >= number_total_jobs:
				break
			# Split data with multiple spaces as separator
			parsed_line = re.findall('\S+', line)
			# Current job
			job = Job(id_job)
			# Current activity's id
			id_activity = 1
			# Current item of the parsed line
			i = 1

			while i < len(parsed_line):
				# Total number of operations for the activity
				number_operations = int(parsed_line[i])
				# Current activity
				activity = Activity(job, id_activity)
				for id_operation in range(1, number_operations + 1):
					activity.add_operation(Operation(id_operation, int(parsed_line[i + 2 * id_operation - 1]),
													 int(parsed_line[i + 2 * id_operation])))

				job.add_activity(activity)
				i += 1 + 2 * number_operations
				id_activity += 1

			jobs_list.append(job)
			id_job += 1

	# Machines
	machines_list = []
	for id_machine in range(1, number_total_machines + 1):
		machines_list.append(Machine(id_machine, number_max_operations))

	return jobs_list, machines_list, number_max_operations


ModuleNotFoundError: No module named 'job'

## Main.py

In [49]:
import copy
import os
import sys
import timeit
import warnings


warnings.simplefilter('ignore', RuntimeWarning)

path = "data/test.fjs" if len(sys.argv) == 1 else sys.argv[1]
jobs_list, machines_list, number_max_operations = parse(path)
number_total_machines = len(machines_list)
number_total_jobs = len(jobs_list)

print("Scheduler launched with the following parameters:")
print('\t', number_total_jobs, "jobs")
print('\t', number_total_machines, "machine(s)")
print('\t', "Machine(s) can process", str(number_max_operations), "operation(s) at the same time")
print("\n")

loop = True
while loop:
	temp_jobs_list = copy.deepcopy(jobs_list)
	temp_machines_list = copy.deepcopy(machines_list)
	print(30 * "-", "MENU", 30 * "-")
	print("1. Scheduler with an heuristic")
	print("2. Genetic Scheduler")
	print("3. Benchmarks")
	print("4. Run an evaluation of the solutions")
	print("5. Exit")
	print(66 * "-")

	choice = input("Enter your choice [1-5]: ")
	if choice == "1":
		heuristic = None
		while heuristic is None:
			print("Heuristics availables:")
			print("\t", "1. When an activity has a multiple choice for the operations, choose the shortest one")
			print("\t", "2. Assign operations to machines randomly")
			heuristic_choice = input("Enter your choice [1-2]: ")
			if heuristic_choice == "1":
				heuristic = Heuristics.select_first_operation
			elif heuristic_choice == "2":
				heuristic = Heuristics.random_operation_choice
			else:
				input("Wrong option selection. Enter any key to try again...")

		start = timeit.default_timer()
		s = Scheduler(temp_machines_list, number_max_operations, temp_jobs_list)
		s.run(heuristic)
		stop = timeit.default_timer()
		print("Finished in " + str(stop - start) + " seconds")

		draw = input("Draw the schedule ? [Y/N, default=Y] ")
		if draw == "n" or draw == "N":
			continue
		else:
			Drawer.draw_schedule(number_total_machines, number_max_operations, temp_jobs_list,
								 filename="output_scheduler.png")
		del s
	elif choice == "2":
		string = input("Total population [default=20] ")
		try:
			total_population = int(string)
		except ValueError:
			total_population = 20
		string = input("Max generation [default=400] ")
		try:
			max_generation = int(string)
		except ValueError:
			max_generation = 400
		start = timeit.default_timer()
		s = GeneticScheduler(temp_machines_list, temp_jobs_list)
		s.run_genetic(total_population=total_population, max_generation=max_generation, verbose=True)
		stop = timeit.default_timer()
		print("Finished in " + str(stop - start) + " seconds")

		draw = input("Draw the schedule ? [Y/N, default=Y] ")
		if draw == "n" or draw == "N":
			continue
		else:
			Drawer.draw_schedule(number_total_machines, 1, temp_jobs_list, filename="output_genetic.png")
		del s
	elif choice == "3":
		b = Benchmarks(path)
		b.run()
		del b
	elif choice == "4":
		path_data_set = None
		while path_data_set is None:
			print("Data set availables:")
			print("\t", "1. Barnes")
			print("\t", "2. Brandimarte")
			print("\t", "3. Dauzere")
			print("\t", "4. Hurink - edata")
			print("\t", "5. Hurink - rdata")
			print("\t", "6. Hurink - sdata")
			print("\t", "7. Hurink - vdata")
			data_set_choice = input("Enter your choice [1-7]: ")
			if data_set_choice == "1":
				path_data_set = os.path.join("data", "Barnes", "Text")
			elif data_set_choice == "2":
				path_data_set = os.path.join("data", "Brandimarte_Data", "Text")	
			elif data_set_choice == "3":
				path_data_set = os.path.join("data", "Dauzere_Data", "Text")
			elif data_set_choice == "4":
				path_data_set = os.path.join("data", "Hurink_Data", "Text", "edata")
			elif data_set_choice == "5":
				path_data_set = os.path.join("data", "Hurink_Data", "Text", "rdata")
			elif data_set_choice == "6":
				path_data_set = os.path.join("data", "Hurink_Data", "Text", "sdata")
			elif data_set_choice == "7":
				path_data_set = os.path.join("data", "Hurink_Data", "Text", "vdata")
			else:
				input("Wrong option selection. Enter any key to try again...")
		print(os.path.dirname(__file__))
		e = EvaluateSolutions(path_data_set)
		e.run()
		del e
	elif choice == "5":
		loop = False
	else:
		input("Wrong option selection. Enter any key to try again...")

	del temp_jobs_list, temp_machines_list


OSError: [Errno 22] Invalid argument: 'c:\\Users\\saadr\\BIO-Inspire\\bio-ai\\--f=c:\\Users\\saadr\\AppData\\Roaming\\jupyter\\runtime\\kernel-v3f6b21393770184fb26fd234b12ec018c705f1adf.json\\r'