# Run the wealth distribution simulation with multiple trackers

## Game Rules

### Start

- Each player begins with a starting wealth of 1000.
- Players can enter and exit the game at any time.
- Players have a single active model they can update at any time. (here in the notebook , you create multiple local players)

### Prediction Phase

- For each prediction round, players automatically invest a fraction of their active wealth into the pot.
- This amount is subtracted from their active wealth.
- The total pot is inflated slightly by a game-defined inflation rate.
- The model must generate predictions in under 50 Milliseconds.

### Scoring & Distribution

- Once the true dove location is revealed, each prediction is scored using a likelihood function.
- The pot is then distributed proportionally based on these likelihood scores.
- More accurate predictions earn a larger share of the pot.
- Player wealth will never go below 0.
- Players can skip predictions. Doing so means they cannot lose or gain wealth, as they are not participating in prize distribution. 

### Payouts

- When a player’s wealth exceeds a defined wealth threshold of 2000, they receive a prize payout equal to 10% of their wealth.
- This payout is treated like a withdrawal: it’s subtracted from their active wealth and moved to Realized Wealth.

See full game rules in [Falcon](https://hub.crunchdao.com/competitions/falcon) challenge


In [None]:
# Standard library imports
import logging
import time
from tqdm.auto import tqdm

# Birdgame package imports
from birdgame.trackers.trackerbase import TrackerBase
from birdgame import HORIZON
from birdgame import GAME_PARAMS

from birdgame.datasources.livedata import live_data_generator
from birdgame.datasources.remotetestdata import remote_test_data_generator
from birdgame.trackers.tracker_evaluator import TrackerEvaluator

In [None]:
# Birdgame Trackers
from birdgame.examples.derived.ewmatracker import EMWAVarTracker
# from birdgame.examples.derived.autoetstracker import AutoETSsktimeTracker
# from birdgame.examples.derived.ngboosttracker import NGBoostTracker
from birdgame.examples.derived.quantileregtracker import QuantileRegressionRiverTracker

In [3]:
import numpy as np
import threading
import time
import warnings
from sktime.forecasting.ets import AutoETS
from statsmodels.tools.sm_exceptions import ConvergenceWarning

# Suppress convergence warnings
warnings.filterwarnings("ignore", category=ConvergenceWarning)
warnings.filterwarnings("ignore", message="Non-stationary starting autoregressive parameters")
warnings.filterwarnings("ignore", message="Non-invertible starting MA parameters found")

# Parameters
class Constants:
    MIN_SAMPLES = 5
    TRAIN_MODEL_FREQUENCY=50
    NUM_DATA_POINTS_MAX=20
    WARMUP_CUTOFF=200
    USE_THREADING=True # Set this to True for live data streams where each `tick()` and `predict()` call must complete within ~50 ms

class CustomTracker(TrackerBase):
    """
    A model that tracks the dove location using AutoETS.

    Parameters
    ----------
    horizon : int
        The prediction horizon in seconds (how far into the future predictions should be made).
    train_model_frequency : int
        The frequency at which the sktime model will be retrained based on the count of observations 
        ingested. This determines how often the model will be updated with new data.
    num_data_points_max : int
        The maximum number of data points to use for training the sktime model.
    warmup : int
        The number of ticks taken to warm up the model (wealth does not change during this period).
    use_threading : bool
        Whether to retrain the model asynchronously in a background thread.  
        /!/ Set this to True for live data streams where each `tick()`  
        and `predict()` call must complete within ~50 ms.  
        When enabled, retraining happens in parallel without blocking predictions.
    """

    def __init__(self, horizon=HORIZON):
        super().__init__(horizon)
        self.current_x = None
        self.last_observed_data = [] # Holds the last few observed data points
        self.prev_t = 0

        self.min_samples = Constants.MIN_SAMPLES
        self.train_model_frequency = Constants.TRAIN_MODEL_FREQUENCY
        self.num_data_points_max = Constants.NUM_DATA_POINTS_MAX

        # Number of steps to predict
        steps = 1 # only one because the univariate serie will only have values separated of at least HORIZON time
        self.fh = np.arange(1, steps + 1)

        # Fit the AutoETS forecaster (no seasonality)
        self.forecaster = AutoETS(auto=True, sp=1, information_criterion="aic")
        self.scale = 1e-6

        # or Fit the AutoARIMA forecaster
        # self.forecaster = AutoARIMA(max_p=2, max_d=1, max_q=2, maxiter=10)

        self.warmup_cutoff = Constants.WARMUP_CUTOFF
        self.tick_count = 0

        # Threading tools
        self.use_threading = Constants.USE_THREADING
        self._lock = threading.Lock()
        if self.use_threading:
            self._cond = threading.Condition(self._lock)
            self._new_data = None
            self._stop_worker = False
            self._worker_thread = threading.Thread(target=self._worker_retrain_model_async, daemon=True)
            self._worker_thread.start()

    # ------------------- Tick -------------------
    def tick(self, payload, performance_metrics=None):
        """
        Ingest a new record (payload), store it internally and update the model.

        Function signature can also look like tick(self, payload) since performance_metrics 
        is an optional parameter.

        Parameters
        ----------
        payload : dict
            Must contain 'time' (int/float) and 'dove_location' (float).
        performance_metrics : dict (is optional)
            Dict containing 'wealth', 'likelihood_ewa', 'recent_likelihood_ewa'.
        """
        # # To see the performance metrics on each tick
        # print(f"performance_metrics: {performance_metrics}")

        # # Can also trigger a warmup by checking if a performance metric drops below a threshold
        # if performance_metrics['recent_likelihood_ewa'] < 1.1:
        #     self.tick_count = 0
        
        x = payload["dove_location"]
        t = payload["time"]
        self.add_to_quarantine(t, x)
        self.current_x = x

        # Collect and process observations only at horizon-based intervals
        if t > self.prev_t + self.horizon:
            self.last_observed_data.append(x)
            self.prev_t = t

            if self.count == self.min_samples or (self.count > self.min_samples and self.count % self.train_model_frequency == 0):
                # Construct 'y' as an univariate serie
                y = np.array(self.last_observed_data)[-self.num_data_points_max:]

                # Fit sktime model and variance prediction
                if self.use_threading:
                    # Signal background thread
                    with self._cond:
                        self._new_data = y
                        self._cond.notify()
                else:
                    self._retrain_model_sync(y)

                # Update last observed data (to limit memory usage as it will be run on continuous live data)
                self.last_observed_data = self.last_observed_data[-(self.num_data_points_max + 2):]

            self.count += 1

        self.tick_count += 1

    # ------------------- Prediction -------------------
    def predict(self):
        """
        Return a dictionary representing the best guess of the distribution,
        modeled as a Gaussian distribution.

        If the model is in the warmup period, return None.
        """
        with self._lock:
            # Check if the model is warming up
            if self.tick_count < self.warmup_cutoff or self.forecaster is None:
                return None

            # the central value (mean) of the gaussian distribution will be represented by the current value
            # but you can get point forecast from 'self.forecaster.predict(fh=self.fh[-1])[0][0]'
            loc = self.current_x
            # we predicted scale during tick training
            scale = max(getattr(self, "scale", 1e-6), 1e-6)

        # time.sleep(0.01)  # mimic short inference delay

        # Return the prediction density
        components = {
            "density": {
                "type": "builtin",
                "name": "norm",
                "params": {"loc": loc, "scale": scale},
            },
            "weight": 1,
        }

        return {"type": "mixture", "components": [components]}

    # ------------------- Model training -------------------
    def _fit(self, y):
        # Fit a clone sktime model (at least a cloned model is required in case of asynchronous training)
        new_forecaster = self.forecaster.clone()
        new_forecaster.fit(y, fh=self.fh)
        # Variance prediction
        var = new_forecaster.predict_var(fh=self.fh)
        scale = np.sqrt(var.values.flatten()[-1])

        return new_forecaster, scale

    def _retrain_model_sync(self, y):
        """Synchronous retraining"""
        start_time = time.perf_counter()
        self.forecaster, self.scale = self._fit(y)
        # print(f"Sync retrain time: {(time.perf_counter()- start_time)*1000:.2f} ms") # check training time

    def _worker_retrain_model_async(self):
        """Asynchronous retraining in a background worker"""
        while True:
            with self._cond:
                # Wait until new data is available
                while self._new_data is None:
                    self._cond.wait()
                y = self._new_data  # get the data to train on
                self._new_data = None  # clear it (so next signal is new data)

            # Train the model outside the lock (so predict() can still run)
            new_forecaster, scale = self._fit(y)

            # Swap the trained model safely
            with self._lock:
                self.forecaster = new_forecaster
                self.scale = scale
            # print("Async retraining done")

In [7]:
# -------------------------------------------------------------------
# CONFIGURATION
# -------------------------------------------------------------------
LIVE_MODE = True   # Do not use threading if you set LIVE_MODE = False
MAX_ROWS = None

WARMUP_STEPS_ALL_MODELS = 500           # delay before updating wealth
LOG_EVERY_N_STEPS = 100                 # print wealth info every N steps

logging.basicConfig(level=logging.INFO, format="[%(asctime)s] %(message)s")

# -------------------------------------------------------------------

In [None]:
def compute_likelihood(evaluator, payload):
    """Safely tick and return the latest likelihood."""
    try:
        evaluator.tick_and_predict(payload, {})
        if evaluator.scores:
            return evaluator.scores[-1]
    except Exception as e:
        logging.warning(f"Error during likelihood computation: {e}")
    return None


def update_wealth(players, likelihoods, params):
    """Update each player's wealth based on likelihood and shared pot."""
    valid_likelihoods = {k: v for k, v in likelihoods.items() if v is not None}
    total_likelihood = sum(valid_likelihoods.values())

    if not valid_likelihoods or total_likelihood == 0:
        return

    # Investment phase
    pot = 0.0
    for player in players.values():
        investment = params["investment_fraction"] * player["wealth"]
        player["wealth"] = max(0.0, player["wealth"] - investment)
        pot += investment

    # Inflation adjustment
    pot *= 1 + params["inflation_bps"] / 10000.0

    # Redistribution phase
    for name, likelihood in valid_likelihoods.items():
        share = likelihood / total_likelihood
        players[name]["wealth"] += pot * share


def run_simulation_wealth(trackers, live=LIVE_MODE, max_rows=MAX_ROWS, game_params=GAME_PARAMS,
                          warmup_steps_all_models=WARMUP_STEPS_ALL_MODELS, log_every_n_steps=LOG_EVERY_N_STEPS):
    """
    Run the live or remote wealth distribution simulation with multiple trackers.

    fading_factor : list
        A list of trackers
    """
    gen = live_data_generator() if live else remote_test_data_generator(max_rows=max_rows)
    evaluators = {t.__class__.__name__: TrackerEvaluator(t) for t in trackers}

    # Player setup
    players = {
        name: {
            "tracker_evaluator": evaluator,
            "wealth": game_params["initial_wealth"],
        }
        for name, evaluator in evaluators.items()
    }

    try:
        for i, payload in enumerate(tqdm(gen)):
            likelihoods = {
                name: compute_likelihood(p["tracker_evaluator"], payload)
                for name, p in players.items()
            }

            # Only start wealth updates after a warmup period
            if i > warmup_steps_all_models and not any(v is None for v in likelihoods.values()):
                update_wealth(players, likelihoods, game_params)

                if i % log_every_n_steps == 0:
                    snapshot = {name: round(p["wealth"], 2) for name, p in players.items()}
                    logging.info(f"Step {i:05d} | Wealth: {snapshot}")
                    
    except KeyboardInterrupt:
            print("Interrupted")

In [11]:
trackers = [
        EMWAVarTracker(),
        QuantileRegressionRiverTracker(),
        CustomTracker(),   # here: AutoETS quickstarter
    ]

run_simulation_wealth(trackers)

0it [00:00, ?it/s]

[2025-10-23 17:06:53,590] Step 00600 | Wealth: {'EMWAVarTracker': 1122.7, 'QuantileRegressionRiverTracker': 947.87, 'CustomTracker': 929.46}
[2025-10-23 17:06:56,742] Step 00700 | Wealth: {'EMWAVarTracker': 1145.28, 'QuantileRegressionRiverTracker': 866.21, 'CustomTracker': 988.57}
[2025-10-23 17:07:00,967] Step 00800 | Wealth: {'EMWAVarTracker': 1103.52, 'QuantileRegressionRiverTracker': 855.8, 'CustomTracker': 1040.77}
[2025-10-23 17:07:05,007] Step 00900 | Wealth: {'EMWAVarTracker': 1094.27, 'QuantileRegressionRiverTracker': 779.92, 'CustomTracker': 1125.93}
[2025-10-23 17:07:11,663] Step 01000 | Wealth: {'EMWAVarTracker': 1151.69, 'QuantileRegressionRiverTracker': 710.08, 'CustomTracker': 1138.38}
[2025-10-23 17:07:16,068] Step 01100 | Wealth: {'EMWAVarTracker': 1159.68, 'QuantileRegressionRiverTracker': 642.48, 'CustomTracker': 1198.03}
[2025-10-23 17:07:21,989] Step 01200 | Wealth: {'EMWAVarTracker': 1164.11, 'QuantileRegressionRiverTracker': 662.97, 'CustomTracker': 1173.13}
[20

Interrupted
