In [3]:
import torch
import gymnasium as gym
from gymnasium import spaces
from collections import deque
import random
import sympy
from sympy import symbols, simplify
import numpy as np
import torch.nn as nn
import torch.optim as optim
import pandas as pd

# Define symbols for scoring
a, b, c, d = symbols('a b c d')

class SymbolicMathEnv(gym.Env):
    def __init__(self):
        super(SymbolicMathEnv, self).__init__()
        self.action_space = spaces.Discrete(6 * 4 + 2 * 3)  # 30 actions
        self.observation_space = spaces.Box(low=-1, high=1, shape=(6 * 4 + 2 * 3,), dtype=int)
        self.reset()
        self.varvector = np.array([a, b, c, d])
        print("Environment initialized.")

    def reset(self):
        self.vectors = np.zeros((6, 4), dtype=int)
        self.answer_vectors = np.zeros((2, 3), dtype=int)
        return self._get_observation()

    def _get_observation(self):
        return np.concatenate((self.vectors.flatten(), self.answer_vectors.flatten()))

    def step(self, action):
        if action < 24:  # Modifying first 6 vectors of 4 elements
            vector_index = action // 4
            element_index = action % 4
            self.vectors[vector_index, element_index] = self._toggle_value(self.vectors[vector_index, element_index])
        else:  # Modifying last 2 vectors of 3 elements
            action -= 24
            vector_index = action // 3 + 6
            element_index = action % 3
            self.answer_vectors[vector_index - 6, element_index] = self._toggle_value(self.answer_vectors[vector_index - 6, element_index])

        total_score, real_g, imaginary_g = self._calculate_reward()
        done = self._is_done()
        reward = self._calculate_custom_reward(total_score, real_g, imaginary_g)  # Pass real_g and imaginary_g
        return self._get_observation(), reward, done, {'real_g': real_g, 'imaginary_g': imaginary_g}

    def _toggle_value(self, value):
        if value == 1:
            return -1
        elif value == -1:
            return 0
        else:
            return 1

    def _calculate_reward(self):
        vector0, vector1, vector2, vector3, vector4, vector5 = self.vectors
        answer0, answer1 = self.answer_vectors

        element_0 = np.dot(vector0, self.varvector)
        element_1 = np.dot(vector1, self.varvector)
        element_2 = np.dot(vector2, self.varvector)
        element_3 = np.dot(vector3, self.varvector)
        element_4 = np.dot(vector4, self.varvector)
        element_5 = np.dot(vector5, self.varvector)

        product_vec = np.array([element_0 * element_1, element_2 * element_3, element_4 * element_5])

        real = (a * c - b * d)
        imaginary = (a * d + b * c)

        real_gr = np.dot(answer0, product_vec)
        imaginary_gr = np.dot(answer1, product_vec)

        real_g = simplify(real_gr)
        imaginary_g = simplify(imaginary_gr)

        # Calculate the score based on the conditions specified
        if real_g == real and imaginary_g == imaginary:
            total_score = 0
        else:
            # Count the number of terms in (imaginary_g - imaginary) + (real_g - real)
            N = len(imaginary_g.as_ordered_terms()) + len(real_g.as_ordered_terms())
            total_score = 1 / N if N > 0 else 1  # Avoid division by zero

        return total_score, real_g, imaginary_g  # Return additional values

    def _calculate_custom_reward(self, total_score, real_g, imaginary_g):
        # Basic reward structure that rewards better as total_score approaches 0
        reward = 1.0 / (1 + abs(total_score))  # Higher reward as total_score approaches 0

        # Penalize the model if real_g or imaginary_g is 0
        if real_g == 0 or imaginary_g == 0:
            reward = -1.0  # Set a negative reward if either is zero

        return max(reward, -1.0)  # Ensure reward does not go above 1 or below -1

    def _is_done(self):
        return False  # You could implement a termination condition if needed

class DQN(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

class ReplayBuffer:
    def __init__(self, max_size):
        self.buffer = deque(maxlen=max_size)

    def add(self, experience):
        self.buffer.append(experience)

    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)

    def size(self):
        return len(self.buffer)

def train_dqn(model, target_model, replay_buffer, batch_size, gamma):
    if replay_buffer.size() < batch_size:
        return None

    states, actions, rewards, next_states, dones = zip(*replay_buffer.sample(batch_size))
    
    states = torch.FloatTensor(np.array(states))  # Convert to numpy array first
    actions = torch.LongTensor(actions)
    rewards = torch.FloatTensor(rewards)
    next_states = torch.FloatTensor(np.array(next_states))  # Convert to numpy array first
    dones = torch.FloatTensor(dones)

    q_values = model(states).gather(1, actions.unsqueeze(1)).squeeze()
    next_q_values = target_model(next_states).max(1)[0]
    target_q_values = rewards + (gamma * next_q_values * (1 - dones))

    loss = nn.functional.mse_loss(q_values, target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    return loss.item()  # Return the loss for tracking

# Create the environment and model
env = SymbolicMathEnv()
model = DQN(input_dim=30, output_dim=30)  # Adjust input_dim to 30
target_model = DQN(input_dim=30, output_dim=30)  # Create the target model
target_model.load_state_dict(model.state_dict())
target_model.eval()  # Set target model to evaluation mode
optimizer = optim.Adam(model.parameters(), lr=0.001)
replay_buffer = ReplayBuffer(max_size=10000)

# Initialize a list to collect results
results = []  # Ensure results is initialized here

num_episodes = 250
gamma = 0.99
epsilon = 1.0
epsilon_decay = 0.995
min_epsilon = 0.01
batch_size = 64
update_target_every = 10  # Update target model every 10 episodes

for episode in range(num_episodes):
    print(f"Starting episode {episode + 1}")
    state = env.reset()
    done = False
    total_reward = 0
    steps = 0  # Count steps for the episode

    while not done and steps < 50:  # Limit episode to 50 steps
        if np.random.rand() < epsilon:
            action = env.action_space.sample()  # Explore
        else:
            with torch.no_grad():
                action = model(torch.FloatTensor(state)).argmax().item()  # Exploit

        next_state, reward, done, info = env.step(action)
        replay_buffer.add((state, action, reward, next_state, done))
        loss = train_dqn(model, target_model, replay_buffer, batch_size, gamma)

        # Log the results for the current step
        results.append({
            'Episode': episode,
            'Real_g': info['real_g'],
            'Imaginary_g': info['imaginary_g'],
            'Total_Score': reward  # Use the reward as total score
        })

        total_reward += reward
        state = next_state
        steps += 1

        if loss is not None:
            print(f"Episode {episode + 1}, Step {steps}, Loss: {loss:.4f}")

    # Print results after each episode
    print(f"Episode {episode + 1}: Real_g = {info['real_g']}, Imaginary_g = {info['imaginary_g']}, Total Reward = {total_reward}")

    # Update the target model every few episodes
    if episode % update_target_every == 0:
        target_model.load_state_dict(model.state_dict())
        print(f"Target model updated at episode {episode + 1}")

    # Update epsilon
    epsilon = max(min_epsilon, epsilon * epsilon_decay)  # Ensure epsilon decays properly

# After training, convert results to a DataFrame
results_df = pd.DataFrame(results)

# Sort the DataFrame by the absolute values


Environment initialized.
Starting episode 1
Episode 1: Real_g = -(a - b + c)*(a + b - d) + (a + b - c)*(a + b + c - d) - (a + b + c)*(a + c - d), Imaginary_g = -(a - b + c)*(a + b - d), Total Reward = -1.4249999999999934
Target model updated at episode 1
Starting episode 2
Episode 2, Step 14, Loss: 0.8044
Episode 2, Step 15, Loss: 0.7618
Episode 2, Step 16, Loss: 0.7364
Episode 2, Step 17, Loss: 0.7021
Episode 2, Step 18, Loss: 0.6707
Episode 2, Step 19, Loss: 0.6458
Episode 2, Step 20, Loss: 0.6172
Episode 2, Step 21, Loss: 0.5808
Episode 2, Step 22, Loss: 0.5579
Episode 2, Step 23, Loss: 0.5390
Episode 2, Step 24, Loss: 0.4942
Episode 2, Step 25, Loss: 0.4744
Episode 2, Step 26, Loss: 0.4279
Episode 2, Step 27, Loss: 0.3903
Episode 2, Step 28, Loss: 0.3648
Episode 2, Step 29, Loss: 0.3340
Episode 2, Step 30, Loss: 0.3257
Episode 2, Step 31, Loss: 0.3307
Episode 2, Step 32, Loss: 0.3376
Episode 2, Step 33, Loss: 0.2776
Episode 2, Step 34, Loss: 0.2558
Episode 2, Step 35, Loss: 0.2899


In [7]:
# After training, convert results to a DataFrame
results_df = pd.DataFrame(results)

# Filter for non-negative Total_Score
non_negative_scores_df = results_df[results_df['Total_Score'] >= 0]

# Sort the DataFrame by Total_Score
lowest_non_negative_scores_df = non_negative_scores_df.nsmallest(10, 'Total_Score')

# Print the table including Total Reward
print("Top 10 Lowest Non-Negative Total Scores:")
print(lowest_non_negative_scores_df[['Episode', 'Total_Score', 'Real_g', 'Imaginary_g']])


Top 10 Lowest Non-Negative Total Scores:
     Episode  Total_Score                   Real_g              Imaginary_g
62         1     0.666667          (a + b)*(a + c)                d*(b - c)
63         1     0.666667          (a + b)*(a + c)               d*(-b + c)
134        2     0.666667  (b - c)*(a + b + c + d)  (b - c)*(a + b + c + d)
135        2     0.666667  (b - c)*(a + b + c - d)  (b - c)*(a + b + c - d)
171        3     0.666667               b*(-c + d)                b*(c - d)
174        3     0.666667         -(b + d)*(c - d)         -(b + d)*(c - d)
240        4     0.666667          (a + c)*(b + d)     (a - b)*(-a + b + d)
257        5     0.666667                      a*b                      a*b
258        5     0.666667                      a*b                      a*b
259        5     0.666667                      a*b                      a*b
