# PyTorch qGAN Implementation

Description

adapted from [PyTorch GAN](https://github.com/eriklindernoren/PyTorch-GAN/blob/master/implementations/gan/gan.py)

In [115]:
# Necessary imports

import numpy as np
from scipy.stats import entropy
from typing import Union, List, Optional, Iterable

from torch import Tensor, stack, reshape, manual_seed, log, clamp
from torch.utils.data import DataLoader
from torch.autograd import Variable
from torch.optim import Adam
import torch.nn as nn
import torch.nn.functional as F


from qiskit import Aer, QuantumCircuit
from qiskit.utils import QuantumInstance, algorithm_globals
from qiskit.opflow import Gradient, StateFn
from qiskit.circuit.library import TwoLocal
from qiskit.circuit import ParameterExpression, ParameterVector
from qiskit_machine_learning.neural_networks import CircuitQNN
from qiskit_machine_learning.connectors import TorchConnector
from qiskit_machine_learning.datasets.dataset_helper import discretize_and_truncate

# Set seed for random generators
algorithm_globals.random_seed = 42
np.random.seed = 42
manual_seed(42)

<torch._C.Generator at 0x7fe7909fa310>

### Load training data

For testing purposes, we decide for a 2D multivariate normal distribution.
Each dimension is represented by 2 qubits.

In [116]:
data_dim = [3, 3]

training_data = np.random.default_rng().multivariate_normal(mean=[0., 0.], cov=[[1, 0], [0, 1]], size=1000, check_valid='warn',
                                                        tol=1e-8, method='svd')
# Define minimal and maximal values for the training data
bounds_min = np.percentile(training_data, 5, axis=0)
bounds_max = np.percentile(training_data, 95, axis=0)
bounds = []
for i, _ in enumerate(bounds_min):
    bounds.append([bounds_min[i], bounds_max[i]])

# Pre-processing, i.e., gridding
(training_data,
data_grid,
grid_elements,
prob_data ) = discretize_and_truncate(
training_data,
np.array(bounds),
data_dim,
return_data_grid_elements=True,
return_prob=True,
prob_non_zero=True,
)

# Define the training batch size
batch_size = 300
dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True, drop_last=True)

### Specify Backend

In [117]:
# declare quantum instance
backend = Aer.get_backend('aer_simulator')
qi = QuantumInstance(backend, shots = batch_size)

### Definition of quantum generator and the respective gradient

In [118]:
def generator_(qnn: QuantumCircuit,
               parameters: Union[ParameterVector, ParameterExpression, List[ParameterExpression]]) -> TorchConnector:
    """
    Args:
        qnn: Quantum neural network ansatz given as a quantum circuit.
        parameters: The parameters of the quantum neural network which are trained.
    Returns:
        Quantum neural network compatible with PyTorch
    """
    circuit_qnn = CircuitQNN(qnn, input_params=[], weight_params = parameters,
                             quantum_instance=qi, sampling=True, sparse=False,
                             input_gradients=True, interpret=lambda x: grid_elements[x])
    # We use the Qiskit TorchConnector to ensure compatibility with PyTorch
    return TorchConnector(circuit_qnn)

def generator_grad(qnn: QuantumCircuit,
                   parameters: Union[ParameterVector, ParameterExpression, List[ParameterExpression]],
                   param_values: Iterable,
                   grad_method: str = 'param_shift') -> Iterable:
    """
    Custom generator gradient
    Args:
        qnn: Quantum neural network ansatz given as a quantum circuit.
        parameters: The parameters of the quantum neural network which are trained.
        param_values: The current values of the quantum neural network parameters.
        grad_method: Method used to compute the gradients {'param_shift', 'lin_comb', 'fin_diff'}
    Returns:
        List of gradients for the sampling probabilities of the quantum neural network.
    """
    grad = Gradient(grad_method=grad_method).gradient_wrapper(StateFn(qnn), parameters, backend=qi)
    grad_values = grad(param_values)
    return grad_values.tolist()

### Definition of classical discriminator

In [119]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()

        self.Linear_in = nn.Linear(len(data_dim), 20)
        self.Leaky_ReLU = nn.LeakyReLU(0.2, inplace=True)
        # self.Linear50 = nn.Linear(50, 20)
        self.Linear20 = nn.Linear(20, 1)
        self.Sigmoid = nn.Sigmoid()

    def forward(self, input: Tensor) -> Tensor:
        x = self.Linear_in(input)
        x = self.Leaky_ReLU(x)
        # x = self.Linear50(x)
        # x = self.Leaky_ReLU(x)
        x = self.Linear20(x)
        x = self.Sigmoid(x)
        return x

### Definition of the quantum neural network ansatz

In [120]:
qnn = QuantumCircuit(sum(data_dim))
qnn.h(qnn.qubits)
ansatz = TwoLocal(sum(data_dim), "ry", "cx", reps=2, entanglement="circular")
qnn.compose(ansatz, inplace=True)

In [121]:
# discriminator = Discriminator()

In [122]:
# d_params = discriminator.

In [123]:
# nn.init.zeros_(d_params)

### Definition of the loss functions

In [124]:
# Loss function
g_loss_fun = nn.BCELoss()
d_loss_fun = nn.BCELoss()

### Evaluation of custom gradients for the generator BCE loss function

In [125]:
def g_loss_fun_grad(qnn: QuantumCircuit,
                    parameters: Union[ParameterVector, ParameterExpression, List[ParameterExpression]],
                    param_values: Iterable,
                    discriminator_: nn.Module,
                    grad_method: str = 'param_shift') -> Iterable:
    """
    Custom gradient of the generator loss function considering the custom gradients of the quantum generator
    Args:
        qnn: Quantum neural network ansatz given as a quantum circuit.
        parameters: The parameters of the quantum neural network which are trained.
        param_values: The current values of the quantum neural network parameters.
        discriminator_: Classical neural network representing the discriminator.
        grad_method: Method used to compute the gradients {'param_shift', 'lin_comb', 'fin_diff'}
    Returns:
        List of gradient values, i.e., the gradients of the loss function w.r.t. the quantum neural network parameters
    """
    grads = generator_grad(qnn, parameters, param_values, grad_method = grad_method)
    loss_grad = ()
    for j, grad in enumerate(grads):
        cx = grad[0].tocoo()
        input = []
        target = []
        weight = []
        for index, prob_grad in zip(cx.col, cx.data):
            input.append(grid_elements[index])
            target.append([1.])
            weight.append([prob_grad])
        bce_loss_grad = F.binary_cross_entropy(discriminator_(Tensor(input)), Tensor(target), weight=Tensor(weight))
        loss_grad += (bce_loss_grad, )
    loss_grad = stack(loss_grad)
    return loss_grad

### Definition of relative entropy as benchmarking metric

In [133]:
def get_rel_entr(gen_data) -> float:
    """Get relative entropy between target and trained distribution"""
    prob_gen = np.zeros(len(grid_elements))
    for j, item in enumerate(grid_elements):
        for gen_item in gen_data.detach().numpy():
            if np.allclose(np.round(gen_item, 6), np.round(item, 6), rtol=1e-5):
                prob_gen[j] += 1
    prob_gen=prob_gen/len(gen_data)
    prob_gen = [1e-8 if x == 0 else x for x in prob_gen]
    rel_entr = entropy(prob_gen, prob_data)
    return rel_entr

### Definition of the optimizers

In [134]:
# Initialize generator and discriminator
generator = generator_(qnn, ansatz.parameters)
discriminator = Discriminator()

lr=0.01 #learning rate
b1=0.9 #first momentum parameter
b2=0.999 #second momentum parameter
n_epochs=100 #number of training epochs

#optimizer for the generator
optimizer_G = Adam(generator.parameters(), lr=lr, betas=(b1, b2))
#optimizer for the discriminator
optimizer_D = Adam(discriminator.parameters(), lr=lr, betas=(b1, b2))

In [135]:
# TODO discriminator.parameters() move to testing

### Training

In [136]:
rel_entr_list = []

for epoch in range(n_epochs):
    rel_entr = []
    for i, data in enumerate(dataloader):
        # Adversarial ground truths
        valid = Variable(Tensor(data.size(0), 1).fill_(1.0), requires_grad=False)
        fake = Variable(Tensor(data.size(0), 1).fill_(0.0), requires_grad=False)

        # Configure input
        real_data = Variable(data.type(Tensor))
        # Generate a batch of images
        gen_data = generator()

        # Evaluate Relative Entropy
        rel_entr.append(get_rel_entr(gen_data))

        # ---------------------
        #  Train Discriminator
        # ---------------------
        optimizer_D.zero_grad()

        # Measure discriminator's ability to classify real from generated samples
        disc_data = discriminator(real_data)
        real_loss = d_loss_fun(disc_data, valid)
        fake_loss = d_loss_fun(discriminator(gen_data), fake)  # (discriminator(gen_data).detach(), fake)
        d_loss = (real_loss + fake_loss) / 2

        # for name, param in discriminator.state_dict().items():
        #     print(name, param.detach())

        # print('d loss ', d_loss)

        d_loss.backward(retain_graph=True)
        optimizer_D.step()

        # -----------------
        #  Train Generator
        # -----------------

        optimizer_G.zero_grad()

        # # Loss measures generator's ability to fool the discriminator
        g_loss = g_loss_fun(discriminator(gen_data), valid)
        g_loss.retain_grad = True
        g_loss_grad = g_loss_fun_grad(qnn, ansatz.ordered_parameters, generator.weight.data.numpy(), discriminator)

        g_loss.backward(retain_graph=True)
        for j, param in enumerate(generator.parameters()):
            param.grad = g_loss_grad
        optimizer_G.step()


        print(
            "[Epoch %d/%d] [Batch %d/%d] [D loss: %f] [G loss: %f]"
            % (epoch, n_epochs, i, len(dataloader), d_loss.item(), g_loss.item())
        )

        # batches_done = epoch * len(dataloader) + i
        # if batches_done % optimizer_G.sample_interval == 0:
        #     #TODO: Do something like storing, printing or relative entropy evaluation
        #     pass

    rel_entr_list.append(np.mean(rel_entr))

[Epoch 0/100] [Batch 0/2] [D loss: 0.714448] [G loss: 0.928033]
[Epoch 0/100] [Batch 1/2] [D loss: 0.706379] [G loss: 0.900403]
[Epoch 1/100] [Batch 0/2] [D loss: 0.701542] [G loss: 0.860606]
[Epoch 1/100] [Batch 1/2] [D loss: 0.695915] [G loss: 0.829850]
[Epoch 2/100] [Batch 0/2] [D loss: 0.695938] [G loss: 0.792841]
[Epoch 2/100] [Batch 1/2] [D loss: 0.683551] [G loss: 0.785370]
[Epoch 3/100] [Batch 0/2] [D loss: 0.687643] [G loss: 0.749159]
[Epoch 3/100] [Batch 1/2] [D loss: 0.688523] [G loss: 0.718854]
[Epoch 4/100] [Batch 0/2] [D loss: 0.689570] [G loss: 0.693075]
[Epoch 4/100] [Batch 1/2] [D loss: 0.690001] [G loss: 0.683160]
[Epoch 5/100] [Batch 0/2] [D loss: 0.676524] [G loss: 0.690667]
[Epoch 5/100] [Batch 1/2] [D loss: 0.688309] [G loss: 0.662591]
[Epoch 6/100] [Batch 0/2] [D loss: 0.690404] [G loss: 0.649922]
[Epoch 6/100] [Batch 1/2] [D loss: 0.678711] [G loss: 0.664482]
[Epoch 7/100] [Batch 0/2] [D loss: 0.677886] [G loss: 0.664639]
[Epoch 7/100] [Batch 1/2] [D loss: 0.677

In [138]:
print('Relative entropy ', rel_entr_list)

Relative entropy  [1.712349361153411, 1.722884709742737, 1.8838426106715145, 1.7601374477520324, 1.7627355670205012, 1.6091215156891399, 1.7466653725276133, 1.5988352159301735, 1.6353539159728574, 1.3420077218204036, 1.8984818120714726, 1.3864323378254242, 1.2883422149745736, 1.3675463970821973, 1.5198449178797258, 1.245582986736179, 1.264990207520007, 1.2122216957553764, 1.2627542370372962, 1.1659676771217167, 1.2983495878869935, 1.0956322038294535, 1.1213118812840048, 1.0537963658350087, 1.1128482298071751, 1.0716683157514575, 1.0379317870397275, 1.1368614220038378, 1.1306008315862761, 1.1902810696074613, 1.0774867320821304, 1.0427759778439984, 1.1025904307460834, 1.0397652189508277, 0.9782199413039465, 0.948812078230151, 0.9405030161344059, 0.9507497549949461, 0.9246165329049216, 0.9313139484692454, 0.9039420936925577, 0.9832687933294664, 0.9681168778001941, 0.9226553255154323, 0.8963672096421159, 0.9684335415086522, 0.9682423820414322, 0.9681959147930519, 0.9886157462190738, 0.9700

In [132]:
# len(prob_data)
# data_grid
grid_elements

[[-1.6184392680888893, -1.5402153632625417],
 [-1.6184392680888893, -1.0891895102937574],
 [-1.6184392680888893, -0.638163657324973],
 [-1.6184392680888893, -0.18713780435618865],
 [-1.6184392680888893, 0.2638880486125956],
 [-1.6184392680888893, 0.7149139015813799],
 [-1.6184392680888893, 1.1659397545501644],
 [-1.6184392680888893, 1.6169656075189487],
 [-1.1450048106382602, -1.5402153632625417],
 [-1.1450048106382602, -1.0891895102937574],
 [-1.1450048106382602, -0.638163657324973],
 [-1.1450048106382602, -0.18713780435618865],
 [-1.1450048106382602, 0.2638880486125956],
 [-1.1450048106382602, 0.7149139015813799],
 [-1.1450048106382602, 1.1659397545501644],
 [-1.1450048106382602, 1.6169656075189487],
 [-0.6715703531876313, -1.5402153632625417],
 [-0.6715703531876313, -1.0891895102937574],
 [-0.6715703531876313, -0.638163657324973],
 [-0.6715703531876313, -0.18713780435618865],
 [-0.6715703531876313, 0.2638880486125956],
 [-0.6715703531876313, 0.7149139015813799],
 [-0.671570353187631

Alternative approach
class LegendrePolynomial3(torch.autograd.Function):
    """
    We can implement our own custom autograd Functions by subclassing
    torch.autograd.Function and implementing the forward and backward passes
    which operate on Tensors.
    """

    @staticmethod
    def forward(ctx, input):
        """
        In the forward pass we receive a Tensor containing the input and return
        a Tensor containing the output. ctx is a context object that can be used
        to stash information for backward computation. You can cache arbitrary
        objects for use in the backward pass using the ctx.save_for_backward method.
        """
        ctx.save_for_backward(input)
        return 0.5 * (5 * input ** 3 - 3 * input)

    @staticmethod
    def backward(ctx, grad_output):
        """
        In the backward pass we receive a Tensor containing the gradient of the loss
        with respect to the output, and we need to compute the gradient of the loss
        with respect to the input.
        """
        input, = ctx.saved_tensors
        return grad_output * 1.5 * (5 * input ** 2 - 1)
