In [9]:
import numpy as np
import matplotlib.pyplot as plt

from qiskit.circuit import QuantumCircuit, QuantumRegister, Parameter, ParameterVector, ParameterExpression
from qiskit.circuit.library import TwoLocal

import qiskit as qk
from qiskit_aer.primitives import EstimatorV2 as AerEstimator

import qiskit_machine_learning as qml
from qiskit_machine_learning.neural_networks import EstimatorQNN
from qiskit_machine_learning.connectors import TorchConnector

import torch 
from torch import Tensor
from torch.nn import MSELoss
from torch.optim import LBFGS, SGD, Adam, RMSprop

import gymnasium as gym


In [10]:
#Better anumation in Jupyter NBK
import matplotlib as mpl
import matplotlib.animation as animation
mpl.rc('animation', html='jshtml')

def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch,

def plot_animation(frames, repeat=False, interval=40):
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = animation.FuncAnimation(
        fig, update_scene, fargs=(frames, patch),
        frames=len(frames), repeat=repeat, interval=interval)
    plt.close()
    return anim

### Rx(θ) Gate

The **rotation around the X-axis** (Rx) gate is defined as:

$$
R_x(\theta) =
\begin{bmatrix}
\cos\left(\frac{\theta}{2}\right) & -i\sin\left(\frac{\theta}{2}\right) \\
-i\sin\left(\frac{\theta}{2}\right) & \cos\left(\frac{\theta}{2}\right)
\end{bmatrix}
$$

It performs a rotation of the qubit state vector by an angle **θ** about the **X-axis** of the Bloch sphere.


In [11]:
#Encoding Circuit (EWncoding classical information onto a quantum register)
def encoding_circuit(inputs, num_qubits = 4, *args):
    qc = QuantumCircuit(num_qubits)
    for i in range(len(inputs)):
        qc.rx(inputs[i],i)

    return qc

In [12]:
#Parameterized Quantum Circuit : An alternative to DQN, to estimate the Q-value (reward), based on the (State, Action) pair
def parametrized_circuit(num_qubits = 4, reuploading = False, reps = 2, insert_barriers = True, meas = False):
    qr = qk.QuantumRegister(num_qubits, 'qr')
    qc = qk.QuantumCircuit(qr)
    
    if meas:
        qr = qk.QuantumRegister(num_qubits, 'qr')
        cr = qk.ClassicalRegister(num_qubits, 'cr')
        qc = qk.QuantumCircuit(qr,cr)
    
    
    if not reuploading:
        
        # Define a vector containg Inputs as parameters (*not* to be optimized)
        inputs = qk.circuit.ParameterVector('x', num_qubits)
                
        # Encode classical input data
        qc.compose(encoding_circuit(inputs, num_qubits = num_qubits), inplace = True)
        if insert_barriers: qc.barrier()
        
        # Variational circuit
        qc.compose(TwoLocal(num_qubits, ['ry', 'rz'], 'cz', 'circular', 
               reps=reps, insert_barriers= insert_barriers, 
               skip_final_rotation_layer = True), inplace = True)
        if insert_barriers: qc.barrier()
        
        # Add final measurements
        if meas: qc.measure(qr,cr)
        
    elif reuploading:
        
        # Define a vector containg Inputs as parameters (*not* to be optimized)
        inputs = qk.circuit.ParameterVector('x', num_qubits)
                
        # Define a vector containng variational parameters
        θ = qk.circuit.ParameterVector('θ', 2 * num_qubits * reps)
        
        # Iterate for a number of repetitions
        for rep in range(reps):

            # Encode classical input data
            qc.compose(encoding_circuit(inputs, num_qubits = num_qubits), inplace = True)
            if insert_barriers: qc.barrier()
                
            # Variational circuit (does the same as TwoLocal from Qiskit)
            for qubit in range(num_qubits):
                qc.ry(θ[qubit + 2*num_qubits*(rep)], qubit)
                qc.rz(θ[qubit + 2*num_qubits*(rep) + num_qubits], qubit)
            if insert_barriers: qc.barrier()
                
            # Add entanglers (this code is for a circular entangler)
            qc.cz(qr[-1], qr[0])
            for qubit in range(num_qubits-1):
                qc.cz(qr[qubit], qr[qubit+1])
            if insert_barriers: qc.barrier()
                        
        # Add final measurements
        if meas: qc.measure(qr,cr)
        
    return qc

In [13]:
num_qubits = 4
qc = parametrized_circuit(num_qubits = num_qubits, 
                          reuploading = True, 
                          reps = 6)
X = list(qc.parameters)[: num_qubits]
params = list(qc.parameters)[num_qubits:]
qc.draw()

In [14]:
#PyTorch Connector
estimator = AerEstimator() 

qnn = EstimatorQNN(
    circuit=qc,
    input_params=X,
    weight_params=params,
    estimator=estimator
)
# Connect to PyTorch
initial_weights = (2*np.random.rand(qnn.num_weights) - 1)
quantum_nn = TorchConnector(qnn, initial_weights)

No gradient function provided, creating a gradient function. If your Estimator requires transpilation, please provide a pass manager.


In [15]:
class encoding_layer(torch.nn.Module):
    def __init__(self, num_qubits = 4):
        super().__init__()
        
        # Define weights for the layer
        weights = torch.Tensor(num_qubits)
        self.weights = torch.nn.Parameter(weights)
        torch.nn.init.uniform_(self.weights, -1, 1) 
    
        
    def forward(self, x):
        """Forward step, as explained above."""
        
        if not isinstance(x, Tensor):
            x = Tensor(x)
        
        x = self.weights * x
        x = torch.atan(x)
                
        return x

In [16]:
class exp_val_layer(torch.nn.Module):
    def __init__(self, action_space = 2):
        super().__init__()
        
        # Define the weights for the layer
        weights = torch.Tensor(action_space)
        self.weights = torch.nn.Parameter(weights)
        torch.nn.init.uniform_(self.weights, 35, 40) # <-- Initialization strategy (heuristic choice)
        
        # Masks that map the vector of probabilities to <Z_0*Z_1> and <Z_2*Z_3>
        self.mask_ZZ_12 = torch.tensor([1.,-1.,-1.,1.,1.,-1.,-1.,1.,1.,-1.,-1.,1.,1.,-1.,-1.,1.], requires_grad = False)
        self.mask_ZZ_34 = torch.tensor([-1.,-1.,-1.,-1.,1.,1.,1.,1.,-1.,-1.,-1.,-1.,1.,1.,1.,1.], requires_grad = False)
        
    def forward(self, x):
        """Forward step, as described above."""
        
        expval_ZZ_12 = self.mask_ZZ_12 * x
        expval_ZZ_34 = self.mask_ZZ_34 * x
        
        # Single sample
        if len(x.shape) == 1:
            expval_ZZ_12 = torch.sum(expval_ZZ_12)
            expval_ZZ_34 = torch.sum(expval_ZZ_34)
            out = torch.cat((expval_ZZ_12.unsqueeze(0), expval_ZZ_34.unsqueeze(0)))
        
        # Batch of samples
        else:
            expval_ZZ_12 = torch.sum(expval_ZZ_12, dim = 1, keepdim = True)
            expval_ZZ_34 = torch.sum(expval_ZZ_34, dim = 1, keepdim = True)
            out = torch.cat((expval_ZZ_12, expval_ZZ_34), 1)
                
        return self.weights * ((out + 1.) / 2.)

In [17]:
encoding = encoding_layer()

# Classical trainable postprocessing
exp_val = exp_val_layer()

# Stack the classical and quantum layers together 
model = torch.nn.Sequential(encoding, 
                            quantum_nn, 
                            exp_val)

model.state_dict()

OrderedDict([('0.weights', tensor([0.3173, 0.4480, 0.0538, 0.6332])),
             ('1.weight',
              tensor([ 0.8481, -0.8442, -0.8235,  0.9780,  0.1273, -0.4580, -0.0693, -0.6563,
                      -0.2601, -0.8433, -0.6814,  0.8737, -0.5132, -0.2454, -0.1320, -0.7731,
                      -0.5694,  0.4126,  0.2250, -0.4415,  0.2136,  0.0754,  0.9722,  0.7280,
                       0.9328, -0.7745,  0.6456, -0.3321, -0.3205,  0.1676, -0.8346, -0.1223,
                      -0.9597, -0.4486,  0.1689,  0.0298,  0.2180,  0.8072, -0.2345, -0.9390,
                      -0.2795,  0.5678,  0.3898,  0.0498,  0.1794, -0.3097,  0.1085, -0.5412])),
             ('1._weights',
              tensor([ 0.8481, -0.8442, -0.8235,  0.9780,  0.1273, -0.4580, -0.0693, -0.6563,
                      -0.2601, -0.8433, -0.6814,  0.8737, -0.5132, -0.2454, -0.1320, -0.7731,
                      -0.5694,  0.4126,  0.2250, -0.4415,  0.2136,  0.0754,  0.9722,  0.7280,
                       0.93

In [18]:
env = gym.make("CartPole-v1")
input_shape = [4] # == env.observation_space.shape
n_outputs = 2 # == env.action_space.n

In [19]:
from collections import deque

replay_memory = deque(maxlen=2000)

In [21]:
batch_size = 16
discount_rate = 0.99
optimizer = Adam(model.parameters(), lr=1e-2)
loss_fn = MSELoss()

In [32]:
def epsilon_greedy_policy(state, epsilon=0):
    """Manages the transition from the *exploration* to *exploitation* phase"""
    if np.random.rand() < epsilon:
        return np.random.randint(n_outputs)
    else:
        with torch.no_grad():
            Q_values = model(Tensor(state)).numpy()
        return np.argmax(Q_values[0])
    
def sample_experiences(batch_size):
    """Sample some past experiences from the replay memory"""
    indices = np.random.randint(len(replay_memory), size=batch_size)
    batch = [replay_memory[index] for index in indices]
    # Extract each field separately, handling arrays and scalars appropriately
    # Ensure states and next_states are consistently shaped numpy arrays
    states_list = []
    next_states_list = []
    for experience in batch:
        # Convert state to numpy array, handling tuples or nested structures
        state_val = experience[0]
        if isinstance(state_val, tuple):
            state_val = state_val[0]  # If it's a tuple, take the first element
        state = np.asarray(state_val, dtype=np.float32).flatten()
        
        # Convert next_state to numpy array, handling tuples or nested structures
        next_state_val = experience[3]
        if isinstance(next_state_val, tuple):
            next_state_val = next_state_val[0]  # If it's a tuple, take the first element
        next_state = np.asarray(next_state_val, dtype=np.float32).flatten()
        
        states_list.append(state)
        next_states_list.append(next_state)
    states = np.stack(states_list)
    actions = np.array([experience[1] for experience in batch])
    rewards = np.array([experience[2] for experience in batch], dtype=np.float32)
    next_states = np.stack(next_states_list)
    dones = np.array([experience[4] for experience in batch], dtype=np.bool_)
    return states, actions, rewards, next_states, dones

def play_one_step(env, state, epsilon):
    """Perform one action in the environment and register the state of the system"""
    action = epsilon_greedy_policy(state, epsilon)
    next_state, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated  # Gymnasium returns terminated and truncated separately
    replay_memory.append((state, action, reward, next_state, done))
    return next_state, reward, done, info

def sequential_training_step(batch_size):
    """
    Actual training routine. Implements the Deep Q-Learning algorithm.
    
    This implementation evaluates individual losses sequentially instead of using batches. 
    This is due to an issue in the TorchConnector, which yields vanishing gradients if it 
    is called with a batch of data (see https://github.com/Qiskit/qiskit-machine-learning/issues/100).
    
    Use this training for the quantum model. If using the classical model, you can use indifferently 
    this implementation or the batched one below. 
    """
    
    # Sample past experiences 
    experiences = sample_experiences(batch_size)
    states, actions, rewards, next_states, dones = experiences
    
    # Evaluates Target Q-values
    with torch.no_grad():
        next_Q_values = model(Tensor(next_states)).numpy()
    max_next_Q_values = np.max(next_Q_values, axis=1)
    target_Q_values = (rewards + (1 - dones) * discount_rate * max_next_Q_values)
    
    # Accumulate Loss sequentially (if batching data, gradients of the parameters are vanishing)
    loss = 0.
    for j, state in enumerate(states):
        single_Q_value = model(Tensor(state))
        Q_value = single_Q_value[actions[j]]
        loss += (target_Q_values[j] - Q_value)**2
    
    # Evaluate the gradients and update the parameters 
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

def training_step(batch_size):
    """
    This is exactly the same function as sequential_training_step, except that it 
    evaluates loss with batch of data, instead of using a for loop. 
    
    Can use this if training the classical model.
    """
    
    # Sample past experiences
    experiences = sample_experiences(batch_size)
    states, actions, rewards, next_states, dones = experiences
    
    # Evaluate Target Q-values
    with torch.no_grad():
        next_Q_values = model(Tensor(next_states)).numpy()
    max_next_Q_values = np.max(next_Q_values, axis=1)
    target_Q_values = (rewards +
                       (1 - dones) * discount_rate * max_next_Q_values)
    target_Q_values = target_Q_values.reshape(-1, 1)
    mask = torch.nn.functional.one_hot(Tensor(actions).long(), n_outputs)
    
    # Evaluate the loss
    all_Q_values = model(Tensor(states))
    Q_values = torch.sum(all_Q_values * mask, dim=1, keepdims=True)
    loss = loss_fn(Tensor(target_Q_values), Q_values)
    
    # Evaluate the gradients and update the parameters 
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


In [33]:
rewards = [] 
best_score = 0

# We let the agent train for 2000 episodes
for episode in range(2000):
    
    # Run enviroment simulation
    obs, info = env.reset()  # Gymnasium returns (observation, info)
    
    # 200 is the target score for considering the environment solved
    for step in range(200):
        
        # Manages the transition from exploration to exploitation
        epsilon = max(1 - episode / 1500, 0.01)
        obs, reward, done, info = play_one_step(env, obs, epsilon)
        if done:
            break
    rewards.append(step)
    
    # Saving best agent
    if step >= best_score:
        # torch.save(model.state_dict(), './new_model_best_weights.pth') # Save best weights
        best_score = step
        
    print("\rEpisode: {}, Steps : {}, eps: {:.3f}".format(episode, step + 1, epsilon), end="")
    
    # Start training only after some exploration experiences  
    if episode > 20:
        sequential_training_step(batch_size)

Episode: 25, Steps : 14, eps: 0.983

KeyboardInterrupt: 

### Post Training Analysis

In [None]:
plt.figure(figsize=(8, 4))
plt.plot(rewards)
plt.xlabel("Episode", fontsize=14)
plt.ylabel("Sum of rewards", fontsize=14)
plt.show()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_theme()

# This file contains the sum of rewards from a previous successfull training run
rewards_history = np.loadtxt("training_rewards2.txt") + 1

cmap = plt.get_cmap('tab20c')

fig = plt.figure(figsize=(8,5))
plt.axhline([200], ls = 'dashed', c=cmap(9))
plt.text(-50,190, s='Max reward', c=cmap(8))

plt.text(-50,100, s='Exploration\nphase', c=cmap(12))
plt.text(1100,100, s='Exploitation\nphase', c=cmap(12))

plt.plot(rewards_history, c = cmap(4))
plt.xlabel("Episodes")
plt.ylabel("Final reward")

plt.tight_layout()
plt.show()

In [None]:
env.seed(42)

state = env.reset()

frames = []
for step in range(200):
    action = epsilon_greedy_policy(state)
    state, reward, done, info = env.step(action)
    if done:
        print("End at step:", step)
        break
    img = env.render(mode="rgb_array")
    frames.append(img)
    
plot_animation(frames)