In [None]:
import os
import json
import requests
import pandas as pd
import plotly.graph_objects as go
from dataclasses import dataclass, field

from eth2spec.utils import bls
from eth2spec.utils.ssz.ssz_typing import Bytes32, uint8, uint32, uint64
from eth2spec.capella.mainnet import (
    ValidatorIndex, BeaconState, Epoch, DomainType, BeaconBlock, uint_to_bytes, get_seed,
    process_slots, hash, DOMAIN_BEACON_PROPOSER, SLOTS_PER_EPOCH, get_current_epoch, 
    get_active_validator_indices, compute_proposer_index, SignedBeaconBlock, state_transition,
    compute_signing_root, get_domain, hash_tree_root, SLOTS_PER_HISTORICAL_ROOT, Slot,
    get_beacon_proposer_index, get_randao_mix, EPOCHS_PER_HISTORICAL_VECTOR, xor,
    MIN_SEED_LOOKAHEAD, MAX_EFFECTIVE_BALANCE, compute_shuffled_index
)

In [None]:
# Function to retrieve and save a specific state slot.
# force parameter allows to overwrite existing saved state.
def get_and_save_state(slot, force=False):
    # Checking if the state already exists in the file system and if there is no force overwrite.
    if os.path.isfile(f'state_at_slot_{slot}.json') and not force:
        print("state already saved")
        return
    print("retrieving state", end="\r")
    # Creating the URL for the state retrieval.
    url = f"http://localhost:5052/eth/v2/debug/beacon/states/{slot}"
    # Sending a GET request to the URL.
    response = requests.get(url)
    # Parsing the JSON response.
    res = response.json()
    # Writing the state to a file.
    with open(f'state_at_slot_{slot}.json', 'wt') as file:
        json.dump(res, file, indent='  ')
    print(f"state for slot {slot:,} successfully loaded")

# Function to retrieve a beacon block at a specific slot.
def get_beacon_block(slot):
    # Creating the URL for the beacon block retrieval.
    url = f"http://localhost:5052/eth/v2/beacon/blocks/{slot}"
    # Sending a GET request to the URL.
    response = requests.get(url)
    # Parsing the JSON response and returning the BeaconBlock object.
    block = response.json()
    return BeaconBlock.from_obj(block["data"]["message"])

# Function to load a state from a JSON file. 
# It checks if the state is already loaded globally and if not, it loads from file.
def load_state_from_json(slot, force=False):
    global state
    if "state" in globals() and not force:
        if state.slot == slot:
            print("state already loaded")
            return state
    print("loading state...", end="\r")
    # Opening the file and loading the state.
    with open(f'state_at_slot_{slot}.json', 'r') as f:
        state_json = json.load(f)["data"]
        _state = BeaconState.from_obj(state_json)
    print(f"state for slot {_state.slot} successfully loaded")
    return _state

# Function to load validator IDs for Lido.
# It reads the IDs from a CSV file and, if provided, intersect with given indices.
def load_lido_validator_ids(indices=None):
    print("loading lido validators...", end="\r")
    # Opening the file and reading the validator IDs.
    with open("lido_validators.csv", 'r') as f:
        lido_validators = set(int(i) for i in set(f.read().splitlines()[1:]) if i)
    lido_validators.discard('')
    if indices:
        _indices = set(indices)
        lido_validators = lido_validators & _indices
    print(f"validators for lido loaded ({len(lido_validators)})")
    print(f"share: ({len(lido_validators)/len(_indices)*100:,.2f}%)")
    return lido_validators

# Function to determine the validator IDs for a user.
# It selects IDs based on provided indices and desired share.
def get_my_validator_ids(indices, share):
    return set([indices[i] for i in range(0, len(indices), int(100//(share*100)))])

In [None]:
# Function to compute proposer index by providing a list of validators
def compute_proposer_index_custom(state_validators, indices, seed):
    assert len(indices) > 0
    MAX_RANDOM_BYTE = 2**8 - 1
    i = uint64(0)
    total = uint64(len(indices))
    # Loop until a valid candidate index is found
    while True:
        candidate_index = indices[compute_shuffled_index(i % total, total, seed)]
        random_byte = hash(seed + uint_to_bytes(uint64(i // 32)))[i % 32]
        effective_balance = state_validators[candidate_index].effective_balance
        if effective_balance * MAX_RANDOM_BYTE >= MAX_EFFECTIVE_BALANCE * random_byte:
            return candidate_index
        i += 1

# Function to get seed from list of randao_mixes
def get_seed_custom(randao_mixes, epoch):
    mix_epoch = Epoch(epoch + EPOCHS_PER_HISTORICAL_VECTOR - MIN_SEED_LOOKAHEAD - 1)
    mix = randao_mixes[mix_epoch % EPOCHS_PER_HISTORICAL_VECTOR]
    return hash(DOMAIN_BEACON_PROPOSER + uint_to_bytes(epoch) + mix)

# Function to get proposers for given state
def get_proposers_for_my_state(randao_mixes, current_epoch, state_validators, indices):
    next_epoch = current_epoch + 2
    seed = get_seed_custom(randao_mixes, next_epoch)
    proposer_indices = dict()
    # Compute proposer index for each slot in the next epoch
    for slot in range(next_epoch * SLOTS_PER_EPOCH, (next_epoch + 1) * SLOTS_PER_EPOCH):
        slot_seed = hash(seed + uint_to_bytes(uint64(slot)))
        proposer_indices[slot] = compute_proposer_index_custom(state_validators, indices, slot_seed)
    return proposer_indices

# Function to count my proposers in the next epoch
def get_my_proposer_count_in_next_epoch(my_validators, all_next_ep):
    return sum([1 if i in my_validators else 0 for i in all_next_ep.values()])

# Function to get my proposers in the next epoch
def get_my_proposers_in_next_epoch(my_validators, all_next_ep, epoch):
    next_epoch = epoch + 2
    start_slot = next_epoch * SLOTS_PER_EPOCH
    return {start_slot+ix: i if i in my_validators else None for ix, i in enumerate(all_next_ep.values())}

# Function to check validators in the next epoch
def check_validators_in_next_epoch(
        randao_mixes, epoch, state_validators, indices, my_validators, results
    ):
    all_next_ep = get_proposers_for_my_state(randao_mixes, epoch, state_validators, indices)
    my_next_ep = get_my_proposers_in_next_epoch(my_validators, all_next_ep, epoch)
    my_next_share = get_my_proposer_count_in_next_epoch(my_validators, all_next_ep)
    
    results.all_props = all_next_ep
    results.my_props = my_next_ep
    results.my_share = my_next_share
    print(f"share of my proposers vs. total: {my_next_share}/32")

# Function to perform RANDAO mixing
def fast_randao_mixing_custom(randao_mixes, epoch, randao_reveal):
    randao_mixes[epoch % EPOCHS_PER_HISTORICAL_VECTOR] = xor(
        randao_mixes[epoch % EPOCHS_PER_HISTORICAL_VECTOR],
        hash(randao_reveal)
    )
    return randao_mixes

# Function to simulate the process
def simulate(
    randao_mixes, epoch, slot, state_validators, indices, my_validators, strategy, simulation_results
):
    print("=================================================")
    print(f"strategy: {strategy}")
    strategy = [int(i) for i in strategy.strip("() ").split()]
    results = Results()
    # Apply strategy and check validators
    for choice in strategy:
        # choice `1` means proposing a block and `0` means missing that slot
        if choice == 1:
            block = get_beacon_block(slot+1)
            randao_mixes = fast_randao_mixing_custom(
                randao_mixes, epoch, block.body.randao_reveal
            )
        slot+=1
    check_validators_in_next_epoch(
        randao_mixes, epoch, state_validators, indices, my_validators, results
    )
    simulation_results.results[len(simulation_results.results.keys())] = results

# Data class to store results
@dataclass
class Results:
    all_props: dict = field(default_factory=dict)
    my_props: dict = field(default_factory=dict)
    my_share: int = 0

# Data class to store simulation results
@dataclass
class SimulationResults:
    results: dict = field(default_factory=dict)


In [None]:
pd.read_csv("validators_subsequent_tails.csv").sort_values("slot").reset_index(drop=True).dropna()

## Summary Table of tail-end slots

In [None]:
df = pd.read_csv("validators_subsequent_tails.csv").sort_values("slot").reset_index(drop=True)#.dropna(subset=["slot_prev_prev"])
summary = pd.DataFrame(columns=[i for i in df if i.startswith("slot")], index=[df["validator"].unique()])
for i in df.columns[::-1]:
    if not i.startswith("slot"):
        continue
    _df = df.dropna().groupby("validator")[i].count()
    for vali, count in zip(_df.index.tolist(), _df.values.tolist()):
        summary.loc[vali, (i)] = count
    df.drop(i, inplace=True, axis=1)
summary = summary.sort_values(["slot_2"], ascending=False).fillna(0)
summary = summary[summary["slot_2"] > 0].reset_index()
print(summary.to_markdown(index=False))

## Simulation

In [None]:
# Starting at the state before Lido had 8 subequent tail-slots at the epoch 149818
state = load_state_from_json(5915831, force=False)
simulation_results = SimulationResults()
strategies = [f"( {format(i, '08b').replace('', ' ')[1: -1]} )" for i in range(256)]
epoch = get_current_epoch(state)
indices = get_active_validator_indices(state, epoch+2)
state_validators = state.validators
lido_validators = load_lido_validator_ids(indices)
for strategy in strategies:
    randao_mixes = state.randao_mixes.copy()
    slot = state.slot.copy()
    simulate(
        randao_mixes, epoch, slot, state_validators, indices, lido_validators, strategy, simulation_results
    )

## Plot it

In [None]:
x_values = list(simulation_results.results.keys())
y_values = [simulation_results.results[i].my_share for i in range(len(simulation_results.results))]
colors = ['#4682b4'] * len(simulation_results.results) 
colors[len(simulation_results.results)-1] = '#3A0B61'
fig = go.Figure(data=[go.Bar(
            x=x_values, y=y_values,
            text=y_values,
            textposition='auto',
            marker_color=colors
        )])

fig.add_shape(
    type="line",
    xref="paper", 
    yref="y",
    x0=0, 
    y0=simulation_results.results[len(simulation_results.results)-1].my_share, 
    x1=1, 
    y1=simulation_results.results[len(simulation_results.results)-1].my_share,
    line=dict(
        color="Grey",
        width=2,
        dash="dash",
    )
)

fig.add_annotation(
    x=x_values[-1],
    y=y_values[-1],
    text="What Lido<br>actually did",
    showarrow=True,
    arrowhead=1,
    arrowsize=1,
    arrowwidth=2,
    arrowcolor="#636363",
    ax=-50,
    ay=-150,
)

fig.update_layout(
    title_text='Possibilities having 8 tail-end slots', 
    xaxis_title='possibilities', 
    yaxis_title='slots in next but one epoch',
    plot_bgcolor = "#ffffff",
    xaxis=dict(
        showgrid=True,
        gridwidth=1,
        gridcolor='LightGrey',
        title_font=dict(size=18)
    ),
    yaxis=dict(
        showgrid=True,
        gridwidth=1,
        gridcolor='LightGrey',
        title_font=dict(size=18)
    ),
    font=dict(
        size=16
    ),
    width = 900,
    height = 400,
    margin=dict(l=0, r=0, t=50, b=0)
)
fig.write_image("randao_manipulation.png")
fig.show()

## Adapt it for missed slots during manipulation

In [None]:
adapted = dict()
for key in simulation_results.results.keys():
    missed_slots = strategies[key].count("0")
    adapted[key] = simulation_results.results[key].my_share - missed_slots
    
strategy_ranking = pd.DataFrame(columns=["strategy", "outcome", "adjusted outcome"])
for i in adapted.keys():
    if adapted[i] == max(adapted.values()):
        strategy_ranking.loc[
            len(strategy_ranking), ("strategy", "outcome", "adjusted outcome")
        ] = strategies[i].strip("()"), simulation_results.results[i].my_share, adapted[i]
    if adapted[i] == max(adapted.values())-1:
        strategy_ranking.loc[
            len(strategy_ranking), ("strategy", "outcome", "adjusted outcome")
        ] = strategies[i].strip("()"), simulation_results.results[i].my_share, adapted[i]
strategy_ranking.sort_values("adjusted outcome", ascending=False, inplace=True)
print(strategy_ranking.to_markdown(index=None))

## Plot it again

In [None]:
adapted = dict()
for key in simulation_results.results.keys():
    missed_slots = strategies[key].count("0")
    adapted[key] = simulation_results.results[key].my_share - missed_slots
    
x_values = list(simulation_results.results.keys())
y_values = [adapted[i] for i in range(len(simulation_results.results))]
colors = ['#4682b4'] * len(simulation_results.results) 
colors[len(simulation_results.results)-1] = '#3A0B61'
fig = go.Figure(data=[go.Bar(
            x=x_values, y=y_values,
            text=y_values,
            textposition='auto',
            marker_color=colors
        )])

fig.add_shape(
    type="line",
    xref="paper", 
    yref="y",
    x0=0, 
    y0=simulation_results.results[len(simulation_results.results)-1].my_share, 
    x1=1, 
    y1=simulation_results.results[len(simulation_results.results)-1].my_share,
    line=dict(
        color="Grey",
        width=2,
        dash="dash",
    )
)

fig.add_annotation(
    x=x_values[-1],
    y=y_values[-1],
    text="What Lido<br>actually did",
    showarrow=True,
    arrowhead=1,
    arrowsize=1,
    arrowwidth=2,
    arrowcolor="#636363",
    ax=-50,
    ay=-100,
)


fig.update_layout(
    title_text='Possibilities having 8 tail-end slots (adjusted for missed slots during manipulation)', 
    xaxis_title='possibilities', 
    yaxis_title='slots in next but one epoch',
    plot_bgcolor = "#ffffff",
    xaxis=dict(
        showgrid=True,
        gridwidth=1,
        gridcolor='LightGrey',
        title_font=dict(size=18)
    ),
    yaxis=dict(
        showgrid=True,
        gridwidth=1,
        gridcolor='LightGrey',
        title_font=dict(size=16)
    ),
    font=dict(
        size=16
    ),
    width = 900,
    height = 400,
    margin=dict(l=0, r=0, t=50, b=0)
)

fig.write_image("randao_manipulation_adjusted.png")
fig.show()

## Getting a certain slot

In [None]:
slot_count_list = []
for share in [0.1, 0.25]:
    state = load_state_from_json(5915831, force=False)
    simulation_results2 = SimulationResults()
    strategies2 = [f"( {format(i, '02b').replace('', ' ')[1: -1]} )" for i in range(2**2)]
    epoch = get_current_epoch(state)
    indices = get_active_validator_indices(state, epoch+2)
    state_validators = state.validators
    lido_validators = get_my_validator_ids(indices, share)
    for strategy in strategies2:
        randao_mixes = state.randao_mixes.copy()
        slot = state.slot.copy()
        simulate(
            randao_mixes, epoch, slot, state_validators, 
            indices, lido_validators, strategy, simulation_results2
        )
    slot_count = {i: 0 for i in range(32)}
    for i in simulation_results2.results.values():
        for ix, (a, b) in enumerate(zip(i.all_props.values(), i.my_props.values())):
            if a == b:
                slot_count[ix] += 1
    slot_count_list.append(slot_count)

In [None]:
x_values = list(slot_count_list[0].keys())
y_values = list(slot_count_list[0].values())
x_values2 = list(slot_count_list[1].keys())
y_values2 = list(slot_count_list[1].values())

# Create figure
fig = go.Figure()

# Add first bar trace
fig.add_trace(go.Bar(
    x=x_values,
    y=y_values,
    name='10% share',
    marker_color='#4682b4'
))

# Add second bar trace
fig.add_trace(go.Bar(
    x=x_values2,
    y=y_values2,
    name='25% share',
))

fig.update_layout(
    barmode='group',
    title_text='Getting a specific slot with 2 tail-end slots',
    xaxis_title='slot in epoch n+2',
    yaxis_title='nr of strategies with success',
    plot_bgcolor = "#ffffff",
    xaxis=dict(
        tickvals=list(range(32)),
        showgrid=True,
        gridwidth=1,
        gridcolor='LightGrey',
        title_font=dict(size=18)
    ),
    yaxis=dict(
        showgrid=True,
        gridwidth=1,
        gridcolor='LightGrey',
        title_font=dict(size=18)
    ),
    font=dict(
        size=16
    ),
    width = 900,
    height = 400,
    margin=dict(l=0, r=0, t=50, b=0)
)

# Show the figure
fig.write_image("specific_slot.png")
fig.show()