In [None]:
This first and second part and of the code is from Qianxu, the author of the paper "Constant-overhead fault-tolerant quantum computation with reconfigurable atom arrays."

In [1]:

import re
import numpy as np
import os, sys
module_path = os.path.abspath(os.path.join('./src/'))
if module_path not in sys.path:
        sys.path.append(module_path)
from ldpc.bposd_decoder import BpOsdDecoder
from ldpc.bplsd_decoder import BpLsdDecoder
from scipy.sparse import csc_matrix
class BPOSD_Decoder():
    def __init__(self, h:np.ndarray, channel_probs:np.ndarray, max_iter:int, bp_method:str, 
                ms_scaling_factor:float, osd_method:str, osd_order:int):
        
        self.decoder = BpOsdDecoder(
                h,
                channel_probs=channel_probs,
                max_iter=max_iter,
                bp_method=bp_method,
                ms_scaling_factor=ms_scaling_factor,
                osd_method=osd_method,
                osd_order=osd_order, )
        self.h = h
    
    def decode(self, synd:np.ndarray):
        self.decoder.decode(synd)
        return self.decoder.osdw_decoding
def GenDecodingGraphs(detector_error_model:str, num_logicals:int):
    items = detector_error_model.split('\n')
    errors = [item for item in items if 'error' in item]
    detectors = [item.split()[1] for item in items if 'detector' in item and 'shift' not in item]

    combined_detectors = detectors
    combined_errors = []
    for error in errors:
        error_list = error.split()
        #error_p = float(re.findall("\d+\.\d+", error_list[0])[0])
        non_e_pattern = "\d+\.\d+"
        e_pattern = r'([\d]+\.[\d]+e-[\d]+)'
        e_matches = re.findall(e_pattern, error_list[0])
        if e_matches:
            error_p = float(e_matches[0])
        else:
            non_e_matches = re.findall("\d+\.\d+", error_list[0])
            error_p = float(non_e_matches[0])

        detectors = error_list[1:]
        flipped_logicals = [item for item in error_list if 'L' in item]
        error_dict = {'p':error_p, 'detectors':detectors, 'logicals':flipped_logicals}
        combined_errors.append(error_dict)
    
    # construct the joint check matrix
    H_joint = np.zeros([len(combined_detectors), len(combined_errors)])
    for i in range(len(combined_detectors)):
        for j in range(len(combined_errors)):
            if combined_detectors[i] in combined_errors[j]['detectors']:
                H_joint[i,j] = 1
    # construct the joint logical correction matrix
    logicals = ['L'+str(i) for i in range(num_logicals)]
    L_joint = np.zeros([len(logicals), len(combined_errors)])
    for i in range(len(logicals)):
        for j in range(len(combined_errors)):
            if logicals[i] in combined_errors[j]['logicals']:
                L_joint[i,j] = 1
    
    channel_prob_joint = [error['p'] for error in combined_errors]
        
    return H_joint, L_joint, channel_prob_joint
class Matching_Decoding():
    def __init__(self):
        self.matching = None
        self.L = None
        
    
    def from_detector_error_model(self, dem=None, num_logicals=None):
        # generate the decoding graphs
        H, L, channel_prob = GenDecodingGraphs(str(dem), num_logicals=num_logicals)
        self.L = L
        channel_prob = np.array(channel_prob)
        weights = np.log((1-channel_prob)/channel_prob)
        self.matching = pymatching.Matching.from_check_matrix(csc_matrix(H), weights=weights, faults_matrix=csc_matrix(L))       
        
    def decode_batch1(self, detector_vals):
        detector_historys = [1.0*detector_val for detector_val in detector_vals]
        logical_cors = []
        for i in range(len(detector_historys)):
            detector_values = detector_historys[i]
            detector_values =  detector_values.T%2
            detector_values = detector_values.reshape(1, -1)
            cor = self.matching.decode_batch(detector_values)
            logical_cors.append(cor)

        return np.vstack(logical_cors)
class BPOSD_Decoding():
    def __init__(self, decoder_params={'max_iter':100, 'bp_method':'min_sum', 'ms_scaling_factor':0.9, 'osd_method':"osd_e", 'osd_order':6}):
        self.decoder = None
        self.L = None
        self.decoder_params = decoder_params
    
    def from_detector_error_model(self, dem=None, num_logicals=None):
        # generate the decoding graphs
        H, L, channel_prob = GenDecodingGraphs(str(dem), num_logicals=num_logicals)
        H = H.astype(int)
        
        self.L = L.astype(int)
        
        max_iter = self.decoder_params['max_iter']
        bp_method = self.decoder_params['bp_method']
        ms_scaling_factor = self.decoder_params['ms_scaling_factor']
        osd_method = self.decoder_params['osd_method']
        osd_order = self.decoder_params['osd_order']
        self.decoder = BPOSD_Decoder(h=H, channel_probs=channel_prob,
                                    max_iter=max_iter,
                                    bp_method=bp_method,
                                    ms_scaling_factor=ms_scaling_factor,
                                    osd_method=osd_method,
                                    osd_order=osd_order)
        
    def decode_batch(self, detector_vals):
        detector_historys = [1.0*detector_val for detector_val in detector_vals]
        logical_cors = []
        for i in range(len(detector_historys)):
            detector_values = detector_historys[i]   
            cor = self.decoder.decode(detector_values)
            log_cor = self.L@cor%2
            logical_cors.append(log_cor.astype(int))

        return np.vstack(logical_cors)


  non_e_pattern = "\d+\.\d+"
  non_e_matches = re.findall("\d+\.\d+", error_list[0])


In [2]:
import numpy as np
import networkx as nx
from networkx.algorithms import bipartite
import copy
import random


def max_degree(graph):
    return max(list(dict(graph.degree).values()))

def BipartitieGraphFromCheckMat(H):
    num_checks, num_bits = H.shape
    C_nodes = list(-np.arange(1, num_checks + 1))
    V_nodes = list(np.arange(1, num_bits + 1)) # 如果是 scipy.sparse 矩阵，我们需要转换

    edges = [(-(i + 1), j + 1) for i in range(num_checks) for j in range(num_bits) if H[i][j] == 1]
    
    G = nx.Graph()
    G.add_nodes_from(C_nodes, bipartite=0)
    G.add_nodes_from(V_nodes, bipartite=1)
    G.add_edges_from(edges)
    
    return G

def best_match(graph):
    C_nodes = list({n for n, d in graph.nodes(data=True) if d["bipartite"] == 0})
    V_nodes = list(set(graph) - set(C_nodes))

    return bipartite.matching.hopcroft_karp_matching(graph, C_nodes)

# Coloration circuit
def TransformBipartiteGraph(G):
    # transform any bipartite graph to a symmetric one by adding dummy vertices and edges
    G_s = copy.deepcopy(G)
    C_nodes = list({n for n, d in G.nodes(data=True) if d["bipartite"] == 0})
    V_nodes = list(set(G) - set(C_nodes))
    
    # Suppose C_nodes all have degree Delta_c, and # V_nodes > # C_nodes
    # Add dummy vertices to C_nodes
    C_nodes_dummy = list(-np.arange((len(C_nodes) + 1), len(V_nodes) + 1))
    G_s.add_nodes_from(C_nodes_dummy, bipartite=0)
    
    # Add dummy edges between edges with degree < Delta_c
    Delta = max_degree(G_s)
#     print('max degree:', Delta)
    open_degree_nodes = copy.deepcopy(dict((node, degree) for node, degree in dict(G_s.degree()).items() if degree < Delta))
            
    while len(open_degree_nodes) > 0:       
        for node1 in list(open_degree_nodes.keys()):
            if node1 < 0:
                c_node = node1
                for node2 in list(open_degree_nodes.keys()):
                    if node2 > 0:
                        v_node = node2
                        if not G_s.has_edge(c_node, v_node):
                            G_s.add_edge(c_node, v_node)
                            
                            if open_degree_nodes[c_node] + 1 == Delta:
                                open_degree_nodes.pop(c_node)
                            else:
                                open_degree_nodes[c_node] = open_degree_nodes[c_node] + 1

                            if open_degree_nodes[v_node] + 1 == Delta:
                                open_degree_nodes.pop(v_node)
                            else:
                                open_degree_nodes[v_node] = open_degree_nodes[v_node] + 1
                            
                            break            
                        
            
    return G_s


def edge_corloring(graph):
    matches_list = []
    g = copy.deepcopy(graph)
    g_s = TransformBipartiteGraph(g)
    
    number_colors = max_degree(g_s)
    
    C_nodes = list({n for n, d in g.nodes(data=True) if d["bipartite"] == 0})
    V_nodes = list(set(g) - set(C_nodes))
    
    C_nodes_s = list({n for n, d in g_s.nodes(data=True) if d["bipartite"] == 0})
    V_nodes_s = list(set(g_s) - set(C_nodes_s))

    while len(g_s.edges()) > 0:
#         print('NEXT COLOR')
        bm=best_match(g_s)
#         print(bm)
#         matches_list.append(bm)
        
        # find the uniqe edges
        unique_match = dict((c_node, bm[c_node]) for c_node in bm if c_node in C_nodes)
        edges_list = [(c_node, bm[c_node]) for c_node in bm if c_node in C_nodes_s]
        matches_list.append(unique_match)
        
        g_s.remove_edges_from(edges_list)
#     assert len(g.edges()) == 0
        
    return matches_list

def ColorationCircuit(H):
    G = BipartitieGraphFromCheckMat(H)
    matches_list = edge_corloring(G)
    
    scheduling_list = []
    for match in matches_list:
        scheduling_list.append(dict((-c_node - 1, match[c_node] - 1) for c_node in match if (match[c_node] - 1) in list(np.where(H[-c_node - 1,:] == 1)[0])))
    
    return scheduling_list


# Random circuit
def RandomCircuit(H):
    # Obtain a random scheduling 
    rand_scheduling_seed = 30000
    num_checks, num_bits = H.shape
    max_stab_w = max([int(np.sum(H[i,:])) for i in range(num_checks)])
    scheduling_list = [list(np.where(H[ancilla_index,:] == 1)[0]) for ancilla_index in range(num_checks)]
    [random.Random(i + rand_scheduling_seed).shuffle(scheduling_list[i]) for i in range(len(scheduling_list))]
    
    schedulings = []
    for time_step in range(max_stab_w):
        scheduling = {}
        for ancilla_index in range(num_checks):
            if len(scheduling_list[ancilla_index]) >= time_step + 1:
                scheduling[ancilla_index] = scheduling_list[ancilla_index][time_step]
        schedulings.append(scheduling)
    return schedulings



# ColorProductCircuit
def QubitIndexToPos(q_index, n_C, n_V):
    if q_index <= n_V**2 - 1:
        i = q_index//n_V
        j = q_index - i*n_V
        return i, j + n_C
    else:
        q_index -= n_V**2
        i = q_index//n_C
        j = q_index - i*n_C
        return i + n_V, j   
    
def XcheckIndexToPos(X_index, n_C, n_V):
    i = X_index//n_V
    j = X_index - i*n_V
    
    return n_V + i, n_C + j

def ZcheckIndexToPos(Z_index, n_C, n_V):
    i = Z_index//n_C
    j = Z_index - i*n_C
    
    return i, j

def GetPosToQubitIndexMap(n_C, n_V):
    n_q = (n_C)**2 + (n_V)**2
    map = {}
    for i in range(n_q):
        q_pos = QubitIndexToPos(i, n_C, n_V)
        map[q_pos] = i
    return map

def GetPosToZCheckIndexMap(n_C, n_V):
    n_Z = n_V*n_C
    map = {}
    for i in range(n_Z):
        q_pos = ZcheckIndexToPos(i, n_C, n_V)
        map[q_pos] = i
    return map

def GetPosToXCheckIndexMap(n_C, n_V):
    n_X = n_C*n_V
    map = {}
    for i in range(n_X):
        q_pos = XcheckIndexToPos(i, n_C, n_V)
        map[q_pos] = i
    return map


def ClassicalCheckFromQuantumCheck(h, check_type):
    n = h.shape[1]
    n0 = int(np.sqrt(n/25))
    n_C, n_V = int(3*n0), int(4*n0)
    
    q_pos_index_map = GetPosToQubitIndexMap(n_C, n_V)
    Z_pos_index_map = GetPosToZCheckIndexMap(n_C, n_V)
    X_pos_index_map = GetPosToXCheckIndexMap(n_C, n_V)
        
    if check_type == 'Z':
        row_Zchecks = [Z_pos_index_map[0, i] for i in range(n_C)]
        row_qubits = [q_pos_index_map[0, i + n_C] for i in range(n_V)]

        H = np.zeros([len(row_Zchecks), len(row_qubits)])

        for i in range(len(row_Zchecks)):
            for j in range(len(row_qubits)):
                H[i,j] = h[row_Zchecks[i], row_qubits[j]]
    else:
        column_Xchecks = [X_pos_index_map[i + n_V, n_C] for i in range(n_C)]
        columm_qubits = [q_pos_index_map[i, n_C] for i in range(n_V)]

        H = np.zeros([len(column_Xchecks), len(columm_qubits)])

        for i in range(len(column_Xchecks)):
            for j in range(len(columm_qubits)):
                H[i,j] = h[column_Xchecks[i], columm_qubits[j]]
    return H

def ColorProductCircuit(q_h, check_type):
    assert check_type in ['X', 'Z'], f'check_type should be either X or Z'
    n = q_h.shape[1]
    n0 = np.sqrt(n/25)
    n_C, n_V = int(3*n0), int(4*n0)
    
    q_pos_index_map = GetPosToQubitIndexMap(n_C, n_V)
    Z_pos_index_map = GetPosToZCheckIndexMap(n_C, n_V)
    X_pos_index_map = GetPosToXCheckIndexMap(n_C, n_V)
    
    c_h = ClassicalCheckFromQuantumCheck(q_h, check_type)
    classical_coloration_scheduling = ColorationCircuit(c_h)
    
    scheduling = []
    
    if check_type == 'Z':
        # add the vertical connections
        for c in classical_coloration_scheduling:
            v_c = {}
            for c_index in list(c.keys()):
                q_y = c_index + n_V
                Z_y = c[c_index]
                for q_x, Z_x in zip(range(n_C), range(n_C)):
                    q_pos = q_y, q_x
                    q_index = q_pos_index_map[q_pos]
                    Z_pos = Z_y, Z_x
                    Z_index = Z_pos_index_map[Z_pos]
                    v_c[Z_index] = q_index
            scheduling.append(v_c)

            h_c = {}
            for c_index in list(c.keys()):
                Z_x = c_index 
                q_x = c[c_index] + n_C
                for q_y, Z_y in zip(range(n_V), range(n_V)):
                    q_pos = q_y, q_x
                    q_index = q_pos_index_map[q_pos]
                    Z_pos = Z_y, Z_x
                    Z_index = Z_pos_index_map[Z_pos]
                    h_c[Z_index] = q_index
            scheduling.append(h_c)
            
    else:
        for c in classical_coloration_scheduling:
            v_c = {}
            for c_index in list(c.keys()):
                X_y = c_index + n_V
                q_y = c[c_index]
                for q_x, X_x in zip(n_C + np.arange(n_V), n_C + np.arange(n_V)):
                    q_pos = q_y, q_x
                    q_index = q_pos_index_map[q_pos]
                    X_pos = X_y, X_x
                    X_index = X_pos_index_map[X_pos]
                    v_c[X_index] = q_index
            scheduling.append(v_c)

            h_c = {}
            for c_index in list(c.keys()):
                q_x = c_index 
                X_x = c[c_index] + n_C
                for q_y, X_y in zip(n_V + np.arange(n_C), n_V + np.arange(n_C)):
                    q_pos = q_y, q_x
                    q_index = q_pos_index_map[q_pos]
                    X_pos = X_y, X_x
                    X_index = X_pos_index_map[X_pos]
                    h_c[X_index] = q_index
            scheduling.append(h_c)
    return scheduling   

In [3]:
from sympy import *
from sympy import Matrix, zeros
from sympy import eye
import numpy as np
import matplotlib.pyplot as plt
import math
from sympy import symbols, Poly
#import seaborn as sns
#from sympy.polys.galoistools import gf_gcd, gf_div, gf_gcdex
#from sympy.polys.domains import ZZ
#from numpy.random import SeedSequence, default_rng
#import multiprocessing
####矩阵在二元域进行运算
def mod2(matrix):
    """Reduce a matrix to modulo 2"""
    return np.mod(matrix, 2)
#矩阵进行行和列交换
def generate_repetition_code_check_matrix(n):
    # 校验矩阵的大小为 (n-1) x n
    H = np.zeros((n-1, n), dtype=int)
    
    for i in range(n-1):
        H[i, i] = 1
        H[i, i+1] = 1
    
    return H
def swap_columns(matrix, col1, col2):
    matrix[:, [col1, col2]] = matrix[:, [col2, col1]]
    return matrix
def swap_rows(matrix, row1, row2):
    matrix[[row1,row2],:] = matrix[[row2,row1],:]
    return matrix
#矩阵化成阶梯型
def gf2_rref(matrix):
    """Compute the reduced row echelon form (RREF) of a matrix in GF(2)"""
    matrix = mod2(matrix)
    rows, cols = matrix.shape
    row, col = 0, 0

    while row < rows and col < cols:
        if matrix[row, col] == 0:
            for r in range(row + 1, rows):
                if matrix[r, col] == 1:
                    matrix[[row, r]] = matrix[[r, row]]
                    break
        if matrix[row, col] == 1:
            for r in range(rows):
                if r != row and matrix[r, col] == 1:
                    matrix[r] = mod2(matrix[r] + matrix[row])
            row += 1
        col += 1
    return matrix

#矩阵化成(I P)\pi 型
def create_identity_at_top(matrix):
    """Ensure the first k rows and k columns form an identity matrix using column swaps"""
   
    matrix = gf2_rref(matrix)
    rows, cols = matrix.shape
    P = np.eye(cols)
    
    for i in range(rows):
        if matrix[i, i] != 1:
            for j in range(i + 1, cols):
                if matrix[i, j] == 1:
                    swap_columns(matrix, i, j)
                    swap_rows(P,i, j)#交换矩阵\pi
                    break
    return matrix,P
#Calculate the kernel of the matrix (P^T I)based on (I P)

def kernel_gf2(matrix):
    """Compute the kernel of a matrix in GF(2)"""
    matrix_rref = gf2_rref(matrix)
    rows, cols = matrix_rref.shape
    pivot_cols = []

    for r in range(rows):
        for c in range(cols):
            if matrix_rref[r, c] == 1:
                pivot_cols.append(c)
                break

    free_vars = [c for c in range(cols) if c not in pivot_cols]
    kernel_basis = []
    #print(free_vars)
    for free_var in free_vars:
        basis_vector = np.zeros(cols, dtype=int)
        basis_vector[free_var] = 1
        for pivot_col in pivot_cols:
            row = pivot_cols.index(pivot_col)
            if matrix_rref[row, free_var] == 1:
                basis_vector[pivot_col] = 1
        kernel_basis.append(basis_vector)

    return np.array(kernel_basis).T
#circulant
def polynomial_to_circulant(poly, l):
    
    x = symbols('x')

   
    if not isinstance(poly, Poly):
        poly = Poly(poly, x,domain=ZZ)

    
    coeffs = poly.all_coeffs()
    coeffs.reverse()
    circulant_matrix = np.zeros((l, l), dtype=int)
    for i in range(l):
        a1 = np.pad( coeffs, (0, l - len( coeffs)), 'constant')
        circulant_matrix[i] = row = np.roll(a1, i)

    return circulant_matrix
def create_identity_at_topH_X(matrix):
    """Ensure the first k rows and k columns form an identity matrix using column swaps"""
   
    matrix = gf2_rref(matrix)
    rows, cols = matrix.shape
    P = np.eye(cols)
    
    for i in range(rows):
        if matrix[i, i] != 1:
            for j in range(i + 1, cols):
                if matrix[i, j] == 1:
                    swap_columns(matrix, i, j)
                    swap_rows(P,i, j)
                    break
    return matrix,P
def gf2_rrefH_Z(matrix,r_X):
    """Compute the reduced row echelon form (RREF) of a matrix in GF(2)"""

    matrix = mod2(matrix)
    rows, cols = matrix.shape
    row, col = 0, r_X

    while row < rows and col < cols:
        if matrix[row, col] == 0:
            for r in range(row + 1, rows):
                if matrix[r, col] == 1:
                    matrix[[row, r]] = matrix[[r, row]]
                    break
        if matrix[row, col] == 1:
            for r in range(rows):
                if r != row and matrix[r, col] == 1:
                    matrix[r] = mod2(matrix[r] + matrix[row])
            row += 1
        col += 1
    return matrix
def create_identity_at_topH_Z(matrix,r_X):
    """Ensure the first k rows and k columns form an identity matrix using column swaps"""
    
    matrix = gf2_rrefH_Z(matrix,r_X)
    rows, cols = matrix.shape
    P1 = np.eye(cols)
    for i in range(rows):
        if matrix[i, r_X] != 1:
            for j in range(r_X , cols):
                if matrix[i, j] == 1:
                    swap_columns(matrix, r_X + i, j)
                    swap_rows(P1,r_X+i, j)
                    break
    rows, cols = P1.shape
    
    return matrix,P1
def CSS_code_Logical(H_X,H_Z):
    r_X = H_X.shape[0]
    r_Z = H_Z.shape[0]
    n = H_X.shape[1]
    H_X1,P = create_identity_at_topH_X(H_X)
    
    H_Z1 = H_Z @ P.T
    H_Z1,P1 = create_identity_at_topH_Z(H_Z1,r_X)
    
    H_X1 = H_X1@P1.T
    
    k = n-r_Z-r_X
    
    I_k = np.eye(k)
    A_2 = H_X1[:,-k:]
    C_2 = H_Z1[:,-k:]

    O_k_r_Z= np.zeros((k, r_Z), dtype=int)
    O_k_r_X= np.zeros((k, r_X), dtype=int)
    J_X = np.hstack((O_k_r_X,C_2.T, I_k)) 
    J_Z =  np.hstack((A_2.T,O_k_r_Z , I_k)) 
    I_s = np.hstack((np.zeros((k, n-k),dtype=int), I_k))

    J_X = J_X@P1@P%2
    J_Z = J_Z@P1@P%2
    I_s = I_s@P1@P%2
    return J_X,J_Z,I_s
def Glue_code(J_Z_A,H_X):
    non_zero_cols = [col_idx for col_idx in range(J_Z_A.shape[1]) if not all(row[col_idx] == 0 for row in J_Z_A)]#n_G
    H_G1=H_X[:,non_zero_cols]
    

    non_zero_row = [row_idx for row_idx in range(H_G1.shape[0]) if not np.all(H_G1[row_idx] == 0)]
    H_G=H_G1[non_zero_row,: ]#H_N
    #生成S
    num_rows = len(non_zero_cols)#r_N
    num_cols = H_X.shape[1]
    result_matrix = np.zeros((num_rows, num_cols), dtype=int)
    for row_idx, col_idx in enumerate(non_zero_cols):
        result_matrix[row_idx, col_idx] = 1
    S_matrix = result_matrix
    #生成T矩阵
    num_cols = len(non_zero_row)#
    num_rows = H_X.shape[0] #r_Z
    T_matrix = np.zeros((num_rows, num_cols), dtype=int)
    for col_idx, row_idx in enumerate(non_zero_row):
        T_matrix[row_idx, col_idx] = 1    
    
    return H_G,T_matrix,S_matrix,non_zero_cols
#生成重复码的check matrix


def deformed_code(H_X,H_Z,J_Z,D):
    J_Z = J_Z
    H_X = H_X
    H_Z = H_Z
    H_G,T_matrix,S_matrix= Glue_code(J_Z,H_X)
    H=generate_repetition_code_check_matrix(D)
    H_X_M,H_Z_M=hypergraph_product_code(H_G, H.T)
    vector1 = np.zeros(D-1, dtype=int)
    vector3 = np.zeros(D, dtype=int)
    vector3[0] = 1
    vector2 = np.zeros((H_X.shape[0],H_G.shape[1]), dtype=int)
    T_matrix = np.hstack((np.kron(vector1,vector2), np.kron(vector3, T_matrix)))%2
    H_X1=np.hstack((H_X, T_matrix))
    O_rGD=np.zeros((H_G.shape[0]*(D-1),H_X.shape[1]), dtype=int)
    H_X_M=np.hstack((O_rGD, H_X_M))
    H_X_deformed = np.vstack((H_X1,H_X_M))
    #print(S_matrix.shape[0])
    vector31=vector3.T
    vector31 = vector3.reshape(-1, 1)
    #print(vector31.shape[0])
    S_matrix = np.kron(vector31,S_matrix)%2
    #print(S_matrix.shape[0])
    #print(H_Z_M.shape[0])
    H_Z_M = np.hstack((S_matrix, H_Z_M ))
    O_nGD=np.zeros((H_Z.shape[0],H_G.shape[1]*(D-1)+H_G.shape[0]*(D)), dtype=int)
    H_Z1=np.hstack((H_Z, O_nGD))
    H_Z_deformed = np.vstack((H_Z1,H_Z_M))
    return H_X_deformed,H_Z_deformed,H_Z_M,H_X_M,H_Z1,H_X1
def deformed_code_surface_code(H_X,H_Z,J_Z,D):
    J_Z = J_Z
    H_X = H_X
    H_Z = H_Z
    H_G,T_matrix,S_matrix= Glue_code(J_Z,H_X)
    O1 = np.zeros((H_X.shape[0],H_X.shape[1]))
    HX1 = np.hstack((H_X, O1))
    J_Z1=np.hstack((J_Z, np.zeros((J_Z.shape[0],H_X.shape[1]))))
    J_Z2=np.hstack((np.zeros((J_Z.shape[0],H_X.shape[1])),J_Z))
    J_Z_def = np.vstack((J_Z1,J_Z2))
    HX11 = np.hstack((HX1, T_matrix))
    HX2 = np.hstack((np.zeros((H_X.shape[0],H_X.shape[1])),H_X))
    HX22 = np.hstack((HX2, T_matrix))
    hx = np.vstack((HX1,HX2))
    J_Z_A= np.hstack((J_Z, J_Z))
    J_Z_defM = np.hstack((J_Z_def,np.zeros((J_Z_def.shape[0],T_matrix.shape[1]))))
    HZ1 = np.hstack((H_Z, np.zeros((H_Z.shape[0],H_Z.shape[1]))))
    HZ2 = np.hstack((np.zeros((H_Z.shape[0],H_Z.shape[1])),H_Z))   
    hz = np.vstack((HZ1,HZ2))
    HZ11 = np.hstack((hz,np.zeros((hz.shape[0],H_G.shape[0]))))
    H_X_deformed = np.vstack((HX11,HX22))
    S_matrix = np.hstack((S_matrix,S_matrix))
    S_matrix = np.hstack((S_matrix,H_G.T))
    H_Z_deformed =  np.vstack((HZ11,S_matrix))
    return H_X_deformed,H_Z_deformed,hx,hz,J_Z_def,J_Z_A,J_Z_defM
def generate_matrix(HX, HXM,J_X,J_X_M,J_X_A,d):
    # 创建一个零矩阵
    m1,n1 = HX.shape
    print(HX.shape)
    m2,n2 = HXM.shape
    print(HXM.shape)
    rows = 2*m1+(d-1)*m2
    cols = (d+1)*n2+d*m2
    A = np.zeros((rows, cols), dtype=object)
    B = np.zeros((J_X.shape[0], cols), dtype=object)
    C = np.zeros((J_X_A.shape[0], cols), dtype=object)
    # 填充矩阵
    A[:m1, :n1] = HX
    B[:, :n1] = J_X
    C[:, :n1] = J_X_A    
    A[:m1, n2:n2+m1] = np.eye(m1).astype(int)
    C[:, n2+m1:n2+m2] = np.ones(m2-m1).astype(int)    
    A[m1+(d-1)*m2:, -n1:] = HX
    B[:, -n1:] = J_X
    A[m1+(d-1)*m2:,(d-1)*(n2+m2)+n2:(d-1)*(n2+m2)+m1+n2 ] = np.eye(m1).astype(int)

    for i in range(2, d + 1):
        A[m1+(i-2)*m2:m1+(i-1)*m2, (i-2)*(n2+m2)+n2:(i-1)*(n2+m2)] =  np.eye(m2).astype(int)
        A[m1+(i-2)*m2:m1+(i-1)*m2, (i-1)*(n2+m2):(i-1)*(n2+m2)+n2] = HXM
        B[:, (i-1)*(n2+m2):(i-1)*(n2+m2)+n2]=J_X_M
        A[m1+(i-2)*m2:m1+(i-1)*m2, (i-1)*(n2+m2)+n2:(i-1)*(n2+m2)+m2+n2] = np.eye(m2).astype(int)
    
    M = np.vstack((B,C))
    
    return A,M
def hypergraph_product_code1(H1, H2):
    # 获取矩阵的形状
    m1, n1 = H1.shape
    m2, n2 = H2.shape

    # 构造单位矩阵
    I1 = np.eye(n1, dtype=int)
    I2 = np.eye(n2, dtype=int)
    I3 = np.eye(m1, dtype=int)
    I4 = np.eye(m2, dtype=int)
    vectors = np.zeros(n2,dtype=int)
    vectors[0]=1
    # 构造 H_X
    HZ = np.hstack((np.kron(I2,H1)%2, np.kron(H2.T,I3)%2))
    LX = np.hstack((np.kron(vectors,np.ones(n1,dtype=int))%2, np.zeros(m2*m1,dtype=int)%2))
    LX = np.atleast_2d(LX)    
    # 构造 H_Z
    HX = np.hstack((np.kron(H2,I1)%2, np.kron(I4,H1.T)%2))
    
    return HX, HZ,LX
def hypergraph_product_code(H1, H2):
    # 获取矩阵的形状
    m1, n1 = H1.shape
    m2, n2 = H2.shape
    
    # 构造单位矩阵
    I1 = np.eye(n1, dtype=int)
    I2 = np.eye(n2, dtype=int)
    I3 = np.eye(m1, dtype=int)
    I4 = np.eye(m2, dtype=int)
    
    # 构造 H_X
    HX = np.hstack((np.kron(I2,H1)%2, np.kron(H2.T,I3)%2))
    
    # 构造 H_Z
    HZ = np.hstack((np.kron(H2,I1)%2, np.kron(I4,H1.T)%2))
    
    return HX, HZ
def extract_logical_qubit_positions(I_s):
    """
    提取 I_s 中每行非零元素的列索引
    :param I_s: 二进制矩阵，维度为 (k, n)
    :return: 列表，每个元素是逻辑比特对应的物理比特位置列表
    """
    logical_positions = []
    for i in range(I_s.shape[0]):
        non_zero = np.where(I_s[i] == 1)[0].tolist()
        logical_positions.append(non_zero)
    return logical_positions
def first_nonzero_per_row(matrix):
    """
    返回矩阵每一行第一个非零元素的列索引
    输入: 二维列表或 NumPy 数组
    输出: 列表，元素为各行的第一个非零列索引（全零行返回 -1）
    """
    arr = np.asarray(matrix)
    mask = (arr != 0) 
    
    has_nonzero = np.any(mask, axis=1) 
    
    # 对每行找到第一个 True 的位置，全零行设为 -1
    indices = np.where(has_nonzero, np.argmax(mask, axis=1), -1)
    return indices.tolist()
def block_diagonal(*matrices):
    for mat in matrices:
        if not isinstance(mat, np.ndarray):
            raise TypeError("所有输入必须是 NumPy 数组")
        if mat.ndim != 2:
            raise ValueError("输入矩阵必须是二维的")
    
    # 计算总维度
    rows = sum(m.shape[0] for m in matrices)
    cols = sum(m.shape[1] for m in matrices)
    
    # 初始化全零矩阵
    block_matrix = np.zeros((rows, cols), dtype=matrices[0].dtype)
    
    # 定位指针
    row_ptr, col_ptr = 0, 0
    
    # 填充矩阵
    for mat in matrices:
        r, c = mat.shape
        block_matrix[row_ptr:row_ptr+r, col_ptr:col_ptr+c] = mat
        row_ptr += r
        col_ptr += c
    
    return block_matrix
def deformed_code_magic(H_X_1,H_Z_1,J_Z_1,J_Z,I_s,H_X_2,H_Z_2,J_Z_2,D):
    J_Z_1_0 = np.atleast_2d(J_Z_1[0])
    J_Z_2 = np.atleast_2d(J_Z_2)
    
    # 生成对应的零数组，保持二维结构
    zeros_2d = np.zeros((1, H_X_2.shape[1]), dtype=int)
    
    JZ1JZ1 = np.concatenate([J_Z_1_0, J_Z_2, zeros_2d], axis=1)
    JZ2JZ2 = np.concatenate([np.atleast_2d(J_Z_1[1]), 
                            np.zeros((1, H_X_2.shape[1]), dtype=int), 
                            J_Z_2], axis=1)

    H_G1,T1_matrix,S1_matrix,non_zero_cols1 = Glue_code(J_Z_1,H_X_1)
    H_G2,T2_matrix,S2_matrix,non_zero_cols2 = Glue_code(J_Z_2,H_X_2)# surface code
    S1_matrix1 = np.hstack((S1_matrix, np.zeros((S1_matrix.shape[0],H_X_2.shape[1]),dtype=int), np.zeros((S1_matrix.shape[0],H_X_2.shape[1]),dtype=int) ))
    S2_matrix2 = np.hstack((np.zeros((S2_matrix.shape[0],H_X_1.shape[1]),dtype=int),S2_matrix,np.zeros((S2_matrix.shape[0],H_X_2.shape[1]),dtype=int)))
    S3_matrix3 = np.hstack((np.zeros((S2_matrix.shape[0],H_X_1.shape[1]),dtype=int),np.zeros((S2_matrix.shape[0],H_X_2.shape[1]),dtype=int),S2_matrix ))
    H_G1_T = np.hstack((H_G1.T,np.zeros((S1_matrix.shape[0],T2_matrix.shape[1]),dtype=int),np.zeros((S1_matrix.shape[0],T2_matrix.shape[1]),dtype=int)))
    H_G2_T = np.hstack((np.zeros((S2_matrix.shape[0],T1_matrix.shape[1]),dtype=int),H_G2.T,np.zeros((S2_matrix.shape[0],T2_matrix.shape[1]),dtype=int)))
    H_G3_T = np.hstack((np.zeros((S2_matrix.shape[0],T1_matrix.shape[1]),dtype=int),np.zeros((S2_matrix.shape[0],T2_matrix.shape[1]),dtype=int),H_G2.T))
    vector1 = np.zeros(H_G1_T.shape[0],dtype=int)
    T_matrix= block_diagonal(T1_matrix,T2_matrix,T2_matrix)
    J_Z = block_diagonal(J_Z,J_Z_2,J_Z_2)
    H_X = block_diagonal(H_X_1,H_X_2,H_X_2)
    H_Z = block_diagonal(H_Z_1,H_Z_2,H_Z_2)
    result = extract_logical_qubit_positions(I_s)
    vector1[non_zero_cols1.index(result[-2][0])] = 1
    vector2 = np.zeros(H_G1_T.shape[0],dtype=int)
    vector2[non_zero_cols1.index(result[-1][0])] = 1  
    vector3 = np.zeros(H_G2_T.shape[0],dtype=int)
    vector3[0] = 1
    vector1 = vector1.reshape(-1, 1)
    vector2 = vector2.reshape(-1, 1)
    vector3 = vector3.reshape(-1, 1)    
    H_G1_T = np.hstack((H_G1_T,vector1,vector2))
    
    H_G2_T = np.hstack((H_G2_T,vector3,np.zeros((H_G2_T.shape[0]),dtype=int).reshape(-1, 1)))
    H_G3_T = np.hstack((H_G3_T,np.zeros((H_G2_T.shape[0]),dtype=int).reshape(-1, 1),vector3))
    
    H_G = np.vstack((H_G1_T,H_G2_T,H_G3_T)).T
    S_matrix = np.vstack((S1_matrix1,S2_matrix2,S3_matrix3))
    JZ1JZ1 = np.kron(np.ones(D,dtype=int),JZ1JZ1@S_matrix.T ).astype(int)
    JZ2JZ2 = np.kron(np.ones(D,dtype=int),JZ2JZ2@S_matrix.T ).astype(int)   
    H=generate_repetition_code_check_matrix(D)

    anciall = np.zeros((H_X.shape[0],2),dtype=int)
    T_matrix = np.hstack((T_matrix,anciall))%2
    H_X_M,H_Z_M=hypergraph_product_code(H_G, H.T)

    vector1 = np.zeros(D-1, dtype=int)
    vector3 = np.zeros(D, dtype=int)
    vector3[0] = 1
    vector2 = np.zeros((H_X.shape[0],H_G.shape[1]), dtype=int)
    T_matrix = np.hstack((np.kron(vector1,vector2), np.kron(vector3, T_matrix)))%2
    
    H_X1=np.hstack((H_X, T_matrix))
    O_rGD=np.zeros((H_G.shape[0]*(D-1),H_X.shape[1]), dtype=int)
    H_X_M=np.hstack((O_rGD, H_X_M))
    H_X_deformed = np.vstack((H_X1,H_X_M))
    #print(S_matrix.shape[0])
    vector31=vector3.T
    vector31 = vector3.reshape(-1, 1)

    S_matrix = np.kron(vector31,S_matrix)%2
    #print(S_matrix.shape[0])
    #print(H_Z_M.shape[0])
    H_Z_M = np.hstack((S_matrix, H_Z_M ))
    O_nGD=np.zeros((H_Z.shape[0],H_G.shape[1]*(D-1)+H_G.shape[0]*(D)), dtype=int)
    H_Z1=np.hstack((H_Z, O_nGD))
    H_Z_deformed = np.vstack((H_Z1,H_Z_M))
 
    return H_X_deformed,H_Z_deformed,JZ1JZ1,JZ2JZ2,J_Z,H_X,H_Z
def validate_check_matrix(check_matrix):
    # 如果是稀疏矩阵，先转换为密集矩阵
    dense_matrix = check_matrix.toarray() if isinstance(check_matrix, csc_matrix) else check_matrix
    for col_idx in range(dense_matrix.shape[1]):
        if np.sum(dense_matrix[:, col_idx]) > 2:
            print(f"第 {col_idx} 列有超过两个1.")
            # 根据需要处理错误，比如抛出异常
            raise ValueError(f"第 {col_idx} 列包含超过两个1.")
    return check_matrix  # 如果没有问题，返回原矩阵
from scipy import sparse
from ldpc.codes import rep_code
import pymatching 
from scipy.sparse import csc_matrix
import numpy as np
from bposd.hgp import hgp
def remove_zero_rows(matrix):
    """
    删除矩阵中所有全零行
    输入: 二维列表或 NumPy 数组
    输出: 删除全零行后的 NumPy 数组（保留原始顺序）
    """
    arr = np.array(matrix)  # 转换为 NumPy 数组
    non_zero_mask = np.any(arr != 0, axis=1)  # 标记非零行
    return arr[non_zero_mask]

def cyclic_shift(l):
    return np.roll(np.eye(l, dtype=int), shift=1, axis=1)

def construct_ldpc(ell, m, code):
    # 生成基础矩阵
    x = np.kron(cyclic_shift(ell), np.eye(m, dtype=int))
    y = np.kron(np.eye(ell, dtype=int), cyclic_shift(m))
    
    # 构造A和B矩阵
    A = (np.linalg.matrix_power(x, code[0]) + np.linalg.matrix_power(y, code[1]) + np.linalg.matrix_power(y, code[2])) % 2
    B = (np.linalg.matrix_power(y, code[3]) + np.linalg.matrix_power(x, code[4]) + np.linalg.matrix_power(x, code[5])) % 2
    
    # 校验正交性
    assert np.all((A @ B + B @ A) % 2 == 0), "Orthogonality failed!"
    
    # 构建Hx和Hz
    Hx = np.hstack([A, B]).astype(int)
    Hz = np.hstack([B.T, A.T]).astype(int)
    return Hx, Hz



In [4]:
import numpy as np
import matplotlib.pyplot as plt
import random
import copy
import time
from typing import Union
import collections

out_count_observable_error_combos: Union[None, collections.Counter]  # 移除 [str]

import bposd.hgp

from bposd.hgp import hgp
import multiprocessing as mp
from ldpc.codes import ring_code
import stim
import re
import sys
sys.path.append("./src/")
import pymatching
import sinter
from typing import *
import re
def QECCircuit_OneStage(H_Z,H_X,J_X_1,J_X,I_s,hz,hx,lx, num_rep, circuit_error_params, p,circuit_type,D):
        
        # give out \bar{H_X} and \bar{H_Z} based on (H_X，H_Z)
        H_Z_deformed,H_X_deformed,JX1JX1,JX2JX2,J_X,hz,hx = deformed_code_magic(H_Z,H_X,J_X_1,J_X,I_s,hz,hx,lx,D) 
        M = H_Z_deformed@H_X_deformed.T%2 
        print(remove_zero_rows(M))# Verify whether the matrices commute.
        if circuit_type == 'coloration':
           scheduling_X = ColorationCircuit(hx)
           scheduling_Z = ColorationCircuit(hz)
############################################################################################### Initialization
        H_X = hx
        H_Z = hz
        data_indices = list(np.arange(0, np.shape(H_X)[1]))
        data_indicesancilla = list(np.arange(np.shape(H_X)[1], np.shape(H_X_deformed)[1]))
        n = len(data_indices) + len(data_indicesancilla)
        n_Z_ancilla, n_X_ancilla = np.shape(H_Z)[0], np.shape(H_X)[0]
        n_Z_ancillaM, n_X_ancillaM =  np.shape(H_Z_deformed)[0]- np.shape(H_Z)[0], np.shape(H_X_deformed)[0]- np.shape(H_X)[0]

        Z_ancilla_indices = list(np.arange(n, n + n_Z_ancilla))
        Z_ancilla_indicesM = list(np.arange(n + n_Z_ancilla ,
                                        n + n_Z_ancilla  + n_Z_ancillaM))
        X_ancilla_indices = list(np.arange(n + n_Z_ancilla+ n_Z_ancillaM, n + n_Z_ancilla+ n_Z_ancillaM + n_X_ancilla))
        X_ancilla_indicesM = list(np.arange(n + n_Z_ancilla + n_Z_ancillaM + n_X_ancilla,
                                        n + n_Z_ancilla+ n_Z_ancillaM + n_X_ancilla + n_X_ancillaM))
        circuit_init = stim.Circuit()
        circuit_init.append("RX", data_indices)
        circuit_init.append("R", X_ancilla_indices )
        circuit_init.append("R",  Z_ancilla_indices)
        circuit_init.append("R", data_indicesancilla)
        
        circuit_init.append("R", X_ancilla_indicesM )
        circuit_init.append("R",  Z_ancilla_indicesM)
        circuit_stab_meas = stim.Circuit()
        circuit_stab_meas.append("H", X_ancilla_indices)
        
        circuit_stab_meas.append("TICK")
        
        for time_step in range(len(scheduling_X)):
            for j in scheduling_X[time_step]:
#                 
                X_ancilla_index = X_ancilla_indices[j]
                data_index = scheduling_X[time_step][j]
                circuit_stab_meas.append("CX", [X_ancilla_index, data_index])
            circuit_stab_meas.append("TICK")
        circuit_stab_meas.append("TICK")
        
        for time_step in range(len(scheduling_Z)):
            for j in scheduling_Z[time_step]:
#                 supported_data_qubits = list(np.where(hz[Z_ancilla_index - n,:] == 1)[0])
                Z_ancilla_index = Z_ancilla_indices[j]
                data_index = scheduling_Z[time_step][j]
                # data_index = supported_data_qubits[i]
                circuit_stab_meas.append("CX", [data_index, Z_ancilla_index])
            circuit_stab_meas.append("TICK")
        circuit_stab_meas.append("TICK")
        circuit_stab_meas.append("H", X_ancilla_indices)
        circuit_stab_meas.append("MR", Z_ancilla_indices )
        circuit_stab_meas.append("MR", X_ancilla_indices)


################################################################################## d_T parity check measurement of both the memory code and surface code
        error_params = {"p_i": circuit_error_params["p_i"]*p, "p_state_p": circuit_error_params["p_state_p"]*p, 
        "p_m": circuit_error_params["p_m"]*p, "p_CX":circuit_error_params["p_CX"]*p, 
        "p_idling_gate": circuit_error_params["p_idling_gate"]*p,"p_H" : circuit_error_params["p_H"]*p,"p_deg" : circuit_error_params["p_deg"]*p}
        circuit_stab_meas_Hx1 = stim.Circuit()

        circuit_stab_meas_Hx1.append("H", X_ancilla_indices)
        circuit_stab_meas_Hx1.append("DEPOLARIZE1", X_ancilla_indices, (error_params['p_state_p']))
    #circuit_stab_meas_Hx1.append("DEPOLARIZE1", X_ancilla_indices, (error_params['p_H'])) # Add the state preparation error # Add the idling errors on the data qubits during the preparation for X ancillas
        circuit_stab_meas_Hx1.append("TICK")
        circuit_stab_meas_Hx1.append("DEPOLARIZE1", data_indices , (error_params['p_idling_gate'])) 
    # Apply CX gates for the X stabilizers
        for time_step in range(len(scheduling_X)):
        # add idling errors for all the qubits during the ancilla shuffling
            idling_qubits = data_indices + X_ancilla_indices
            idling_data_indices = list(copy.deepcopy(data_indices))
        #circuit_stab_meas_Hx1.append("DEPOLARIZE1", idling_qubits, (error_params['p_idling_gate'])) 
            for j in scheduling_X[time_step]:
    #                 supported_data_qubits = list(np.where(hx[X_ancilla_index - n - n_Z_ancilla,:] == 1)[0])
                X_ancilla_index = X_ancilla_indices[j]
                data_index = scheduling_X[time_step][j]
            # data_index = supported_data_qubits[i]
                circuit_stab_meas_Hx1.append("CX", [X_ancilla_index, data_index])
                if data_index in idling_data_indices:
                     idling_data_indices.pop(idling_data_indices.index(data_index))
            circuit_stab_meas_Hx1.append("DEPOLARIZE1", idling_data_indices, (error_params['p_idling_gate'])) # idling errors for qubits that are not being checked
            circuit_stab_meas_Hx1.append("TICK")

    # meausure the Z ancillas
        circuit_stab_meas_Hx1.append("DEPOLARIZE1", Z_ancilla_indices, (error_params['p_state_p'])) # Add the state preparation error
    # circuit_stab_meas_Hx1.append("DEPOLARIZE1", data_indices, (error_params['p_i'])) # Add the idling errors on the data qubits during the preparation for Z ancillas
    
        circuit_stab_meas_Hx1.append("DEPOLARIZE1",  data_indices , (error_params['p_idling_gate']))
        circuit_stab_meas_Hx1.append("TICK")
    # Appy CX gates for the Z stabilziers
        for time_step in range(len(scheduling_Z)):
              idling_qubits = data_indices + Z_ancilla_indices
              idling_data_indices = list(copy.deepcopy(data_indices))
        #circuit_stab_meas_Hx1.append("DEPOLARIZE1", idling_qubits, (error_params['p_idling_gate']))
              for j in scheduling_Z[time_step]:
    #                 supported_data_qubits = list(np.where(hz[Z_ancilla_index - n,:] == 1)[0])
                  Z_ancilla_index = Z_ancilla_indices[j]
                  data_index = scheduling_Z[time_step][j]
            # data_index = supported_data_qubits[i]
                  circuit_stab_meas_Hx1.append("CX", [data_index, Z_ancilla_index])
                  if data_index in idling_data_indices:
                         idling_data_indices.pop(idling_data_indices.index(data_index))
              circuit_stab_meas_Hx1.append("DEPOLARIZE1", idling_data_indices, (error_params['p_idling_gate'])) # idling errors for qubits that are not being checked
              circuit_stab_meas_Hx1.append("TICK")

    # Measure the ancillas
        
        circuit_stab_meas_Hx1.append("H", X_ancilla_indices)
        circuit_stab_meas_Hx1.append("TICK")
        circuit_stab_meas_Hx1.append("DEPOLARIZE1",  data_indices , (error_params['p_idling_gate']))

        circuit_stab_meas_Hx1.append("DEPOLARIZE1",  Z_ancilla_indices +X_ancilla_indices, (error_params['p_m'])) # Add the measurement error
        circuit_stab_meas_Hx1.append("TICK")
        circuit_stab_meas_Hx1.append("MR", Z_ancilla_indices + X_ancilla_indices)
        circuit_stab_meas_Hx1.append("SHIFT_COORDS", [], (1))
        for i in range(len(X_ancilla_indices)):
                 circuit_stab_meas_Hx1.append("DETECTOR", [stim.target_rec(- len(X_ancilla_indices) + i)], (0))
        circuit_stab_meas_Hx1.append("TICK")
        circuit_stab_meas_Hx2 = stim.Circuit()
    
        circuit_stab_meas_Hx2.append("H", X_ancilla_indices)
        circuit_stab_meas_Hx2.append("DEPOLARIZE1", X_ancilla_indices, (error_params['p_state_p'])) 
    #circuit_stab_meas_Hx2.append("DEPOLARIZE1", X_ancilla_indices, (error_params['p_H'])) # Add the state preparation error
    #ircuit_stab_meas_rep2.append("DEPOLARIZE1", data_indices, (error_params['p_i'])) # Add the idling errors on the data qubits during the preparation for X ancillas
        circuit_stab_meas_Hx2.append("TICK")
        circuit_stab_meas_Hx2.append("DEPOLARIZE1", data_indices , (error_params['p_idling_gate']))
    # Apply CX gates for the X stabilizers
        for time_step in range(len(scheduling_X)):
                     idling_qubits = data_indices + X_ancilla_indices
        #circuit_stab_meas_Hx2.append("DEPOLARIZE1", idling_qubits, (error_params['p_idling_gate']))
                     idling_data_indices = list(copy.deepcopy(data_indices))
                     for j in scheduling_X[time_step]:
    #                 supported_data_qubits = list(np.where(hx[X_ancilla_index - n - n_Z_ancilla,:] == 1)[0])
                           X_ancilla_index = X_ancilla_indices[j]
                           data_index = scheduling_X[time_step][j]
            # data_index = supported_data_qubits[i]
                           circuit_stab_meas_Hx2.append("CX", [X_ancilla_index, data_index])
                           if data_index in idling_data_indices:
                                  idling_data_indices.pop(idling_data_indices.index(data_index))
                     circuit_stab_meas_Hx2.append("DEPOLARIZE1", idling_data_indices, (error_params['p_idling_gate'])) # idling errors for qubits that are not being checked
                     circuit_stab_meas_Hx2.append("TICK")
        circuit_stab_meas_Hx2.append("DEPOLARIZE1", Z_ancilla_indices, (error_params['p_state_p'])) # Add the state preparation error
        circuit_stab_meas_Hx2.append("DEPOLARIZE1", data_indices, (error_params['p_idling_gate'])) # Add the idling errors on the data qubits during the preparation for Z ancillas
        circuit_stab_meas_Hx2.append("TICK")
        for time_step in range(len(scheduling_Z)):
                  idling_qubits = data_indices + Z_ancilla_indices
        #circuit_stab_meas_Hx2.append("DEPOLARIZE1", idling_qubits, (error_params['p_idling_gate']))
                  idling_data_indices = list(copy.deepcopy(data_indices))
                  for j in scheduling_Z[time_step]:
    #                 supported_data_qubits = list(np.where(hz[Z_ancilla_index - n,:] == 1)[0])
                       Z_ancilla_index = Z_ancilla_indices[j]
                       data_index = scheduling_Z[time_step][j]
            # data_index = supported_data_qubits[i]
                       circuit_stab_meas_Hx2.append("CX", [data_index, Z_ancilla_index])
                       if data_index in idling_data_indices:
                                 idling_data_indices.pop(idling_data_indices.index(data_index))
                  circuit_stab_meas_Hx2.append("DEPOLARIZE1", idling_data_indices, (error_params['p_idling_gate'])) # idling errors for qubits that are not being checked
                  circuit_stab_meas_Hx2.append("TICK")        
    # Measure the ancillas
        circuit_stab_meas_Hx2.append("H", X_ancilla_indices)
        circuit_stab_meas_Hx2.append("TICK")
        circuit_stab_meas_Hx2.append("DEPOLARIZE1", data_indices , (error_params['p_idling_gate'])) 
    #circuit_stab_meas_Hx2.append("DEPOLARIZE1",  X_ancilla_indices, (error_params['p_H']))
        circuit_stab_meas_Hx2.append("DEPOLARIZE1",  Z_ancilla_indices +X_ancilla_indices, (error_params['p_m']))# Add the measurement error
        circuit_stab_meas_Hx2.append("TICK")
        circuit_stab_meas_Hx2.append("MR", Z_ancilla_indices + X_ancilla_indices)
    #circuit_stab_meas_Hx2.append("SHIFT_COORDS", [], (1))
        for i in range(len(X_ancilla_indices)):
            circuit_stab_meas_Hx2.append("DETECTOR", [stim.target_rec(- len(X_ancilla_indices) + i), 
                                        stim.target_rec(- len(X_ancilla_indices) + i - len(Z_ancilla_indices) - len(X_ancilla_indices))], (0))
        circuit_stab_meas_Hx2.append("TICK")


        circuit_stab_meas_repHxHx = circuit_stab_meas_Hx1 + (num_rep - 1)*circuit_stab_meas_Hx2
######################################################################################################## d_T parity check measurement of  the deformed code
        if circuit_type == 'coloration':
            scheduling_X_deform = ColorationCircuit(H_X_deformed)
            scheduling_Z_deform = ColorationCircuit(H_Z_deformed)
        circuit_stab_meas_rep1 = stim.Circuit()
#         circuit_stab_meas_rep1.append("DEPOLARIZE1", data_indices, (pz)) # for debug
        # measurement the X ancillas
        # # Initialize the X ancillas to the + state
        #circuit_stab_meas_rep1.append("DEPOLARIZE1", data_indicesancilla , (error_params['p_deg'])) 
        circuit_stab_meas_rep1.append("DEPOLARIZE1", data_indicesancilla, (error_params['p_state_p']))
        circuit_stab_meas_rep1.append("TICK")
        circuit_stab_meas_rep1.append("H", X_ancilla_indices+X_ancilla_indicesM)
        circuit_stab_meas_rep1.append("DEPOLARIZE1", X_ancilla_indices+X_ancilla_indicesM, (error_params['p_state_p']))
        
        circuit_stab_meas_rep1.append("TICK") # Add the state preparation error
        circuit_stab_meas_rep1.append("DEPOLARIZE1", data_indices, (error_params['p_deg'])) # Add the idling errors on the data qubits during the preparation for X ancillas
        circuit_stab_meas_rep1.append("TICK")
        # Apply CX gates for the X stabilizers
        #circuit_stab_meas_rep1.append("DEPOLARIZE1", data_indices + data_indicesancilla , (error_params['p_idling_gate']))#+ X_ancilla_indices+X_ancilla_indicesM
        for time_step in range(len(scheduling_X_deform)):
            # add idling errors for all the qubits during the ancilla shuffling
            idling_qubits = data_indices + data_indicesancilla + X_ancilla_indices+X_ancilla_indicesM
            idling_data_indices = list(copy.deepcopy(data_indices+data_indicesancilla))
            #circuit_stab_meas_rep1.append("DEPOLARIZE1", idling_qubits, (error_params['p_idling_gate'])) 
            for j in scheduling_X_deform[time_step]:
#               
                X_ancilla_index = (X_ancilla_indices+X_ancilla_indicesM)[j]
                data_index = scheduling_X_deform[time_step][j]
                
                circuit_stab_meas_rep1.append("CX", [X_ancilla_index, data_index])
                if data_index in idling_data_indices:
                    idling_data_indices.pop(idling_data_indices.index(data_index))
            circuit_stab_meas_rep1.append("DEPOLARIZE1", idling_data_indices, (error_params['p_deg'])) # idling errors for qubits that are not being checked
            circuit_stab_meas_rep1.append("TICK")
        circuit_stab_meas_rep1.append("DEPOLARIZE1", data_indices+ data_indicesancilla , (error_params['p_deg']))#idling errors for qubits that are not being Measurement
        circuit_stab_meas_rep1.append("DEPOLARIZE1", Z_ancilla_indices+Z_ancilla_indicesM, (error_params['p_state_p'])) # Add the state preparation error
        circuit_stab_meas_rep1.append("TICK")
        #circuit_stab_meas_rep1.append("DEPOLARIZE1", data_indices+ data_indicesancilla + Z_ancilla_indices+Z_ancilla_indicesM, (error_params['p_deg']))
        for time_step in range(len(scheduling_Z_deform)):
            idling_qubits = data_indices+ data_indicesancilla + Z_ancilla_indices+Z_ancilla_indicesM
            idling_data_indices = list(copy.deepcopy(data_indices+data_indicesancilla))
            #circuit_stab_meas_rep1.append("DEPOLARIZE1", idling_qubits, (error_params['p_idling_gate']))
            for j in scheduling_Z_deform[time_step]:
#                 
                Z_ancilla_index = (Z_ancilla_indices+Z_ancilla_indicesM)[j]
                data_index = scheduling_Z_deform[time_step][j]
                
                circuit_stab_meas_rep1.append("CX", [data_index, Z_ancilla_index])
                if data_index in idling_data_indices:
                    idling_data_indices.pop(idling_data_indices.index(data_index))
            circuit_stab_meas_rep1.append("DEPOLARIZE1", idling_data_indices, (error_params['p_deg'])) # idling errors for qubits that are not being checked
            circuit_stab_meas_rep1.append("TICK")
        circuit_stab_meas_rep1.append("DEPOLARIZE1", data_indices+data_indicesancilla, (error_params['p_deg']))
        # Measure the ancillas
        circuit_stab_meas_rep1.append("H", X_ancilla_indices+ X_ancilla_indicesM)
        circuit_stab_meas_rep1.append("TICK")
        circuit_stab_meas_rep1.append("DEPOLARIZE1", X_ancilla_indices+ X_ancilla_indicesM+Z_ancilla_indices+Z_ancilla_indicesM , (error_params['p_m']))

        circuit_stab_meas_rep1.append("TICK")
        #circuit_stab_meas_rep1.append("DEPOLARIZE1",  X_ancilla_indices+ X_ancilla_indicesM, (3/2*error_params['p_m'])) # Add the measurement error
        #circuit_final_mea.append("DEPOLARIZE1", data_indices, (error_params['p_i'])) # Add the idling errors on the data qubits during the measurement of X ancillas
        circuit_stab_meas_rep1.append("MR", Z_ancilla_indices+Z_ancilla_indicesM )
        circuit_stab_meas_rep1.append("MR", X_ancilla_indices+X_ancilla_indicesM)
        #circuit_stab_meas_rep1.append("RX", X_ancilla_indices+X_ancilla_indicesM)
        circuit_stab_meas_rep1.append("TICK")
        #circuit_stab_meas_rep1.append("SHIFT_COORDS", [], (1))
        for i in range(len(X_ancilla_indices)):
            circuit_stab_meas_rep1.append("DETECTOR", [stim.target_rec(- len(X_ancilla_indices)-len(X_ancilla_indicesM) + i),
                                                       stim.target_rec(- len(X_ancilla_indices)-len(Z_ancilla_indices)-len(Z_ancilla_indicesM)-len(X_ancilla_indices)-len(X_ancilla_indicesM) + i)], (0))

        logical_X_qubit_indicesm1 = list(np.where(JX1JX1[0,:] == 1)[0])
        circuit_stab_meas_rep1.append("OBSERVABLE_INCLUDE", 
                               [stim.target_rec(-len(X_ancilla_indicesM) + data_index1)for data_index1 in logical_X_qubit_indicesm1],
                               (0))
        logical_X_qubit_indicesm2 = list(np.where(JX2JX2[0,:] == 1)[0])
        circuit_stab_meas_rep1.append("OBSERVABLE_INCLUDE", 
                               [stim.target_rec(-len(X_ancilla_indicesM) + data_index1)for data_index1 in logical_X_qubit_indicesm2],
                               (1))    
        circuit_stab_meas_rep1.append("TICK")
        # rep with difference detectors
        circuit_stab_meas_rep2 = stim.Circuit()
        #circuit_stab_meas_rep2.append("DEPOLARIZE1", data_indices+data_indicesancilla , (error_params['p_m']))
        # measurement the X ancillas
        # # Initialize the X ancillas to the + state
#         circuit_stab_meas_rep2.append("DEPOLARIZE1", data_indices, (pz)) # for debug
        #circuit_stab_meas_rep2.append("DEPOLARIZE1", data_indices + data_indicesancilla, (error_params['p_deg']))
        circuit_stab_meas_rep2.append("DEPOLARIZE1", X_ancilla_indices+X_ancilla_indicesM, (error_params['p_state_p']))
        circuit_stab_meas_rep2.append("TICK")# Add the state preparation error
        circuit_stab_meas_rep2.append("H", X_ancilla_indices+X_ancilla_indicesM)
        circuit_stab_meas_rep2.append("DEPOLARIZE1", data_indices + data_indicesancilla, (error_params['p_deg'])) 
        # circuit_final_mea.append("DEPOLARIZE1", data_indices, (error_params['p_i'])) # Add the idling errors on the data qubits during the preparation for X ancillas
        circuit_stab_meas_rep2.append("TICK")
        #circuit_stab_meas_rep2.append("DEPOLARIZE1", data_indices + data_indicesancilla + X_ancilla_indices+X_ancilla_indicesM, (error_params['p_idling_gate']))
        # Apply CX gates for the X stabilizers
        for time_step in range(len(scheduling_X_deform)):
            # add idling errors for all the qubits during the ancilla shuffling
            idling_qubits = data_indices + data_indicesancilla + X_ancilla_indices+X_ancilla_indicesM
            idling_data_indices = list(copy.deepcopy(data_indices+data_indicesancilla))
            #circuit_stab_meas_rep2.append("DEPOLARIZE1", idling_qubits, (error_params['p_idling_gate'])) 
            for j in scheduling_X_deform[time_step]:
#                 supported_data_qubits = list(np.where(hx[X_ancilla_index - n - n_Z_ancilla,:] == 1)[0])
                X_ancilla_index = (X_ancilla_indices+X_ancilla_indicesM)[j]
                data_index = scheduling_X_deform[time_step][j]
                # data_index = supported_data_qubits[i]
                circuit_stab_meas_rep2.append("CX", [X_ancilla_index, data_index])
                if data_index in idling_data_indices:
                    idling_data_indices.pop(idling_data_indices.index(data_index))
            circuit_stab_meas_rep2.append("DEPOLARIZE1", idling_data_indices, (error_params['p_deg'])) # idling errors for qubits that are not being checked
            circuit_stab_meas_rep2.append("TICK")
        circuit_stab_meas_rep2.append("DEPOLARIZE1", Z_ancilla_indices+Z_ancilla_indicesM, (error_params['p_state_p'])) # Add the state preparation error
        circuit_stab_meas_rep2.append("TICK")
        circuit_stab_meas_rep2.append("DEPOLARIZE1", data_indices+ data_indicesancilla , (error_params['p_deg']))
        for time_step in range(len(scheduling_Z_deform)):
            idling_qubits = data_indices+ data_indicesancilla + Z_ancilla_indices+Z_ancilla_indicesM
            idling_data_indices = list(copy.deepcopy(data_indices+data_indicesancilla))
            #circuit_stab_meas_rep2.append("DEPOLARIZE1", idling_qubits, (error_params['p_idling_gate']))
            for j in scheduling_Z_deform[time_step]:
#                 supported_data_qubits = list(np.where(hz[Z_ancilla_index - n,:] == 1)[0])
                Z_ancilla_index = (Z_ancilla_indices+Z_ancilla_indicesM)[j]
                data_index = scheduling_Z_deform[time_step][j]
                # data_index = supported_data_qubits[i]
                circuit_stab_meas_rep2.append("CX", [data_index, Z_ancilla_index])
                if data_index in idling_data_indices:
                    idling_data_indices.pop(idling_data_indices.index(data_index))
            circuit_stab_meas_rep2.append("DEPOLARIZE1", idling_data_indices, (error_params['p_deg'])) # idling errors for qubits that are not being checked
            circuit_stab_meas_rep2.append("TICK")
        circuit_stab_meas_rep2.append("DEPOLARIZE1", data_indices+ data_indicesancilla , (error_params['p_deg']))
        # Measure the ancillas
        circuit_stab_meas_rep2.append("TICK")
        circuit_stab_meas_rep2.append("H", X_ancilla_indices+ X_ancilla_indicesM)
        circuit_stab_meas_rep2.append("DEPOLARIZE1", Z_ancilla_indices+Z_ancilla_indicesM+X_ancilla_indices+ X_ancilla_indicesM, (error_params['p_m'])) # Add the measurement error
        # circuit_final_mea.append("DEPOLARIZE1", data_indices, (error_params['p_i'])) # Add the idling errors on the data qubits during the measurement of X ancillas
        circuit_stab_meas_rep2.append("TICK")
        circuit_stab_meas_rep2.append("MR", Z_ancilla_indices+Z_ancilla_indicesM )
        circuit_stab_meas_rep2.append("MR", X_ancilla_indices+X_ancilla_indicesM)
        #circuit_stab_meas_rep1.append("RX", X_ancilla_indices+X_ancilla_indicesM)
        circuit_stab_meas_rep2.append("SHIFT_COORDS", [], (1))   
        for i in range(len(X_ancilla_indices)+len(X_ancilla_indicesM)):
            circuit_stab_meas_rep2.append("DETECTOR", [stim.target_rec(- len(X_ancilla_indices)-len(X_ancilla_indicesM) + i), 
                                            stim.target_rec(- len(X_ancilla_indices)-len(X_ancilla_indicesM) + i - len(Z_ancilla_indices) -len(Z_ancilla_indicesM)- len(X_ancilla_indices)-len(X_ancilla_indicesM))], (0))
        circuit_stab_meas_rep2.append("TICK")
###########################################################################################d_T parity check measurement of both the memory code and surface code
        circuit_stab_meas_Hx3 = stim.Circuit()
        circuit_stab_meas_Hx3.append("DEPOLARIZE1",  data_indicesancilla, (error_params['p_m']))
        circuit_stab_meas_Hx3.append("MR", data_indicesancilla)   
        circuit_stab_meas_Hx3.append("TICK")
    # measurement the X ancillas
    # # Initialize the X ancillas to the + state
        circuit_stab_meas_Hx3.append("H", X_ancilla_indices)
        circuit_stab_meas_Hx3.append("DEPOLARIZE1", X_ancilla_indices, (error_params['p_state_p'])) 


        

    #circuit_stab_meas_Hx3.append("DEPOLARIZE1", X_ancilla_indices, (error_params['p_H'])) # Add the state preparation error
    #ircuit_stab_meas_rep2.append("DEPOLARIZE1", data_indices, (error_params['p_i'])) # Add the idling errors on the data qubits during the preparation for X ancillas
        circuit_stab_meas_Hx3.append("TICK")
        circuit_stab_meas_Hx3.append("DEPOLARIZE1", data_indices , (error_params['p_idling_gate']))
    # Apply CX gates for the X stabilizers
        for time_step in range(len(scheduling_X)):
                     idling_qubits = data_indices + X_ancilla_indices
        #circuit_stab_meas_Hx3.append("DEPOLARIZE1", idling_qubits, (error_params['p_idling_gate']))
                     idling_data_indices = list(copy.deepcopy(data_indices))
                     for j in scheduling_X[time_step]:
    #                 supported_data_qubits = list(np.where(hx[X_ancilla_index - n - n_Z_ancilla,:] == 1)[0])
                           X_ancilla_index = X_ancilla_indices[j]
                           data_index = scheduling_X[time_step][j]
            # data_index = supported_data_qubits[i]
                           circuit_stab_meas_Hx3.append("CX", [X_ancilla_index, data_index])
                           if data_index in idling_data_indices:
                                  idling_data_indices.pop(idling_data_indices.index(data_index))
                     circuit_stab_meas_Hx3.append("DEPOLARIZE1", idling_data_indices, (error_params['p_idling_gate'])) # idling errors for qubits that are not being checked
                     circuit_stab_meas_Hx3.append("TICK")

    # meausure the Z ancillas
    ## initialize the Z ancillas
        circuit_stab_meas_Hx3.append("DEPOLARIZE1", Z_ancilla_indices, (error_params['p_state_p'])) # Add the state preparation error
        circuit_stab_meas_Hx3.append("DEPOLARIZE1", data_indices, (error_params['p_idling_gate'])) # Add the idling errors on the data qubits during the preparation for Z ancillas
        circuit_stab_meas_Hx3.append("TICK")
    # Appy CX gates for the Z stabilziers
    #circuit_stab_meas_Hx3.append("DEPOLARIZE1",  data_indices + Z_ancilla_indices, (error_params['p_idling_gate']))
        for time_step in range(len(scheduling_Z)):
                  idling_qubits = data_indices + Z_ancilla_indices
        #circuit_stab_meas_Hx3.append("DEPOLARIZE1", idling_qubits, (error_params['p_idling_gate']))
                  idling_data_indices = list(copy.deepcopy(data_indices))
                  for j in scheduling_Z[time_step]:
    #                 supported_data_qubits = list(np.where(hz[Z_ancilla_index - n,:] == 1)[0])
                       Z_ancilla_index = Z_ancilla_indices[j]
                       data_index = scheduling_Z[time_step][j]
            # data_index = supported_data_qubits[i]
                       circuit_stab_meas_Hx3.append("CX", [data_index, Z_ancilla_index])
                       if data_index in idling_data_indices:
                                 idling_data_indices.pop(idling_data_indices.index(data_index))
                  circuit_stab_meas_Hx3.append("DEPOLARIZE1", idling_data_indices, (error_params['p_idling_gate'])) # idling errors for qubits that are not being checked
                  circuit_stab_meas_Hx3.append("TICK")
         
        circuit_stab_meas_Hx3.append("DEPOLARIZE1", data_indices , (error_params['p_idling_gate']))
    # Measure the ancillas
        circuit_stab_meas_Hx3.append("H", X_ancilla_indices)
    #circuit_stab_meas_Hx3.append("DEPOLARIZE1",  X_ancilla_indices, (error_params['p_H']))
        circuit_stab_meas_Hx3.append("DEPOLARIZE1",  Z_ancilla_indices +X_ancilla_indices, (error_params['p_m']))# Add the measurement error
        circuit_stab_meas_Hx3.append("TICK")
    # circuit_stab_meas_Hx3.append("DEPOLARIZE1", data_indices, (error_params['p_i'])) # Add the idling errors on the data qubits during the measurement of X ancillas
        circuit_stab_meas_Hx3.append("MR", Z_ancilla_indices + X_ancilla_indices)
    #circuit_stab_meas_Hx3.append("SHIFT_COORDS", [], (1))
        for i in range(len(X_ancilla_indices)):
            circuit_stab_meas_Hx3.append("DETECTOR", [stim.target_rec(- len(X_ancilla_indices) + i), 
                                        stim.target_rec(  i- len(X_ancilla_indices) - len(Z_ancilla_indices)-len(data_indicesancilla) - len(X_ancilla_indices)-len(X_ancilla_indicesM) )], (0))
        circuit_stab_meas_Hx3.append("TICK")


############################################################################### a transversal readout
        circuit_final_meas_f = stim.Circuit()
#         circuit_final_meas_f.append("DEPOLARIZE1", data_indices, (1*pz)) # for debug
        circuit_final_meas_f.append("DEPOLARIZE1",  data_indices, (error_params['p_m'])) # Add the measurement error
        
        circuit_final_meas_f.append("MX", data_indices)
        circuit_final_meas_f.append("TICK")
        circuit_final_meas_f.append("SHIFT_COORDS", [], (1))
        # Obtain the syndroms
        for i in range(len(X_ancilla_indices)):
            supported_data_indices = list(np.where(hx[i,:] == 1)[0])
            rec_indices = []
            for data_index in supported_data_indices:
                rec_indices.append(- len(data_indices) + data_index)
            rec_indices.append(- len(X_ancilla_indices)+ i - len(data_indices))
            circuit_final_meas_f.append("Detector", [stim.target_rec(rec_index) for rec_index in rec_indices], (0))
        # Obtain the logical measurements result
        for i in range(len(J_X)):
            logical_X_qubit_indices = list(np.where(J_X[i,:] == 1)[0])
            
            circuit_final_meas_f.append("OBSERVABLE_INCLUDE", 
                               [stim.target_rec(- len(data_indices) + data_index) for data_index in logical_X_qubit_indices],
                               (i+2)) 
        noisy_circuit =circuit_stab_meas_repHxHx+ circuit_stab_meas_rep1+(num_rep-1)*circuit_stab_meas_rep2 +circuit_stab_meas_Hx3+(num_rep-1)*circuit_stab_meas_Hx2
        circuit_stea =  circuit_init +circuit_stab_meas
        noisy_circuit1 = AddCXError(noisy_circuit, 'DEPOLARIZE2(%f)' % error_params["p_CX"])
        noisy_circuitall = circuit_stea+noisy_circuit1 + circuit_final_meas_f

    
        return noisy_circuitall   
def AddCXError(circuit:stim.Circuit, error_instruction:str) -> stim.Circuit:
    circuit_str = str(circuit)    
    ## Find all the unique cx instructions
    cx_instructions = re.findall('CX.*\n', circuit_str)
    unique_cx_instructions = list(set(cx_instructions))
    unique_cx_instructions
    
    ## Add gate errors after each cx instruction
    for cx_ins in unique_cx_instructions:
        circuit_str = circuit_str.replace(cx_ins, cx_ins + 
                                      cx_ins.replace('CX', error_instruction))
    
    modified_circuit = stim.Circuit(circuit_str)
    

    return modified_circuit


In [None]:
import multiprocessing as mp
import numpy as np
import time
import bposd
from ldpc.codes import ring_code
from bposd.hgp import hgp
from bposd import bposd_decoder
import multiprocessing as mp
from bposd.css import css_code
import sys
sys.path.append('./src/')
from scipy import sparse
from ldpc.codes import rep_code
from pymatching import Matching
import time
import secrets
def GenFaultHyperGraph1(detector_error_model:str, num_logicals:int):
    # if the error model string starts with "repeat" block, remove it
    
    items = detector_error_model.split('\n')
    errors = [item for item in items if 'error' in item]
    detectors = [item for item in items if 'detector' in item and 'shift' not in item]
    shifts_indices = np.where(np.array(items) == 'shift_detectors(1) 0')[0] - len(errors) + 1
    num_detectors_each_cycle = []
    for i in range(len(shifts_indices)):
        if i == 0:
            None
        else:
            num_detectors_each_cycle.append(shifts_indices[i] - shifts_indices[i - 1] - 1)
    num_detectors_each_cycle.append(len(detectors) - np.sum(num_detectors_each_cycle))

    layered_dectectors = []
    layered_dectectors.append(detectors[:num_detectors_each_cycle[0]]) # first layer
    layered_dectectors.append(detectors[-num_detectors_each_cycle[-1]:]) # last layer
    
    layered_dectectors = [[dectector_str.split()[1] for dectector_str in dectectors_each_layer] for dectectors_each_layer in layered_dectectors]
    layered_errors = [[] for i in range(2)]
    for error in errors:
        error_list = error.split()
        #error_p = float(re.findall("\d+\.\d+", error_list[0])[0])
        non_e_pattern = "\d+\.\d+"
        e_pattern = r'([\d]+\.[\d]+e-[\d]+)'
        e_matches = re.findall(e_pattern, error_list[0])
        if e_matches:
            error_p = float(e_matches[0])
        else:
            non_e_matches = re.findall("\d+\.\d+", error_list[0])
            error_p = float(non_e_matches[0])

        detectors = error_list[1:]
        
        flipped_logicals = [item for item in error_list if 'L' in item]
        occupied_layers = [j for j in range(len(layered_dectectors)) if set(detectors).intersection(set(layered_dectectors[j]))]
        
        if occupied_layers != []:
            layer = occupied_layers[0]
            detectors = list(set(detectors).intersection(layered_dectectors[layer]))
            error_dict = {'layer':layer, 'p':error_p, 'detectors':detectors, 'logicals':flipped_logicals}
            layered_errors[layer].append(error_dict)
    
    # obtain the check matrix for each layer
    H_list = []
    for error_each_layer, detector_each_layer in zip(layered_errors, layered_dectectors):
        H_each_layer = np.zeros([len(detector_each_layer), len(error_each_layer)])
        for i in range(len(detector_each_layer)):
            for j in range(len(error_each_layer)):
                if detector_each_layer[i] in error_each_layer[j]['detectors']:
                    H_each_layer[i,j] = 1
        H_list.append(H_each_layer)
    
    # obtain the channel probability for each layer
    channel_prob_list = [[error['p'] for error in error_each_layer] for error_each_layer in layered_errors]

    # obtain the matrix for the flipped logicals for each layer
    L_list = []
    logicals = ['L'+str(i) for i in range(num_logicals)]
    for error_each_layer in layered_errors:
        L_each_layer = np.zeros([len(logicals), len(error_each_layer)])
        for i in range(len(logicals)):
            for j in range(len(error_each_layer)):
                if logicals[i] in error_each_layer[j]['logicals']:
                    L_each_layer[i,j] = 1
        L_list.append(L_each_layer)
    
#     return layered_errors, layered_dectectors 
    return H_list, L_list, channel_prob_list
def hypergraph_product_code1(H1, H2):
    # 获取矩阵的形状
    m1, n1 = H1.shape
    m2, n2 = H2.shape

    # 构造单位矩阵
    I1 = np.eye(n1, dtype=int)
    I2 = np.eye(n2, dtype=int)
    I3 = np.eye(m1, dtype=int)
    I4 = np.eye(m2, dtype=int)
    vectors = np.zeros(n2,dtype=int)
    vectors[0]=1
    # 构造 H_X
    HZ = np.hstack((np.kron(I2,H1)%2, np.kron(H2.T,I3)%2))
    LX = np.hstack((np.kron(vectors,np.ones(n1,dtype=int))%2, np.zeros(m2*m1,dtype=int)%2))
    LX = np.atleast_2d(LX)    
    # 构造 H_Z
    HX = np.hstack((np.kron(H2,I1)%2, np.kron(I4,H1.T)%2))
    
    return HX, HZ,LX
# LFR function
def LFR(logical_vals):
    failures = [1 * logical.any() for logical in logical_vals]
    LFP = np.sum(failures) / 100
    return LFP
Lall = [4]

def generate_channel_matrix(m, length, channel_probs, seed):
    """
    生成一个 m行×length列 的0/1矩阵，其中第j列的每个元素为1的概率是 channel_probs[j]
    
    参数:
        m: 行数
        length: 列数（需与channel_probs长度一致）
        channel_probs: 概率向量，每个元素为0~1之间的浮点数
        seed: 随机数种子（默认为None，表示不设置种子）
    
    返回:
        binary_matrix: m×length的二值矩阵（0或1）
    """
    assert len(channel_probs) == length, "channel_probs的长度必须等于length"
    
    # 设置随机数种子
    
    np.random.seed(seed)
    
    # 生成均匀分布随机矩阵，并通过广播与channel_probs逐列比较
    rand_matrix = np.random.rand(m, length)
    binary_matrix = (rand_matrix < channel_probs).astype(int)
    return binary_matrix
circuit_error_params = {"p_i": 0, "p_state_p": 1, "p_m": 1, "p_CX": 1, "p_idling_gate": 1,"p_H": 0,"p_deg": 1}
eval_logical_type = 'Z'
# circuit_type = 'colorproduct'
circuit_type = 'coloration'
def process_sample_batch(num_qubits,eval_p,h,L,num_rep,channel_probs, circuit_error_params, num_samples, start_index, batch_size,circuit_type,D=1,seed=None):
    """
    处理一批采样任务，每个批次执行 `batch_size` 次采样。
    """

    #matching_decoding= Matching_Decoding()
    #matching_decoding.from_detector_error_model(dem, num_logicals)
    #logical_correction = matching_decoding.decode_batch1(detector_vals)
    #corrected_logical_vals = (logical_vals + logical_correction) % 2
    seed= secrets.randbelow(2**32)
    decoder_params = {'max_iter':int(1000), 'bp_method': 'min_sum', 
                      'ms_scaling_factor': 0.9, 'osd_method': "osd_cs", 'osd_order': 5}
    max_iter = decoder_params['max_iter']
    bp_method =decoder_params['bp_method']
    ms_scaling_factor = decoder_params['ms_scaling_factor']
    osd_method = decoder_params['osd_method']
    osd_order = decoder_params['osd_order']
    bposd_decoding =BpOsdDecoder(
                    h,
                    channel_probs=channel_probs,
                    max_iter=max_iter,
                    bp_method=bp_method,
                    ms_scaling_factor=ms_scaling_factor,
                    osd_method=osd_method,
                    osd_order=osd_order, )  


 

    sample = generate_channel_matrix(batch_size,h.shape[1], channel_probs,seed)
    case3_count =0
    case2_count =0 
    case1_count =0
    case4_count =0
    case5_count =0
    case6_count = 0  
    case7_count = 0  
    case8_count = 0   
    case9_count = 0   
    case10_count = 0  
    case11_count = 0  
    case12_count = 0  
    case13_count = 0

    for i in range(sample.shape[0]):

       vector = sample[i,:]
       vector = np.array(vector)
       syn = h@vector.T%2


       
       error_estimate = bposd_decoding.decode(syn.T)
       corrected_vector = (vector + error_estimate) % 2
       predicted_observables = (corrected_vector @ L.T) % 2
       obs = predicted_observables
       if (obs[-3]==0) and (obs[5]==1):
            case1_count += 1
       if (obs[-4]==0) and (obs[4]==1):
            case2_count += 1
       if((obs[-4]==0) and (obs[4]==1)) and ((obs[-3]==0) and (obs[5]==1)):
       #ocZ_j - ocZ_j \cap ocM_j and noZ_j
            case4_count += 1
       if (obs[-4]+obs[4])%2== 1 :#ocZ_j - ocZ_j \cap ocM_j and noZ_j
            case5_count += 1
        #############j = 2
       if (obs[-3]+obs[5])%2== 1:#(Z_j)-(Z_j)\cap(oc_j)
            case6_count += 1        
       ####################
       if ((obs[-4]+obs[4])%2== 1) and ((obs[-3]+obs[5])%2 == 1):
            case9_count += 1
    # 计算三种情况的比率（分母根据实际样本量调整）
    total = sample.shape[0]
    return [
        case1_count/total,
        case2_count/total,
        case3_count/total,
        case4_count/total,
        case5_count/total,
        case6_count/total,  
        case7_count/total,   
        case8_count/total,  
        case9_count/total,   
        case10_count/total,  
        case11_count/total,
        case12_count/total,
        case13_count/total
    ]

    

def parallel_sampling(num_qubit,eval_p, H,L,num_rep,channel_prob, circuit_error_params, circuit_type, D, num_samples, batch_size, num_processes=None):
    """
    对单个 `eval_p` 进行 8000 次采样，并行化每次采样任务。
    """
    num_samples =100000
    batch_size =2000
    num_batches = num_samples // batch_size
    


    
    with mp.Pool(50) as pool:
        results = pool.starmap(process_sample_batch, 
                              [(num_qubit,eval_p, H,L,num_rep,channel_prob, circuit_error_params,num_samples , i * batch_size, batch_size,circuit_type, D) 
                               for i in range(num_batches)])
    
    all_case = [[] for _ in range(13)]
    for r in results:
        for i in range(13):
            all_case[i].append(r[i])
    
    # 合并结果
    LFR_cases = [np.mean(case) for case in all_case]
    return LFR_cases
eval_ps = np.array([0.0017])
#eval_ps = np.array([0.009])

#### different code
##############################################

ell, m = 15, 3
code = [9, 1, 2, 0, 2, 7]  # 需验证参数
H_X, H_Z = construct_ldpc(ell, m, code)

H_X = H_X.astype(np.uint8) 
H_Z = H_Z.astype(np.uint8)

H_X1= gf2_rref(H_X)
H_X1 = remove_zero_rows(H_X1)

H_Z1= gf2_rref(H_Z)
H_Z1 = remove_zero_rows(H_Z1)
LX,LZ,I_s = CSS_code_Logical(H_X1,H_Z1)
LX = LX.astype(np.uint8)

L =10
J_X_1 = LX[-2:,]
##############################

h = rep_code(2)
h1 = h.toarray()
D=10
num_rep = 10
surface_codehx,surface_codehz,surface_codelx =hypergraph_product_code1(H1=h1,H2=h1)   
hx=surface_codehx
hz=surface_codehz
lx=surface_codelx
hx=hx.astype(np.uint8)
hz=hz.astype(np.uint8)

num_logicals = LX.shape[0]+4
num_qubit = LX.shape[1]+2*lx.shape[1]
eval_pm = 0.001 
qec_circuit = QECCircuit_OneStage(H_Z,H_X,J_X_1,LX,I_s,hz,hx,lx,num_rep, circuit_error_params, eval_pm,circuit_type,D)

dem = qec_circuit.detector_error_model(flatten_loops=True)


H, LX, channel_prob = GenDecodingGraphs(str(dem), num_logicals=num_logicals)
H_List,L_List,channel_prob_List = GenFaultHyperGraph1(str(dem), num_logicals=num_logicals)
H = H.astype(np.uint8)
print(H.shape)
H_List = np.array(H_List[0])
L_List = np.array(L_List[0])
LX = LX.astype(np.uint8)
m = -L_List.shape[1]+LX.shape[1]
L_List = np.hstack((L_List,np.zeros((L_List.shape[0],m),dtype=int)))
L_ocZ = (L_List [-4:,:]).astype(np.uint8)

LX = np.vstack((L_ocZ,LX))
channel_prob = np.array(channel_prob) 
LFPPP = [[] for _ in range(13)]

start_time = time.time()
for eval_p in eval_ps:
   
    LFR_all_results = [[] for _ in range(13)]
    for L in Lall:

      channel_probs = (eval_p / eval_pm) * channel_prob
      channel_probs = list(channel_probs)
      LFR_cases= parallel_sampling(num_qubit,eval_p,H,LX,num_rep,channel_probs,circuit_error_params, circuit_type, D, num_samples=100000, batch_size=2000, num_processes=mp.cpu_count())    
      for i in range(13):
            LFR_all_results[i].append(LFR_cases[i])
    
    # 存储到全局结果
    for i in range(13):
        LFPPP[i].append(LFR_all_results[i])
elapsed_time = time.time() - start_time
print(elapsed_time)
for i in range(13):
    print(f"Case {i+1} Results:")
    print(LFPPP[i])
LFPPP = [[] for _ in range(13)]
eval_ps = np.array([0.001])
for eval_p in eval_ps:
   
    LFR_all_results = [[] for _ in range(13)]
    for L in Lall:

      channel_probs = (eval_p / eval_pm) * channel_prob
      channel_probs = list(channel_probs)
      LFR_cases= parallel_sampling(num_qubit,eval_p,H,LX,num_rep,channel_probs,circuit_error_params, circuit_type, D, num_samples=100000, batch_size=2000, num_processes=mp.cpu_count())    
      for i in range(13):
            LFR_all_results[i].append(LFR_cases[i])
    
    for i in range(13):
        LFPPP[i].append(LFR_all_results[i])

for i in range(13):
    print(f"Case {i+1} Results:")
    print(LFPPP[i])
LFPPP = [[] for _ in range(13)]
eval_ps = np.array([0.001,0.0012,0.0015,0.0017,0.002,0.0023,0.0025,0.003])
for eval_p in eval_ps:
   
    LFR_all_results = [[] for _ in range(13)]
    for L in Lall:

      channel_probs = (eval_p / eval_pm) * channel_prob
      channel_probs = list(channel_probs)
      LFR_cases= parallel_sampling(num_qubit,eval_p,H,LX,num_rep,channel_probs,circuit_error_params, circuit_type, D, num_samples=100000, batch_size=2000, num_processes=mp.cpu_count())    
      for i in range(13):
            LFR_all_results[i].append(LFR_cases[i])
    
    for i in range(13):
        LFPPP[i].append(LFR_all_results[i])

for i in range(13):
    print(f"Case {i+1} Results:")
    print(LFPPP[i])

  non_e_pattern = "\d+\.\d+"
  non_e_matches = re.findall("\d+\.\d+", error_list[0])


我好帥
yanzheng
[]
461
49
171
(3229, 26714)
10893.674940347672
Case 1 Results:
[[0.24594000000000002]]
Case 2 Results:
[[0.24929]]
Case 3 Results:
[[0.0]]
Case 4 Results:
[[0.061180000000000005]]
Case 5 Results:
[[0.25366999999999995]]
Case 6 Results:
[[0.25031]]
Case 7 Results:
[[0.0]]
Case 8 Results:
[[0.0]]
Case 9 Results:
[[0.06369000000000001]]
Case 10 Results:
[[0.0]]
Case 11 Results:
[[0.0]]
Case 12 Results:
[[0.0]]
Case 13 Results:
[[0.0]]


In [None]:
different code
#################################################
x = symbols('x')
n =24
k = 6

a_x = 1 + x**2+x**8 + x**15 
b_x = 1 + x**2+x**12+ x**17
A = polynomial_to_circulant(a_x, n)
B = polynomial_to_circulant(b_x, n)

H_X= np.hstack((A,B))

H_X1= gf2_rref(H_X)
H_X1 = remove_zero_rows(H_X1)
H_Z= np.hstack((B.T,A.T))
H_Z1= gf2_rref(H_Z)
H_Z1 = remove_zero_rows(H_Z1)
LX,LZ,I_s = CSS_code_Logical(H_X1,H_Z1)
J_X_1 = LX[:2,]
#####################################
#################
H_Z = block_diagonal(hz,hz)
H_X = block_diagonal(hx,hx)
H_X1= gf2_rref(H_X)
H_X1 = remove_zero_rows(H_X1)
H_Z1= gf2_rref(H_Z)
H_Z1 = remove_zero_rows(H_Z1)
LX,LZ,I_s = CSS_code_Logical(H_X1,H_Z1)

J_X_1 = LX[:2,]

################
#################################################
x = symbols('x')
n =24
k = 6

a_x = 1 + x**2+x**8 + x**15 
b_x = 1 + x**2+x**12+ x**17
A = polynomial_to_circulant(a_x, n)
B = polynomial_to_circulant(b_x, n)

H_X= np.hstack((A,B))

H_X1= gf2_rref(H_X)
H_X1 = remove_zero_rows(H_X1)
H_Z= np.hstack((B.T,A.T))
H_Z1= gf2_rref(H_Z)
H_Z1 = remove_zero_rows(H_Z1)
LX,LZ,I_s = CSS_code_Logical(H_X1,H_Z1)
J_X_1 = LX[:2,]
#####################################