In [None]:
# 라이브러리 임포트 
from qiskit import QuantumCircuit
from qiskit.quantum_info import Clifford
from qiskit.quantum_info.random import random_clifford
from qiskit.synthesis.clifford import synth_clifford_greedy

import gymnasium
from gymnasium import spaces

from torch import nn
import torch

import numpy as np

from ray.rllib.algorithms.ppo import PPO
import ray
from ray import tune
from ray.tune.registry import register_env
from ray.train import Checkpoint

In [None]:
# Clifford Circuit Synthesis 환경 정의
class ENV_v1(gymnasium.Env):
  def __init__(self, n_qubits):
    self.n_qubits = n_qubits
    self.max_diff = 1024 # diff 임계값
    self.batch_size = 100 # 만드는 총 op 개수 
    self.threshold = 70 # diff 증가하는 임계값 
    self.difficulty = 1
    self.T = 100  # 한 op당 최대 gate 수 

    self.result = []
    self.target_gate_set = ['h','s','cx']
                                                         # gate종류           control            cx 관련, 0이면 감소, 1이면 증가
    self.action_space = spaces.MultiDiscrete([len(self.target_gate_set)] + [self.n_qubits] + [2])
    self.observation_space = spaces.Box(low=0, high=1, shape=(2*n_qubits, 2*n_qubits), dtype=int)

    self.logger = open("your_logging_file1",'w')
    self.logger2 = open("your_logging_file2",'w')
    self.logger3 = open("your_logging_file3",'w')
    self.reset()

  
  def reset(self, *, seed=None, options=None):
    self.init_gate_count = [[0 for _ in range(3)] for _ in range(self.batch_size)]
    self.gate_count = [[0 for _ in range(3)] for _ in range(self.batch_size)]
    self.batch_op = []
    self.current_idx = 0
    self.current_steps = 0
    self.success_num = 0
    self.gate_seq = []
    self.init_depth = []
    self.old_cx = 0 # target cx개수
    self.new_cx = 0 # synthesis cx개수
    self.dp = 0 # synthesis depth
    self.dp_old = 0 # target depth - 비교군
    self.syn_gate_cnt = 0 # 합성 성공한 총 gate 개수
    if (self.difficulty < self.max_diff):
      for i in range(self.batch_size):
        qc = QuantumCircuit(self.n_qubits, self.n_qubits)
        for _ in range(self.difficulty):
          choose = np.random.randint(0,3)
          control_bit = np.random.randint(0, self.n_qubits)
          if choose == 0:
            qc.h(control_bit)
            self.init_gate_count[i][0] += 1
          elif choose == 1:
            qc.s(control_bit)
            self.init_gate_count[i][1] += 1
          elif choose == 2:
            while True:
              target_bit = np.random.randint(0, 2) # 0이면 감소, 1이면 증가
              if target_bit == 0:
                if control_bit != 0: break
              else:
                if control_bit != (self.n_qubits-1): break 
            if target_bit == 0: qc.cx(control_bit, control_bit-1)
            elif target_bit == 1: qc.cx(control_bit, control_bit+1)  
            self.init_gate_count[i][2] += 1

        self.batch_op.append(qc)
        self.init_depth.append(qc.depth())
      return_state = Clifford(self.batch_op[self.current_idx]).tableau[:, 0:2*self.n_qubits].astype(int)
    else:
      for i in range(self.batch_size):
        temp = random_clifford(self.n_qubits)
        self.batch_op.append(temp.to_circuit())
        self.init_depth.append(temp.to_circuit().depth())

        # cx 개수 변화만 비교
        if(temp.to_circuit().count_ops().get('cx') != None):
          a = temp.to_circuit().count_ops().get('cx')
        self.init_gate_count[i][2] = a
      return_state = Clifford(self.batch_op[self.current_idx]).tableau[:, 0:2*self.n_qubits].astype(int)
    return return_state, {}


  def step(self, action):
    self.mat = self.batch_op[self.current_idx]
    IDENTITY = np.eye(2*self.n_qubits)
    self.old_close = np.linalg.norm(Clifford(self.mat).tableau[:, 0:2*self.n_qubits].astype(int) - IDENTITY)
    if action[0] == 0: 
      self.mat.h(action[1])
      self.gate_count[self.current_idx][0] += 1
      self.gate_seq.append((0,action[1]))
    elif action[0] == 1:
      self.mat.s(action[1])
      self.gate_count[self.current_idx][1] += 1
      self.gate_seq.append((1,action[1]))
    elif action[0] == 2:
      if action[2] == 0:
        if action[1] != 0:
          self.mat.cx(action[1], action[1]-1)
          self.gate_count[self.current_idx][2] += 1
          self.gate_seq.append((2,action[1],action[1]-1))
      elif action[2] == 1:
        if action[1] != (self.n_qubits-1):
          self.mat.cx(action[1], action[1]+1)
          self.gate_count[self.current_idx][2] += 1
          self.gate_seq.append((2,action[1],action[1]+1))
      
    self.batch_op[self.current_idx] = self.mat
    self.current_steps += 1
    done = False
    reward = 0
    # 각 원소 제곱의 합 -> 최댓값 64 in 4 qubit
    self.close = np.linalg.norm(Clifford(self.mat).tableau[:, 0:2*self.n_qubits].astype(int) - IDENTITY)  
    gate_diff = np.array(self.gate_count[self.current_idx])
    penalties = np.array([-0.1,-0.1,-1])
    reward += sum(gate_diff * penalties)/10.0    # 게이트 많으면 많을수록 패널티 
    if (self.old_close - self.close > 0):
      reward += 30

    # 합성에 성공하면
    if((self.close == 0)):
      reward += ((self.T-self.current_steps)*(30-(sum(gate_diff*penalties)/10.0)))
      self.old_cx += self.init_gate_count[self.current_idx][2]
      self.new_cx += self.gate_count[self.current_idx][2]
      self.dp += self.mat.depth()
      self.syn_gate_cnt += sum(self.gate_count[self.current_idx])
      self.dp_old += self.init_depth[self.current_idx]
      self.success_num += 1
      self.current_steps = 0
      self.gate_seq = []
      if(self.current_idx < self.batch_size-1):
        self.current_idx += 1
      else: done = True
    else:
      if(self.current_steps > self.T):
        if(self.current_idx < self.batch_size-1):
          self.current_idx += 1
          self.current_steps = 0
          self.gate_seq = []
        else: done = True

    if(done):
      self.logger.write(f"ratio : {self.success_num / self.batch_size}, diff : {self.difficulty}\n")
      self.logger.flush()
      self.logger2.write(f"target: {self.old_cx}, synthesis: {self.new_cx}\n")
      self.logger2.flush()
      self.logger3.write(f"depth: {self.dp_old} -> {self.dp}, total_gate: {self.syn_gate_cnt} \n")
      self.logger3.flush()

    if((self.success_num >= self.threshold) and (done == True)):
      if(self.difficulty < self.max_diff):
        self.curr_diff_epi = 0
        self.difficulty += 1
    
    return_state = Clifford(self.batch_op[self.current_idx]).tableau[:, 0:2*self.n_qubits].astype(int)
    return return_state, reward, done, False, {}

  def render(self, mode='human'):
    return self.result

In [None]:
# 강화학습 시작
def env_creator1(env_config):
  return ENV_v1(n_qubits=2)

register_env("Q-v2", env_creator2)

ray.init(num_cpus = 4, ignore_reinit_error=True)

config = {
  "env": "Q-v2",
  "num_workers": 0,
  "framework": "torch",
  "sgd_minibatch_size": 10,      # sgd 사이즈; epoch 당 minibatch 사이즈
  "num_sgd_iter": 5,             # sgd 반복 횟수; train 당 epoch 횟수
  "lr": 0.0001,                  # 학습률
  "gamma": 0.99,                 # 할인률
  "lambda": 0.95,                # GAE param, 1에 가까울수록 장기적 보상 더 많이 반영
  "clip_param": 0.1,             # cliping 값, PPO surrogate param
  "model":{
    "fcnet_hiddens": [1000,1000],
    "fcnet_activation": "relu",
    "fcnet_weights_initializer": "xavier",
    # output channel, kernel, stride
    "conv_filters": [1024, [4,4], 2],
    "conv_activation": "relu",
  },
}

In [None]:
analysis = tune.run(
  PPO,
  config=config,    
  stop={"training_iteration": 10000},
  checkpoint_at_end=True,
  checkpoint_freq=10,
  verbose=2,
)