In [1]:
import math
import random 
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from scipy.stats import gaussian_kde
from dataclasses import dataclass
from typing import Callable, List, Optional, Tuple, Dict, Any

%run SortDet.ipynb


In [2]:

#Placer für SortDet, API:(capacity, used, free, insert, insert_any) wird bei FKPT benötigt, da Kapazitäten dynamisch sind
class SortDetPlacer:
    def __init__(self, array, start, end):
        self._cap = end - start
        self._used = 0
        self.A     = array          
        self.start = start          
        self.end   = end           
        self.sd = SortDet(array, start, end, label="SortDet")

    @property
    def capacity(self) -> int:
        return self._cap

    @property
    def used(self) -> int:
        return self._used
 
    @property
    def free(self) -> int:
        return self._cap - self.used

    def insert(self, x: float) -> bool:
        ok = self.sd.insert(x)
        if ok:
            self._used += 1
        return ok

    #blind fill für emergency Option 
    def insert_any(self, x: float) -> bool:
        A, s, e = self.A, self.start, self.end
        #Fülle von links nach rechts
        for pos in range(s, e):
            if A[pos] is None:
                A[pos] = x
                self._used += 1
                #SortDet-internen Füllzähler anpassen
                if hasattr(self.sd, "subarrays") and hasattr(self.sd, "subarray_fill"):
                    for j, (s_j, e_j) in enumerate(self.sd.subarrays):
                        if s_j <= pos < e_j:
                            self.sd.subarray_fill[j] += 1
                            break
                return True
        return False

#Neue Datenstrukturen
#Bucket: Wrapper um einen Array-Stück [start:end],verwaltet über einen Placer die Kapazität und Einfügeoperationen.
@dataclass
class Bucket:
    start: int
    end: int
    placer: Any

    def capacity(self) -> int:
        return self.end - self.start

    def used(self) -> int:
        return self.placer.used

    def free(self) -> int:
        return self.capacity() - self.used()

    def insert(self, x: float) -> bool:
        return self.placer.insert(x)

    #fallback 
    def insert_any(self, x: float) -> bool:
            # Falls der Placer insert_any hat, nutze den (für das Debugging)
            if hasattr(self.placer, "insert_any"):
                return self.placer.insert_any(x)
            # Fallback
            A = self.placer.A if hasattr(self.placer, "A") else None
            if A is None: return False
            for pos in range(self.start, self.end):
                if A[pos] is None:
                    A[pos] = x
                    if hasattr(self.placer, "_used"):
                        self.placer._used += 1
                    return True
            return False

#Subarray ist ein A_i im FKPT-Algorithmus, das aus K Buckets besteht 
class Subarray:
    def __init__(self, name: str, start: int, length: int, K: int,
                 array: List[Optional[float]], placer_factory: Callable[[List[Optional[float]], int, int], Any]):
        self.name = name
        self.start = start
        self.length = length
        self.K = K
        self.array = array
        self.placer_factory = placer_factory
        self.buckets: List[Bucket] = []
        self.placed_count: int = 0  

    #Baut für Phase 1 die Bucketstruktur auf, sodass alle Buckets gleiche Größe (+/-1) haben
    def build_uniform_buckets(self) -> None:
        base = self.length // self.K
        rem = self.length % self.K
        s = self.start
        self.buckets = []
        for i in range(self.K):
            size = base + (1 if i < rem else 0)
            e = s + size
            placer = self.placer_factory(self.array, s, e)
            self.buckets.append(Bucket(s, e, placer))
            s = e
            
    #Baut die Buckets für Phase i >= 2. Kapazitäten der Buckets werden so gewählt, dass freie Plätze in A_i-1 beachtet und neue Bucketkapazitäten gleichmäßig.
    def build_balanced_buckets_from_prev(self, prev: "Subarray", C_i: float) -> None:
        K = self.K #Anzahl Buckets in A_i
        N_i = prev.placed_count #bereits platzierte Elemente in vorheriger Phase
    
        # Ziel-Gesamtkapazität: (|A_i-1| + |A_i| - N_i)
        total_target = prev.length + self.length - N_i
        base = total_target // K
        r = total_target % K  #r Bins bekommen +1
    
        #Freie Plätze der Paar-Buckets aus A_i-1 bestimmen (bei merge werden immer 2 buckets aus der alten phase mit einem aus der neuen zusammengefügt)
        free_pairs = []
        for j in range(K):
            free_pair = prev.buckets[2*j].free() + prev.buckets[2*j+1].free()
            free_pairs.append(free_pair)
    
        #Soll-Kapazität (target) pro Bucket vor Abzug freier Plätze
        T = [base + (1 if j < r else 0) for j in range(K)]
    
        #Vorläufige Kapazitäten in A_i
        caps = [max(0, T[j] - free_pairs[j]) for j in range(K)]
        total = sum(caps)
    
        #Falls die Summe nicht exakt self.length ergibt mit „Slack“ anpassen.
        #Slack = Spielraum nach oben (noch erhöhbar) oder nach unten (reduzierbar).
        need = self.length - total
        if need > 0:
            # Wir müssen 'need'zusätzliche Slots verteilen
            slack = [( (T[j] - free_pairs[j]) - caps[j], j) for j in range(K)]
            # nur Kandidaten mit positivem Slack
            slack = [p for p in slack if p[0] > 0] 
            # Sortiere absteigend nach Slack
            slack.sort(reverse=True)
            si = 0
            while need > 0 and slack:
                amt, j = slack[si]
                take = min(amt, need)
                caps[j] += take
                need -= take
                si += 1
                if si >= len(slack):
                    si = 0  
            #Falls need noch > 0
            si = 0
            while need > 0:
                caps[si % K] += 1
                need -= 1
                si += 1
        elif need < 0:
            # Wir müssen need-Einheiten abziehen – verteile auf Buckets mit positiver Kapazität
            need = -need
            idx = 0
            while need > 0 and idx < K:
                take = min(caps[idx], need)
                caps[idx] -= take
                need -= take
                idx += 1
            # Falls immer noch etwas übrig, trimme zyklisch weiter
            idx = 0
            while need > 0:
                if caps[idx % K] > 0:
                    caps[idx % K] -= 1
                    need -= 1
                idx += 1
    
        #Buckets von A_i bauen
        s = self.start
        self.buckets = []
        for j in range(K):
            size = caps[j]
            e = s + size
            placer = self.placer_factory(self.array, s, e)
            self.buckets.append(Bucket(s, e, placer))
            s = e
    
        #sanity check to make sure..
        assert s - self.start == self.length, f"Bucket sizes do not sum to subarray length in {self.name}"
    


In [6]:
#PolylogOnlineSorter nach Fotakis et al: 

#c_end steuert Größe der B-Phase, 
#placer_factory kapselt die lokale Platzierung 
#power_of_two aktiviert die "Nachbar"-Heuristik (i und i^1) -> IGNORIEREN (ist Variante, die wir nicht analysieren)
#strict_phases -> ignorieren
#smooth_emergency -> ignorieren (alte weitere Variante)
#emergency_dir: Richtung der Notfallauffüllung -> ignorieren, ist irrelevant
class PolylogOnlineSorter:
    def __init__(self, n: int, c_end: int = 5,
                 placer_factory: Callable[[List[Optional[float]], int, int], Any] = None,
                 rng: Optional[random.Random] = None, 
                power_of_two: bool = False, 
                strict_phases: bool = True,            
                 smooth_emergency: bool = False,        
                 emergency_dir: str = "ltr"):
        assert n > 0
        self.n = n
        self.c_end = c_end  # constant for final tail size (e.g., 5 instead of 100)
        self.power_of_two = power_of_two
        self.rng = rng or random.Random(12345)

        self.strict_phases = strict_phases
        self.smooth_emergency = smooth_emergency
        self.emergency_dir = emergency_dir if emergency_dir in ("ltr","rtl") else "ltr"
        
        self.A: List[Optional[float]] = [None] * n
        self.subarrays: List[Subarray] = []  # A1..Ak
        self.N: Dict[int, int] = {}  # N_{i}: elements placed in A_i before its first overflow
        self.l = self._compute_l()
        # Placer factory
        #self.placer_factory = placer_factory or (lambda arr, s, e: SimpleAadv(arr, s, e))
        self.placer_factory = placer_factory or (lambda arr, s, e: SortDetPlacer(arr, s, e))
        # Final tail (B)
        self.B_start: Optional[int] = None
        self.B_end: Optional[int] = None
        self.B_placer: Optional[Any] = None
        # Phase index of last created subarray (1-based)
        self.current_phase: int = 0
        # >>> Instrumentierung: Logs pro neu gebauter Phase (>=2)
        self.construction_log: List[Dict[str, Any]] = []
        self._pending_overflow_context: Optional[Dict[str, Any]] = None

        self.phase_state: str = "BUCKETS"
        self.backyard_full: bool = False


#Hilfsfunktionen 
    #Für K1 = 2^l 
    def _compute_l(self) -> int:
        log2n = math.log2(self.n)
        L2sq = (log2n ** 2)
        upper = self.n / (2 * L2sq)
        ell = max(1, int(math.floor(math.log2(upper))))
        lower = self.n / (4 * L2sq)
        if (2 ** ell) <= lower:
            ell += 1
        return ell

    #für Randfälle
    @staticmethod
    def _bucket_index(x: float, K: int) -> int:
        if x < 0.0:
            x = 0.0
        if x > 1.0:
            x = 1.0
        return min(int(x * K), K - 1)

    #Nur für Power of 2-> IGNORIEREN
    @staticmethod
    def _neighbor_idx(idx: int) -> int:
        # flippt nur das niederwertige Bit → (2j) <-> (2j+1)
        return idx ^ 1

    #Noch nicht an Phasen A1..Ai vergebener Rest des Arrays
    def _remaining_unallocated(self) -> int:
        if self.subarrays:
            end = self.subarrays[-1].start + self.subarrays[-1].length
        else:
            end = 0
        return self.n - end
    #Abbruchkriterium: wenn Rest <= c_end·(log2 n)^2, wechsle in finale Phase B
    def _end_threshold(self) -> int:
        return int(math.ceil(self.c_end * (math.log2(self.n) ** 2)))

    # Baut A1 (erste Phase): Länge n/2, K1=2^l, gleichmäßig partitionierte Buckets
    def _ensure_phase1(self) -> None:
        if self.current_phase >= 1:
            return
        # Build A1
        A1_len = self.n // 2
        K1 = 2 ** self.l
        A1 = Subarray("A1", start=0, length=A1_len, K=K1, array=self.A, placer_factory=self.placer_factory)
        A1.build_uniform_buckets()
        self.subarrays.append(A1)
        self.current_phase = 1
    
    #Versucht, A_i+1 zu erzeugen, stoppt bei Abbruchkriterien und wechselt dann in B
    def _create_next_phase(self) -> bool:
        i = self.current_phase
        if i >= self.l:
            self._activate_final_tail()
            return False
        if self._remaining_unallocated() <= self._end_threshold():
            self._activate_final_tail()
            return False
        Ai_len = self.n // (2 ** (i + 1))
        start = self.subarrays[-1].start + self.subarrays[-1].length
        K_next = self.subarrays[-1].K // 2
        A_prev = self.subarrays[-1]
        if i not in self.N:
            self.N[i] = A_prev.placed_count
        C_next = (A_prev.length + Ai_len - self.N[i]) / K_next
        #Snapshot fürs Debuggen: komplette Bucket-Belegung in A_prev vor Bau von A_i+1
        prev_snapshot = [{
            "idx": j,
            "used": A_prev.buckets[j].used(),
            "free": A_prev.buckets[j].free(),
            "capacity": A_prev.buckets[j].capacity(),
        } for j in range(A_prev.K)]
        free_pairs = []
        for j in range(K_next):
            free_pair = A_prev.buckets[2*j].free() + A_prev.buckets[2*j+1].free()
            free_pairs.append(free_pair)
        #Baue A_i+1
        A_next = Subarray(f"A{i+1}", start=start, length=Ai_len, K=K_next,
                          array=self.A, placer_factory=self.placer_factory)
        A_next.build_balanced_buckets_from_prev(prev=A_prev, C_i=C_next)
        built_caps_Ai = [b.end - b.start for b in A_next.buckets]
        self.subarrays.append(A_next)
        self.current_phase += 1
        #Merkt sich Auslöser des Phasenbaus (t, x, Phase/Bucket) für Debugging
        ctx = self._pending_overflow_context or {}
        self._pending_overflow_context = None  # reset
        # Log-Eintrag für Debugging
        self.construction_log.append({
            "phase_built": i+1,                 
            "trigger_t": ctx.get("t"),
            "trigger_x": ctx.get("x"),
            "overflow_prev_phase": ctx.get("prev_phase"),
            "overflow_prev_bucket_idx": ctx.get("prev_bucket_idx"),
            "prev_K": ctx.get("prev_K", A_prev.K),
            "N_i": self.N[i],
            "K_next": K_next,
            "Ai_len": Ai_len,
            "Ci": C_next,
            "A_prev_snapshot": prev_snapshot, # komplette Belegung von A_i zum Bauzeitpunkt
            "free_pairs": free_pairs,  #Summe der freien Plätze pro (2j,2j+1)
            "built_caps_Ai": built_caps_Ai,  #gewählte Kapazitäten in A_i+1
            "po2": self.power_of_two,
        })
        return True

    #Aktiviert die finale Endphase B direkt hinter der letzten gebauten Phase Ai
    def _activate_final_tail(self) -> None:
        if self.B_start is not None:
            return  # already active
        start = self.subarrays[-1].start + self.subarrays[-1].length if self.subarrays else 0
        self.B_start = start
        self.B_end = self.n
        self.B_placer = self.placer_factory(self.A, self.B_start, self.B_end)
    # Versucht x im Backyard B zu platzieren
    def _try_place_in_backyard(self, x: float) -> bool:
        return (self.B_placer is not None) and self.B_placer.insert(x)
    #Ist True, wenn in allen bestehenden Phasen (A1..Ai) keine freie Zellen mehr existieren
    def _all_buckets_full(self) -> bool:
        for S in self.subarrays:
            for b in S.buckets:
                if b.free() > 0:
                    return False
        return True

    #Notfallbefüllung: schriebt x in erste freie Zelle
    def _emergency_backfill(self, x: float) -> bool:
        phase_iter = (self.subarrays if self.emergency_dir == "ltr" else reversed(self.subarrays))
        for S in phase_iter:
            bucket_iter = (S.buckets if self.emergency_dir == "ltr" else reversed(S.buckets))
            for b in bucket_iter:
                if b.free() > 0:
                    if b.insert_any(x):
                        S.placed_count += 1
                        return True
        return False

    #API: verarbeitet den Strom, drei strenge Phase: BUCEKTS, BACKYARD, EMERGENCY
    def run(self, stream: List[float]) -> Dict[str, Any]:
        self._ensure_phase1()
        success = True
        overflow_phase = None
        for t, x in enumerate(stream, start=1):
            placed = False
            #Strenge Phasen
            #if self.strict_phases and not self.smooth_emergency:
            if self.phase_state == "BUCKETS":
                #(1) normale Platzierung (inkl. evtl. Phase bauen)
                while not placed:
                    # Versuche in bestehenden Phasen zu platzieren
                    for i in range(self.current_phase):
                        S = self.subarrays[i]
                        K = S.K
                        #power of 2 variante (alt: IGNORIEREN!) 
                        if self.power_of_two and K >= 2:
                            idx0 = self._bucket_index(x, K)
                            idx1 = self._neighbor_idx(idx0)
                            b0, b1 = S.buckets[idx0], S.buckets[idx1]

                            def _score(b):
                                cap, f = b.capacity(), b.free()
                                return (f / cap if cap > 0 else 0.0, f)
                            for _, b in sorted([(_score(b0), b0), (_score(b1), b1)],
                                                key=lambda t: t[0], reverse=True):
                                if b.insert(x):
                                    S.placed_count += 1
                                    placed = True
                                    break
                            if placed:
                                break
                        else:
                            idx = self._bucket_index(x, K)
                            if S.buckets[idx].insert(x):
                                S.placed_count += 1
                                placed = True
                                break

                    if placed:
                        break
                    # Kein Platz in aktuellen Phasen -> Overflow in letzter Phase
                    overflow_phase = self.current_phase
                    if overflow_phase not in self.N:
                        self.N[overflow_phase] = self.subarrays[-1].placed_count

                    self._pending_overflow_context = {
                        "t": t, "x": x, "prev_phase": overflow_phase,
                        "prev_K": self.subarrays[-1].K,
                        "prev_bucket_idx": self._bucket_index(x, self.subarrays[-1].K),
                    }
                    created = self._create_next_phase()
                    if not created:
                        #Wir können keine neue Phase erstellen -> Backyard aktivieren und in State wechseln
                        if self.B_placer is None:
                            self._activate_final_tail()
                        self.phase_state = "BACKYARD"
                        break  
                #Falls während BUCKETS schon platziert wurde
                if placed:
                    continue

            if self.phase_state == "BACKYARD":
                #(2) zuerst Backyard
                if self._try_place_in_backyard(x):
                    placed = True
                else:
                    #Backyard voll -> EMERGENCY
                    self.backyard_full = True
                    self.phase_state = "EMERGENCY"

            if self.phase_state == "EMERGENCY" and not placed:
                #(3) Notfallfüllung in eingestellten Richtung
                if not self._emergency_backfill(x):
                    success = False 
                placed = True

            if not success:
                break

        #Fürs Debuggiing
        phases_info = []
        for i, S in enumerate(self.subarrays, start=1):
            cap_list = [b.capacity() for b in S.buckets]
            used_list = [b.used() for b in S.buckets]
            phases_info.append({
                "phase": i,
                "name": S.name,
                "K": S.K,
                "length": S.length,
                "placed_in_subarray": S.placed_count,
                "bucket_capacity_sum": sum(cap_list),
                "bucket_used_sum": sum(used_list),
            })

        report = {
            "n": self.n,
            "ell": self.l,
            "c_end": self.c_end,
            "success": success,
            "overflow_phase_first": overflow_phase,
            "num_phases": self.current_phase,
            "final_tail": None if self.B_start is None else {
                "start": self.B_start,
                "end": self.B_end,
                "capacity": (self.B_end - self.B_start),
                "used": 0 if self.B_placer is None else self.B_placer.used,
            },
            "phases": phases_info,
            "power_of_two": self.power_of_two,
        }
        return report
        
