# CSCI 2470 Final Project

## Load Dependencies

In [None]:
# !pip install gymnasium
# !pip install pysr

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0
Collecting pysr
  Downloading pysr-1.1.0-py3-none-any.whl.metadata (54 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.6/54.6 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
Collecting juliacall==0.9.23 (from pysr)
  Downloading juliacall-0.9.23-py3-none-any.whl.metadata (4.6 kB)
Collecting juliapkg~=0.1.8 (from juliacall==0.9.23->pysr)
  Downloading juliapkg-0.1.15-py3-none-any.w

## Global Parameters

There is a limit on how many points an environment remembers, and a limit on the places where the agent can query `{0,...,100}`

In [None]:
max_point_count = 10
num_actions = 2*max_point_count # half of action space is for answering

## Generate Terms

Let's use `c` to denote constants (e.g., `1` and `2` and `π` and `e`) and `x` to denote variables/parameters (e.g., `x`).

A term/expression `t` is one of the following

* `c`, constants,
* `x`, variable reference,
* `sin(t)`
* `t + t`
* `t - t`
* `t × t`

*Note: we denote terms/expressions with `t` rather than `e` because people might misread `e` as the Euler's number.*

### Generate Terms with an Expected Size

We can size terms by the following function:

```
size(c)       = 1
size(x)       = 1
size(⊖ t)     = 1 + size t
size(t1 ⊕ t2) = 1 + size t1 + size t2
```

We want to make sure that the expected size of a random term `S = E[size(t)]` is bound

So we need to solve the following

|                 constraints                  |
| :------------------------------------------: |
| S = p₀ × 1 + p₁ × (1 + S) + p₂ × (1 + 2 × S) |
|               p₀ + p₁ + p₂ = 1               |
|           pᵢ ≥ 0, ∀ i ∈ {1, 2, 3}            |

which reduces to

|        constraints         |
| :------------------------: |
|  1/S ​≤ p₀ ​≤ (1 + 1/S) / 2  |
|      p₁ = 1-2×p₀+1/S​​       |
|       p₂ = p₀ - 1/S        |

This system is under-constraint. So we will pick `p₀` uniformly randomly.

### More Considerations

We further adjust our generator in the following ways:

* We generate constants from the standard normal distribution.
* We round generated constants to two digits for the sake of readability.
* We use `sympy.simplify` to normalize the generated term to avoid generating, for example, both `x * x` and `x²`.

In [None]:
import sympy as sp
import numpy as np

the_x = sp.symbols("x")

def make_term(expected_size = 2, max_depth = 5):
    assert expected_size >= 1, f"the expected size must be at least 1, given {expected_size}"
    assert max_depth >= 1, f"the expected size must be at least 1, given {max_depth}"

    import random
    term0_maker = lambda depth: np.random.choice([
        lambda: round(np.random.normal(), ndigits=2),
        lambda: the_x # We want x to appear more often than numbers
    ], p = [0.2, 0.8])()
    term1_maker = lambda depth: np.random.choice([
        lambda: sp.sin(new_term(depth)),
        # lambda: sp.exp(new_term(depth)),
    ])()
    term2_maker = lambda depth: np.random.choice([
        lambda: new_term(depth) + new_term(depth),
        lambda: new_term(depth) - new_term(depth),
        lambda: new_term(depth) * new_term(depth),
    ])()

    S = expected_size
    p0_lower = 1/S
    p0_upper = (1 + 1/S) / 2

    def new_term(depth):
        if depth >= max_depth:
            maker = term0_maker
        else:
            p0 = np.random.uniform(p0_lower, p0_upper)
            p1 = 1 - 2*p0 + 1/S
            p2 = p0 - 1/S
            assert p0 >= 0, f"p0 is {p0}"
            assert p1 >= 0, f"p1 is {p1}"
            assert p2 >= 0, f"p2 is {p2}"
            assert 0.9 <= p0 + p1 + p2 <= 1.1, f"p0, p1, p2 is {(p0, p1, p2)}; They sum up to {p0 + p1 + p2}"

            maker = random.choices(
                [
                    term0_maker,
                    term1_maker,
                    term2_maker
                ],
                weights=[ p0, p1, p2 ],
                k=1
            )[0]
        return maker(depth + 1)
    return new_term(0)

### Test the Generator

In [None]:
make_term(3)

x

Generate 20 terms

In [None]:
[
    make_term(3) for _ in range(20)
]

[sin(sin(x)),
 x*sin(x),
 x,
 x,
 sin(x),
 -0.21,
 -0.362473457456449,
 -sin(x),
 x,
 -2.13,
 x - sin(x - sin(x)),
 0.307960934385481,
 0.87,
 2*x,
 x,
 1.29,
 x + sin(x),
 sin(sin(sin(x))),
 x,
 sin(x)]

### Build a set of terms

In [None]:
the_terms = []

the_terms_strs = set()
while len(the_terms) < 100:
    term = make_term(3)
    term_str = str(term)
    if term_str not in the_terms_strs:
        the_terms_strs.add(term_str)
        the_terms.append(term)
np.random.choice(the_terms, size = 5)

array([sin(x + sin(x)), 0, x + 1.89, -0.0499375858243268, x + 0.88],
      dtype=object)

## The Answer Model (PySR-based)

is provided by PySR.

In [None]:
def symbolic_regression(x, y):

    # suppress some warning messages
    import warnings
    warnings.filterwarnings(
        "ignore",
        message="Note: it looks like you are running in Jupyter.*"
    )

    from pysr import PySRRegressor

    model = PySRRegressor(
        verbosity=0,
        maxsize=20,
        niterations=5,  # < Increase me for better results
        binary_operators=["+", "*"],
        unary_operators=[
            "cos",
            "exp",
            "sin",
        ],
        elementwise_loss="loss(prediction, target) = (prediction - target)^2",
        # ^ Custom loss function (julia syntax)
    )

    model.fit(x[..., np.newaxis], y)
    return model.sympy()

Example usage:

In [None]:
# import numpy as np
# x = 2 * np.random.randn(100)
# y = 2.5382 * np.cos(x) + x ** 2 - 0.5
# e = symbolic_regression(x, y)
# print(e)
# numerical_func = sp.lambdify('x0', e, modules=["numpy"])
# numerical_func(0)

## The Environment

In our case:

* The **observation** is a list of 2D points.
* The **state** includes the observation and the underlying function.
* The **action** is either "answer" or "query" at a certain place.

The following `gym` definition (adapted from [an example from gymnasium's official document](https://gymnasium.farama.org/introduction/create_custom_env/)) makes these two points explicit.

### Compute the reward by L2

#### L2

In [None]:
def L2_distance_uniform(f1, f2, a=-1, b=1, N=1000):
    """
    Approximate the L2 distance between two sympy expressions expr1 and expr2
    over a bounded interval [a, b] using Monte Carlo uniform sampling.

    Parameters
    ----------
    var : sympy.Symbol, optional
        The variable in the expressions. Defaults to sympy.Symbol("x").
    a : float, optional
        Lower bound of the interval for sampling. Defaults to -1.
    b : float, optional
        Upper bound of the interval for sampling. Defaults to 1.
    N : int, optional
        Number of sample points to use for the approximation. Default is 1000.

    Returns
    -------
    float
        Approximate L2 distance between the two functions over [a, b].
    """
    if a >= b:
        raise ValueError("Lower bound 'a' must be less than upper bound 'b'.")

    # Define a uniform sampler over [a, b]
    sampler = lambda size: np.random.uniform(a, b, size)

    # Sample points
    X = sampler(N)  # shape: (N,)

    # Evaluate functions at sampled points
    y1 = f1(X)
    y2 = f2(X)

    # Compute mean square difference
    msd = np.mean((y1 - y2) ** 2)

    # Scale by the interval length for accurate L2 norm approximation
    interval_length = b - a
    L2_distance = np.sqrt(msd) * np.sqrt(interval_length)

    return L2_distance

In [None]:
L2_distance_uniform(lambda x: x, lambda x: x + 1)

1.4142135623730951

In [None]:
L2_distance_uniform(lambda x: x, lambda x: x)

0.0

In [None]:
L2_distance_uniform(lambda x: x, lambda x: np.sin(x))

0.08453134990477468

#### Reward

The distance is in `[0, +inf)`. We want to make sure the reward is in `[0, 1]` so we can give reward `0` when the model times out, and reward `1` when the answer is perfect.

We tried `1 / (1 + distance)` but this reward is too permissive for not exactly right answer. The current reward is `1 / exp(distance)`.

In [None]:
def compute_reward(expr, known_points):

    # the true answer
    f1 = sp.lambdify(the_x, expr)

    # the guessed answer
    e = symbolic_regression(
        np.array([x for x, y in known_points]),
        np.array([y for x, y in known_points])
    )
    f2 = sp.lambdify('x0', e, modules=["numpy"])

    d = L2_distance_uniform(f1, f2)

    # map distance [0, +inf) to [0, 1]
    return 1 / (1 + d), e
    # return 1 / np.exp(d), e

In [None]:
# compute_reward(the_x, [(0, 0), (1, 1), (2, 2)])

In [None]:
# compute_reward(the_x, [(0, 1), (1, 2), (2, 3)])

### Encode States

A state is a list of 2d points; The list length is not greater than max_point_count.
We want to convert it to an array of shape (max_point_count, 2) by filling in zeros.

In [None]:
import tensorflow as tf

def encode_state(state):
    assert len(state) <= max_point_count, f"len(state) is {len(state)}, which exceeds the limit {max_point_count}"

    state = tf.convert_to_tensor(state, dtype=float)

    # add noise to avoid keep querying the same thing
    # state += tf.random.normal(shape=state.shape, stddev=0.1)

    target_shape = (max_point_count, 2)

    # Calculate the padding
    pad_rows = target_shape[0] - tf.shape(state)[0]
    padding = [[0, pad_rows], [0, 0]]  # Pad rows only, no padding for columns

    # Pad the state
    return tf.pad(state, padding)

In [None]:
encode_state([
    (1, 2),
    (3, 4),
])[:10]

<tf.Tensor: shape=(10, 2), dtype=float32, numpy=
array([[1., 2.],
       [3., 4.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.],
       [0., 0.]], dtype=float32)>

### The Environment `gym` Definition

In [None]:
from typing import Optional
import gymnasium as gym

class MyEnv(gym.Env):

    def __init__(self, term = None):
        from collections import defaultdict

        self.observation_space = gym.spaces.Sequence(
              gym.spaces.Box(
              low=float('-inf'), high=float('inf'),
              shape=(2,),
              dtype=np.float32
          )
        )

        self.action_space = gym.spaces.Discrete(num_actions)

        if term is None:
            self.term = make_term(3)
        else:
            self.term = term

        self.time = 0
        self.known_points = None

    def _query(self, new_x):
        new_x += np.random.randn()
        new_y = sp.lambdify(the_x, self.term)(new_x)

        self.time += 1
        self.known_points.append(np.array([new_x, new_y], dtype=float))

    def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
        # We need the following line to seed self.np_random
        super().reset(seed=seed)

        if options is not None and 'term' in options:
            self.term = options['term']
        else:
            self.term = np.random.choice(the_terms)

        self.time = 0
        self.known_points = []

        self._query(0)

        observation = tuple(self.known_points)
        info = { "term": self.term }

        return observation, info


    def step(self, action):

        if action >= max_point_count:
            terminated = True
            reward, e = compute_reward(self.term, self.known_points)
            action = f"answered {e}"
        elif self.time + 1 < max_point_count:
            self._query(action)
            terminated = False
            reward = - float(1/max_point_count) * (1 + self.time)
            action = f"queried {action}"
        else:
            self.time += 1
            terminated = True
            reward = - float(1/max_point_count) * (1 + self.time)
            action = f"timeout"

        observation = tuple(self.known_points)
        truncated = False
        info = { "action": action }

        return observation, reward, terminated, truncated, info

gym.register(
    id="gymnasium_env/LittleScientist",
    entry_point=MyEnv,
)

### Test the Environment

In [None]:
env = gym.make('gymnasium_env/LittleScientist')
env.observation_space

Sequence(Box(-inf, inf, (2,), float32), stack=False)

In [None]:
env.action_space

Discrete(20)

In [None]:
env.reset()

  logger.warn(f"{pre} is not within the observation space.")


((array([-0.33186597, -1.04      ]),), {'term': -1.04})

In [None]:
env.action_space.sample()

8

In [None]:
env.observation_space.sample()

(array([-1.7134843, -0.8774723], dtype=float32),
 array([ 0.8758127, -1.503819 ], dtype=float32),
 array([-0.65272534, -0.17953993], dtype=float32),
 array([-0.3111068 , -0.11345543], dtype=float32),
 array([-0.09398746, -1.1925148 ], dtype=float32),
 array([ 1.0039654 , -0.31497705], dtype=float32),
 array([1.4074302, 1.2093539], dtype=float32),
 array([-0.2551157,  1.1390036], dtype=float32),
 array([-3.142295  ,  0.97203666], dtype=float32),
 array([0.81900615, 1.3895038 ], dtype=float32),
 array([-0.41725042, -0.31643704], dtype=float32),
 array([ 0.8656851 , -0.33373335], dtype=float32),
 array([1.3987029, 0.358785 ], dtype=float32),
 array([0.42660344, 0.05870843], dtype=float32),
 array([-1.6319885 , -0.86893797], dtype=float32))

In [None]:
gym.make('gymnasium_env/LittleScientist').reset(options={'term': the_x})

((array([-0.43751824, -0.43751824]),), {'term': x})

In [None]:
gym.make('gymnasium_env/LittleScientist').reset(options={'term': the_x})

((array([1.21716053, 1.21716053]),), {'term': x})

## Model(s)

The model is a dictionary from action to reward.

The action space is 100, where 0 is giving an answer, and all other points mean querying. Making `0` the query action is fine because the env always queries `0` when reset.

In [None]:
import tensorflow.keras as keras
import tensorflow.keras.layers as layers

def make_model():
    # Network defined by the Deepmind paper
    return keras.Sequential(
        [
            layers.Input((max_point_count, 2)),
            layers.Flatten(),
            layers.Dense(512, activation="relu"),
            layers.Dense(256, activation="relu"),
            layers.Dense(num_actions, activation="linear"),
        ]
    )

# The first model makes the predictions for Q-values which are used to
# make a action.
model = make_model()
# Build a target model for the prediction of future rewards.
# The weights of a target model get updated every update_target_network steps thus when the
# loss between the Q-values is calculated the target Q-value is stable.
model_target = make_model()

## Example Episode(s) Before Training

In [None]:
def make_episode(model, term=None):
    state, info = env.reset(options = term and { 'term': term })
    # print("info", info)
    term = info['term']
    print(f"The true answer is {term}")

    done = False
    for _ in range(50):
        action = model.predict(np.array([encode_state(state)]), verbose=0)[0]
        action = np.argmax(action)
        state, reward, done, _, info = env.step(action)
        print(info['action'], reward)
        if done:
            break

make_episode(model, term=the_x)

The true answer is x
queried 5 -0.30000000000000004
answered x0 - 1.3425961e-8 0.9999999810128243


## Training

This section is heavily inspired by

https://keras.io/examples/rl/deep_q_network_breakout/

We changed many places to fit our setting.

### Set Up

In [None]:
import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
from keras import layers

import gymnasium as gym
import numpy as np
import tensorflow as tf

# Configuration parameters for the whole setup
gamma = 0.99  # Discount factor for past rewards
epsilon = 0.2
# epsilon = 1.0  # Epsilon greedy parameter
# epsilon_min = 0.1  # Minimum epsilon greedy parameter
# epsilon_max = 1.0  # Maximum epsilon greedy parameter
# epsilon_interval = (
#     epsilon_max - epsilon_min
# )  # Rate at which to reduce chance of random action being taken
batch_size = 32  # Size of batch taken from replay buffer
max_steps_per_episode = 50
max_episodes = 20  # Limit training episodes, will run until solved if smaller than 1

env = gym.make('gymnasium_env/LittleScientist')

# Experience replay buffers
action_history = []
state_history = []
state_next_history = []
rewards_history = []
done_history = []
episode_reward_history = []
running_reward = 0
episode_count = 0
frame_count = 0
# Maximum replay length
max_memory_length = 1000
# Train the model after 4 actions
update_after_actions = 4
# How often to update the target network
update_target_network = 200
# Using huber loss for stability
loss_function = keras.losses.Huber()

### The Training Loop

In [None]:
optimizer = keras.optimizers.Adam()

while True:
    observation, _ = env.reset()
    state = np.array(observation)
    episode_reward = 0

    for timestep in range(1, max_steps_per_episode):
        frame_count += 1

        # Use epsilon-greedy for exploration
        if epsilon > np.random.rand(1)[0]:
            # Take random action
            action = np.random.choice(num_actions)
        else:
            # Predict action Q-values
            # From environment state
            state_tensor = keras.ops.convert_to_tensor(encode_state(state))
            state_tensor = keras.ops.expand_dims(state_tensor, 0)
            action_probs = model(state_tensor, training=False)
            # Take best action
            action = keras.ops.argmax(action_probs[0]).numpy()

        # Apply the sampled action in our environment
        state_next, reward, done, _, _ = env.step(action)
        state_next = np.array(state_next)

        episode_reward += reward

        # Save actions and states in replay buffer
        action_history.append(action)
        state_history.append(state)
        state_next_history.append(state_next)
        done_history.append(done)
        rewards_history.append(reward)
        state = state_next

        # Update every fourth frame and once batch size is over 32
        if frame_count % update_after_actions == 0 and len(done_history) > batch_size:
            # Get indices of samples for replay buffers
            indices = np.random.choice(range(len(done_history)), size=batch_size)

            # Using list comprehension to sample from replay buffer
            state_sample = np.array([encode_state(state_history[i]) for i in indices])
            state_next_sample = np.array([encode_state(state_next_history[i]) for i in indices])
            rewards_sample = [rewards_history[i] for i in indices]
            action_sample = [action_history[i] for i in indices]
            done_sample = keras.ops.convert_to_tensor(
                [float(done_history[i]) for i in indices]
            )

            # Build the updated Q-values for the sampled future states
            # Use the target model for stability
            future_rewards = model_target.predict(state_next_sample, verbose=0)
            # Q value = reward + discount factor * expected future reward
            updated_q_values = rewards_sample + gamma * keras.ops.amax(
                future_rewards, axis=1
            )

            # If final frame set the last value to -1
            updated_q_values = updated_q_values * (1 - done_sample) - done_sample

            # Create a mask so we only calculate loss on the updated Q-values
            masks = keras.ops.one_hot(action_sample, num_actions)

            with tf.GradientTape() as tape:
                # Train the model on the states and updated Q-values
                q_values = model(state_sample)

                # Apply the masks to the Q-values to get the Q-value for action taken
                q_action = keras.ops.sum(keras.ops.multiply(q_values, masks), axis=1)
                # Calculate loss between new Q-value and old Q-value
                loss = loss_function(updated_q_values, q_action)

            # Backpropagation
            grads = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if frame_count % update_target_network == 0:
            # update the the target network with new weights
            model_target.set_weights(model.get_weights())
            # Log details
            template = "running reward: {:.2f} at episode {}, frame count {}"
            print(template.format(running_reward, episode_count, frame_count))

        # Limit the state and reward history
        if len(rewards_history) > max_memory_length:
            del rewards_history[:1]
            del state_history[:1]
            del state_next_history[:1]
            del action_history[:1]
            del done_history[:1]

        if done:
            break

    # Update running reward to check condition for solving
    episode_reward_history.append(episode_reward)
    if len(episode_reward_history) > 100:
        del episode_reward_history[:1]
    running_reward = np.mean(episode_reward_history)

    episode_count += 1

    if running_reward > 0 and episode_count >= 10:  # Condition to consider the task solved
        print("Solved at episode {}!".format(episode_count))
        break

    if (
        max_episodes > 0 and episode_count >= max_episodes
    ):  # Maximum number of episodes reached
        print("Stopped at episode {}!".format(episode_count))
        break

  logger.warn(f"{pre} is not within the observation space.")
  logger.warn(f"{pre} is not within the observation space.")


Solved at episode 10!


## Example Episode(s) After Training

In [None]:
make_episode(model, term=the_x)

The true answer is x
queried 5 -0.30000000000000004
answered x0 - 1.1221864e-8 0.9999999841298879


In [None]:
make_episode(model, term=the_x*the_x)

The true answer is x**2
queried 7 -0.30000000000000004
answered x0*x0 1.0


In [None]:
make_episode(model, term=the_x*sp.sin(the_x))

The true answer is x*sin(x)
queried 7 -0.30000000000000004
answered x0*sin(x0) 1.0


###Testing

In [None]:
test_functions = [
    ("Linear Function", the_x + 1),
    ("Quadratic Function", the_x**2),
    ("Sine Function", sp.sin(the_x)),
    ("Polynomial Function", 2*the_x**2+the_x-1),
    ("combinition Function", sp.sin(the_x**2)),
]


In [None]:
print(test_functions[0][0])
make_episode(model, term=test_functions[0][1])

Linear Function
The true answer is x + 1
queried 5 -0.30000000000000004
answered x0 + 1.0 1.0


In [None]:
print(test_functions[1][0])
make_episode(model, term=test_functions[1][1])

Quadratic Function
The true answer is x**2
queried 7 -0.30000000000000004
answered x0*x0 1.0


In [None]:
print(test_functions[2][0])
make_episode(model, term=test_functions[2][1])

Sine Function
The true answer is sin(x)
queried 5 -0.30000000000000004
answered x0*1.854735e-8 + sin(x0) 0.9999999850749357


In [None]:
print(test_functions[3][0])
make_episode(model, term=test_functions[3][1])

Polynomial Function
The true answer is 2*x**2 + x - 1
queried 7 -0.30000000000000004
answered x0*(x0 + x0) + x0 - 1.0 1.0


In [None]:
print(test_functions[4][0])
make_episode(model, term=test_functions[4][1])

combinition Function
The true answer is sin(x**2)
queried 5 -0.30000000000000004
answered sin(x0*x0) 1.0


## References

* https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html
* https://keras.io/examples/rl/deep_q_network_breakout/

## END