<a href="https://colab.research.google.com/github/user-1221/home-pa-algo/blob/main/algo_1.3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



# Home-PA Scheduling Prototype

This notebook hosts a simplified allocator that scores suggestions by combining their `need` and `importance`, filters by proximity to available gaps, searches for the least-travel permutation, and assigns time blocks while respecting splitting rules.



## Workflow Overview

1. Evaluate each suggestion with `score = need + importance` (values are clamped to `[0, 1]`).
2. Filter to suggestions close enough to at least one gap boundary (start or end).
3. Greedily keep the highest scoring suggestions whose **minimum block** (`base_duration`) fits within the total free minutes.
4. Enumerate all feasible orderings (up to a configurable limit) and pick the one that minimises travel from the first gap's start location to the final gap's end location.
5. While embedding suggestions, guarantee at least one base block, then stretch splittable items toward their maximum multiples when extra time remains; drop the lowest-score suggestion and retry if a layout fails.



In [None]:
from __future__ import annotations

import itertools
import math
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Sequence, Tuple

Coordinate = Tuple[float, float]


def clamp01(value: float) -> float:
    """Clamp a floating-point value into the inclusive range [0, 1]."""
    if value < 0.0:
        return 0.0
    if value > 1.0:
        return 1.0
    return value


def euclidean_distance(a: Coordinate, b: Coordinate) -> float:
    """Return the Euclidean distance between two 2D coordinates."""
    return math.hypot(a[0] - b[0], a[1] - b[1])


@dataclass(frozen=True)
class Gap:
    """Represents an available time window (gap) defined by its boundary events."""

    gap_id: str
    duration: float  # minutes
    start_location: Coordinate
    end_location: Coordinate

    def __post_init__(self) -> None:
        if self.duration <= 0:
            raise ValueError(f"Gap '{self.gap_id}' duration must be positive.")


@dataclass
class Suggestion:
    """
    A suggestion that may be slotted into the schedule.

    Each suggestion specifies a `base_duration` (minimum commitment) and a
    maximum number of multiples (`max_multiples`). When splittable, the
    allocator will try to add additional base blocks—up to the maximum—after the
    guaranteed first block has been placed.
    """

    suggestion_id: str
    need: float
    importance: float
    base_duration: float
    max_multiples: int
    location: Coordinate
    splittable: bool = True

    score: float = field(init=False)
    min_duration: float = field(init=False)
    max_duration: float = field(init=False)

    def __post_init__(self) -> None:
        if self.base_duration <= 0:
            raise ValueError(
                f"Suggestion '{self.suggestion_id}' base_duration must be positive."
            )
        if self.max_multiples < 1:
            raise ValueError(
                f"Suggestion '{self.suggestion_id}' max_multiples must be >= 1."
            )
        if not self.splittable and self.max_multiples != 1:
            raise ValueError(
                f"Non-splittable suggestion '{self.suggestion_id}' must set max_multiples to 1."
            )

        object.__setattr__(self, "score", clamp01(self.need) + clamp01(self.importance))
        object.__setattr__(self, "min_duration", self.base_duration)
        object.__setattr__(self, "max_duration", self.base_duration * self.max_multiples)

    @property
    def max_blocks(self) -> int:
        return self.max_multiples


@dataclass
class ScheduledBlock:
    """Represents a scheduled slice of a suggestion embedded inside a gap."""

    suggestion_id: str
    gap_id: str
    start_offset: float  # minutes from the gap start
    duration: float
    location: Coordinate


@dataclass
class ScheduleResult:
    """Container for the overall scheduling outcome."""

    ordered_suggestions: List[Suggestion]
    scheduled_blocks: List[ScheduledBlock]
    travel_cost: float
    dropped_suggestions: List[Suggestion]
    unused_gap_time: float
    permutations_evaluated: int
    allocated_minutes: Dict[str, float]

    def is_feasible(self) -> bool:
        return bool(self.scheduled_blocks)

    def blocks_used(self, suggestion: Suggestion) -> float:
        minutes = self.allocated_minutes.get(suggestion.suggestion_id, 0.0)
        if suggestion.base_duration == 0:
            return 0.0
        return minutes / suggestion.base_duration


def total_gap_minutes(gaps: Sequence[Gap]) -> float:
    """Total available minutes across all gaps."""
    return sum(gap.duration for gap in gaps)


def suggestion_is_near_gap(
    suggestion: Suggestion, gap: Gap, max_distance: float
) -> bool:
    """Return True when the suggestion lies close to either boundary of a gap."""
    distance_to_start = euclidean_distance(suggestion.location, gap.start_location)
    distance_to_end = euclidean_distance(suggestion.location, gap.end_location)
    return min(distance_to_start, distance_to_end) <= max_distance


def suggestion_is_close_to_any_gap(
    suggestion: Suggestion, gaps: Sequence[Gap], max_distance: float
) -> bool:
    """Check whether the suggestion is within `max_distance` of a gap boundary."""
    if not gaps:
        return False
    return any(suggestion_is_near_gap(suggestion, gap, max_distance) for gap in gaps)


def greedy_select_candidates(
    suggestions: Sequence[Suggestion],
    gaps: Sequence[Gap],
    max_distance: float,
    *,
    tolerance: float = 1e-6,
) -> List[Suggestion]:
    """
    Greedily select high-score suggestions whose minimum commitment fits.

    Filtering rules:
    - Suggestions must lie within `max_distance` of at least one gap boundary.
    - Suggestions are considered in score-descending order.
    - A suggestion is included only if its base block can fit into the
      remaining global capacity (sum of gap durations minus already selected).
    """
    capacity = total_gap_minutes(gaps)
    remaining_capacity = capacity

    nearby_suggestions = [
        s for s in suggestions if suggestion_is_close_to_any_gap(s, gaps, max_distance)
    ]
    sorted_candidates = sorted(
        nearby_suggestions, key=lambda s: s.score, reverse=True
    )

    selected: List[Suggestion] = []
    for suggestion in sorted_candidates:
        required = suggestion.min_duration
        if required <= remaining_capacity + tolerance:
            selected.append(suggestion)
            remaining_capacity -= required
    return selected


def enumerate_best_order(
    suggestions: Sequence[Suggestion],
    start_location: Optional[Coordinate],
    end_location: Optional[Coordinate],
    *,
    permutation_limit: int = 8,
) -> Tuple[List[Suggestion], float, int]:
    """
    Examine all feasible orderings (up to factorial explosion) and return the
    sequence with minimum travel cost.
    """
    n = len(suggestions)
    if n == 0:
        return [], 0.0, 0

    if n > permutation_limit:
        raise ValueError(
            f"Permutation search limited to {permutation_limit} suggestions, "
            f"received {n}. Drop lower-score items before ordering."
        )

    best_order: Optional[Tuple[Suggestion, ...]] = None
    best_cost = math.inf
    permutations_checked = 0

    for permutation in itertools.permutations(suggestions):
        permutations_checked += 1
        travel_cost = 0.0
        previous = start_location

        for suggestion in permutation:
            if previous is not None:
                travel_cost += euclidean_distance(previous, suggestion.location)
            previous = suggestion.location

        if previous is not None and end_location is not None:
            travel_cost += euclidean_distance(previous, end_location)

        if travel_cost < best_cost:
            best_cost = travel_cost
            best_order = permutation

    assert best_order is not None
    return list(best_order), best_cost, permutations_checked


def assign_order_to_gaps(
    ordered_suggestions: Sequence[Suggestion],
    gaps: Sequence[Gap],
    max_distance: float,
    *,
    tolerance: float = 1e-6,
) -> Optional[Tuple[List[ScheduledBlock], Dict[str, int]]]:
    """
    Attempt to embed suggestions, guaranteeing one base block and stretching up to max.
    """
    if not ordered_suggestions:
        return [], { }

    schedule: List[ScheduledBlock] = []
    gap_usage = {gap.gap_id: 0.0 for gap in gaps}
    allocated_blocks = {s.suggestion_id: 0 for s in ordered_suggestions}

    gap_index = 0
    current_cursor: Optional[Coordinate] = None
    gap_has_blocks = False

    def current_gap() -> Optional[Gap]:
        if gap_index >= len(gaps):
            return None
        return gaps[gap_index]

    def ensure_gap_exit_ok(gap: Gap, cursor: Coordinate) -> bool:
        return euclidean_distance(cursor, gap.end_location) <= max_distance + tolerance

    def shift_to_next_gap() -> bool:
        nonlocal gap_index, current_cursor, gap_has_blocks
        gap = current_gap()
        if gap is None:
            return False
        if gap_has_blocks:
            cursor = current_cursor if current_cursor is not None else gap.start_location
            if not ensure_gap_exit_ok(gap, cursor):
                return False
        gap_index += 1
        current_cursor = None
        gap_has_blocks = False
        return gap_index < len(gaps)

    def allocate_block(suggestion: Suggestion, blocks_to_schedule: int) -> bool:
        nonlocal gap_index, current_cursor, gap_has_blocks
        while True:
            gap = current_gap()
            if gap is None:
                return False

            if current_cursor is None:
                current_cursor = gap.start_location
                gap_has_blocks = False

            used = gap_usage[gap.gap_id]
            remaining = gap.duration - used
            if remaining <= tolerance:
                if not shift_to_next_gap():
                    return False
                continue

            distance = euclidean_distance(current_cursor, suggestion.location)
            if distance > max_distance:
                if not shift_to_next_gap():
                    return False
                continue

            duration = suggestion.base_duration * blocks_to_schedule
            if duration > remaining + tolerance:
                if not shift_to_next_gap():
                    return False
                continue

            start_offset = used
            schedule.append(
                ScheduledBlock(
                    suggestion_id=suggestion.suggestion_id,
                    gap_id=gap.gap_id,
                    start_offset=start_offset,
                    duration=duration,
                    location=suggestion.location,
                )
            )
            gap_usage[gap.gap_id] += duration
            current_cursor = suggestion.location
            gap_has_blocks = True
            return True

    for suggestion in ordered_suggestions:
        min_blocks = 1
        max_blocks = suggestion.max_blocks

        while allocated_blocks[suggestion.suggestion_id] < max_blocks:
            blocks_needed = 1 if suggestion.splittable else max_blocks - allocated_blocks[suggestion.suggestion_id]
            if not allocate_block(suggestion, blocks_needed):
                if allocated_blocks[suggestion.suggestion_id] >= min_blocks:
                    break
                return None
            allocated_blocks[suggestion.suggestion_id] += blocks_needed

        if allocated_blocks[suggestion.suggestion_id] < min_blocks:
            return None

    gap = current_gap()
    if gap is not None and gap_has_blocks:
        cursor = current_cursor if current_cursor is not None else gap.start_location
        if not ensure_gap_exit_ok(gap, cursor):
            return None

    return schedule, allocated_blocks


def schedule_suggestions(
    suggestions: Sequence[Suggestion],
    gaps: Sequence[Gap],
    *,
    max_distance: float = 3.0,
    permutation_limit: int = 8,
    tolerance: float = 1e-6,
) -> ScheduleResult:
    """High-level orchestration for the scheduler."""
    gap_list = list(gaps)
    suggestion_list = list(suggestions)

    total_capacity = total_gap_minutes(gap_list)
    if not gap_list or total_capacity <= tolerance:
        return ScheduleResult(
            ordered_suggestions=[],
            scheduled_blocks=[],
            travel_cost=0.0,
            dropped_suggestions=list(suggestion_list),
            unused_gap_time=0.0,
            permutations_evaluated=0,
            allocated_minutes={},
        )

    selected = greedy_select_candidates(
        suggestion_list, gap_list, max_distance, tolerance=tolerance
    )
    dropped: List[Suggestion] = [
        suggestion for suggestion in suggestion_list if suggestion not in selected
    ]

    permutations_checked_total = 0
    while selected:
        selected.sort(key=lambda s: s.score, reverse=True)

        if len(selected) > permutation_limit:
            dropped_candidate = selected.pop()
            dropped.append(dropped_candidate)
            continue

        start_location = gap_list[0].start_location if gap_list else None
        end_location = gap_list[-1].end_location if gap_list else None

        try:
            order, travel_cost, permutations_checked = enumerate_best_order(
                selected,
                start_location=start_location,
                end_location=end_location,
                permutation_limit=permutation_limit,
            )
        except ValueError:
            dropped_candidate = selected.pop()
            dropped.append(dropped_candidate)
            continue

        permutations_checked_total += permutations_checked

        assignment = assign_order_to_gaps(
            order, gap_list, max_distance, tolerance=tolerance
        )
        if assignment is not None:
            schedule, allocated_blocks = assignment
            allocated_minutes: Dict[str, float] = {}
            for block in schedule:
                allocated_minutes.setdefault(block.suggestion_id, 0.0)
                allocated_minutes[block.suggestion_id] += block.duration
            for suggestion in order:
                allocated_minutes.setdefault(suggestion.suggestion_id, 0.0)

            used_time = sum(block.duration for block in schedule)
            unused_time = max(0.0, total_capacity - used_time)
            return ScheduleResult(
                ordered_suggestions=order,
                scheduled_blocks=schedule,
                travel_cost=travel_cost,
                dropped_suggestions=dropped,
                unused_gap_time=unused_time,
                permutations_evaluated=permutations_checked_total,
                allocated_minutes=allocated_minutes,
            )

        dropped_candidate = selected.pop()
        dropped.append(dropped_candidate)

    return ScheduleResult(
        ordered_suggestions=[],
        scheduled_blocks=[],
        travel_cost=0.0,
        dropped_suggestions=dropped,
        unused_gap_time=total_capacity,
        permutations_evaluated=permutations_checked_total,
        allocated_minutes={},
    )


def format_schedule(result: ScheduleResult) -> str:
    """Render a human-friendly multi-line summary of the schedule."""
    if not result.scheduled_blocks:
        return "No feasible schedule could be generated."

    lines = [
        "Final schedule:",
        "----------------",
    ]

    for block in result.scheduled_blocks:
        lines.append(
            f"- Gap {block.gap_id}: {block.suggestion_id} "
            f"({block.duration:.0f} min starting at +{block.start_offset:.0f} min)"
        )

    lines.append("")
    lines.append("Per-suggestion totals:")
    for suggestion in result.ordered_suggestions:
        allocated = result.allocated_minutes.get(suggestion.suggestion_id, 0.0)
        lines.append(
            f"  {suggestion.suggestion_id}: {allocated:.0f}/{suggestion.max_duration:.0f} min "
            f"(base {suggestion.base_duration:.0f})"
        )

    lines.append("")
    lines.append(f"Travel cost: {result.travel_cost:.2f} grid units.")
    lines.append(
        f"Dropped suggestions: {', '.join(s.suggestion_id for s in result.dropped_suggestions) or 'None'}"
    )
    lines.append(f"Unused gap time: {result.unused_gap_time:.0f} minutes.")
    lines.append(
        f"Permutations evaluated: {result.permutations_evaluated}"
    )
    return "\n".join(lines)



In [None]:
sample_suggestions = [
    Suggestion(
        suggestion_id="exercise",
        need=0.85,
        importance=0.9,
        base_duration=30,
        max_multiples=2,
        location=(2.6, 3.0),
        splittable=True,
    ),
    Suggestion(
        suggestion_id="meal_prep",
        need=0.9,
        importance=0.8,
        base_duration=60,
        max_multiples=1,
        location=(2.0, 2.3),
        splittable=False,
    ),
    Suggestion(
        suggestion_id="call_mom",
        need=0.65,
        importance=0.95,
        base_duration=30,
        max_multiples=1,
        location=(3.6, 2.8),
        splittable=False,
    ),
    Suggestion(
        suggestion_id="deep_work",
        need=0.7,
        importance=0.8,
        base_duration=30,
        max_multiples=3,
        location=(4.6, 4.2),
        splittable=True,
    ),
    Suggestion(
        suggestion_id="groceries",
        need=0.7,
        importance=0.55,
        base_duration=45,
        max_multiples=1,
        location=(8.5, 7.6),
        splittable=False,
    ),
    Suggestion(
        suggestion_id="meditation",
        need=0.5,
        importance=0.7,
        base_duration=20,
        max_multiples=2,
        location=(3.3, 3.0),
        splittable=True,
    ),
    Suggestion(
        suggestion_id="language_practice",
        need=0.45,
        importance=0.75,
        base_duration=25,
        max_multiples=2,
        location=(5.8, 5.8),
        splittable=True,
    ),
    Suggestion(
        suggestion_id="cleaning",
        need=0.55,
        importance=0.6,
        base_duration=15,
        max_multiples=2,
        location=(7.2, 6.9),
        splittable=True,
    ),
    Suggestion(
        suggestion_id="read_book",
        need=0.4,
        importance=0.7,
        base_duration=20,
        max_multiples=2,
        location=(6.6, 6.5),
        splittable=True,
    ),
]

sample_gaps = [
    Gap(gap_id="gap_1", duration=180, start_location=(1.2, 1.8), end_location=(3.6, 3.1)),
    Gap(gap_id="gap_2", duration=70, start_location=(3.8, 3.2), end_location=(4.0, 3.5)),
    Gap(gap_id="gap_3", duration=90, start_location=(6.8, 6.4), end_location=(9.0, 8.5)),
]

result = schedule_suggestions(
    sample_suggestions,
    sample_gaps,
    max_distance=3.6,
    permutation_limit=8,
)
print(format_schedule(result))

print("\nOrdered evaluation scores:")
for suggestion in result.ordered_suggestions:
    allocated = result.allocated_minutes.get(suggestion.suggestion_id, 0.0)
    print(
        f"- {suggestion.suggestion_id}: score {suggestion.score:.2f}, "
        f"allocated {allocated:.0f}/{suggestion.max_duration:.0f} min"
    )

print("\nDropped suggestions with scores:")
for suggestion in result.dropped_suggestions:
    print(f"- {suggestion.suggestion_id}: score {suggestion.score:.2f}")

