In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize

# Define the function to optimize (correlation mask example)
target = np.array([1, 0, -1, 0, 1])  # Example target pattern
def objective_function(mask):
    target_noise = np.random.rand(len(target))/50
    # Example correlation computation (replace with actual computation)
    target_to_correlate = target + target_noise
    correlation = np.correlate(mask, target_to_correlate, mode='valid')
    return -np.max(correlation)  # Maximize correlation by minimizing its negative

# Track optimization progress
class OptimizationTracker:
    def __init__(self):
        self.history = []

    def callback(self, xk):
        self.history.append(xk)

# Initialize tracker
tracker = OptimizationTracker()

# Initial guess
initial_guess = np.random.rand(5)  # Example initial mask of size 5

# Perform optimization using Nelder-Mead
result = minimize(
    objective_function,
    initial_guess,
    method='Nelder-Mead',
    callback=tracker.callback,
    options={'disp': True}
)

# Print optimization results
print("\nOptimization Result:")
print(result)

# Extract data for plotting
history = np.array(tracker.history)
z_vals = np.array([objective_function(x) for x in history])


# Plot evolution of the objective function
plt.figure()
plt.subplot(211)
plt.plot(result.x/max(result.x), label="optimized")
plt.plot(target, label="og")
plt.title("Optimized Mask")
plt.xlabel("Index")
plt.ylabel("Value")
plt.legend()
plt.grid()
plt.subplot(212)
plt.plot(z_vals, marker='o', label="Objective Value")
plt.title("Objective Function Evolution")
plt.xlabel("Iteration")
plt.ylabel("Objective Function Value")
plt.legend()
plt.grid()
plt.tight_layout()
plt.show()


In [None]:
import ray
from ray.rllib.env.env_context import EnvContext
from ray.tune.registry import register_env
from ray.rllib.algorithms.ppo import PPO
from gymnasium import Env, spaces


# Define a custom environment inheriting from gymnasium.Env
class CustomEnv(Env):
    def __init__(self, config: EnvContext):
        super().__init__()
        # Define action and observation spaces
        self.action_space = spaces.Discrete(2)  # Actions: 0 or 1
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(1,), dtype=np.float32
        )

        self.state = 0
        self.done = False

    def reset(self, *, seed=None, options=None):
        super().reset(seed=seed)
        self.state = 0
        self.done = False
        return np.array([self.state]), {}  # Ensure observation is a numpy array

    def step(self, action):
        if self.done:
            raise ValueError("Step called after environment is done")

        self.state += 1 if action == 1 else -1
        terminated = abs(self.state) >= 5
        truncated = False
        reward = 1 if terminated and self.state > 0 else 0

        return np.array([self.state]), reward, terminated, truncated, {}

    def close(self):
        """Close the environment."""
        pass

from ray.rllib.algorithms.callbacks import DefaultCallbacks

class CustomMetrics(DefaultCallbacks):
    def on_episode_end(self, *, worker, base_env, policies, episode, **kwargs):
        # Log custom metrics like episode rewards
        total_reward = episode.total_reward
        print(f"Episode finished! Total reward: {total_reward}")

# Initialize Ray
ray.init(ignore_reinit_error=True)

# Register the custom environment
register_env("custom_env", lambda config: CustomEnv(config))

# Configuration for the RLlib PPO trainer
config = {
    "env": "custom_env",
    "env_config": {},
    "framework": "torch",
    "callbacks": CustomMetrics,
}

# Initialize and train PPO
trainer = PPO(config=config)

# Training loop
for i in range(10):
    result = trainer.train()
    # print(f"Iteration: {i}, Reward Mean: {result['episode_reward_mean']}")

# Clean up
trainer.stop()
ray.shutdown()


In [None]:
print(result.keys())