# Risk-Prioritized Debris Selection

## Goal
Maximize number of collected satellites under a total cost budget B. Each target requires an independent mission from the depot (first row = service vehicle). Cost = Euclidean distance in km (SGP4 positions in TEME). 0–1 knapsack.

## Solvers (3 sections)
1. **Greedy local search** – Only accept swaps that improve the solution (positive Δ). Gradient-descent style; can get stuck in local minima.
2. **Simulated annealing** – Accept worse swaps with probability exp(-ΔE/T) to escape local minima.
3. **Quantum annealing** – QUBO solved on D-Wave (or classical fallback when unavailable).

## Data Sources
- **TLE CSV**: `leo_debris_sample.csv` (line1, line2). First row = depot.
- **Epoch**: Single reference time (TLE epoch or user-provided).

---
## Setup: Load TLE Data & Compute Costs

In [None]:
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime, timedelta

DATA_PATH = Path("leo_debris_sample.csv")
df = pd.read_csv(DATA_PATH)

depot_row = df.iloc[[0]]
targets_df = df.iloc[1:].reset_index(drop=True)

print(f"Depot: {depot_row['name'].values[0]} (norad {depot_row['norad_id'].values[0]})")
print(f"Targets: {len(targets_df)} objects")
targets_df.head()

In [None]:
from sgp4.api import Satrec

def epoch_str_to_jd(epoch_str: str):
    e = float(epoch_str)
    yy = int(e // 1000)
    doy = e % 1000
    year = 2000 + yy if yy < 57 else 1900 + yy
    base = datetime(year, 1, 1) + timedelta(days=doy - 1)
    jd_total = base.toordinal() + 1721424.5 + (base.hour*3600 + base.minute*60 + base.second) / 86400.0
    return int(jd_total), jd_total - int(jd_total)

def tle_to_position(line1: str, line2: str, jd: int, fr: float) -> np.ndarray:
    sat = Satrec.twoline2rv(line1, line2)
    e, r, _ = sat.sgp4(jd, fr)
    return np.array(r) if e == 0 else np.full(3, np.nan)

REF_EPOCH = depot_row["epoch"].values[0]
jd, fr = epoch_str_to_jd(str(REF_EPOCH))
r_depot = tle_to_position(depot_row["line1"].values[0], depot_row["line2"].values[0], jd, fr)

n_targets = len(targets_df)
costs = np.zeros(n_targets)
for i in range(n_targets):
    r = tle_to_position(targets_df.iloc[i]["line1"], targets_df.iloc[i]["line2"], jd, fr)
    costs[i] = np.linalg.norm(r_depot - r)

BUDGET_KM = 50_000.0
print(f"Costs (km): min={costs.min():.1f}, max={costs.max():.1f}, budget={BUDGET_KM}")

---
## Section 1: Greedy Local Search (Gradient-Descent Style)

Only accept swaps that **improve** the solution (positive Δ). Neighborhood: add item, remove item, swap in/out. Iterate until no improving move exists. Gets stuck in local minima.

In [None]:
def obj(x: np.ndarray, costs: np.ndarray) -> tuple:
    """Return (n_items, -total_cost) for lexicographic max."""
    sel = x == 1
    n = sel.sum()
    c = costs[sel].sum()
    return (n, -c)

def greedy_local_search(costs: np.ndarray, budget: float, x_init: np.ndarray = None, max_iters: int = 10_000) -> np.ndarray:
    n = len(costs)
    if x_init is None:
        order = np.argsort(costs)
        cum = np.cumsum(costs[order])
        k = np.searchsorted(cum, budget, side="right")
        x = np.zeros(n, dtype=int)
        x[order[:k]] = 1
    else:
        x = x_init.copy()

    best_obj = obj(x, costs)
    for _ in range(max_iters):
        sel = x == 1
        unsel = ~sel
        total = costs[sel].sum()
        best_candidate, best_candidate_obj = None, best_obj

        # Add: try each unselected that fits
        for i in np.where(unsel)[0]:
            if total + costs[i] <= budget:
                x_new = x.copy()
                x_new[i] = 1
                o = obj(x_new, costs)
                if o > best_candidate_obj:
                    best_candidate, best_candidate_obj = x_new, o

        # Remove: try each selected (frees budget)
        for i in np.where(sel)[0]:
            x_new = x.copy()
            x_new[i] = 0
            o = obj(x_new, costs)
            if o > best_candidate_obj:
                best_candidate, best_candidate_obj = x_new, o

        # Swap: remove i, add j when feasible
        for i in np.where(sel)[0]:
            for j in np.where(unsel)[0]:
                if total - costs[i] + costs[j] <= budget:
                    x_new = x.copy()
                    x_new[i], x_new[j] = 0, 1
                    o = obj(x_new, costs)
                    if o > best_candidate_obj:
                        best_candidate, best_candidate_obj = x_new, o

        if best_candidate is None or best_candidate_obj <= best_obj:
            break
        x, best_obj = best_candidate, best_candidate_obj

    return x

x_greedy = greedy_local_search(costs, BUDGET_KM)
sel_greedy = np.where(x_greedy == 1)[0]
cost_greedy = costs[sel_greedy].sum()
print(f"Section 1 — Greedy local search: {len(sel_greedy)} targets, cost = {cost_greedy:.1f} km")

---
## Section 2: Simulated Annealing

Same neighborhood (add/remove/swap). Accept **worse** moves with probability exp(-ΔE/T) to escape local minima. T decreases over time.

In [None]:
def simulated_annealing(costs: np.ndarray, budget: float, T_init: float = 10.0, T_min: float = 0.01,
                        cooling: float = 0.9995, max_iters: int = 50_000, seed: int = 42) -> np.ndarray:
    rng = np.random.default_rng(seed)
    n = len(costs)
    order = np.argsort(costs)
    cum = np.cumsum(costs[order])
    k = np.searchsorted(cum, budget, side="right")
    x = np.zeros(n, dtype=int)
    x[order[:k]] = 1

    def energy(xx):
        sel = xx == 1
        if costs[sel].sum() > budget:
            return 1e9  # infeasible penalty
        return -sel.sum()  # minimize negative count = maximize count

    E = energy(x)
    best_x, best_E = x.copy(), E
    T = T_init

    for it in range(max_iters):
        # Random move: add, remove, or swap
        sel = x == 1
        unsel = ~sel
        total = costs[sel].sum()

        move = rng.integers(0, 3)
        x_new = x.copy()

        if move == 0 and np.any(unsel):  # add
            j = rng.choice(np.where(unsel)[0])
            if total + costs[j] <= budget:
                x_new[j] = 1
        elif move == 1 and np.any(sel):  # remove
            i = rng.choice(np.where(sel)[0])
            x_new[i] = 0
        elif move == 2 and np.any(sel) and np.any(unsel):  # swap
            i = rng.choice(np.where(sel)[0])
            j = rng.choice(np.where(unsel)[0])
            if total - costs[i] + costs[j] <= budget:
                x_new[i], x_new[j] = 0, 1

        E_new = energy(x_new)
        dE = E_new - E
        if dE <= 0 or rng.random() < np.exp(-dE / T):
            x, E = x_new, E_new
            if E < best_E:
                best_x, best_E = x.copy(), E

        T = max(T_min, T * cooling)

    return best_x

x_sa = simulated_annealing(costs, BUDGET_KM)
sel_sa = np.where(x_sa == 1)[0]
cost_sa = costs[sel_sa].sum()
print(f"Section 2 — Simulated annealing: {len(sel_sa)} targets, cost = {cost_sa:.1f} km")

---
## Section 3: Quantum Annealing

QUBO: H = -Σ x_i + λ(Σ c_i x_i - B)². Solve on D-Wave quantum annealer when available; otherwise classical simulated annealing (neal) as fallback.

In [None]:
def build_knapsack_qubo(costs: np.ndarray, budget: float, penalty_strength: float = None) -> dict:
    n = len(costs)
    lam = penalty_strength if penalty_strength is not None else max(1.0, 2.0 / (np.min(costs) ** 2))
    Q = {}
    for i in range(n):
        Q[(i, i)] = -1.0 - 2 * lam * budget * costs[i] + lam * costs[i] ** 2
    for i in range(n):
        for j in range(i + 1, n):
            Q[(i, j)] = 2 * lam * costs[i] * costs[j]
    return Q

Q = build_knapsack_qubo(costs, BUDGET_KM)
x_qa = None

# Try D-Wave quantum annealer (requires dwave-cloud-client + config)
try:
    from dwave.system import DWaveSampler, EmbeddingComposite
    sampler = EmbeddingComposite(DWaveSampler())
    sampleset = sampler.sample_qubo(Q, num_reads=100)
    best = sampleset.first.sample
    x_qa = np.array([best[i] for i in range(len(costs))])
    print("Section 3 — Quantum annealing (D-Wave)")
except Exception as e:
    print(f"D-Wave unavailable: {e}")
    # Classical fallback: neal
    try:
        import neal
        sampleset = neal.SimulatedAnnealingSampler().sample_qubo(Q, num_reads=100, seed=42)
        best = sampleset.first.sample
        x_qa = np.array([best[i] for i in range(len(costs))])
        print("Section 3 — Classical fallback (neal)")
    except ImportError:
        print("neal not installed. pip install dwave-neal")

if x_qa is not None:
    sel_qa = np.where(x_qa == 1)[0]
    cost_qa = costs[sel_qa].sum()
    feasible = cost_qa <= BUDGET_KM
    print(f"  {len(sel_qa)} targets, cost = {cost_qa:.1f} km, feasible = {feasible}")

---
## Results Summary

In [None]:
OUTPUT_DIR = Path("outputs")
OUTPUT_DIR.mkdir(exist_ok=True)

summary = [
    ("1. Greedy local search", len(sel_greedy), cost_greedy, cost_greedy <= BUDGET_KM),
    ("2. Simulated annealing", len(sel_sa), cost_sa, cost_sa <= BUDGET_KM),
]
if x_qa is not None:
    summary.append(("3. Quantum/classical QUBO", len(sel_qa), cost_qa, cost_qa <= BUDGET_KM))

for name, n, c, ok in summary:
    print(f"{name}: {n} targets, {c:.1f} km, feasible={ok}")

pd.DataFrame({"index": range(n_targets), "norad_id": targets_df["norad_id"], "name": targets_df["name"], "cost_km": costs}).to_csv(OUTPUT_DIR / "target_costs.csv", index=False)
pd.DataFrame({"method": [s[0] for s in summary], "n_targets": [s[1] for s in summary], "cost_km": [s[2] for s in summary]}).to_csv(OUTPUT_DIR / "results_summary.csv", index=False)
print(f"\nSaved to {OUTPUT_DIR}/: target_costs.csv, results_summary.csv")