In [1]:
#Importing read-sm-files.py
import ReadSMFIles
from Resource import Resource
from Task import Task
from Schedule import Schedule
import random
from typing import List, Tuple, Callable, Optional
import numpy as np
import matplotlib.pyplot as plt

#reading j30.sm/j301_1.sm file
sm_file = ReadSMFIles.SMFileParser.parse_sm_file("j30.sm/j301_1.sm")

print(sm_file)


(   renewable  nonrenewable  doubly_constrained
0          4             0                   0, {'pronr': 1, 'jobs': 30, 'rel_date': 0, 'duedate': 38, 'tardcost': 26, 'MPM_Time': 38},     jobnr  modes  successors_count    successors
0       1      1                 3     [2, 3, 4]
1       2      1                 3   [6, 11, 15]
2       3      1                 3    [7, 8, 13]
3       4      1                 3    [5, 9, 10]
4       5      1                 1          [20]
5       6      1                 1          [30]
6       7      1                 1          [27]
7       8      1                 3  [12, 19, 27]
8       9      1                 1          [14]
9      10      1                 2      [16, 25]
10     11      1                 2      [20, 26]
11     12      1                 1          [14]
12     13      1                 2      [17, 18]
13     14      1                 1          [17]
14     15      1                 1          [25]
15     16      1                

In [2]:
#Creating resources
r1 = int(sm_file[4].R1[0])
r2 = int(sm_file[4].R2[0])
r3 = int(sm_file[4].R3[0])
r4 = int(sm_file[4].R4[0])

R1 = Resource('R1', r1)
R2 = Resource('R2', r2)
R3 = Resource('R3', r3)
R4 = Resource('R4', r4)

resources = [R1, R2, R3, R4]

print([resource.name for resource in resources], [resource.per_period_availability for resource in resources])


['R1', 'R2', 'R3', 'R4'] [12, 13, 4, 12]


In [3]:
#Creating jobs
jobs_enumerate = sm_file[3].jobnr
jobs_duration = sm_file[3].duration
jobs_resources = sm_file[3].resources
jobs_successors = sm_file[2].successors

jobs = [None for _ in jobs_enumerate]

for i in jobs_enumerate:
    jobs[i - 1] = Task(str(i), jobs_duration[i - 1])

for i in range(len(resources)):
    for j in range(len(jobs)):
        jobs[j].add_renewable_resource(resources[i], jobs_resources[j][i])

for i in range(len(jobs)):
    successors = jobs_successors[i]
    for j in successors:
        jobs[i].add_sucessor(jobs[j - 1])
    
# jobs = jobs[1:-1]

for job in jobs:
    print(job.name, [sucessor.name for sucessor in job.predecessors])


1 []
2 ['1']
3 ['1']
4 ['1']
5 ['4']
6 ['2']
7 ['3']
8 ['3']
9 ['4']
10 ['4']
11 ['2']
12 ['8']
13 ['3']
14 ['9', '12']
15 ['2']
16 ['10']
17 ['13', '14']
18 ['13']
19 ['8']
20 ['5', '11', '18']
21 ['16']
22 ['16', '17', '18']
23 ['20', '22']
24 ['19', '23']
25 ['10', '15', '20']
26 ['11']
27 ['7', '8']
28 ['21', '27']
29 ['19']
30 ['6', '24', '25']
31 ['26', '28']
32 ['29', '30', '31']



# ACO for RCPSP

In [4]:
import numpy as np
import random
from collections import defaultdict

class RCPSP:
    def __init__(self, file_path):
        self.parse_psplib(file_path)

    def parse_psplib(self, file_path):
        with open(file_path, 'r') as f:
            lines = f.readlines()

        self.n_activities = int(lines[0].strip())
        self.resources = []
        self.durations = {}
        self.successors = defaultdict(list)
        self.resource_requirements = defaultdict(list)
        reading_mode = None

        for line in lines[1:]:
            if "PRECEDENCE RELATIONS" in line:
                reading_mode = "precedence"
                continue
            elif "REQUESTS/DURATIONS" in line:
                reading_mode = "durations"
                continue
            elif "RESOURCEAVAILABILITIES" in line:
                self.resources = list(map(int, line.strip().split()))
                break  # Assumes rest isn't needed

            if reading_mode == "precedence":
                parts = list(map(int, line.strip().split()))
                if parts:
                    act = parts[0]
                    succs = parts[2:]
                    self.successors[act].extend(succs)
            elif reading_mode == "durations":
                parts = list(map(int, line.strip().split()))
                if parts:
                    act = parts[0]
                    duration = parts[1]
                    requirements = parts[2:]
                    self.durations[act] = duration
                    self.resource_requirements[act] = requirements

    def is_feasible(self, schedule):
        # Minimal check for feasibility, assumes Serial SGS used elsewhere
        return True


class AntColonyRCPSP:
    def __init__(self, rcpsp, n_ants=10, alpha=1, beta=2, rho=0.1, iterations=50):
        self.rcpsp = rcpsp
        self.n_ants = n_ants
        self.alpha = alpha
        self.beta = beta
        self.rho = rho
        self.iterations = iterations
        self.pheromones = np.ones((rcpsp.n_activities + 1, rcpsp.n_activities + 1))
        self.best_schedule = None
        self.best_makespan = float('inf')

    def _heuristic(self, from_act, to_act):
        return 1.0 / (self.rcpsp.durations.get(to_act, 1) + 1)

    def _probabilities(self, current, available):
        pheromone = np.array([self.pheromones[current][j] for j in available])
        heuristic = np.array([self._heuristic(current, j) for j in available])
        product = (pheromone ** self.alpha) * (heuristic ** self.beta)
        total = np.sum(product)
        return product / total if total > 0 else np.ones(len(available)) / len(available)

    def _construct_solution(self):
        schedule = []
        available = [1]  # Start with the dummy start node
        visited = set()

        while available:
            current = schedule[-1] if schedule else 1
            probs = self._probabilities(current, available)
            next_act = random.choices(available, weights=probs, k=1)[0]
            schedule.append(next_act)
            visited.add(next_act)
            available.remove(next_act)
            for succ in self.rcpsp.successors.get(next_act, []):
                if all(pred in visited for pred in self._get_predecessors(succ)) and succ not in visited:
                    available.append(succ)
        return schedule

    def _get_predecessors(self, act):
        return [i for i, succs in self.rcpsp.successors.items() if act in succs]

    def _evaluate(self, schedule):
        # Simplified serial SGS
        current_time = 0
        end_times = {}
        for act in schedule:
            earliest_start = max([end_times.get(p, 0) for p in self._get_predecessors(act)], default=0)
            end_times[act] = earliest_start + self.rcpsp.durations[act]
            current_time = max(current_time, end_times[act])
        return current_time

    def _update_pheromones(self, solutions):
        self.pheromones *= (1 - self.rho)
        for schedule, makespan in solutions:
            for i in range(len(schedule) - 1):
                self.pheromones[schedule[i]][schedule[i+1]] += 1.0 / makespan

    def run(self):
        for _ in range(self.iterations):
            solutions = []
            for _ in range(self.n_ants):
                schedule = self._construct_solution()
                makespan = self._evaluate(schedule)
                solutions.append((schedule, makespan))
                if makespan < self.best_makespan:
                    self.best_makespan = makespan
                    self.best_schedule = schedule
            self._update_pheromones(solutions)
        return self.best_schedule, self.best_makespan


# --- Usage Example ---
rcpsp = RCPSP(r"rcpsp-optimization-genetic-algorithm-main\j30.sm\j301_1.sm")
aco = AntColonyRCPSP(rcpsp, iterations=100, n_ants=20)
best_schedule, best_makespan = aco.run()
print("Best schedule:", best_schedule)
print("Best makespan:", best_makespan)


FileNotFoundError: [Errno 2] No such file or directory: 'j301_1.sm'