## Deep Q-learning 

For more complex problems, a discrete state-action space may not be feasible. Instead we create a function approximation

$$Q_\theta(s,a)$$

Where $\theta$ are the function parameters.



We now seek to minimise the loss function

$$ L(\theta) = \mathbb{E}(y_k - Q_\theta(x_k, a_k))^2 $$

where

$$y_k = c_k + \lambda \min_a Q_\theta(x_{k+1}, a)$$

Therefore each time we perform our minimisation algorithm, we require a minibatch of quartets

$$e_k = (x_k, a_k, c_k, x_{k+1})$$

Like the Q-learning decscribed above, this is an off-policy method. Although this can be unstable, hence we may use

$$y_k = c_k + \lambda \min_a Q_{\theta'}(x_{k+1}, a)$$

where $Q_{\theta'}(x_{k}, a)$ tracks $Q_{\theta}(x_{k}, a)$ by periodically using $\theta' = \theta $.

In [None]:
import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader
from rdkit import Chem
from rdkit.Chem import AllChem

import tqdm
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from typing import List, Optional, Tuple


class Quartets(Dataset):
    def __init__(self, csv_file: str, smiles_col="SMILES", target_col="Solubility"):

    def __len__(self) -> int:
        return len(self.features)

    def __getitem__(self, idx) -> Tuple[torch.FloatTensor, torch.FloatTensor, torch.FloatTensor, torch.FloatTensor]:
        return 


class Qfunc(nn.Module):
    def __init__(self, input_size: int = 2048):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(input_size, input_size // 2),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(input_size // 2, input_size // 4),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(input_size // 4, input_size // 8),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(input_size // 8, 1),
        )

    def forward(self, x) -> torch.Tensor:
        return self.model(x)


class Trainer:
    def __init__(
        self,
        model: nn.Module,
        criterion: nn.Module,
        optimizer: optim.Optimizer,
        device: torch.device,
    ) -> None:
        self.model = model.to(device)
        self.criterion = criterion
        self.optimizer = optimizer
        self.device = device
        self.train_losses: List[float] = []

    def train_epoch(self, train_loader: DataLoader) -> float:
        self.model.train()
        total_loss: float = 0

        for batch_idx, (features, targets) in enumerate(train_loader):
            features, targets = features.to(self.device), targets.to(self.device)

            self.optimizer.zero_grad()
            outputs = self.model(features).squeeze()
            loss = self.criterion(outputs, targets)
            loss.backward()
            self.optimizer.step()

            total_loss += loss.item()

        return total_loss / len(train_loader)

    @staticmethod
    def calculate_r2(pred: torch.Tensor, true: torch.Tensor) -> float:
        ss_tot = torch.sum((true - true.mean()) ** 2)
        ss_res = torch.sum((true - pred) ** 2)
        r2 = 1 - (ss_res / ss_tot)
        return r2.item()

    def train(
        self,
        train_loader: DataLoader,
        epochs: int
    ) -> None:
        for epoch in tqdm.tqdm(range(epochs)):
            train_loss = self.train_epoch(train_loader)

            self.train_losses.append(train_loss)

            print(f"Epoch {epoch+1}/{epochs}:")
            print(f"Train Loss: {train_loss:.4f}")

    def plot_training_history(self) -> None:
        plt.figure(figsize=(12, 4))

        plt.plot(self.train_losses, label="Train Loss")
        plt.xlabel("Epoch")
        plt.ylabel("Loss")
        plt.legend()
        plt.tight_layout()
        plt.show()


def main():
    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Create dataset
    dataset = ...

    # Create data loaders
    train_loader = DataLoader(dataset, batch_size=32, shuffle=True)

    # Initialize model, criterion, optimizer
    model = Qfunc()
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

    # Create trainer and train
    trainer = Trainer(model, criterion, optimizer, device)
    trainer.train(train_loader, epochs=100)

    # Plot training history
    trainer.plot_training_history()

In [None]:
class DeepQLearning:
    def __init__(self, env, q_model: Qfunc):
        self.env = env
        self.epsilon_init = 0.5
        self.epsilon_final = 0.0

        self.q_model = q_model

    def policy(self, state, step_total):
        # Epsilon-greedy action selection
        return


    def train(self, episodes):
        # add method to train NN using 
        return 

## Actor-critic Methods

The above method has a notable problem, how can we gurantee performance with the presence of $\min_a$?

This is where we now introduce another neural network $\Pi_w$ that determines the suitable action given a state. $y_k$ is now given by 

$$y_k = c_k + \lambda \min_a Q_{\theta}(x_{k+1}, \Pi_w(x_{k+1}))$$

and the function to minimise for the actor is given by 

$$L_\Pi(w) = \mathbb{E}(Q_{\theta}(x, \Pi_w(x_))) $$
