### Importing packages

In [None]:
import numpy as np
import pandas as pd
from collections import defaultdict
from typing import List, Dict, Tuple
from pulp import *

import warnings
warnings.filterwarnings("ignore")

### Set of input data and variables

In [None]:
# Set of all Satellites
S = ["s1", "s2"] 

# Set of all Ground Stations
G = ["g1", "g2"]

# Set of all Time Windows in 'minutes' assuming evenly spaced from TW1 to TW8
TW = {
    ("s1", "g1", "s1g1") : (1, 4),
    ("s1", "g2", "s1g2") : (3, 5),
    ("s2", "g1", "s2g1") : (2, 7),
    ("s2", "g2", "s2g2") : (6, 8)
}

# Set of all downloading times in 'minutes' for each satellite (Assumed values)
Z = {
    "s1" : 1.8,
    "s2" : 2.7
}

# Objective function weights
W1 = 0.5

### Decision Variables

In [None]:
assignment = LpVariable.dicts(
    "x", 
    [(s, g, s+g) for s in S for g in G],
    lowBound=0,
    cat="Binary"
)

processing_times = LpVariable.dicts(
    "p", 
    [(s, g, s+g) for s in S for g in G],
    lowBound=0,
    cat="Continuous"
)

satellite_setup_times = LpVariable.dicts(
    "Tau_s", 
    [(s, g1, g2) for s in S for g1, g2 in zip(G, G[1:])],
    lowBound=1,
    cat="Continuous"
)

ground_station_setup_times = LpVariable.dicts(
    "Tau_g", 
    [(g, s1, s2) for g in G for s1, s2 in zip(S, S[1:])],
    lowBound=1,
    cat="Continuous"
)

### Objective Function

In [None]:
model = LpProblem("Maximize Processing Time", LpMaximize)

model += (
    W1*(lpSum(processing_times)) + 
    (1-W1)*round(sum([Z[s] for s in S]), 1)
)

### Constraints

#### C1: Bounds for total download time for each satellite

In [None]:
for s in S:
    model+= Z[s] <= lpSum(
        [processing_times[(s, g, s+g)] for g in G]
    )

#### C2: Bounds for start and download processing times 

In [None]:
for i in assignment.keys():
    model += processing_times[i] <= (
        assignment[i] * (max(TW[i]) - min(TW[i]))
    )

#### C3: A satellite interacts with only 1 ground station at a time

In [None]:
for s in S:
    model += lpSum([assignment[(s, g, s+g)] for g in G]) == 1

#### C4: A ground station interacts with only 1 satellite at a time

In [None]:
for g in G:
    model += lpSum([assignment[(s, g, s+g)] for s in S]) == 1

#### C5: Bounds on satellite setup & processing time before the next schedule

In [None]:
for s in S:
    for g in G:
        kes = (s, g, s+g)
        next_gs = kes[1][0] + str(int(kes[1][1])+1)
        if next_gs in G:
            kes_2 = kes[0], next_gs, kes[0]+next_gs
            model += (
                (
                    min(TW[kes]) +
                    processing_times[kes] +
                    satellite_setup_times[kes[0], kes[1], next_gs]
                ) <= min(TW[kes_2])
            )

#### C6: Bounds on ground station setup & processing time before the next schedule

In [None]:
for g in G:
    for s in S:
        kes = (s, g, s+g)
        next_sat = kes[0][0] + str(int(kes[0][1])+1)
        
        if next_sat in S:
            kes_2 = next_sat, kes[1], next_sat+kes[1]
            model += (
                min(TW[kes]) +
                processing_times[kes] +
                ground_station_setup_times[kes[1], kes[0], next_sat]
            ) <= max(TW[next_sat, kes[1], next_sat+kes[1]])

### Wrap all steps in a function

In [None]:
def download_interval_schedule(
    S: List[str], 
    G: List[str], 
    TW: Dict[Tuple[str, str, str], Tuple[int, int]], 
    Z: Dict[str, float], 
    W1:float
):
    """
    Optimized Schedule of satellite interaction
    with ground stations within specified time windows
    
    Parameters:
    - S (list): List of satellite indices
    - G (list): List of ground station indices
    - TW (dict): Dictionary of all satellite-ground station time windows
    - Z (dict): Dictionary of all download times for each satellite
    - W1 (float): Weight of objective function coefficients

    Returns:
    - LpProblem: A PuLP LpProblem instance representing the optimization model

    """
    # 1. Decision variables
    ## 1.1. Binary assignment variable if satellite 's' interacts with ground station 'g'
    ## within time window TW
    assignment = LpVariable.dicts(
        "x",
        [(s, g, s+g) for s in S for g in G],
        lowBound=0,
        cat="Binary"
    )
    
    ## 1.2. Processing time when satellite 's' interacts with ground station 'g'
    ## within time window TW
    processing_times = LpVariable.dicts(
        "p",
        [(s, g, s+g) for s in S for g in G],
        lowBound=0,
        cat="Continuous"
    )
    
    ## 1.3. Satellite setup times from one ground station to the next
    satellite_setup_times = LpVariable.dicts(
        "Tau_s", 
        [(s, g1, g2) for s in S for g1, g2 in zip(G, G[1:])],
        lowBound=1,
        cat="Continuous"
    )
    
    ## 1.4. Ground station setup times from one satellite to the next
    ground_station_setup_times = LpVariable.dicts(
        "Tau_g", 
        [(g, s1, s2) for g in G for s1, s2 in zip(S, S[1:])],
        lowBound=1,
        cat="Continuous"
    )
    
    # 2. Objective Function
    model = LpProblem("Maximize Processing Time", LpMaximize)
    
    model += (
        W1*(lpSum(processing_times)) + 
        (1-W1)*round(sum([Z[s] for s in S]), 1)
    )
    
    
    # 3. Constraints
    ## 3.1. Bounds for total download time for each satellite
    for s in S:
        model+= Z[s] <= lpSum(
            [processing_times[(s, g, s+g)] for g in G]
        )
    
    ## 3.2. Bounds for start and download processing times
    for i in assignment.keys():
        model += processing_times[i] <= (
            assignment[i] * (max(TW[i]) - min(TW[i]))
        )
        
    ## 3.3. A satellite interacts with only 1 ground station at a time
    for s in S:
        model += lpSum([assignment[(s, g, s+g)] for g in G]) == 1
    
    ## 3.4. A ground station interacts with only 1 satellite at a time
    for g in G:
        model += lpSum([assignment[(s, g, s+g)] for s in S]) == 1
    
    ## 3.5. Bounds on satellite setup & processing time before the next schedule
    for s in S:
        for g in G:
            kes = (s, g, s+g)
            next_gs = kes[1][0] + str(int(kes[1][1])+1)
            if next_gs in G:
                kes_2 = kes[0], next_gs, kes[0]+next_gs
                model += (
                    (
                        min(TW[kes]) +
                        processing_times[kes] +
                        satellite_setup_times[
                            kes[0], kes[1], next_gs
                        ]
                    ) <= min(TW[kes_2])
                )
                
    ## 3.6. Bounds on ground station setup & processing time before the next schedule
    for g in G:
        for s in S:
            kes = (s, g, s+g)
            next_sat = kes[0][0] + str(int(kes[0][1])+1)

            if next_sat in S:
                kes_2 = next_sat, kes[1], next_sat+kes[1]
                model += (
                    (
                        min(TW[kes]) +
                        processing_times[kes] +
                        ground_station_setup_times[
                            kes[1], kes[0], next_sat
                        ]
                    ) <= min(TW[kes_2])
                )
                
    
    # 4. Solve model (Using default solver)
    model.solve()
            
    return model

### Function demonstration

In [None]:
model = download_interval_schedule(
    S=["s1", "s2"], 
    
    G=["g1", "g2"], 
    
    TW={
        ("s1", "g1", "s1g1") : (1, 4),
        ("s1", "g2", "s1g2") : (3, 5),
        ("s2", "g1", "s2g1") : (2, 7),
        ("s2", "g2", "s2g2") : (6, 8)
    },
    
    Z={
        "s1": 2.0, 
        "s2": 3.0
    },
    
    W1=0.5
)

print("Model Status:", "\n",LpStatus[model.status], "Solution")

print(
    "\nMaximum task completion time (Processing + Download):", "\n", 
    value(model.objective),
    "minutes"
)

print("\nAssignment Variables:")
for v in model.variablesDict():
    if "x_"in v and model.variablesDict()[v].varValue>0:
        print(" ", v, "=", model.variablesDict()[v].varValue)

print("\nProcessing Times:")
for v in model.variablesDict():
    if "p_"in v and model.variablesDict()[v].varValue>0:
        print(
            " ", v, "=", 
            model.variablesDict()[v].varValue,
            "minutes"
        )

print("\nSatellite-Ground Station Setup Times:")
for v in model.variablesDict():
    if "Tau_s"in v and model.variablesDict()[v].varValue>0:
        print(
            " ", v, "=", 
            model.variablesDict()[v].varValue,
            "minutes"
        )

print("\nGround Station-Satellite Setup Times:")
for v in model.variablesDict():
    if "Tau_g"in v and model.variablesDict()[v].varValue>0:
        print(
            " ", v, "=", 
            model.variablesDict()[v].varValue,
            "minutes"
        )
        
print("\nProposed Optimal Schedule:")
for s in S:
    for g in G:
        if model.variablesDict()[
                "x_('{}',_'{}',_'{}')".format(s, g, s+g)
            ].varValue==1:
            print(
                " Satellite '{}' to interact with Ground Station '{}'".format(s, g),
                "within Time Window",
                TW[(s, g, s+g)],
                "with a processing time of {} minutes".format(
                    model.variablesDict()[
                        "p_('{}',_'{}',_'{}')".format(s, g, s+g)
                    ].varValue
                )
            )