# Qubit Mover 2.0

Qubit Mover 2.0 is an updated version of the Qubit Mover project, which aims to develop a reinforcement learning agent capable of optimizing qubit distribution protocols in a quantum network. The agent will learn to navigate the network and make decisions that maximize the quantum Fisher information matrix (QFIM) for parameter estimation tasks. In addition to being able to move qubits, the agent will also have the ability to apply hadamard gates to qubits, allowing for more complex state manipulations and potentially improving the estimation accuracy.

## Import Libraries
This section imports all the required libraries for quantum information calculations, symbolic computation, reinforcement learning, and plotting. Each library is chosen for its specific capabilities:
- `itertools`, `random`, `datetime`: Standard Python utilities for combinatorics, randomness, and time-stamping.
- `sympy`: Used for symbolic mathematics, which is essential for quantum Fisher information and protocol analysis.
- `numpy`: Efficient numerical operations and array handling.
- `matplotlib`: For visualizing results and protocol performance.
- `gymnasium`, `stable_baselines3`: RL environment and agent training.
- `scipy.optimize`: For numerical root finding and least squares estimation in parameter recovery.

In [2]:
# QFIM calculations libraries
from itertools import combinations

import sympy as sp


# Estimator libraries
from scipy.optimize import root, least_squares

# Standard libraries
import numpy as np
import random
import datetime
import matplotlib.pyplot as plt

# RL libraries
import gymnasium as gym
from gymnasium import spaces
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.callbacks import EvalCallback, StopTrainingOnRewardThreshold



## Create Helper Functions

The density matrix of each qubit will be calculated seperately and symbolically using sympy. The overall density matrix will be the tensor product of each qubit's density matrix. The QFIM will then be calculated from the overall density matrix.

### Initialize Constants and variables

In [1]:
state0=sp.Matrix(np.matrix([[1,0],[0,0]]))
x_gate=sp.Matrix(np.matrix([[0,1],[1,0]]))
h_gate=sp.Matrix(np.matrix([[1,1],[1,-1]]))/np.sqrt(2)
p1,p2,p3 = sp.symbols('p1 p2 p3') # Parameter symbols
symbols_list = [p1,p2,p3]
p1_hat,p2_hat,p3_hat = sp.symbols('p1_hat p2_hat p3_hat')

def error_1(rho):
    return(p1_hat*rho+p1*x_gate*rho*x_gate)

def error_2(rho):
    return(p2_hat*rho+p2*x_gate*rho*x_gate)

def error_3(rho):
    return(p3_hat*rho+p3*x_gate*rho*x_gate)

def hadamard(rho):
    return(h_gate*rho*h_gate)



NameError: name 'sp' is not defined

### Fisher Information Matrix Calculation

In [5]:
def calculate_QFIM(rhos,thetas=None):

    state0=sp.Matrix(np.matrix([[1,0],[0,0]]))
    x_gate=sp.Matrix(np.matrix([[0,1],[1,0]]))
    h_gate=sp.Matrix(np.matrix([[1,1],[1,-1]]))/np.sqrt(2)
    p1,p2,p3 = sp.symbols('p1 p2 p3') # Parameter symbols
    symbols_list = [p1,p2,p3]
    p1_hat,p2_hat,p3_hat = sp.symbols('p1_hat p2_hat p3_hat')


    # compute eigen values

    F=sp.zeros(len(symbols_list), len(symbols_list))
    n=len(symbols_list)

    for rho in rhos:

        lambdas=rho.eigenvals()


        subs_dict = {sp.symbols('p1_hat'): 1 - sp.symbols('p1'),
                    sp.symbols('p2_hat'): 1 - sp.symbols('p2'),
                    sp.symbols('p3_hat'): 1 - sp.symbols('p3')}
        lambdas_sub = [lam.subs(subs_dict) for lam in lambdas]


        for i in range(n):
            for j in range(n):
                s = 0
                for lam in lambdas_sub:
                    if lam == 0:
                        continue  # Skip zero eigenvalues to avoid division by zero
                    s += (1/lam) * lam.diff(symbols_list[i]) * lam.diff(symbols_list[j])
                F[i, j] += s


    if thetas is not None:
        F=F.subs({p1: thetas[0], p2: thetas[1], p3: thetas[2]})
    return F

rho1=hadamard(error_1(hadamard(error_1(state0))))
rho2=hadamard(error_2(hadamard(error_2(state0))))
rho3=error_3(hadamard(error_3(hadamard(state0))))


sp.pprint(calculate_QFIM([rho1,rho2,rho3],thetas=[0.1,0.2,0.3]))


⎡11.1111111111111   0           0        ⎤
⎢                                        ⎥
⎢       0          6.25         0        ⎥
⎢                                        ⎥
⎣       0           0    4.76190476190476⎦


In [6]:
def calculate_QCRB(F):
    return F.inv().trace()

print(calculate_QCRB(calculate_QFIM([rho1,rho2,rho3],thetas=[0.1,0.2,0.3])))

0.460000000000000


### RLA Move Interpereter

In [30]:
def nodes_to_paths(move_list):
    move_list = [x for x in move_list if x != 0]
    node_list=[((n-1)//4)+1 for n in move_list] # Removes the hadamard gates from the list
    #print(node_list)
    h_list=[((n-1)%4) for n in move_list] # Extracts the hadamard gates from the list
    #print(h_list)

    path_list=[]




    if h_list[0]==0:
        path_list.append(node_list[0])
    
    elif h_list[0]==1:
        path_list.append(4)
        path_list.append(node_list[0])

    elif h_list[0]==2:
        path_list.append(node_list[0])
        path_list.append(4)

    elif h_list[0]==3:
        path_list.append(4)
        path_list.append(node_list[0])
        path_list.append(4)

    if len(node_list)<=1: # Ensures that single node lists are handled correctly
        return -1 # Invalid single node list



    for i in range(1,len(node_list)):

        if h_list[i]==0:
            path_list.append(node_list[i])
    
        elif h_list[i]==1:
            path_list.append(4)
            path_list.append(node_list[i])

        elif h_list[i]==2:
            path_list.append(node_list[i])
            path_list.append(4)

        elif h_list[i]==3:
            path_list.append(4)
            path_list.append(node_list[i])
            path_list.append(4)


        path_list.append(node_list[i])

    path_list.pop() 

    

    return path_list

    # return filtered_list


nodes_to_paths([10,1,7,9,1,10]) 

[4, 3, 1, 1, 2, 4, 2, 3, 3, 1, 1, 4, 3]

### Convert Moves into states

In [34]:
def moves_to_gates(move_list):
    state0 = sp.Matrix([[1, 0], [0, 0]])
    temp_rho=state0 # Initialize the qubit in the 0 state
    x_gate=sp.Matrix(np.matrix([[0,1],[1,0]]))
    h_gate=sp.Matrix(np.matrix([[1,1],[1,-1]]))/np.sqrt(2)
    p1,p2,p3 = sp.symbols('p1 p2 p3') # Parameter symbols
    symbols_list = [p1,p2,p3]
    p1_hat,p2_hat,p3_hat = sp.symbols('p1_hat p2_hat p3_hat')
    
    for move in move_list:
        if move == 1:
            temp_rho=(p1_hat*temp_rho+p1*x_gate*temp_rho*x_gate)
        elif move == 2:
            temp_rho=(p2_hat*temp_rho+p2*x_gate*temp_rho*x_gate)
        elif move == 3:
            temp_rho=(p3_hat*temp_rho+p3*x_gate*temp_rho*x_gate)
        elif move == 4:
            temp_rho=h_gate*temp_rho*h_gate
        else:
            #print("Invalid move detected")
            return -10
        temp_rho=sp.Matrix(temp_rho)
    return temp_rho

print(moves_to_gates([1,2,3,4,1]))


    

Matrix([[p1*(0.5*p3*(p1*p2 + p1_hat*p2_hat) + 0.5*p3*(p1*p2_hat + p1_hat*p2) + 0.5*p3_hat*(p1*p2 + p1_hat*p2_hat) + 0.5*p3_hat*(p1*p2_hat + p1_hat*p2)) + p1_hat*(0.5*p3*(p1*p2 + p1_hat*p2_hat) + 0.5*p3*(p1*p2_hat + p1_hat*p2) + 0.5*p3_hat*(p1*p2 + p1_hat*p2_hat) + 0.5*p3_hat*(p1*p2_hat + p1_hat*p2)), p1*(-0.5*p3*(p1*p2 + p1_hat*p2_hat) + 0.5*p3*(p1*p2_hat + p1_hat*p2) + 0.5*p3_hat*(p1*p2 + p1_hat*p2_hat) - 0.5*p3_hat*(p1*p2_hat + p1_hat*p2)) + p1_hat*(-0.5*p3*(p1*p2 + p1_hat*p2_hat) + 0.5*p3*(p1*p2_hat + p1_hat*p2) + 0.5*p3_hat*(p1*p2 + p1_hat*p2_hat) - 0.5*p3_hat*(p1*p2_hat + p1_hat*p2))], [p1*(-0.5*p3*(p1*p2 + p1_hat*p2_hat) + 0.5*p3*(p1*p2_hat + p1_hat*p2) + 0.5*p3_hat*(p1*p2 + p1_hat*p2_hat) - 0.5*p3_hat*(p1*p2_hat + p1_hat*p2)) + p1_hat*(-0.5*p3*(p1*p2 + p1_hat*p2_hat) + 0.5*p3*(p1*p2_hat + p1_hat*p2) + 0.5*p3_hat*(p1*p2 + p1_hat*p2_hat) - 0.5*p3_hat*(p1*p2_hat + p1_hat*p2)), p1*(0.5*p3*(p1*p2 + p1_hat*p2_hat) + 0.5*p3*(p1*p2_hat + p1_hat*p2) + 0.5*p3_hat*(p1*p2 + p1_hat*p2_hat) +

### Create the reward function

#### Reward function to help learn the rules

In [31]:
def reward_function_easy(node_lists,thetas):
    " Reward function that gives a large negative reward for invalid paths, and otherwise rewards based on QCRB improvement"
    for node_list in node_lists:
        counter=0
        if node_list[0]==0 or node_list[1]==0 or node_list[-1]!=0:
            counter+=1
            #print("Yikes")
        
        found_zero = False
        for val in node_list:
            if val == 0:
                found_zero = True
            elif found_zero and val != 0:
                #print("Invalid: nonzero after zero")
                counter+=1  # or any large negative reward
            
    if counter>0:
        return -counter # Large negative reward for invalid input
    

    # Make sure there are an even number of hadamard gates between nodes
    path_lists=[]
    for node_list in node_lists:
        path_list=nodes_to_paths(node_list)
        if path_list == -1:
            
            #print("Invalid first move")
            return -1 # Invalid path due to invalid first move
            
        elif path_list.count(4) % 2 != 0:
            #print("Odd hadamards")
            return -.5 # Invalid path due to odd number of hadamards
        
        path_lists.append(path_list)
    return 1

#### Reward Function for protocol optimization

In [28]:
def reward(node_lists,thetas):

    #################################
    ## Make sure the protocol is valid
    #################################

    # Make sure that the first element is not 0 and the last element is 0
    for node_list in node_lists:
        if node_list[0]==0 or node_list[1]==0 or node_list[-1]!=0:
            #print("Yikes")
            return -10 # Large negative reward for invalid input
        found_zero = False
        for val in node_list:
            if val == 0:
                found_zero = True
            elif found_zero and val != 0:
                #print("Invalid: nonzero after zero")
                return -10  # or any large negative reward
            
    # Make sure there are an even number of hadamard gates between nodes
    path_lists=[]
    for node_list in node_lists:
        path_list=nodes_to_paths(node_list)
        if path_list == -1:
            
            #print("Invalid first move")
            return -10 # Invalid path due to invalid first move
            
        elif path_list.count(4) % 2 != 0:
            #print("Odd hadamards")
            return -10 # Invalid path due to odd number of hadamards
        
        path_lists.append(path_list)
   
    ################################
    ## Calculate the QFIM
    #################################
    
    rho_1=moves_to_gates(path_lists[0])
    rho_2=moves_to_gates(path_lists[1])
    rho_3=moves_to_gates(path_lists[2])
    
    
    F=calculate_QFIM([rho_1,rho_2,rho_3],thetas=thetas) # Calculate the QFIM for the given states and parameters

    #sp.pprint(F)

    try:
        qcrb= calculate_QCRB(F)
        max_qcrb = 10.0
        reward_val=1-qcrb/max_qcrb # Higher reward for lower qcrb, normalized between 0 and 1
        reward_val = np.clip(reward_val, 0, 1)  # Ensure reward is within [0, 1]

        
    except Exception as e:
        # If F is singular or not invertible, give a large negative reward
        reward_val = -10

    return float(reward_val)


#### Test the reward function

In [None]:
thetas=[0.1,0.2,0.3]
## Catch invalid moves
p1=[10,11,0,0,0,0]
p2=[6,6,0,0,0,0]
p3=[2,3,0,0,0,0]

print(nodes_to_paths(p1))
print(nodes_to_paths(p2))
print(nodes_to_paths(p3))

print(reward([p1,p2,p3],thetas))
print(reward_function_easy([p1,p2,p3],thetas))



[4, 3, 3, 4, 3, 1]
[4, 2, 4, 2]
[4, 1, 1, 4]
-10
1


## Train the RL Agent

### Create the environments

In [None]:
class QubitMoverEnv_easy(gym.Env):
    """
    Custom Environment for the Qubit Mover RL agent.
    - 3 qubits, each with a path of length 10
    - Each step, the agent selects a node (1-3) or 0 (measure) for each qubit as well as whether to apply a hadamard gate (4)
    - Episode ends after a fixed number of steps or when all qubits are measured
    """
    metadata = {"render_modes": ["human"], "render_fps": 4}

    def __init__(self):
        super().__init__()
        self.n_qubits = 3
        self.max_steps = 10
        self.thetas = np.random.uniform(0.05, 0.5, 3).tolist()

        # Each qubit can be at node 0, 1, 2, or 3 at each step
        self.action_space = spaces.MultiDiscrete([3] * self.n_qubits)


        # Observation space: qubit states + theta values
        # Qubit states: (n_qubits * max_steps), Theta: (3,)
        low = np.zeros(self.n_qubits * self.max_steps + 3)
        high = np.concatenate([np.full(self.n_qubits * self.max_steps, 13), np.ones(3)])
        self.observation_space = spaces.Box(low=low, high=high, dtype=np.float32)

        self.reset()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.thetas = np.random.uniform(0.05, 0.5, 3).tolist() 
        self.state = np.zeros((self.n_qubits, self.max_steps), dtype=np.int32)
        self.state=self.state.flatten()
        self.current_step = 0
        self.done = False
        obs = np.concatenate([self.state, np.array(self.thetas, dtype=np.float32)]).astype(np.float32)
        return obs, {}


    def step(self, action):
        if self.done:
            raise RuntimeError("Episode is done")

        # Record the action for each qubit at this step
        # Update the observation space
        for q in range(self.n_qubits):
            self.state[q * self.max_steps + self.current_step] = action[q]

        self.current_step += 1

        # If all qubits have been measured (0) or max_steps reached, episode is done
        state_2d = self.state.reshape(self.n_qubits, self.max_steps)
        all_measured = np.all(state_2d[:, self.current_step-1] == 0)

        self.done = all_measured or self.current_step >= self.max_steps

        reward_val = 0
        
        # The reward is only calculated at the end of the episode
        if self.done:
            # Convert state to list of lists for your reward function
            node_lists = [list(state_2d[q]) for q in range(self.n_qubits)]
            reward_val = reward_function_easy(node_lists, self.thetas)
            reward_val+=0.5
        else:
            if self.current_step == 1 and np.any(((np.array(action) - 1) % 4 != 0) & ((np.array(action) - 1) % 4 != 1)):

                reward_val = -1  # Or a step penalty if desired
                self.done=True

            if self.current_step == 1 and not action.all():
                
                reward_val = -1
                self.done=True



        




        self.state=self.state.flatten()
        obs = np.concatenate([self.state, np.array(self.thetas, dtype=np.float32)]).astype(np.float32)

        return obs.copy(), reward_val, self.done, False, {}

    def render(self):
        print("Step:", self.current_step)
        print("State:\n", self.state)

    def close(self):
        pass

# Example usage:
# env = QubitMoverEnv()
# obs, info = env.reset()
# done = False
# while not done:
#     action = env.action_space.sample()
#     obs, reward, done, truncated, info = env.step(action)
#     env.render()

#### Harder Environment

In [9]:
class QubitMoverEnv(gym.Env):
    """
    Custom Environment for the Qubit Mover RL agent.
    - 3 qubits, each with a path of length 10
    - Each step, the agent selects a node (1-3) or 0 (measure) for each qubit as well as whether to apply a hadamard gate (4)
    - Episode ends after a fixed number of steps or when all qubits are measured
    """
    metadata = {"render_modes": ["human"], "render_fps": 4}

    def __init__(self):
        super().__init__()
        self.n_qubits = 3
        self.max_steps = 10
        self.thetas = np.random.uniform(0.05, 0.5, 3).tolist()

        # Each qubit can be at node 0, 1, 2, or 3 at each step
        self.action_space = spaces.MultiDiscrete([13] * self.n_qubits)


        # Observation space: qubit states + theta values
        # Qubit states: (n_qubits * max_steps), Theta: (3,)
        low = np.zeros(self.n_qubits * self.max_steps + 3)
        high = np.concatenate([np.full(self.n_qubits * self.max_steps, 13), np.ones(3)])
        self.observation_space = spaces.Box(low=low, high=high, dtype=np.float32)

        self.reset()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.thetas = np.random.uniform(0.05, 0.5, 3).tolist() 
        self.state = np.zeros((self.n_qubits, self.max_steps), dtype=np.int32)
        self.state=self.state.flatten()
        self.current_step = 0
        self.done = False
        obs = np.concatenate([self.state, np.array(self.thetas, dtype=np.float32)]).astype(np.float32)
        return obs, {}


    def step(self, action):
        # action: array of length 3, each in {0,1,2,3}
        if self.done:
            raise RuntimeError("Episode is done")

        # Record the action for each qubit at this step
        # Update the observation space
        for q in range(self.n_qubits):
            self.state[q * self.max_steps + self.current_step] = action[q]

        self.current_step += 1

        # If all qubits have been measured (0) or max_steps reached, episode is done
        state_2d = self.state.reshape(self.n_qubits, self.max_steps)
        all_measured = np.all(state_2d[:, self.current_step-1] == 0)

        self.done = all_measured or self.current_step >= self.max_steps

        reward_val = 0
        
        # The reward is only calculated at the end of the episode
        if self.done:
            # Convert state to list of lists for your reward function
            node_lists = [list(state_2d[q]) for q in range(self.n_qubits)]
            reward_val = reward(node_lists, self.thetas)
        else:
            reward_val = 0  # Or a step penalty if desired

        self.state=self.state.flatten()
        obs = np.concatenate([self.state, np.array(self.thetas, dtype=np.float32)]).astype(np.float32)

        return obs.copy(), reward_val, self.done, False, {}

    def render(self):
        print("Step:", self.current_step)
        print("State:\n", self.state)

    def close(self):
        pass

# Example usage:
# env = QubitMoverEnv()
# obs, info = env.reset()
# done = False
# while not done:
#     action = env.action_space.sample()
#     obs, reward, done, truncated, info = env.step(action)
#     env.render()

#### Verify the environment works

In [52]:
from stable_baselines3.common.env_checker import check_env

# Instantiate the environment
env = QubitMoverEnv()
env_easy=QubitMoverEnv_easy()

# Check if the environment follows the Gymnasium API and is compatible with Stable Baselines3
check_env(env, warn=True)
print("Hard Environment check passed!")
check_env(env_easy, warn=True)


print("Easy Environment check passed!")

Hard Environment check passed!
Easy Environment check passed!


  gym.logger.warn(
  gym.logger.warn(


### Create the agent

In [53]:
# Create and train the RL agent using PPO

# Instantiate the environment
env_easy = QubitMoverEnv_easy()

# Create the PPO agent
model = PPO(
    "MlpPolicy",      # Use a multilayer perceptron policy
    env_easy,              # Pass your custom environment
    verbose=1,        # Print training progress
    tensorboard_log="./ppo_qubit_mover2_tensorboard/"  # Optional: log for TensorBoard
)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


### Train the agent

In [54]:
# Initialize evaluation callback
# Create a callback that stops training when mean reward > threshold

# Evaluate every 1000 steps, use stop_callback to halt training
eval_callback = EvalCallback(
    env_easy,
    eval_freq=1000,
    best_model_save_path='/best_model/',
    verbose=1
)


# Train the agent
model.learn(total_timesteps=400_000)  # You can increase timesteps for better training



timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

# Save the trained model
model.save(f"agents_2/ppo_qubit_mover_agent_{timestamp}")


# To load and use the agent later:
# model = PPO.load("ppo_qubit_mover_agent", env=env)

Logging to ./ppo_qubit_mover2_tensorboard/PPO_8
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1.81     |
|    ep_rew_mean     | -0.955   |
| time/              |          |
|    fps             | 2579     |
|    iterations      | 1        |
|    time_elapsed    | 0        |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 1.81        |
|    ep_rew_mean          | -0.955      |
| time/                   |             |
|    fps                  | 1909        |
|    iterations           | 2           |
|    time_elapsed         | 2           |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.018154606 |
|    clip_fraction        | 0.15        |
|    clip_range           | 0.2         |
|    entropy_loss         | -7.69       |
|    explained_variance 

KeyboardInterrupt: 