## RL Autocomplete

In this notebook I'm going to show my process of making a reinforcement learning agent that will guess the next letter in a string and potentially use it for autocomplete. I decided to do this project because I wanted to do a non gaming related reinforcement learning project. I know that there are better ways to do autocomplete but I'm curious if reinforcement learning would work. The project will be using a PPO model that I will create using pytorch and chatgpt. The reason for this choice is because I used a PPO model in my previous project and I want to get a better understanding of how it works

First I made the pytorch module. Initially I was using convolutional layers in my module but those are usually used for pattern recognision like machine vision. I learned about convolutional layers so it wasn't too bad. I found that most people use linear layers so my module also uses linear layers. If at any point the module seems to be insufficient I will update it.



In [1]:
import torch.nn as nn

class ActorCritic(nn.Module):

    def __init__(self, nb_actions):
        super().__init__()
        self.head = nn.Sequential(nn.Linear(4, 64), nn.Tanh(), nn.Linear(64, 64), nn.Tanh())
        self.actor = nn.Sequential(nn.Linear(64, nb_actions))
        self.critic = nn.Sequential(nn.Linear(64, 1))
    
    def forward(self, x):
        h = self.head(x)
        return self.actor(h), self.critic(h)

With the help of chatgpt I made the PPO agent

In [2]:
import torch
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from torch.distributions import Categorical
class PPOAgent:
    def __init__(self, nb_actions, device='cpu', gamma=0.99, lam=0.95, clip_eps=0.2, lr=2.5e-4, epochs=4, batch_size=64):
        self.device = device
        self.gamma = gamma
        self.lam = lam
        self.clip_eps = clip_eps
        self.epochs = epochs
        self.batch_size = batch_size

        self.model = ActorCritic(nb_actions).to(device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)

    def get_action_and_value(self, state):
        logits, value = self.model(state)
        dist = Categorical(logits=logits)
        action = dist.sample()
        log_prob = dist.log_prob(action)
        return action, log_prob, value.squeeze(-1)

    def compute_gae(self, rewards, values, dones, next_value):
        values = np.append(values, next_value)
        gae = 0
        returns = []
        for t in reversed(range(len(rewards))):
            delta = rewards[t] + self.gamma * values[t + 1] * (1 - dones[t]) - values[t]
            gae = delta + self.gamma * self.lam * (1 - dones[t]) * gae
            returns.insert(0, gae + values[t])
        return returns

    def ppo_update(self, states, actions, log_probs, returns, values):
        advantages = returns - values
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        for _ in range(self.epochs):
            indices = np.arange(len(states))
            np.random.shuffle(indices)

            for i in range(0, len(states), self.batch_size):
                idx = indices[i:i+self.batch_size]
                s_batch = torch.tensor(states[idx], dtype=torch.float32).to(self.device)
                a_batch = torch.tensor(actions[idx]).to(self.device)
                old_log_probs_batch = torch.tensor(log_probs[idx]).to(self.device)
                ret_batch = torch.tensor(returns[idx], dtype=torch.float32).to(self.device)
                adv_batch = torch.tensor(advantages[idx], dtype=torch.float32).to(self.device)

                logits, value = self.model(s_batch)
                dist = Categorical(logits=logits)
                entropy = dist.entropy().mean()
                new_log_probs = dist.log_prob(a_batch)

                ratio = (new_log_probs - old_log_probs_batch).exp()
                surr1 = ratio * adv_batch
                surr2 = torch.clamp(ratio, 1.0 - self.clip_eps, 1.0 + self.clip_eps) * adv_batch

                policy_loss = -torch.min(surr1, surr2).mean()
                value_loss = F.mse_loss(value.squeeze(-1), ret_batch)

                loss = policy_loss + 0.5 * value_loss - 0.01 * entropy

                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

Before starting on the autocomplete agent I wanted to test if the PPO agent even works. It would be annoying if I didn't know if a bug was caused by my agent or my training. I created a script for loading a gymnasium environment and training the PPO agent. An interesting thing to note from the script is that the agent only gets trained after 50 experiences. this is because of the way PPO works. first the agent will collect an amount of experiences based on the current policy and then the agent gets trained on those experiences. unlike Q-learning, these experiences are discarded after training.

In [3]:
import gymnasium as gym
from PPO import PPOAgent
import numpy as np
import torch

env = gym.make('CartPole-v1')
agent = PPOAgent(nb_actions=env.action_space.n)

max_episodes = 10000
max_steps = 500

for episode in range(max_episodes):
    state, _ = env.reset()
    states, actions, log_probs, rewards, dones, values = [], [], [], [], [], []

    
    for _ in range(max_steps):
        state_tensor = torch.from_numpy(state).float().unsqueeze(0)
        action_tensor, log_prob, value = agent.get_action_and_value(state_tensor)
        action = int(action_tensor)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated


        states.append(state)
        actions.append(action)
        log_probs.append(log_prob.item())
        rewards.append(reward)
        dones.append(done)
        values.append(value.item())

        state = next_state
        if done:
            break

    next_state_tensor = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)
    _, _, next_value = agent.get_action_and_value(next_state_tensor)
    returns = agent.compute_gae(rewards, values, dones, next_value.item())

    agent.ppo_update(
        np.array(states),
        np.array(actions),
        np.array(log_probs),
        np.array(returns),
        np.array(values)
    )

    total_reward = sum(rewards)
    print(f"Episode {episode + 1}: Total Reward = {total_reward}")

Episode 1: Total Reward = 14.0
Episode 2: Total Reward = 20.0
Episode 3: Total Reward = 26.0
Episode 4: Total Reward = 24.0
Episode 5: Total Reward = 27.0
Episode 6: Total Reward = 23.0
Episode 7: Total Reward = 35.0
Episode 8: Total Reward = 19.0
Episode 9: Total Reward = 14.0
Episode 10: Total Reward = 29.0
Episode 11: Total Reward = 19.0
Episode 12: Total Reward = 12.0
Episode 13: Total Reward = 11.0
Episode 14: Total Reward = 14.0
Episode 15: Total Reward = 21.0
Episode 16: Total Reward = 22.0
Episode 17: Total Reward = 25.0
Episode 18: Total Reward = 38.0
Episode 19: Total Reward = 30.0
Episode 20: Total Reward = 17.0
Episode 21: Total Reward = 32.0
Episode 22: Total Reward = 9.0
Episode 23: Total Reward = 17.0
Episode 24: Total Reward = 15.0
Episode 25: Total Reward = 11.0
Episode 26: Total Reward = 23.0
Episode 27: Total Reward = 10.0
Episode 28: Total Reward = 22.0
Episode 29: Total Reward = 12.0
Episode 30: Total Reward = 16.0
Episode 31: Total Reward = 31.0
Episode 32: Total 

Looking at the reward, you can see that the agent gets properly trained. Now I can start with the autocomplete. While looking for a good dataset on kaggle, I found a english word frequency [dataset](https://www.kaggle.com/datasets/rtatman/english-word-frequency/data) containing 333333 of the most frequently used words according to Google Web Trillion Word Corpus. I want to remove all single letter words from the dataset as most of these are most likely accidental searches and there is no point on training the agent on single letters as there is no second letter to predict

In [None]:
import csv

with open('dataset.csv', mode='r') as file:
    csv_reader = csv.reader(file)
    for row in csv_reader:
        print(row)

AttributeError: module 'csv' has no attribute 'reacer'