In [1]:
import numpy as np
import ortools
from ortools.sat.python import cp_model
from enum import Enum


In [None]:

class Satellite:
    def __init__(self, name, memory_capacity, battery_level):
        self.name = name
        self.memory_capacity = memory_capacity
        self.memory_used = 0
        self.battery_level = battery_level
        self.tasks_queue = []
        self.current_task = None

    def can_perform_task(self, task):

        if self.battery_level >= task.battery_required and self.memory_used + task.memory_required <= self.memory_capacity:
            return True
        
        return False
    
    def add_task(self, task):
        if self.memory_used + task.memory_required <= self.memory_capacity:
            self.tasks_queue.append(task)

    def set_memory_capacity(self, memory_capacity):
        self.memory_capacity = memory_capacity


class TaskType(Enum):
    IMAGE_CAPTURE = "Image Capture"
    DATA_TRANSMISSION = "Data Transmission"
    LENS_CALIBRATION = "Lens Calibration"
    

class Task:
    def __init__(self, task_id, task_type: TaskType, duration, priority=0, location=None, memory_required=0, battery_required=0):
        self.task_id = task_id
        self.task_type = task_type
        self.duration = duration
        self.priority = priority
        self.location = location
        self.memory_required = memory_required
        self.battery_required = battery_required

    
class Track:
    def __init__(self, start_time, duration, zones):
        self.start_time = start_time
        self.duration = duration
        self.zones = zones  # List of zones the satellite will pass over

    def get_zone_by_name(self, name):
        for zone in self.zones:
            if zone.name == name:
                return zone
        return None
    
    
class Zone:
    def __init__(self, name, coordinates, visibility_window, has_ground_station=False):
        self.name = name 
        self.coordinates = coordinates  # GPS coordinates (latitude, longitude)e
        self.visibility_window = visibility_window  # Time window when the zone is visible (start_time, end_time)
        self.has_ground_station  = has_ground_station  # True if the zone has a ground station for data transmission

    def is_visible(self, current_time):
        start, end = self.visibility_window
        return start <= current_time <= end
    

In [None]:
class SatelliteScheduler:
    def __init__(self, satellite, images_to_capture, track):
        self.satellite = satellite
        self.images_to_capture = images_to_capture
        self.track = track
        self.model = cp_model.CpModel()
        self.task_starts = {}
        self.task_done = {}
        self.tasks = []
        self.solver = cp_model.CpSolver()
        self.execution_log = []

    def solve(self):
        # Create variables for task start times and completion
        for task_id, task in self.images_to_capture.items():
            self.task_starts[task_id] = self.model.NewIntVar(0, self.track.duration, f"start_{task_id}")
            self.task_done[task_id] = self.model.NewBoolVar(f"done_{task_id}")
            self.tasks.append((task_id, task))

        # Add constraints for task execution
        for task_id, task in self.images_to_capture.items():
            # Ensure task can only start when the zone is visible
            zone = self.track.get_zone_by_name(task.location)
            if zone:
                start, end = zone.visibility_window
                self.model.Add(self.task_starts[task_id] >= start).OnlyEnforceIf(self.task_done[task_id])
                self.model.Add(self.task_starts[task_id] + task.duration <= end).OnlyEnforceIf(self.task_done[task_id])

            # Ensure satellite has enough memory and battery for the task
            self.model.Add(task.memory_required <= self.satellite.memory_capacity).OnlyEnforceIf(self.task_done[task_id])
            self.model.Add(task.battery_required <= self.satellite.battery_level).OnlyEnforceIf(self.task_done[task_id])

            # Calibration task must precede image capture
            if task.task_type == TaskType.IMAGE_CAPTURE:
                calibration_task_id = f"calibration_{task_id}"
                calibration_task = Task(
                    task_id=calibration_task_id,
                    task_type=TaskType.LENS_CALIBRATION,
                    duration=2,
                    memory_required=0,
                    battery_required=2,
                )
                self.task_starts[calibration_task_id] = self.model.NewIntVar(0, self.track.duration, f"start_{calibration_task_id}")
                self.task_done[calibration_task_id] = self.model.NewBoolVar(f"done_{calibration_task_id}")
                self.tasks.append((calibration_task_id, calibration_task))

                self.model.Add(self.task_starts[task_id] >= self.task_starts[calibration_task_id] + calibration_task.duration).OnlyEnforceIf(self.task_done[task_id])

        # Ensure data transmission if memory is full
        for zone in self.track.zones:
            if zone.has_ground_station:
                transmission_task_id = f"transmission_{zone.name}"
                transmission_task = Task(
                    task_id=transmission_task_id,
                    task_type=TaskType.DATA_TRANSMISSION,
                    duration=3,
                    memory_required=0,
                    battery_required=5,
                )
                self.task_starts[transmission_task_id] = self.model.NewIntVar(0, self.track.duration, f"start_{transmission_task_id}")
                self.task_done[transmission_task_id] = self.model.NewBoolVar(f"done_{transmission_task_id}")
                self.tasks.append((transmission_task_id, transmission_task))

                self.model.Add(self.satellite.memory_used >= self.satellite.memory_capacity).OnlyEnforceIf(self.task_done[transmission_task_id])
                self.model.Add(self.task_starts[transmission_task_id] >= zone.visibility_window[0]).OnlyEnforceIf(self.task_done[transmission_task_id])
                self.model.Add(self.task_starts[transmission_task_id] + transmission_task.duration <= zone.visibility_window[1]).OnlyEnforceIf(self.task_done[transmission_task_id])



        # Objective: Maximize the priority of completed image capture tasks
        self.model.Maximize(
            sum(task.priority * self.task_done[task_id] for task_id, task in self.images_to_capture.items())
        )

        # Solve the model
        status = self.solver.Solve(self.model)

        # Log execution results
        if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
            for task_id, task in self.tasks:
                if self.solver.Value(self.task_done[task_id]):
                    start_time = self.solver.Value(self.task_starts[task_id])
                    self.execution_log.append(f"Task {task_id} ({task.task_type.value}) executed at time {start_time}.")
                else:
                    self.execution_log.append(f"Task {task_id} ({task.task_type.value}) could not be executed.")
        else:
            self.execution_log.append("No feasible solution found.")

    def print_execution_log(self):
        for log_entry in self.execution_log:
            print(log_entry)


SyntaxError: unmatched ')' (2751650649.py, line 47)

In [20]:
# Satellite :
satellite = Satellite(name="Sat-1", memory_capacity=10, battery_level=100)

# Images à capturer :
images_to_capture = {
    "image1": Task(task_id="image1", task_type=TaskType.IMAGE_CAPTURE, duration=5, priority=10, location="Zone A", memory_required=10, battery_required=5),
    "image2": Task(task_id="image2", task_type=TaskType.IMAGE_CAPTURE, duration=5, priority=20, location="Zone B", memory_required=10, battery_required=5),
    "image3": Task(task_id="image3", task_type=TaskType.IMAGE_CAPTURE, duration=5, priority=15, location="Zone C", memory_required=10, battery_required=5),
    "image4": Task(task_id="image4", task_type=TaskType.IMAGE_CAPTURE, duration=5, priority=5, location="Zone D", memory_required=10, battery_required=5),
    "image5": Task(task_id="image5", task_type=TaskType.IMAGE_CAPTURE, duration=5, priority=25, location="Zone E", memory_required=10, battery_required=5),
}

# Zones :
zones = [
    Zone(name="Zone A", coordinates=(10, 20), visibility_window=(0, 30), has_ground_station=True),
    Zone(name="Zone B", coordinates=(15, 25), visibility_window=(5, 35), has_ground_station=False),
    Zone(name="Zone C", coordinates=(20, 30), visibility_window=(10, 40), has_ground_station=True),
    Zone(name="Zone D", coordinates=(25, 35), visibility_window=(15, 45), has_ground_station=False),
    Zone(name="Zone E", coordinates=(30, 40), visibility_window=(20, 50), has_ground_station=True),
]

# Track :
track = Track(start_time=0, duration=60, zones=zones)

scheduler = SatelliteScheduler(satellite, images_to_capture, track)
scheduler.solve()
scheduler.print_execution_log()


Task image1 (Image Capture) executed at time 2.
Task image2 (Image Capture) executed at time 5.
Task image3 (Image Capture) executed at time 10.
Task image4 (Image Capture) executed at time 15.
Task image5 (Image Capture) executed at time 20.
Task calibration_image1 (Lens Calibration) could not be executed.
Task calibration_image2 (Lens Calibration) could not be executed.
Task calibration_image3 (Lens Calibration) could not be executed.
Task calibration_image4 (Lens Calibration) could not be executed.
Task calibration_image5 (Lens Calibration) could not be executed.
Task transmission_Zone A (Data Transmission) could not be executed.
Task transmission_Zone C (Data Transmission) could not be executed.
Task transmission_Zone E (Data Transmission) could not be executed.
