<a href="https://colab.research.google.com/github/microprediction/martingale/blob/main/StatsChallenge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Collecting git+https://github.com/microprediction/martingale.git
  Cloning https://github.com/microprediction/martingale.git to /tmp/pip-req-build-th6lk813
  Running command git clone --filter=blob:none --quiet https://github.com/microprediction/martingale.git /tmp/pip-req-build-th6lk813
  Resolved https://github.com/microprediction/martingale.git to commit a18efb5d63d27962852d5b6ad429875d23933b0d
[31mERROR: git+https://github.com/microprediction/martingale.git does not appear to be a Python project: neither 'setup.py' nor 'pyproject.toml' found.[0m[31m
[0m

In [15]:
PREAMBLE = """ # You are being asked to write a class with the following methods:
    #
    #  get_mean(self):        Returns an estimate of a latent variable
    #  update(x:float):       Assimilate information from a noisy observation x
    #
    #  The approach you write should work reasonably well for the following examples:
    #
    #            - Brownian motion observed with noise
    #            - Brownian motion observed with serially correlated noise
    #            - Time-subordinated brownian motion observed with serially correlated noise
    #            - Time-subordinated brownian motion observed with serially correlated noise and occasional large outliers
    #
    #  You should make no assumptions about the scale of the process or the ratio of fluctuations
    #  caused by Brownian motion to those caused by the noise process.
    #
    #  The construction should not receive any arguments. Everything must be learned on the fly. You should try to learn
    #  as much as possible about the process as quickly as possible in order to create a robust nowcast that works well
    #  whether we see 100 observations, 5000 observations, or 100,000 observations.
    #
    #  The class must be called "Nowcast"
"""

### Prompt
(Initial prompt to LLM)

In [16]:
def challenge_prompt():
    """
    Returns a prompt that includes the PREAMBLE and the definition of NowcastExample.
    """
    nowcast_example_code = """class NowcastExample:
    def __init__(self):
       # Initialize state as you see fit here
       self.prev_x = None

    def update(self, x:float, dt:float):
       # Upon receiving value x dt ms after the previous value, update the state
       self.prev_x = x

    def get_mean(self):
       # Provide a nowcast of the mean of the anchor state
       return self.prev_x
"""
    # Combine the PREAMBLE with the example code
    combined_prompt = PREAMBLE + nowcast_example_code
    return combined_prompt

challenge_prompt()


' # You are being asked to write a class with the following methods:\n    #\n    #  get_mean(self):        Returns an estimate of a latent variable\n    #  update(x:float):       Assimilate information from a noisy observation x \n    #\n    #  The approach you write should work reasonably well for the following examples:\n    #\n    #            - Brownian motion observed with noise\n    #            - Brownian motion observed with serially correlated noise\n    #            - Time-subordinated brownian motion observed with serially correlated noise\n    #            - Time-subordinated brownian motion observed with serially correlated noise and occasional large outliers\n    #\n    #  You should make no assumptions about the scale of the process or the ratio of fluctuations\n    #  caused by Brownian motion to those caused by the noise process. \n    #\n    #  The construction should not receive any arguments. Everything must be learned on the fly. You should try to learn\n    #  as 

## Put your actual nowcast example here:
(Which might be the response from the LLM, or something you write)

In [17]:
class Nowcast:
    """
    A robust, adaptive nowcaster for a latent state observed with:
      - Brownian motion
      - Potentially serially correlated noise
      - Occasional large outliers
      - Unknown scale

    Everything is learned on the fly. The time interval dt is used in
    modeling the variance growth of the Brownian increment.

    Example usage:
    -------------
    nc = NowcastRobust()
    for t in range(len(data)):
        # Suppose data[t] arrives after dt = 1.0, for simplicity
        nc.update(data[t], dt=1.0)
        estimate = nc.get_mean()
        # do something with estimate
    """

    def __init__(self):
        # -- Adaptive State Estimates --

        # 1) Latent state estimate:
        self.x_hat = 0.0
        self.has_first_obs = False

        # 2) AR(1) noise estimate:
        self.e_hat = 0.0
        # AR coefficient (alpha): e_{k+1} = alpha*e_k + w_k
        self.alpha = 0.5

        # -- Covariances / Variances --
        self.sigma_x_sq = 1.0   # Variance of Brownian increments per unit time
        self.sigma_w_sq = 1.0   # Variance of noise innovations w_k
        self.sigma_out_sq = 1.0 # Additional robust outlier penalty

        # -- For robust weighting of outliers
        self.kappa = 2.5  # Controls how strongly we clamp large residuals

        # We store the time of the last update (if needed)
        self.t_prev = 0.0

    def update(self, x: float, dt: float):
        """
        Assimilate a new observation x, after dt time has passed since the last observation.

        The key idea is to treat:
          - The latent state as a random walk (Brownian motion).
          - Observations as x = x_true + e, where e is AR(1) noise with occasional outliers.
        We adaptively learn alpha, sigma_x_sq, sigma_w_sq, etc.
        """

        if not self.has_first_obs:
            # First observation: initialize filter
            self.x_hat = x
            self.has_first_obs = True
            self.t_prev = dt
            return

        # 1) Predicted AR(1) noise
        self.e_hat = self.alpha * self.e_hat

        # 2) Compute residual = observed - predicted
        residual = x - (self.x_hat + self.e_hat)

        # 3) Robust weighting for outliers
        #    If the residual is large relative to typical noise scale, reduce its influence.
        residual_scale = max((self.sigma_x_sq + self.sigma_w_sq)**0.5, 1e-9)
        norm_res = abs(residual) / residual_scale
        robust_weight = min(1.0, self.kappa / max(norm_res, 1e-9))

        # 4) Compute a scalar "Kalman-like" gain K
        #    We'll define measurement variance ~ sigma_out_sq
        #    We'll define process variance ~ sigma_x_sq*dt + sigma_w_sq
        #    Then we clamp by the robust weight.
        P_x = self.sigma_x_sq * dt
        P_e = self.sigma_w_sq
        var_meas = P_x + P_e + self.sigma_out_sq
        K = ((P_x + P_e) / var_meas) * robust_weight

        # 5) State update
        alpha_x = 0.8
        alpha_e = 0.2
        self.x_hat += alpha_x * K * residual
        self.e_hat += alpha_e * K * residual

        # 6) Online updates of parameters
        #    (a) alpha: we do a naive gradient step based on e_{k+1} ~ alpha*e_k
        lr_alpha = 0.001
        est_e = x - self.x_hat  # new implied noise
        if abs(self.e_hat) > 1e-9:
            new_alpha_est = est_e / self.e_hat
            # clamp alpha to [0, 1)
            new_alpha_est = max(0.0, min(0.99, new_alpha_est))
            self.alpha = (1 - lr_alpha)*self.alpha + lr_alpha*new_alpha_est

        #    (b) sigma_x_sq: measure the magnitude of "update" to x_hat over dt
        lr_var = 0.001
        increment = K * residual
        incr_per_dt = increment / max(dt, 1e-9)
        est_sigma_x_sq = incr_per_dt**2
        self.sigma_x_sq = (1 - lr_var)*self.sigma_x_sq + lr_var*est_sigma_x_sq

        #    (c) sigma_w_sq: measure the magnitude of "noise" increment in e_hat
        noise_innov = self.e_hat - (self.alpha * (self.e_hat - alpha_e*K*residual))
        est_sigma_w_sq = noise_innov**2
        self.sigma_w_sq = (1 - lr_var)*self.sigma_w_sq + lr_var*est_sigma_w_sq

        #    (d) sigma_out_sq: measure residual magnitude
        est_sigma_out_sq = residual**2
        self.sigma_out_sq = (1 - lr_var)*self.sigma_out_sq + lr_var*est_sigma_out_sq

        self.t_prev += dt

    def get_mean(self):
        if not self.has_first_obs:
            return None
        return self.x_hat


### User run the following cell
(The functions would be imported from elsewhere so as not to clutter the notebook)

In [23]:
def instantiation_feedback():
    try:
       nc = Nowcast()
    except Exception as e:
        # TODO: Capture the traceback and produce a prompt telling LLM how to fix it
        import traceback
        tb_str = traceback.format_exc()
        # Provide instructions or store them in a variable
        # e.g., "prompt_for_llm" could be a string that includes your instructions:
        prompt_for_llm = (
            f"An error occurred while trying to instantiate nowcast:\n{tb_str}\n"
            "Please investigate the stack trace and suggest a fix."
        )


def unit_test_feedback():
    """
    Run the nowcaster.

    Creates an instance of Nowcast and attempts to run the unit_test_nowcast
    function. If any exception occurs, capture the traceback and use it to
    generate feedback for the LLM on how to fix the problem. Otherwise returns None.
    """
    import numpy as np
    nc = Nowcast()
    try:
        example_xs = np.cumsum(np.random.randn(100)) + np.random.randn(100)
        for x in example_xs:
            nc.update(x)
            y = nx.get_mean()
        # Optionally, return or print 'feedback' if needed
        # return feedback
    except Exception as e:
        # TODO: Capture the traceback and produce a prompt telling LLM how to fix it
        import traceback
        tb_str = traceback.format_exc()
        # Provide instructions or store them in a variable
        # e.g., "prompt_for_llm" could be a string that includes your instructions:
        prompt_for_llm = (
            f"An error occurred while running unit_test_nowcast:\n{tb_str}\n"
            "Please investigate the stack trace and suggest a fix."
        )
        # You might log it or handle it however you see fit
        # print(prompt_for_llm)
        pass

def statistical_test_feedback():
     # Placeholder
     return None


def live_test_feedback():
     # Placeholder
     return None


feedback = instantiation_feedback() or unit_test_feedback() or statistical_test_feedback() or live_test_feedback()

print(feedback)



None
