In [37]:
from datetime import datetime, timedelta
import heapq
from typing import List, Set, Optional, Tuple


class Task:
    """
    Represents a task that can be scheduled with required resources and workers.

    Attributes:
        task_id (int): Unique identifier for the task.
        name (str): Name or title of the task.
        priority (int): Task priority, higher value means higher priority.
        duration (int): Duration of the task in hours.
        required_skills (set): Set of skills required to complete the task.
        due_date (datetime): The due date of the task.
        dependencies (list): List of task IDs that must be completed before this task can start.
        assigned_worker (Worker): The worker assigned to the task (if any).
        start_time (datetime): The time when the task starts.
        end_time (datetime): The time when the task ends.
        assigned_resource (Resource): The resource assigned to the task (if any).
    """

    def __init__(self, task_id: int, name: str, priority: int, duration: int, 
                 required_skills: List[str], due_date: datetime, dependencies: Optional[List[int]] = None):
        """
        Initializes a Task object.

        Args:
            task_id (int): Unique identifier for the task.
            name (str): Name or title of the task.
            priority (int): Task priority.
            duration (int): Task duration in hours.
            required_skills (list): List of skills required for the task.
            due_date (datetime): Due date of the task.
            dependencies (list, optional): List of task IDs that need to be completed before this task. Defaults to None.
        """
        self.task_id: int = task_id
        self.name: str = name
        self.priority: int = priority
        self.duration: int = duration
        self.required_skills: Set[str] = set(required_skills)
        self.due_date: datetime = due_date
        self.dependencies: List[int] = dependencies if dependencies else []
        self.assigned_worker: Optional[Worker] = None
        self.start_time: Optional[datetime] = None
        self.end_time: Optional[datetime] = None
        self.assigned_resource: Optional[Resource] = None


class Worker:
    """
    Represents a worker who can be assigned tasks based on their skills and availability.

    Attributes:
        worker_id (int): Unique identifier for the worker.
        name (str): Name of the worker.
        skills (set): Set of skills possessed by the worker.
        availability (list): List of time intervals (start_time, end_time) when the worker is available.
    """

    def __init__(self, worker_id: int, name: str, skills: List[str], availability: List[Tuple[datetime, datetime]]):
        """
        Initializes a Worker object.

        Args:
            worker_id (int): Unique identifier for the worker.
            name (str): Name of the worker.
            skills (list): List of skills the worker possesses.
            availability (list): List of time intervals the worker is available.
        """
        self.worker_id: int = worker_id
        self.name: str = name
        self.skills: Set[str] = set(skills)
        self.availability: List[Tuple[datetime, datetime]] = sorted(availability)

    def is_available(self, start_time: datetime, end_time: datetime) -> int:
        """
        Checks if the worker is available for a task during the given time window.

        Args:
            start_time (datetime): The proposed start time of the task.
            end_time (datetime): The proposed end time of the task.

        Returns:
            int: The index of the available slot if available, otherwise -1.
        """
        for i, (available_start, available_end) in enumerate(self.availability):
            if available_start <= start_time and available_end >= end_time:
                return i  # Return index of the available slot
        return -1

    def allocate_time(self, start_time: datetime, end_time: datetime, slot_index: int):
        """
        Adjusts the worker's availability after assigning a task to them.

        Args:
            start_time (datetime): The start time of the task.
            end_time (datetime): The end time of the task.
            slot_index (int): The index of the available slot being allocated.
        """
        available_start, available_end = self.availability.pop(slot_index)
        if available_start < start_time:
            self.availability.append((available_start, start_time))
        if available_end > end_time:
            self.availability.append((end_time, available_end))
        self.availability.sort()


class Resource:
    """
    Represents a resource (e.g., machinery or equipment) that can be allocated to tasks.

    Attributes:
        resource_id (int): Unique identifier for the resource.
        name (str): Name of the resource.
        capacity (int): Maximum number of concurrent tasks the resource can handle.
        availability (list): List of time intervals when the resource is available.
    """

    def __init__(self, resource_id: int, name: str, capacity: int):
        """
        Initializes a Resource object.

        Args:
            resource_id (int): Unique identifier for the resource.
            name (str): Name of the resource.
            capacity (int): Capacity of the resource in terms of the number of tasks it can handle concurrently.
        """
        self.resource_id: int = resource_id
        self.name: str = name
        self.capacity: int = capacity
        self.availability: List[Tuple[datetime, datetime, int]] = []

    def is_available(self, start_time: datetime, end_time: datetime) -> bool:
        """
        Checks if the resource is available for a task during the given time window.

        Args:
            start_time (datetime): The proposed start time of the task.
            end_time (datetime): The proposed end time of the task.

        Returns:
            bool: True if the resource is available, otherwise False.
        """
        if not self.availability:
            return True
        for res_start, res_end, allocated in self.availability:
            if not (res_end <= start_time or res_start >= end_time) and allocated >= self.capacity:
                return False
        return True

    def allocate(self, start_time: datetime, end_time: datetime):
        """
        Allocates the resource to a task during the specified time window.

        Args:
            start_time (datetime): The start time of the task.
            end_time (datetime): The end time of the task.
        """
        self.availability.append((start_time, end_time, 1))
        self.availability.sort()


class TaskScheduler:
    """
    Schedules tasks, assigns workers and resources based on task priority, availability, and dependencies.

    Attributes:
        tasks (list): List of tasks to be scheduled.
        workers (list): List of workers available for task assignments.
        resources (list): List of resources available for task assignments.
        scheduled_tasks (list): List of tasks that have been scheduled with workers and resources.
    """

    def __init__(self):
        """
        Initializes a TaskScheduler object.
        """
        self.tasks: List[Tuple[int, Task]] = []
        self.workers: List[Worker] = []
        self.resources: List[Resource] = []
        self.scheduled_tasks: List[Task] = []

    def add_task(self, task: Task):
        """
        Adds a task to the scheduler.

        Args:
            task (Task): The task to be added to the scheduler.
        """
        heapq.heappush(self.tasks, (-task.priority, task))  # Max heap based on priority

    def add_worker(self, worker: Worker):
        """
        Adds a worker to the scheduler.

        Args:
            worker (Worker): The worker to be added to the scheduler.
        """
        self.workers.append(worker)

    def add_resource(self, resource: Resource):
        """
        Adds a resource to the scheduler.

        Args:
            resource (Resource): The resource to be added to the scheduler.
        """
        self.resources.append(resource)

    def schedule_tasks(self) -> List[Task]:
        """
        Schedules tasks by finding available workers and resources for each task, considering task dependencies.

        Returns:
            List[Task]: A list of scheduled tasks, with workers and resources assigned.
        """
        pending_tasks: dict[int, Task] = {}  # Track tasks with unsatisfied dependencies
        finished_tasks: dict[int, datetime] = {}  # Track completed tasks with end times

        while self.tasks:
            _, task = heapq.heappop(self.tasks)

            # Ensure dependencies are completed
            if any(dep not in finished_tasks for dep in task.dependencies):
                pending_tasks[task.task_id] = task
                continue

            # Calculate the earliest possible start time
            earliest_start = max(datetime.now(), *[finished_tasks.get(dep, datetime.min) for dep in task.dependencies] or [datetime.now()])
            end_time = earliest_start + timedelta(hours=task.duration)

            # Find the earliest worker who is available and has the right skills
            for worker in self.workers:
                slot_index = worker.is_available(earliest_start, end_time)

                if slot_index != -1 and worker.skills >= task.required_skills:
                    # Find an available resource if required
                    for resource in self.resources:
                        if resource.is_available(earliest_start, end_time):
                            # Assign worker and resource to task
                            task.assigned_worker = worker
                            task.start_time = earliest_start
                            task.end_time = end_time
                            task.assigned_resource = resource

                            worker.allocate_time(earliest_start, end_time, slot_index)
                            resource.allocate(earliest_start, end_time)

                            self.scheduled_tasks.append(task)
                            finished_tasks[task.task_id] = task.end_time  # Track when this task is done
                            break
                    break

        return self.scheduled_tasks



# Example Usage
task1 = Task(1, "Data Processing", 3, 4, ["Python"], datetime(2025, 2, 15))
task2 = Task(2, "Report Generation", 2, 2, ["SQL"], datetime(2025, 2, 16), dependencies=[1])

worker1 = Worker(1, "Alice", ["Python", "SQL"], [(datetime(2025, 2, 10), datetime(2025, 2, 20))])

resource1 = Resource(1, "Server A", capacity=2)

scheduler = TaskScheduler()
scheduler.add_task(task1)
scheduler.add_task(task2)
scheduler.add_worker(worker1)
scheduler.add_resource(resource1)

scheduled = scheduler.schedule_tasks()

if scheduled:
    for task in scheduled:
        print(f"Task {task.name} assigned to {task.assigned_worker.name} "
              f"using {task.assigned_resource.name} from {task.start_time} to {task.end_time}")
else:
    print("No tasks were scheduled.")

Task Data Processing assigned to Alice using Server A from 2025-02-10 22:02:34.088547 to 2025-02-11 02:02:34.088547
Task Report Generation assigned to Alice using Server A from 2025-02-11 02:02:34.088547 to 2025-02-11 04:02:34.088547


# Construction industry example

In [36]:
# Tasks
task1 = Task(1, "Foundation Construction", 5, 40, ["Masonry"], datetime(2024, 5, 1))
task2 = Task(2, "Structural Framing", 4, 30, ["Carpentry"], datetime(2024, 5, 10))
task3 = Task(3, "Electrical Wiring", 3, 15, ["Electrician"], datetime(2024, 6, 1), dependencies=[2])
task4 = Task(4, "Interior Finishing", 2, 20, ["Interior Design"], datetime(2024, 6, 10), dependencies=[3])

# Workers
worker1 = Worker(1, "John", ["Masonry"], [(datetime(2024, 5, 1), datetime(2024, 5, 20))])
worker2 = Worker(2, "Paul", ["Carpentry"], [(datetime(2024, 5, 10), datetime(2024, 6, 5))])
worker3 = Worker(3, "Alice", ["Electrician"], [(datetime(2024, 6, 1), datetime(2024, 6, 15))])

# Resources
resource1 = Resource(1, "Excavator", capacity=1)
resource2 = Resource(2, "Cranes", capacity=2)

# Scheduler
scheduler = TaskScheduler()
scheduler.add_task(task1)
scheduler.add_task(task2)
scheduler.add_task(task3)
scheduler.add_task(task4)
scheduler.add_worker(worker1)
scheduler.add_worker(worker2)
scheduler.add_worker(worker3)
scheduler.add_resource(resource1)
scheduler.add_resource(resource2)

scheduled_tasks = scheduler.schedule_tasks()

# Output scheduled tasks
for task in scheduled_tasks:
    print(f"Task {task.name} assigned to {task.assigned_worker.name} "
          f"using {task.assigned_resource.name} from {task.start_time} to {task.end_time}")

# Software development example

In [22]:
# Tasks
task1 = Task(1, "User Authentication Implementation", 5, 20, ["Backend"], datetime(2025, 3, 1))
task2 = Task(2, "UI Design", 4, 15, ["Frontend"], datetime(2025, 3, 5))
task3 = Task(3, "Backend API Development", 6, 25, ["Backend"], datetime(2025, 3, 10))
task4 = Task(4, "Unit Testing", 3, 10, ["QA"], datetime(2025, 3, 20), dependencies=[3])
task5 = Task(5, "Deployment", 2, 8, ["DevOps"], datetime(2025, 3, 25), dependencies=[4])

# Workers
worker1 = Worker(1, "Alice", ["Backend"], [(datetime(2025, 3, 1), datetime(2025, 3, 15))])
worker2 = Worker(2, "Bob", ["Frontend"], [(datetime(2025, 3, 5), datetime(2025, 3, 15))])
worker3 = Worker(3, "Charlie", ["QA"], [(datetime(2025, 3, 20), datetime(2025, 3, 25))])
worker4 = Worker(4, "Diana", ["DevOps"], [(datetime(2025, 3, 25), datetime(2025, 3, 30))])

# Resources
resource1 = Resource(1, "Test Server", capacity=1)
resource2 = Resource(2, "Production Server", capacity=1)

# Scheduler
scheduler = TaskScheduler()
scheduler.add_task(task1)
scheduler.add_task(task2)
scheduler.add_task(task3)
scheduler.add_task(task4)
scheduler.add_task(task5)
scheduler.add_worker(worker1)
scheduler.add_worker(worker2)
scheduler.add_worker(worker3)
scheduler.add_worker(worker4)
scheduler.add_resource(resource1)
scheduler.add_resource(resource2)

scheduled_tasks = scheduler.schedule_tasks()

# Output scheduled tasks
for task in scheduled_tasks:
    print(f"Task {task.name} assigned to {task.assigned_worker.name} "
          f"using {task.assigned_resource.name} from {task.start_time} to {task.end_time}")


# Event planning

In [21]:
# Tasks
task1 = Task(1, "Venue Setup", 3, 10, ["Event Coordination"], datetime(2025, 5, 1))
task2 = Task(2, "Catering Setup", 2, 8, ["Catering"], datetime(2025, 5, 1))
task3 = Task(3, "Guest Registration", 2, 6, ["Registration"], datetime(2025, 5, 2), dependencies=[1])
task4 = Task(4, "Security Setup", 1, 5, ["Security"], datetime(2025, 5, 2), dependencies=[1])

# Workers
worker1 = Worker(1, "Eve", ["Event Coordination"], [(datetime(2025, 5, 1), datetime(2025, 5, 2))])
worker2 = Worker(2, "Frank", ["Catering"], [(datetime(2025, 5, 1), datetime(2025, 5, 2))])
worker3 = Worker(3, "Grace", ["Registration"], [(datetime(2025, 5, 2), datetime(2025, 5, 3))])
worker4 = Worker(4, "Hank", ["Security"], [(datetime(2025, 5, 2), datetime(2025, 5, 3))])

# Resources
resource1 = Resource(1, "Event Venue", capacity=1)
resource2 = Resource(2, "Catering Equipment", capacity=1)
resource3 = Resource(3, "Sound System", capacity=1)

# Scheduler
scheduler = TaskScheduler()
scheduler.add_task(task1)
scheduler.add_task(task2)
scheduler.add_task(task3)
scheduler.add_task(task4)
scheduler.add_worker(worker1)
scheduler.add_worker(worker2)
scheduler.add_worker(worker3)
scheduler.add_worker(worker4)
scheduler.add_resource(resource1)
scheduler.add_resource(resource2)
scheduler.add_resource(resource3)

scheduled_tasks = scheduler.schedule_tasks()

# Output scheduled tasks
for task in scheduled_tasks:
    print(f"Task {task.name} assigned to {task.assigned_worker.name} "
          f"using {task.assigned_resource.name} from {task.start_time} to {task.end_time}")


TypeError: '<' not supported between instances of 'Task' and 'Task'