<a href="https://colab.research.google.com/github/physicsllama/rl-quantum/blob/main/Fitted%20Q-Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Implements fitted Q-learning

In [2]:
import numpy as np
import cmath
import random
from gym import Env
from gym.spaces import Discrete, Box, Dict
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Input, Dropout

# Defining the qubit environement

In [24]:
qubit_eps = 0.01

# Defining parts of the environment to deal with "continuous" variables (angles)
samples = 20
theta = np.linspace(0, np.pi, samples)
phi = np.linspace(0, 2*np.pi, samples)

# Fixed initial and final angles. Use theta1 only for initial testing.
theta1 = random.choice(theta)
phi1 = random.choice(phi)
theta2 = random.choice(theta)
phi2 = random.choice(phi)

final_qubit = np.array([np.cos(theta2 / 2), np.exp(1j * phi2) * np.sin(theta2 / 2)])

class QubitUnitary(Env):
    def __init__(self):
        # What we can observe: a single qubit, # characterized by 2 real angles on the Bloch sphere.
        self.observation_space = Box(low=np.array([0, 0]), 
                                     high=np.array([np.pi, 2*np.pi]), shape=(2,))
        
        # What actions we can take: choose a unitary along an axis (y or z) 
        # and a real angle of rotation around that axis.
        self.action_space = Dict({"axis": Discrete(2),
                                  "angle": Box(low=0, high=2*np.pi, shape=(1,))})
        
        #Starting qubit. Can be the same every time, or different every time.
        self.theta = theta1
        self.phi = phi1
        self.state = np.array([self.theta, self.phi])
        self.qubit = np.array([np.cos(self.theta/2), np.exp(1j*self.phi)*np.sin(self.theta/2)])
        
        # Maximum number of unitaries before resetting
        self.max_unitaries = 10
    
    def rot(self, axis, angle):
        # If axis = 0, rotate y. if axis = 1, rotate z. Return the unitary for rotation along correct axis and angle.
        if axis == 0:
          return np.array([[np.cos(angle / 2), -np.sin(angle / 2)], [np.sin(angle / 2), np.cos(angle / 2)]])
        elif axis == 1:
          return np.array([[np.exp(-1j * angle / 2), 0], [0, np.exp(1j * angle / 2)]])
        else:
          return None

    def qubit_fidelity(self, qubit1, qubit2):
      return np.dot(qubit1, np.conjugate(qubit2)) * np.dot(np.conjugate(qubit1),qubit2)

    def step(self, action):
        # Update the qubit depending on what action we took
        self.qubit = np.dot(self.rot(action["axis"], action["angle"][0]), self.qubit)

        # Update parameters and state
        self.theta = 2 * np.arccos(np.abs(self.qubit[0])-0.0001)
        self.phi = (cmath.phase(self.qubit[1]) - cmath.phase(self.qubit[0]))%(2*np.pi)
        self.state = [self.theta, self.phi]

        # Decrease unitaries remaining
        self.max_unitaries -= 1

        # Reward step. Use criterion of distance to final qubit. This part can be experimented with.
        if self.qubit_fidelity(self.qubit, final_qubit) > (1 - qubit_eps):
            reward = 10
        else:
            reward = -1
            
        # Check if we've used up all unitaries:
        if self.qubit_fidelity(self.qubit,final_qubit)>(1-qubit_eps) or self.max_unitaries<=0:
            done = True
        else:
            done = False
        # Maybe use info for rendering!
        info = {}

        return self.state, reward, done, info

    def render(self):
        pass

    def reset(self):
        # Reset qubit to initial state.
        self.theta = random.choice(theta)
        self.phi = random.choice(phi)
        #self.theta = theta1
        #self.phi = phi1
        self.state = np.array([self.theta, self.phi])
        self.qubit = np.array([np.cos(self.theta/2), np.exp(1j*self.phi)*np.sin(self.theta/2)])

        # Reset max number of unitaries
        self.max_unitaries = 10

In [25]:
env = QubitUnitary()



# Neural network for learning Q from data

In [26]:
# Define the model. Another possibility: have the model be outside, and just re-train or something like that as we expand data? (Instead of training from scratch every time.)

def train_Q(data_x, data_y):
  '''
  Trains a neural network to find the value function Q(s, a).
  data_x must be an array of length N containing tuples of the form [s1, s2, a1, a2_1, a2_2, a2_3] where a2_i is a one-hot encoding of discrete a2 variables.
  data_y must be an array of length N containing numbers Q, corresponding to each tuple.
  Test out on simple case before trying it for real!
  '''
  #data_size = np.shape(data_x)[0]

  # Normalize data. Use one-hot encoding to turn discrete axes into appropriate variables.

  '''
  #Start defining the input tensor:
  inpTensor = Input((6,))   

  #create the layers and pass them the input tensor to get the output tensor:    
  hidden1Out = Dense(units=3)(inpTensor)       
  finalOut = Dense(units=1)(hidden1Out)   

  #define the model's start and end points    
  model = Model(inpTensor,finalOut)

  '''
  # Set up neural network
  model = Sequential()
  model.add(Dense(5, input_shape=(5,), activation = 'relu')) 
  model.add(Dense(5, activation = 'relu'))
  model.add(Dropout(0.5))
  model.add(Dense(6, activation = 'relu'))
  model.add(Dropout(0.5))
  model.add(Dense(1))

  # Compile model.
  opt = tf.keras.optimizers.Adam(learning_rate=0.2) # Adjust parameters as needed. 
  model.compile(optimizer=opt, loss='mse')

  # Fit the model.
  model.fit(data_x, data_y) # Set verbose = 0 if I'm not interested in progress updates. Play with batch size, etc.

  # Make a prediction; will return model, and need to call function.predict(x) to find Q. Might be better to simply define model = function(data), and use that model to predict in main code.
  return model

# RL algorithm for Q-learning

In [27]:
full_x_data = []
for i in range(samples):
  for j in range(samples):
    for k in range(samples):
      for l in [0,1]:
          full_x_data.append(np.array([theta[i] / np.pi, phi[j] / (2.0 * np.pi), phi[k] / (2.0 * np.pi), int(l==0), int(l==1)]))

In [28]:
Q_Data = []
# Q is organized as [theta, phi, angle, axis]
Q_Full = np.zeros((samples **3 * 2,1))
Q_Matrix = np.reshape(Q_Full, (samples,samples,samples,2,1))
# Maybe class is better way to deal with Q_Data or Q_Full!

In [29]:
# Use epsilon-greedy strategy to explore space of possibilities and update Q values.
def select_action(s, epsilon):
  p = random.random()
  if p < epsilon:
    return env.action_space.sample()
  else:
    # Indices associated to s1, s2.
    s1 = int( np.floor((samples-1)*s[0]/np.pi))
    s2 = int( np.floor((samples-1)*s[1]/(2.0*np.pi)))
    # Best index from argmax of q.
    Q_Matrix = np.reshape(Q_Full, (samples,samples,samples,2,1))
    best_index = np.unravel_index(Q_Matrix[s1][s2].argmax(), Q_Matrix[s1][s2].shape)
    best_angle = phi[best_index[0]]
    best_axis = best_index[1]
    best_action = {'axis': best_axis, 'angle': [best_angle]}
    return best_action

In [30]:
# Update Q step.
def update_Q(s, a, alpha, gamma):
  # Indices corresponding to s, a.
  s1 = int( np.floor((samples-1)*s[0]/np.pi))
  if s1 >= samples or s1 <0:
    print('Disaster! Value of s1 is' + str(s1))
    print(s[0])
  s2 = int( np.floor((samples-1)*s[1]/(2.0*np.pi)))
  if s2 >= samples or s1 <0:
    print('Disaster! Value of s2 is' + str(s2))
    print(s[1])
  a1 = int( np.floor((samples-1)*a['angle'][0]/(2.0*np.pi)))
  a2 = a['axis']
  # Take step, find new indices.
  new_s, reward, done, info = env.step(a)
  new_s1 = int( np.floor((samples-1)*new_s[0]/np.pi)) 
  if new_s1 >= samples or new_s1 <0:
    print('Disaster! Value of new s1 is' + str(new_s1))
    print(new_s[0])
  new_s2 = int( np.floor((samples-1)*new_s[1]/(2.0*np.pi))) 
  if new_s2 >= samples or new_s2 <0:
    print('Disaster! Value of new s2 is' + str(new_s2))
    print(new_s[1])
  old_Q = Q_Matrix[s1][s2][a1][a2]
  Q_Matrix[s1][s2][a1][a2]=(1-alpha)*old_Q+alpha*(reward+gamma*np.max(Q_Matrix[new_s1][new_s2]))
  Q_Data.append([s[0], s[1], a['angle'][0], a2, Q_Matrix[s1][s2][a1][a2]])
  return done

# Preprocessing data for neural network

In [32]:
# Update Data set with warm-up and random exploration. Set epsilon to be close to 1.
warmup_steps = 5000
counter = 0
Q_Data = []
while warmup_steps > 0:
  env.reset()

  # Now run loop where we get to explore until done! So 10 unitaries essentially. Set reward to be 0 initially.
  Done = False

  while not Done:
    counter += 1
    s = env.state
    action = select_action(s, 0.99)
    Done = update_Q(s, action, 0.99, 0.5) # Tweak alpha and gamma here! 

  warmup_steps -= 1

print(counter)
print(np.shape(Q_Data))

48326
(48326, 5)


  result = asarray(a).shape


In [12]:
# Write data in friendly way for neural network. Normalize data, use one-hot encoding for discrete variables.
def translate_data(dat):
  training_x = []
  training_y = []
  for i in range(np.shape(dat)[0]):
    training_s1 = Q_Data[i][0] / (np.pi)
    training_s2 = Q_Data[i][1] / (2 * np.pi)
    training_a1 = Q_Data[i][2] / (2 * np.pi)
    training_a21 = int(Q_Data[i][3] == 0)
    training_a22 = int(Q_Data[i][3] == 1)
    training_x.append([training_s1, training_s2, training_a1, training_a21, training_a22])
    training_y.append(Q_Data[i][4])

  training_x = np.array(training_x)
  training_y = np.array(training_y)
  return training_x, training_y

# Training

In [33]:
burnin_runs = 1
eps = 0.99
while burnin_runs >= 0:

  epochs = 100000
  X = translate_data(Q_Data)
  mod = train_Q(X[0], X[1])
  x = np.expand_dims(full_x_data, axis=-1)
  Q_Full = mod.predict(x)
  Q_Matrix = np.reshape(Q_Full, (samples,samples,samples,2,1))
  Q_Data = []
  while epochs >= 0:
    env.reset()
    # Now run loop where we get to explore until done! So 10 unitaries essentially.
    Done = False
    while not Done:
      s = env.state
      action = select_action(s, eps)
      Done = update_Q(s, action, 0.99, 0.5) # Tweak alpha and gamma here! 

    epochs -= 1
  burnin_runs -= 1

  result = asarray(a).shape




In [34]:
# Do many loops of burnin at first! 

runs = 10
eps = 0.7
print(np.shape(Q_Data))
while runs >= 0:

  epochs = 50000
  X = translate_data(Q_Data)
  mod = train_Q(X[0], X[1])
  x = np.expand_dims(full_x_data, axis=-1)
  Q_Full = mod.predict(x)
  Q_Matrix = np.reshape(Q_Full, (samples,samples,samples,2,1))
  Q_Data = []
  while epochs >= 0:
    env.reset()
    # Now run loop where we get to explore until done! So 10 unitaries essentially.
    Done = False
    while not Done:
      s = env.state
      action = select_action(s, eps)
      Done = update_Q(s, action, 0.99, 0.5) # Tweak alpha and gamma here! 

    epochs -= 1

  eps -= 0.1
  runs -= 1

  result = asarray(a).shape


(961859, 5)


# Results

In [45]:
total = 1000
numb_yes_training = []

while total >= 0:  
  env.reset()

  Done = False
  counter = 0
 
  while not Done:
    counter += 1
    s = env.state
    #print(env.qubit)
    action = select_action(s, 0)
    st, r, Done, info = env.step(action)
    #print(env.qubit)
    #print(action)
    
  total -= 1
  
  numb_yes_training.append(counter)

success = 0
for i in numb_yes_training:
  if i <=9: #WHY IS TWO ENOUGH? 
    success += 1

print(success / 1000)

0.05
