In [1]:
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import random, math, pandas as pd
from collections import defaultdict
import hashlib

# Constants 

In [None]:
SLOTS_PER_EPOCH = 2400
SLOT_DURATION_SEC = 1
EEP_FRACTION = 1/3
EEP_SLOT = int(SLOTS_PER_EPOCH * EEP_FRACTION)
PENALTY_FACTOR = 2.0
RECOVERY_FACTOR = 1.0
TOLERATED_BAD_RATIO = 0.02
REWARD_PER_BLOCK = 1.0
AUTO_COMPOUND = True
RNG_SEED = 42
MIN_BET_AMOUNT = 100.0

# Classes

In [None]:
@dataclass
class Wallet:
    address: str
    green: float = 0.0   # STAKE / collateral (TKG)
    red: float   = 0.0   # FEE / utility (TKR)

    def get_balance(self) -> Dict[str, float]:
        """Ritorna i saldi correnti (green=stake, red=fee)."""
        return {"green": self.green, "red": self.red}

    def deposit(self, green: float = 0.0, red: float = 0.0) -> None:
        """Ricarica bilanci (stake/fee)."""
        if green > 0: self.green += green
        if red   > 0: self.red   += red

    def transfer(self, other: "Wallet", *, green_amount: float = 0.0, red_amount: float = 0.0) -> bool:
        """
        Trasferisce token verso un altro wallet (come nel wallet reale: indichi green e red).
        Metti 0 se vuoi trasferire solo uno dei due.
        """
        if green_amount < 0 or red_amount < 0:
            return False
        if green_amount > self.green or red_amount > self.red:
            return False

        if green_amount:
            self.green -= green_amount
            other.green += green_amount
        if red_amount:
            self.red -= red_amount
            other.red += red_amount
        return (green_amount > 0) or (red_amount > 0)

    def pay_fee(self, amount: float) -> bool:
        """
        Paga una fee consumando prima RED (TKR) e solo se insufficiente usa GREEN (TKG).
        """
        if amount <= 0:
            return True
        # usa red prima
        use_red = min(self.red, amount)
        self.red -= use_red
        remaining = amount - use_red
        if remaining <= 0:
            return True
        # fallback su green
        if remaining > self.green:
            # fee non pagabile
            # ripristina red consumato? (dipende dal tuo modello; qui NON lo ripristiniamo)
            self.green = max(0.0, self.green - remaining)  # consuma quello che c'è (oppure torna False)
            return False
        self.green -= remaining
        return True

In [None]:
@dataclass
class Stakeholder(Wallet):
    display_name: Optional[str] = None

    # opzionale: solo helper per la coda del simulatore (EEP)
    def bet_op(self, to_main_id: str, amount: float, when_slot: int) -> dict:
        return {"kind": "STAKE", "from": self.address, "to": to_main_id,
                "amount": float(amount), "slot": when_slot}

In [None]:
@dataclass
class MainAddress(Wallet):
    # Stake delegato: delegante -> GREEN in stake presso il main (principal + reward composte)
    delegates: Dict[str, float] = field(default_factory=dict)

    # Snapshot usato dal consenso (impostalo all'EEP)
    active_stake: float = 0.0

    # Nuovo attributo: overflow associati
    overflows: Dict[str, "OverflowNode"] = field(default_factory=dict)

    def register_overflow(self, overflow: "OverflowNode") -> None:
        """
        Registra un overflow associato a questo main.
        Lo aggiunge al dizionario 'overflows' e aggiorna il main_id del nodo.
        """
        overflow.main_id = self.address
        self.overflows[overflow.address] = overflow

    def get_overflow_list(self) -> list:
        """Ritorna la lista degli overflow associati a questo main."""
        return list(self.overflows.values())

    def bet(self, from_address: str, amount: float) -> bool:
        """
        Riceve una delega (bet) in GREEN da uno stakeholder o dal main stesso.
        La quantità deve superare MIN_BET_AMOUNT.
        """
        if amount < MIN_BET_AMOUNT:
            return False
        self.delegates[from_address] = self.delegates.get(from_address, 0.0) + amount
        return True

    def self_bet(self, amount: float) -> bool:
        """
        Il main punta su sé stesso (auto-stake).
        Scala i suoi green e registra la delega come '_self_'.
        """
        if amount < MIN_BET_AMOUNT or amount > self.green:
            return False
        self.green -= amount
        return self.bet("_self_", amount)

    def total_delegated_now(self) -> float:
        """Somma di tutte le deleghe correnti (inclusi self-bet e reward composte)."""
        return sum(self.delegates.values())

    def snapshot_at_eep(self) -> None:
        """Congela la stake attiva per l’epoch successivo."""
        self.active_stake = self.total_delegated_now()

    def credit_reward(self, amount: float) -> None:
        """
        Distribuisce la reward proporzionalmente alle deleghe correnti,
        componendola subito nella stake di ciascun delegante.
        """
        if amount <= 0:
            return
        total = self.total_delegated_now()
        if total <= 0:
            # fallback: se nessuno ha delegato, il main incassa in green
            self.green += amount
            return

        for addr, stake_amt in list(self.delegates.items()):
            share = amount * (stake_amt / total)
            self.delegates[addr] = stake_amt + share

In [None]:
@dataclass
class OverflowNode(Wallet):
    main_id: str = ""              # MAIN a cui è collegato
    base_stake_share: float = 1.0  # quota interna se più overflow per lo stesso MAIN

    # Penalità/affidabilità
    penalty_score: float = 0.0
    p_skip: float = 0.01
    p_invalid: float = 0.00
    p_delayed: float = 0.00

    # Computati per epoch
    effective_stake: float = 0.0
    assigned_slots: int = 0

    def behavior_outcome(self, rng) -> str:
        """Esito di uno slot (stocastico) in base ai parametri di affidabilità."""
        x = rng.random()
        if x < self.p_skip:    return "skipped"
        x -= self.p_skip
        if x < self.p_invalid: return "invalid"
        x -= self.p_invalid
        if x < self.p_delayed: return "delayed"
        return "ok"

    def slash_green(self, amount: float) -> float:
        """
        Applica una penalità sul COLLATERAL (GREEN) del nodo.
        Ritorna quanto è stato effettivamente tagliato.
        """
        if amount <= 0:
            return 0.0
        taken = min(self.green, amount)
        self.green -= taken
        return taken

In [None]:
class TakamakaSim:
    def __init__(self,
                 *,
                 slots_per_epoch: int = SLOTS_PER_EPOCH,
                 slot_duration_sec: int = SLOT_DURATION_SEC,
                 eep_fraction: float = EEP_FRACTION,          # es. 1/3
                 min_slots_per_overflow: int = 0,             # vincoli opzionali
                 max_slots_per_overflow: int = 10**9,
                 penalty_factor: float = PENALTY_FACTOR,
                 recovery_factor: float = RECOVERY_FACTOR,
                 tolerated_bad_ratio: float = TOLERATED_BAD_RATIO,
                 reward_per_block: float = REWARD_PER_BLOCK,
                 seed: int = RNG_SEED,
                 slash_per_invalid: float = 0.0):             # opzionale: slashing (stake green) per blocchi invalid
        self.cfg = {
            "slots_per_epoch": int(slots_per_epoch),
            "slot_duration_sec": int(slot_duration_sec),
            "eep_slot": int(slots_per_epoch * eep_fraction),
            "min_slots_per_overflow": int(min_slots_per_overflow),
            "max_slots_per_overflow": int(max_slots_per_overflow),
            "penalty_factor": float(penalty_factor),
            "recovery_factor": float(recovery_factor),
            "tolerated_bad_ratio": float(tolerated_bad_ratio),
            "reward_per_block": float(reward_per_block),
            "seed": int(seed),
            "slash_per_invalid": float(slash_per_invalid),
        }
        self.epoch = 0

        # registri
        self.stakeholders: Dict[str, Stakeholder] = {}
        self.mains: Dict[str, MainAddress] = {}
        self.overflows: Dict[str, OverflowNode] = {}

        # log differiti (applicati agli EEP)
        # - STAKE:      {"kind":"STAKE","from":addr|"_self_","to":main_id,"amount":float,"epoch":int,"slot":int}
        # - STAKE_UNDO: {"kind":"STAKE_UNDO","from":addr,"epoch":int,"slot":int}
        self.pending_ops: List[Dict] = []

        # storico KPI
        self.history: List[Dict] = []

    # ---------------------------
    # Registrazione / indexing
    # ---------------------------
    def register_stakeholder(self, st: Stakeholder):
        self.stakeholders[st.address] = st

    def register_main(self, m: MainAddress):
        self.mains[m.address] = m

    def index_overflow(self, node: OverflowNode):
        """Indicizza un overflow (dopo che il suo main l'ha registrato)."""
        if node.main_id not in self.mains:
            raise ValueError("main_id non registrato")
        self.overflows[node.address] = node

    # ---------------------------
    # Operazioni differite (EEP)
    # ---------------------------
    def queue_bet(self, from_addr: str, to_main_id: str, amount: float, when_slot: int):
        self.pending_ops.append({
            "kind": "STAKE",
            "from": from_addr,
            "to": to_main_id,
            "amount": float(amount),
            "epoch": self.epoch,
            "slot": int(when_slot),
        })

    def queue_stake_undo(self, from_addr: str, when_slot: int):
        """Resetta tutte le puntate del delegante al prossimo EEP applicabile."""
        self.pending_ops.append({
            "kind": "STAKE_UNDO",
            "from": from_addr,
            "epoch": self.epoch,
            "slot": int(when_slot),
        })

    def _apply_ops_at_current_eep(self):
        """
        Applica le operazioni che, secondo la regola EEP, 'maturano' ORA.
        Regola (semplificata ma fedele alla logica 1/3):
          - ops dell'epoch corrente con slot <= EEP        → valgono da (t+1)
          - ops dell'epoch precedente con slot > EEP       → valgono da (t+1)
        Quindi qui le 'applichiamo' prima dello snapshot di questo EEP.
        """
        eep = self.cfg["eep_slot"]
        eligible = []
        keep = []
        for op in self.pending_ops:
            if (op["epoch"] == self.epoch and op["slot"] <= eep) or \
               (op["epoch"] == self.epoch - 1 and op["slot"] > eep):
                eligible.append(op)
            else:
                keep.append(op)

        # Ordine deterministico: prima UNDO poi STAKE; per pari slot→from→to
        def op_key(op):
            return (
                0 if op["kind"] == "STAKE_UNDO" else 1,
                op.get("epoch", -1),
                op.get("slot", -1),
                op.get("from", ""),
                op.get("to", "")
            )
        eligible.sort(key=op_key)

        for op in eligible:
            kind = op["kind"]
            if kind == "STAKE_UNDO":
                deleg_addr = op["from"]
                st = self.stakeholders.get(deleg_addr)
                if st is None:
                    continue
                for m in self.mains.values():
                    cur = m.delegates.get(deleg_addr, 0.0)
                    if cur > 0:
                        st.green += cur
                        m.delegates.pop(deleg_addr, None)

            elif kind == "STAKE":
                to_main = self.mains.get(op["to"])
                if to_main is None:
                    continue
                amount = float(op["amount"])
                if amount < MIN_BET_AMOUNT:
                    continue
                from_addr = op["from"]
                if from_addr == "_self_":
                    # self-bet: addebita sul green del main
                    if amount <= to_main.green:
                        to_main.green -= amount
                        to_main.bet("_self_", amount)
                else:
                    st = self.stakeholders.get(from_addr)
                    if st is None:
                        continue
                    if amount <= st.green:
                        st.green -= amount
                        to_main.bet(from_addr, amount)

        # lascia in coda solo le non-eligibili (matureranno più avanti)
        self.pending_ops = keep

    # ---------------------------
    # Snapshot EEP
    # ---------------------------
    def _snapshot_all_mains_at_eep(self):
        for m in self.mains.values():
            m.snapshot_at_eep()

    # ---------------------------
    # Helpers deterministici
    # ---------------------------
    @staticmethod
    def _hash_u64(*parts: str) -> int:
        """Hash deterministico → intero 64 bit (per ticket/ordinamenti)."""
        h = hashlib.sha256("::".join(parts).encode()).digest()
        return int.from_bytes(h[:8], "big", signed=False)

    def _epoch_seed_str(self) -> str:
        return f"takamaka-seed:{self.cfg['seed']}:{self.epoch}"

    # ---------------------------
    # Assegnazione slot (deterministica)
    # ---------------------------
    @staticmethod
    def _effective_from_penalty(base: float, penalty_score: float) -> float:
        return base / (1.0 + max(0.0, penalty_score))

    def _quota_counts(self, weights: Dict[str, float]) -> Dict[str, int]:
        """Quote intere con rounding deterministico sul resto."""
        S = self.cfg["slots_per_epoch"]
        total = sum(weights.values())
        if total <= 0:
            return {k: 0 for k in weights}
        expected = {k: (w / total) * S for k, w in weights.items()}
        base = {k: int(math.floor(v)) for k, v in expected.items()}
        remainder = S - sum(base.values())

        # ordina per parte frazionaria desc; tie-break deterministico
        seed = self._epoch_seed_str()
        frac = []
        for k, v in expected.items():
            frac_part = v - math.floor(v)
            tiebreak = self._hash_u64(seed, "frac", k)
            frac.append((k, frac_part, tiebreak))
        frac.sort(key=lambda x: (-x[1], x[2]))
        for i in range(remainder):
            base[frac[i][0]] += 1
        return base

    def _enforce_min_max(self, counts: Dict[str, int]) -> Dict[str, int]:
        """Applica min/max slot per overflow con redistribuzione deterministica."""
        S = self.cfg["slots_per_epoch"]
        min_s = self.cfg["min_slots_per_overflow"]
        max_s = self.cfg["max_slots_per_overflow"]

        # porta sotto-minimo al minimo
        need = 0
        for k in counts:
            if counts[k] < min_s:
                need += (min_s - counts[k])
                counts[k] = min_s

        if need > 0:
            seed = self._epoch_seed_str()
            donors = sorted(counts.keys(),
                            key=lambda k: (-counts[k], self._hash_u64(seed, "donor", k)))
            for k in donors:
                if need <= 0:
                    break
                avail = max(0, counts[k] - min_s)
                take = min(avail, need)
                counts[k] -= take
                need -= take

        # normalizza somma a S (senza scendere sotto min_s)
        total = sum(counts.values())
        if total != S:
            seed = self._epoch_seed_str()
            keys = sorted(counts.keys(), key=lambda k: self._hash_u64(seed, "norm", k))
            idx = 0
            while sum(counts.values()) < S:
                counts[keys[idx % len(keys)]] += 1
                idx += 1
            while sum(counts.values()) > S:
                k = keys[idx % len(keys)]
                if counts[k] > min_s:
                    counts[k] -= 1
                idx += 1

        # cappatura a max_s e redistribuzione dell'eccesso
        seed = self._epoch_seed_str()
        excess = 0
        for k, v in list(counts.items()):
            if v > max_s:
                excess += (v - max_s)
                counts[k] = max_s

        if excess > 0:
            receivers = sorted([k for k in counts if counts[k] < max_s],
                               key=lambda k: (counts[k], self._hash_u64(seed, "recv", k)))
            i = 0
            while excess > 0 and receivers:
                k = receivers[i % len(receivers)]
                if counts[k] < max_s:
                    counts[k] += 1
                    excess -= 1
                i += 1
            if excess > 0:
                # spill deterministico
                keys = sorted(counts.keys(), key=lambda k: self._hash_u64(seed, "spill", k))
                i = 0
                while excess > 0:
                    counts[keys[i % len(keys)]] += 1
                    excess -= 1
                    i += 1

        return counts

    def _distribute_slots(self) -> Tuple[Dict[str, int], List[str]]:
        """
        Pesi (stake effettiva) → quote intere → vincoli min/max → schedule deterministica (ticket hashing).
        effective(node) = main.active_stake * node.base_stake_share / (1 + penalty_score)
        """
        # pesi per overflow
        weights: Dict[str, float] = {}
        for node in self.overflows.values():
            main = self.mains[node.main_id]
            base = max(0.0, main.active_stake * max(0.0, node.base_stake_share))
            eff = self._effective_from_penalty(base, node.penalty_score)
            node.effective_stake = eff
            weights[node.address] = eff

        # quote + vincoli
        counts = self._quota_counts(weights)
        counts = self._enforce_min_max(counts)

        # salva assegnazioni
        for addr, cnt in counts.items():
            self.overflows[addr].assigned_slots = cnt

        # schedule deterministica: un ticket per slot del nodo
        seed = self._epoch_seed_str()
        tickets = []
        for addr, cnt in counts.items():
            for k in range(cnt):
                t = self._hash_u64(seed, "slot", addr, str(k))
                tickets.append((t, addr))
        tickets.sort(key=lambda x: x[0])
        schedule = [addr for _, addr in tickets]
        return counts, schedule

    # ---------------------------
    # Penalità e reward
    # ---------------------------
    def _apply_penalties_and_rewards(self, per_node_outcomes: Dict[str, Dict[str, int]]):
        for addr, outcome in per_node_outcomes.items():
            node = self.overflows[addr]
            main = self.mains[node.main_id]

            assigned = max(1, node.assigned_slots)
            bad = outcome.get("skipped", 0) + outcome.get("invalid", 0) + outcome.get("delayed", 0)
            ok_blocks = outcome.get("ok", 0)
            bad_ratio = bad / assigned

            # penalità di lungo periodo (progressiva)
            if bad_ratio > self.cfg["tolerated_bad_ratio"]:
                node.penalty_score += self.cfg["penalty_factor"] * (bad_ratio - self.cfg["tolerated_bad_ratio"])
            else:
                node.penalty_score = max(
                    0.0,
                    node.penalty_score - self.cfg["recovery_factor"] * (self.cfg["tolerated_bad_ratio"] - bad_ratio)
                )

            # (opzionale) slashing sul green del nodo per blocchi invalid
            spiv = self.cfg["slash_per_invalid"]
            if spiv > 0.0:
                slash = outcome.get("invalid", 0) * spiv
                if slash > 0:
                    node.slash_green(slash)

            # reward al MAIN → proportional compound sui deleganti del main
            reward = self.cfg["reward_per_block"] * ok_blocks
            main.credit_reward(reward)

    # ---------------------------
    # Simulazione epoch
    # ---------------------------
    def simulate_one_epoch(self) -> Dict:
        """
        1) Applica STAKE/STAKE_UNDO che maturano a questo EEP
        2) Snapshot stake attiva (EEP)
        3) Assegna slot (deterministico)
        4) Simula gli slot (behavior overflow)
        5) Applica penalità + reward
        """
        # 1) op all'EEP
        self._apply_ops_at_current_eep()

        # 2) snapshot all'EEP
        self._snapshot_all_mains_at_eep()

        # 3) distribuzione slot
        slots_per_node, schedule = self._distribute_slots()

        # 4) simulazione slot
        per_node_outcomes = defaultdict(lambda: {"ok": 0, "skipped": 0, "invalid": 0, "delayed": 0})
        # outcome stocastico ma ripetibile (seed fisso + epoch)
        rng_slots = random.Random(self.cfg["seed"] * 7919 + self.epoch)
        for addr in schedule:
            outcome = self.overflows[addr].behavior_outcome(rng_slots)
            per_node_outcomes[addr][outcome] += 1

        # 5) penalità + reward
        self._apply_penalties_and_rewards(per_node_outcomes)

        # KPI epoch
        kpi = {
            "epoch": self.epoch,
            "slots_per_node": dict(slots_per_node),
            "per_node_outcomes": {addr: dict(v) for addr, v in per_node_outcomes.items()},
            "main_active_stake": {m.address: m.active_stake for m in self.mains.values()},
            "main_total_delegates": {m.address: m.total_delegated_now() for m in self.mains.values()},
            "penalties": {addr: self.overflows[addr].penalty_score for addr in self.overflows},
        }
        self.history.append(kpi)
        self.epoch += 1
        return kpi

    # ---------------------------
    # Report
    # ---------------------------
    def epoch_summary_df(self, kpi: Dict) -> pd.DataFrame:
        rows = []
        for addr, slots in kpi["slots_per_node"].items():
            o = kpi["per_node_outcomes"].get(addr, {})
            node = self.overflows[addr]
            rows.append({
                "epoch": kpi["epoch"],
                "overflow": addr,
                "main": node.main_id,
                "assigned_slots": slots,
                "ok": o.get("ok", 0),
                "skipped": o.get("skipped", 0),
                "invalid": o.get("invalid", 0),
                "delayed": o.get("delayed", 0),
                "penalty_score": node.penalty_score,
                "effective_stake": node.effective_stake,
            })
        return pd.DataFrame(rows)


# Simulations