In [62]:
import numpy as np
from scipy.integrate import solve_ivp
import synthetic_data
# import tensorflow as tf
import torch

print("done importing")

class TrapEnvironment:
    def __init__(self, num_traps=20,max_steps=100):
        # Initialize environment parameters
        self.num_traps = num_traps
        self.n = 25
        self.m = 25
        self.predator_density = None
        self.prey_density = None
        self.trap_replacement_rate = 10
        self.pts_per_sec = 100
        self.len_traj=50
        self.current_step = 0
        self.max_steps = max_steps
        self.reset()

    def reset(self):
        # Reset the environment to its initial state (just predator and prey)
        # Generate initial predator and prey densities
        # default params for initial generation num_traj=200,len_traj=50, pts_per_sec=100, save_loc='../Data/val.npy', prey_range=(1, 5), predator_range=(1, 3)
        data_init = synthetic_data.generate().reshape(self.n, self.m, 2, self.pts_per_sec*self.len_traj)
        self.prey_density, self.predator_density = data_init[:, :, 0, -1], data_init[:, :, 1, -1]
        return self.predator_density, self.prey_density

    def step(self, action):
        # Take action (place traps) and observe the next state and reward
        # Update predator and prey densities based on the action
        # Calculate reward based on the change in predator density
        # Return next state, reward, and done flag

        # Simulate predator dynamics with traps placed at specified locations
        # Here, action is a list of trap locations [(i1, j1), (i2, j2), ..., (in, jn)]
        y0 = np.zeros((self.n, self.m, 2))
        for i, j in action:
            print(i)
            print(j)
            y0[i,j,1] = 10  # place those traps at each cell
        y0 = y0.flatten()

        # get impact on predator and prey spread after placement window steps
        master_sol = np.ndarray((self.n*self.m*2,self.pts_per_sec))
        
        for _ in range(self.trap_replacement_rate):
            # trap solver, only grab single timestep
            sol = solve_ivp(synthetic_data.spatial_dynamics_traps, y0=y0, t_span=[0,1], t_eval=np.linspace(0, 1, self.pts_per_sec), args=(self.n, self.m))
            # prey solver y0 creation
            y_prey = np.zeros((self.n, self.m, 2))
            last_dim = int(self.pts_per_sec)
            sol_use = sol.y.reshape((self.n, self.m, 2, last_dim))
            pred_data_new, trap_data_new = sol_use[:, :, 0, :], sol_use[:, :, 1, :]
            # set prey from timestep of interest as predator in y_prey
            y_prey[:,:,1] = pred_data_new[:, :, -1]
            # grab predator information from prey data
            y_prey[:,:,0] = self.prey_density
            y_prey = y_prey.flatten()
            # prey solver, only grab single timestep
            sol_prey = solve_ivp(synthetic_data.spatial_dynamics, y0=y_prey, t_span=[0,1], t_eval=np.linspace(0, 1, self.pts_per_sec), args=(self.n, self.m))
            # create y0 for next run of trap solver, overwrite y0 and prey_data
            y0 = np.zeros((self.n, self.m, 2))
            sol_prey_use = sol_prey.y.reshape((self.n, self.m, 2, last_dim))
            self.prey_density, self.predator_density = sol_prey_use[:, :, 0, :], sol_prey_use[:, :, 1, :]
            y0[:,:,0] = self.predator_density[:, :, -1]
            self.prey_density = self.prey_density[:, :, -1]
            # initialize trap locations based on number of desired traps and density, re initialize per replacement time
            y0[:,:,1] = trap_data_new[:,:,-1]
            y0 = y0.flatten()
            master_sol = np.concatenate((master_sol, sol_prey.y), 1)

        master_sol = master_sol[:,100:].reshape(self.n, self.m,2,self.pts_per_sec*self.trap_replacement_rate)
        
        # Extract the last step predator and prey densities from the solution
        self.prey_density , self.predator_density = master_sol[:, :, 0, -1], master_sol[:, :, 1, -1]

        # Calculate reward based on the change in predator density
        reward = -np.sum(self.predator_density)

        # Check termination condition
        predator_sum_zero = np.sum(self.predator_density) == 0
        # Check if the maximum number of steps is reached
        max_steps_reached = self.current_step >= self.max_steps
    
        # Combine termination conditions
        done = predator_sum_zero or max_steps_reached
        
        # Return next state, reward, and done flag (assuming no termination condition for now)
        self.current_step += 1
        return self.predator_density, self.prey_density, reward, False

# define DQN
class QNetwork(torch.nn.Module):
    def __init__(self, num_actions, input_size):
        super(QNetwork, self).__init__()
        self.dense1 = torch.nn.Linear(input_size, 64)
        self.dense2 = torch.nn.Linear(64, 64)
        self.output_layer = torch.nn.Linear(64, num_actions)


    def forward(self, state):
        x = torch.nn.functional.relu(self.dense1(state))
        x = torch.nn.functional.relu(self.dense2(x))
        return self.output_layer(x)

# Simple Q-learning algorithm with experience replay
class QLearningAgent:
    def __init__(self, m,n,input_size):
        self.num_actions = m * n
        self.q_network = QNetwork(self.num_actions,input_size)
        self.optimizer = torch.optim.Adam(self.q_network.parameters(), lr=0.001)
        self.memory = []
        self.m = m
        self.n = n

    def select_action(self, state):
        # Epsilon-greedy policy
        if np.random.rand() < 0.1:
            # Explore: Randomly select a trap location
            return [(np.random.randint(self.n), np.random.randint(self.m))]
        else:
            # Exploit: Select action with highest Q-value
            state_tensor = torch.tensor(state, dtype=torch.float32)#.view(1, -1)  # Reshape state to (1, input_size)
            q_values = self.q_network(state_tensor)
            action_index = torch.argmax(q_values).item()
            # Convert action index to trap location
            i = action_index // self.m
            j = action_index % self.m
            return [(i, j)]


    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def experience_replay(self, batch_size=32):
        if len(self.memory) < batch_size:
            return

        minibatch = np.random.choice(len(self.memory), batch_size, replace=False)
        states, targets = [], []
        for idx in minibatch:
            state, action, reward, next_state, done = self.memory[idx]
            states.append(state)
            q_values = self.q_network(torch.tensor([state], dtype=torch.float32)).detach().numpy()[0]
            if done:
                q_values[action] = reward
            else:
                next_q_values = self.q_network(torch.tensor([next_state], dtype=torch.float32)).detach().numpy()[0]
                q_values[action] = reward + 0.9 * np.max(next_q_values)
            targets.append(q_values)
        
        states = np.array(states, dtype=np.float32)
        targets = np.array(targets, dtype=np.float32)
        
        states_tensor = torch.tensor(states)
        targets_tensor = torch.tensor(targets)
        
        self.optimizer.zero_grad()
        q_values = self.q_network(states_tensor)
        loss = torch.nn.functional.mse_loss(q_values, targets_tensor)
        loss.backward()
        self.optimizer.step()

        
# Training loop
def train_agent(env, agent, num_episodes=1000):
    for episode in range(num_episodes):
        state = env.reset()
        total_reward = 0
        done = False
        while not done:
            state = np.array(state).flatten()
            print(state.shape)
            action = agent.select_action(state)
            next_state_prey, next_state_pred, reward, done = env.step(action)
            next_state = np.concatenate((next_state_prey.flatten(), next_state_pred.flatten()))
            agent.remember(state, action, reward, next_state, done)
            agent.experience_replay()
            state = next_state
            total_reward += reward
        print(f"Episode {episode + 1}/{num_episodes}, Total Reward: {total_reward}")

done importing


In [63]:
# Create environment and agent
env = TrapEnvironment()
agent = QLearningAgent(n=env.n, m= env.m, input_size=env.n * env.m*2)

# Train the agent
train_agent(env, agent)

(1250,)
0
23
(1250,)
0
23
(1250,)
0
23
(1250,)
0
23
(1250,)
0
23
(1250,)
0
23
(1250,)
0
23
(1250,)
20
0
(1250,)
0
23
(1250,)
0
23
(1250,)
0
23
(1250,)
0
23
(1250,)
0
23
(1250,)
0
23
(1250,)
0
23
(1250,)
5
7
(1250,)
0
23
(1250,)
6
4
