# First Method

- Author: Osama Abdelaal
- Date: 2023-06-30     
Be sure that you installed SnnTorch and Gym

In [None]:
!pip install snntorch

In [None]:
import gym
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (16, 10)

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
torch.manual_seed(0)

import base64, io

# For visualization
from gym.wrappers.monitoring import video_recorder
from IPython.display import HTML
from IPython import display
import glob

torch.manual_seed(0)
plt.rcParams['figure.figsize'] = (16, 10)

class S_Policy(nn.Module):
    def __init__(self, num_inputs=4, num_hidden=32, num_outputs=2):
        super().__init__()
        # Network Architecture
        # self.num_steps = 25
        beta = 0.95

        # Initialize layers
        self.fc1 = nn.Linear(num_inputs, num_hidden)
        self.lif1 = snn.Leaky(beta=beta)
        self.fc2 = nn.Linear(num_hidden, num_outputs)
        self.lif2 = snn.Leaky(beta=beta)

    def forward(self, x):

        # Initialize hidden states at t=0
        mem1 = self.lif1.init_leaky()
        mem2 = self.lif2.init_leaky()

        # Record the final layer
        spk2_rec = []
        mem2_rec = []

        #for step in range(self.num_steps):
        cur1 = self.fc1(x)
        spk1, mem1 = self.lif1(cur1, mem1)
        cur2 = self.fc2(spk1)
        spk2, mem2 = self.lif2(cur2, mem2)
        spk2_rec.append(spk2)
        mem2_rec.append(mem2)
        #print(spk2_rec)
        return torch.stack(spk2_rec, dim=0), torch.stack(mem2_rec, dim=0)

    # Function to select an action
    def act(self, state, temperature=1.0):
        """
        This method selects an action based on the state.

        Args:
        - state: The current state of the environment
        - temperature (float, optional): Temperature parameter for the softmax function to control
        exploration-exploitation balance. It can be any positive real number, typically around 1.0.
        High temperature (greater than 1.0) leads to more exploration (actions have similar probability),
        and low temperature (less than 1.0) leads to more exploitation (the action with the highest
        original probability is more likely to be chosen).

        Returns:
        - action (int): The selected action.
        - action_dist.log_prob(action) (Tensor): The log probability of the selected action.
        """

        state = torch.from_numpy(state).float().unsqueeze(0).to(device) # Prepare state for network input
        action_probs, _ = self.forward(state) # Get action probabilities
        action_probs = action_probs.squeeze(0)

        # Adjust action probabilities using temperature and create a categorical distribution
        action_dist = Categorical(F.softmax(action_probs / temperature, dim=-1))

        action = action_dist.sample() # Sample an action
        return action.item(), action_dist.log_prob(action) # Return the action and the log probability


In [None]:
import gym
import numpy as np
from collections import deque
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
from torch.utils.data import Dataset, DataLoader
import snntorch as snn
import snntorch.functional as SF
from snntorch import utils
from snntorch import backprop
import torch
import torch.nn as nn

class RL_Dataset(Dataset):
    """ PyTorch Dataset for offline RL """

    def __init__(self, states, actions, rewards, next_states, dones):
        self.states = states
        self.actions = actions
        self.rewards = rewards
        self.next_states = next_states
        self.dones = dones

    def __len__(self):
        return len(self.states)

    def __getitem__(self, idx):
        # Convert the items to tensors before returning
        state = torch.tensor(self.states[idx]).float()
        reward = torch.tensor(self.rewards[idx]).float()
        next_state = torch.tensor(self.next_states[idx]).float()
        done = torch.tensor(self.dones[idx]).float()

        # Create a single data tensor by concatenating other tensors
        data = torch.cat((state, reward.unsqueeze(0), next_state, done.unsqueeze(0)))

        # Convert the action to a tensor and return it as the target
        target = torch.tensor(self.actions[idx]).long()  # Assuming the action is an integer

        return data, target




# Function to train the policy using reinforce
def reinforce(policy,BP_policy, policy_optimizer, snn_optimizer, n_episodes=1000, max_t=100, gamma=1.0, print_every=100):
    """
    Train a policy using the REINFORCE algorithm.

    Parameters:
    policy (Policy): The policy to train.
    optimizer (torch.optim.Optimizer): The optimizer to use for training the policy.
    n_episodes (int, optional): The maximum number of training episodes. Default is 1000.
    max_t (int, optional): The maximum number of timesteps per episode. Default is 1000.
    gamma (float, optional): The discount factor. Default is 1.0.
    print_every (int, optional): How often to print average score. Default is 100.

    Returns:
    scores (List[float]): A list of scores from each episode of the training. The score is the total reward obtained in the episode.
    """

    # Create a double-ended queue to hold the most recent 100 episode scores
    scores_deque = deque(maxlen=100)

    # List to store all episode scores
    scores = []
    scores = []
    states = []
    actions = []
    rewards = []
    next_states = []
    dones = []
    loss_fn = SF.mse_count_loss()
    reg_fn = SF.l1_rate_sparsity()
    # Loop over each episode
    for i_episode in range(1, n_episodes+1):
        # List to save log probabilities for each step of this episode
        saved_log_probs = []

        # List to save rewards for each step of this episode
        episode_rewards = []

        # Reset the environment and get initial state
        state = env.reset(seed=0)

        # Collect trajectory
        for t in range(max_t):
            # Use the policy to select an action given the current state
            action, log_prob = policy.act(state)

            # Save the log probability of the chosen action
            saved_log_probs.append(log_prob)

            # Take the action and get the new state and reward
            state_, reward, done, _ = env.step(action)

            # Add the reward to the list of rewards for this episode
            episode_rewards.append(reward)
            # Store experience
            states.append(state)
            actions.append(action)
            rewards.append(reward)
            next_states.append(state_)
            dones.append(done)
            # If the episode is done, break out of the loop
            if done:
                break
            state = state_
        # Calculate total reward for this episode and add it to the deque and list of scores
        scores_deque.append(sum(episode_rewards))
        scores.append(sum(episode_rewards))

        # Compute future discount rewards for each step
        discounts = [gamma**i for i in range(len(episode_rewards)+1)]

        # Calculate total discounted reward for the episode
        R = sum([a*b for a, b in zip(discounts, episode_rewards)])

        # Compute the policy loss
        policy_loss = []
        for log_prob in saved_log_probs:
            policy_loss.append(-log_prob * R) # note that gradient ascent is the same as gradient descent with negative rewards
        policy_loss = torch.cat(policy_loss).sum()

        # Creating the dataset
        dataset = RL_Dataset(states, actions, rewards, next_states, dones)
        # Create a dataloader
        dataloader = DataLoader(dataset, batch_size=64, shuffle=True)
        # Backprobagate trhoough time
        loss = backprop.BPTT(BP_policy, dataloader, optimizer=snn_optimizer,
                             criterion=loss_fn, num_steps=1, time_var=False,
                             regularization=reg_fn, device=device)


        # Perform a step of policy gradient descent
        policy_optimizer.zero_grad()
        policy_loss.backward(retain_graph=True)
        policy_optimizer.step()

        # Clear the computation graph
        #torch.cuda.empty_cache()

        # Perform a step of SNN optimization
        #snn_optimizer.zero_grad()
        #loss.backward()
        #snn_optimizer.step()


        # Print current average score every 'print_every' episodes
        if i_episode % print_every == 0:
            print('Episode {}\tAverage Score: {:.2f}\tSNN Loss Score: {:.2f}'.format(i_episode,
                                                                                     np.mean(scores_deque),
                                                                                     loss))


        # Stop if the environment is solved
        if np.mean(scores_deque)>=500.0:
            print('Environment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(i_episode-100, np.mean(scores_deque)))
            break


    # Return all episode scores
    return scores

In [None]:
# Main
import snntorch as snn
import snntorch.functional as SF
from snntorch import utils
from snntorch import backprop
import torch
import torch.nn as nn

env = gym.make('CartPole-v1')
torch.manual_seed(0)

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

lif1 = snn.Leaky(beta=0.9, init_hidden=True)
lif2 = snn.Leaky(beta=0.9, init_hidden=True, output=True)

s_policy = S_Policy()
policy_optimizer = optim.Adam(s_policy.parameters(), lr=1e-2)

BP_net = nn.Sequential(nn.Flatten(),
                    nn.Linear(10,500),
                    lif1,
                    nn.Linear(500, 2),
                    lif2).to(device)

snn_optimizer = optim.Adam(BP_net.parameters(), lr=1e-3)  # Notice the different learning rate


device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

lif1 = snn.Leaky(beta=0.9, init_hidden=True)
lif2 = snn.Leaky(beta=0.9, init_hidden=True, output=True)

BP_net = nn.Sequential(nn.Flatten(),
                    nn.Linear(10,500),
                    lif1,
                    nn.Linear(500, 2),
                    lif2).to(device)

scores = reinforce(s_policy, BP_net, policy_optimizer, snn_optimizer)


# Second Method

- Author: Osama Abdelaal
- Date: 2023-06-30     
Be sure that you installed SnnTorch and Gym

In [None]:
# Description: Policy network for the SNN agent
import torch
import torch.nn as nn

torch.manual_seed(0)

import base64, io

# For visualization
from gym.wrappers.monitoring import video_recorder
from IPython.display import HTML
from IPython import display
import glob

torch.manual_seed(0)


class S_Policy(nn.Module):
    def __init__(self, num_inputs=4, num_hidden=32, num_outputs=2, ):
        super().__init__()
        # Network Architecture
        # self.num_steps = 25
        beta = 0.95
        self.device = torch.device( "cpu" )
        # Initialize layers
        self.fc1 = nn.Linear(num_inputs, num_hidden)
        self.lif1 = snn.Leaky(beta=beta)
        self.fc2 = nn.Linear(num_hidden, num_outputs)
        self.lif2 = snn.Leaky(beta=beta)

    def forward(self, x):
        x = x.to(self.device)
        # Initialize hidden states at t=0
        mem1 = self.lif1.init_leaky()
        mem2 = self.lif2.init_leaky()

        # Record the final layer
        spk2_rec = []
        mem2_rec = []

        #for step in range(self.num_steps):
        cur1 = self.fc1(x)
        spk1, mem1 = self.lif1(cur1, mem1)
        cur2 = self.fc2(spk1)
        spk2, mem2 = self.lif2(cur2, mem2)
        spk2_rec.append(spk2)
        mem2_rec.append(mem2)
        #print(spk2_rec)
        return torch.stack(spk2_rec, dim=0), torch.stack(mem2_rec, dim=0)

    # Function to select an action
    def act(self, state, temperature=1.0):
        """
        This method selects an action based on the state.

        Args:
        - state: The current state of the environment
        - temperature (float, optional): Temperature parameter for the softmax function to control
        exploration-exploitation balance. It can be any positive real number, typically around 1.0.
        High temperature (greater than 1.0) leads to more exploration (actions have similar probability),
        and low temperature (less than 1.0) leads to more exploitation (the action with the highest
        original probability is more likely to be chosen).

        Returns:
        - action (int): The selected action.
        - action_dist.log_prob(action) (Tensor): The log probability of the selected action.
        """

        state = torch.from_numpy(state).float().unsqueeze(0).to(self.device) # Prepare state for network input
        action_probs, _ = self.forward(state) # Get action probabilities
        action_probs = action_probs.squeeze(0)

        # Adjust action probabilities using temperature and create a categorical distribution
        action_dist = Categorical(F.softmax(action_probs / temperature, dim=-1))

        action = action_dist.sample() # Sample an action
        return action.item(), action_dist.log_prob(action) # Return the action and the log probability

import numpy as np
from collections import deque
import torch.nn.functional as F
from torch.distributions import Categorical
from torch.utils.data import Dataset, DataLoader
import snntorch as snn
import snntorch.functional as SF
from snntorch import backprop
import torch
import torch.nn as nn

class RL_Dataset(Dataset):
    """ PyTorch Dataset for offline RL """

    def __init__(self, states, actions) :# rewards, next_states, dones):
        self.states = states
        self.actions = actions
        #self.rewards = rewards
        #self.next_states = next_states
        #self.dones = dones

    def __len__(self):
        return len(self.states)

    def __getitem__(self, idx):
        # Convert the items to tensors before returning
        state = torch.tensor(self.states[idx]).float()
        #reward = torch.tensor(self.rewards[idx]).float()
        #next_state = torch.tensor(self.next_states[idx]).float()
        #done = torch.tensor(self.dones[idx]).float()

        # Create a single data tensor by concatenating other tensors
        #data = torch.cat((state, reward.unsqueeze(0), next_state, done.unsqueeze(0)))

        # Convert the action to a tensor and return it as the target
        target = torch.tensor(self.actions[idx]).long()  # Assuming the action is an integer

        return state, target




# Function to train the policy using reinforce
def reinforce(policy,BP_policy, policy_optimizer, snn_optimizer,
              env, device, n_episodes=500, max_t=10, gamma=1.0, print_every=100):
    """
    Train a policy using the REINFORCE algorithm.

    Parameters:
    policy (Policy): The policy to train.
    optimizer (torch.optim.Optimizer): The optimizer to use for training the policy.
    n_episodes (int, optional): The maximum number of training episodes. Default is 1000.
    max_t (int, optional): The maximum number of timesteps per episode. Default is 1000.
    gamma (float, optional): The discount factor. Default is 1.0.
    print_every (int, optional): How often to print average score. Default is 100.

    Returns:
    scores (List[float]): A list of scores from each episode of the training. The score is the total reward obtained in the episode.
    """

    # Create a double-ended queue to hold the most recent 100 episode scores
    scores_deque = deque(maxlen=100)

    # List to store all episode scores
    scores = []
    states = []
    actions = []
    rewards = []
    next_states = []
    dones = []
    loss_fn = SF.mse_count_loss()
    reg_fn = SF.l1_rate_sparsity()
    # Loop over each episode
    for i_episode in range(1, n_episodes+1):
        # List to save log probabilities for each step of this episode
        saved_log_probs = []

        # List to save rewards for each step of this episode
        episode_rewards = []

        # Reset the environment and get initial state
        state = env.reset(seed=0)

        # Collect trajectory
        for t in range(max_t):
            # Use the policy to select an action given the current state
            action, log_prob = policy.act(state)

            # Save the log probability of the chosen action
            saved_log_probs.append(log_prob)

            # Take the action and get the new state and reward
            state_, reward, done, _ = env.step(action)

            # Add the reward to the list of rewards for this episode
            episode_rewards.append(reward)
            # Store experience
            states.append(state)
            actions.append(action)
            rewards.append(reward)
            next_states.append(state_)
            dones.append(done)
            state = state_
            # If the episode is done, break out of the loop
            if done:
                break

        # Calculate total reward for this episode and add it to the deque and list of scores
        scores_deque.append(sum(episode_rewards))
        scores.append(sum(episode_rewards))

        # Compute future discount rewards for each step
        discounts = [gamma**i for i in range(len(episode_rewards)+1)]

        # Calculate total discounted reward for the episode
        R = sum([a*b for a, b in zip(discounts, episode_rewards)])

        # Compute the policy loss
        policy_loss = []
        for log_prob in saved_log_probs:
            policy_loss.append(-log_prob * R) # note that gradient ascent is the same as gradient descent with negative rewards
        policy_loss = torch.cat(policy_loss).sum()
        # Creating the dataset
        dataset = RL_Dataset(states, actions)
        # Create a dataloader
        dataloader = DataLoader(dataset, batch_size=128, shuffle=True)
        # Backprobagate trhoough time
        loss= backprop.BPTT(BP_policy, dataloader, optimizer=snn_optimizer,
                             criterion=loss_fn, num_steps=max_t, time_var=False,
                             regularization=reg_fn, device=device)


        # Perform a step of policy gradient descent
        policy_optimizer.zero_grad()
        policy_loss.backward()
        policy_optimizer.step()

        # Clear the computation graph
        #torch.cuda.empty_cache()

        # Perform a step of SNN optimization
        #snn_optimizer.zero_grad()
        #loss.backward()
        #snn_optimizer.step()


        # Print current average score every 'print_every' episodes
        if i_episode % print_every == 0:
            print('Episode {}\tAverage Score: {:.2f}\tSNN Loss Score: {:.2f}\tPolicy Loss Score: {:.2f}'.format(i_episode,
                                                                                                                np.mean(scores_deque),
                                                                                                                loss,
                                                                                                                policy_loss))

        # Stop if the environment is solved
        if np.mean(scores_deque)>=500.0:
            print('Environment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(i_episode-100, np.mean(scores_deque)))
            break


    # Return all episode scores
    return scores

In [None]:
# Main
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

#from s_policy import *
import torch
import torch.nn as nn
import torch.optim as optim
import gym

env = gym.make('CartPole-v1')
torch.manual_seed(0)

device = torch.device("cpu")

lif1 = snn.Leaky(beta=0.9, init_hidden=True)
lif2 = snn.Leaky(beta=0.9, init_hidden=True, output=True)

s_policy = S_Policy()
policy_optimizer = optim.Adam(s_policy.parameters(), lr=1e-2)

BP_net = nn.Sequential(nn.Flatten(),
                    nn.Linear(10,32),
                    lif1,
                    nn.Linear(32, 2),
                    lif2).to(device)

snn_optimizer = optim.Adam(BP_net.parameters(), lr=1e-3)  # Notice the different learning rate


scores = reinforce(s_policy, s_policy, policy_optimizer, snn_optimizer, env, device)
