# Bivariate Bicycle Code

In this notebook, I'll implement the simulation tool of the example BB codes in "High-threshold and low-overhead fault-tolerant quantum memory" by Bravyi, Sergey, et al. 
These are the configurations:

In [1]:
import mwpf as mwpf
mwpf.Visualizer.embed()

In [2]:
from dataclasses import dataclass
import numpy as np
import stim
from galois.typing import ArrayLike
from itertools import chain
from galois import GF2

x = 'x'
y = 'y'
BBExpr = list[tuple[str, int]]
gf2_zeros = lambda shape: GF2(np.zeros(shape, dtype=np.uint8))
gf2_eye = lambda N: GF2(np.eye(N, dtype=np.uint8))

def x_matrix(l: int, m: int, exp: int):
    S = gf2_zeros(shape=(l, l))
    for i in range(l):
        S[i, (i+exp) % l] = 1
    return np.kron(S, gf2_eye(m))

def y_matrix(l: int, m: int, exp: int):
    S = gf2_zeros(shape=(m, m))
    for i in range(m):
        S[i, (i+exp) % m] = 1
    return np.kron(gf2_eye(l), S)


@dataclass
class BBConfig:
    nkd: tuple[int, int, int]
    l: int
    m: int
    A: BBExpr
    B: BBExpr
    unsure_d: bool = False

    def __post_init__(self):
        # sanity check
        n, k, d = self.nkd
        assert n == 2 * self.l * self.m
        for bb_expr in [self.A, self.B]:
            for var, exp in bb_expr:
                assert var in [x, y]
                assert exp >= 0
        # construct the matrices
        lm = self.l * self.m
        def matrix_of(bb_expr: BBExpr) -> ArrayLike:
            matrix = gf2_zeros(shape=(lm, lm))
            for var, exp in bb_expr:
                matrix += (x_matrix if var == x else y_matrix)(self.l, self.m, exp)
            return matrix
        self.matrix_A = matrix_of(self.A)
        self.matrix_B = matrix_of(self.B)
        assert (self.matrix_A @ self.matrix_B == self.matrix_B @ self.matrix_A).all()
        self.H_X = np.concatenate((self.matrix_A, self.matrix_B), axis=1)
        self.H_Z = np.concatenate((self.matrix_B.T, self.matrix_A.T), axis=1)
        assert (self.H_X @ self.H_Z.T == 0).all()
        # assert k value
        assert k_of(self.matrix_A, self.matrix_B) == k
        assert 2 * lm - np.linalg.matrix_rank(self.H_X) - np.linalg.matrix_rank(self.H_Z) == k
        # construct logical observables to check logical errors
        self.logical_operators = find_logical_operators(self.H_X, self.H_Z)
        assert len(self.logical_operators) == k
        self.L_X, self.L_Z = construct_logical_checks(self.logical_operators)
        assert (self.H_X @ self.L_Z.T == 0).all()
        assert (self.H_Z @ self.L_X.T == 0).all()

    @property
    def n(self) -> int:
        return 2 * self.l * self.m

    def __str__(self) -> str:
        n, k, d = self.nkd
        str_of = lambda bb_expr: "+".join(["1" if exp==0 else (var if exp==1 else f"{var}^{{{exp}}}") for (var, exp) in bb_expr])
        d_str = ('\\le' if self.unsure_d else '') + str(d)
        return f"[[{n}, {k}, {d_str}]]: l={self.l}, m={self.m}, A={str_of(self.A)}, B={str_of(self.B)}"

def find_logical_operators(H_X: ArrayLike, H_Z: ArrayLike) -> list[stim.PauliString]:
    stabilizers = []
    for t, matrix in [("X", H_X), ("Z", H_Z)]:
        for line in matrix:
            stabilizers.append(stim.PauliString([t if e == 1 else "I" for e in line]))
    # https://quantumcomputing.stackexchange.com/questions/37812/how-to-find-a-set-of-independent-logical-operators-for-a-stabilizer-code-with-st
    completed_tableau = stim.Tableau.from_stabilizers(
        stabilizers,
        allow_redundant=True,
        allow_underconstrained=True,
    )
    observables = []
    for k in range(len(completed_tableau))[::-1]:
        z = completed_tableau.z_output(k)
        if z in stabilizers:
            break
        x = completed_tableau.x_output(k)
        observables.append((x, z))
    return observables

def construct_logical_checks(logical_operators: list[stim.PauliString]) -> tuple[ArrayLike, ArrayLike]:
    height = len(logical_operators)
    width = len(logical_operators[0][0])
    L_X = gf2_zeros(shape=(height, width))
    L_Z = gf2_zeros(shape=(height, width))
    for idx, (l_x, l_z) in enumerate(logical_operators):
        for t, l, L in [("X", l_x, L_X), ("Z", l_z, L_Z)]:
            indices = l.pauli_indices(t)
            assert indices == l.pauli_indices(), "should not include other Pauli types"
            L[idx][indices] = 1
    return L_X, L_Z

def mwpf_matrix_of(matrix: ArrayLike) -> mwpf.TightMatrix:
    mwpf_matrix = mwpf.TightMatrix()
    for i in range(matrix.shape[1]):
        mwpf_matrix.add_tight_variable(i)
    for line_idx, line in enumerate(matrix):
        mwpf_matrix.add_constraint(line_idx, np.nonzero(line)[0], False)
    return mwpf_matrix

def k_of(matrix_A: ArrayLike, matrix_B: ArrayLike) -> int:
    gf2 = mwpf.TightMatrix()
    for i in range(matrix_A.shape[1]):
        gf2.add_tight_variable(i)
    for line_idx, line in enumerate(chain(matrix_A, matrix_B)):
        gf2.add_constraint(line_idx, np.nonzero(line)[0], False)
    echelon = mwpf.EchelonMatrix(gf2)
    print()
    return 2 * (matrix_A.shape[1] - echelon.rows)

configs = [
    BBConfig(nkd=(72, 12, 6), l=6, m=6, A = [(x, 3), (y, 1), (y, 2)], B = [(y, 3), (x, 1), (x, 2)]),
    BBConfig(nkd=(90, 8, 10), l=15, m=3, A = [(x, 9), (y, 1), (y, 2)], B = [(x, 0), (x, 2), (x, 7)]),
    BBConfig(nkd=(108, 8, 10), l=9, m=6, A = [(x, 3), (y, 1), (y, 2)], B = [(y, 3), (x, 1), (x, 2)]),
    BBConfig(nkd=(144, 12, 12), l=12, m=6, A = [(x, 3), (y, 1), (y, 2)], B = [(y, 3), (x, 1), (x, 2)]),
    BBConfig(nkd=(288, 12, 18), l=12, m=12, A = [(x, 3), (y, 2), (y, 7)], B = [(y, 3), (x, 1), (x, 2)]),
    # BBConfig(nkd=(360, 12, 24), unsure_d=True, l=30, m=6, A = [(x, 9), (y, 1), (y, 2)], B = [(y, 3), (x, 25), (x, 26)]),
    # BBConfig(nkd=(756, 16, 34), unsure_d=True, l=21, m=18, A = [(x, 3), (y, 10), (y, 17)], B = [(y, 5), (x, 3), (x, 19)]),
]

# print them in the notebook
from IPython.display import display, Math

for config in configs:
    display(Math(str(config)))

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

<IPython.core.display.Math object>

In [8]:
class MWPFDecoder:
    """ mwpf decoder for independent bit-flip and phase-flip noise model """
    def __init__(self, config: BBConfig, is_z: bool = False, cluster_node_limit: int | None = None, timeout: float | None = None):
        self.config = config
        H = config.H_Z if is_z else config.H_X
        vertex_num = H.shape[0]
        weighted_edges = []
        for edge_index in range(H.shape[1]):
            weighted_edges.append(mwpf.HyperEdge(np.nonzero(H[:,edge_index])[0], 1))
        self.initializer = mwpf.SolverInitializer(vertex_num, weighted_edges)
        self.positions = [mwpf.VisualizePosition(i, j, 0) for i in range(config.l) for j in range(config.m)]
        self.visualizer = mwpf.Visualizer(positions=self.positions)
        config = {}
        if cluster_node_limit is not None:
            config["cluster_node_limit"] = cluster_node_limit
        if timeout is not None:
            config["timeout"] = timeout
        self.solver = mwpf.Solver(self.initializer, config)
        # self.solver.show(self.positions)
    def decode(self, syndrome: ArrayLike) -> ArrayLike:
        syndrome = mwpf.SyndromePattern(np.nonzero(syndrome)[0])
        self.solver.clear()
        self.solver.solve(syndrome)
        correction = gf2_zeros(config.n)
        correction[self.solver.subgraph()] = 1
        return correction

MWPFDecoder(configs[0])

{}


<__main__.MWPFDecoder at 0x13f041410>

In [4]:
class Sample:
    config: BBConfig
    x_err: ArrayLike
    z_err: ArrayLike
    def __init__(self, config: BBConfig, p: float, depolarize: bool = True):
        self.config = config
        self.x_err, self.z_err = generate_error_vec(config.n, p, depolarize)
        self.x_syndrome = config.H_X @ self.z_err
        self.z_syndrome = config.H_Z @ self.x_err
    def is_logical_x_err(self, x_correction: ArrayLike) -> bool:
        x_residual = (x_correction + self.x_err).T
        assert (self.config.H_Z @ x_residual == 0).all()
        return (self.config.L_Z @ x_residual).any()
    def is_logical_z_err(self, z_correction: ArrayLike) -> bool:
        z_residual = (z_correction + self.z_err).T
        assert (self.config.H_X @ z_residual == 0).all()
        return (self.config.L_X @ z_residual).any()
    def is_logical_err(self, x_correction: ArrayLike, z_correction: ArrayLike) -> bool:
        return self.is_logical_x_err(x_correction) or self.is_logical_z_err(z_correction)

def generate_error_vec(n: int, p: float, depolarize: bool) -> tuple[ArrayLike, ArrayLike]:
    rng = np.random.default_rng()
    if depolarize:
        a = rng.random(n)
        x_err = a < p/3
        z_err = (a >= p/3) & (a < 2*p/3)
        y_err = (a >= 2*p/3) & (a < p)
        return GF2(np.array(x_err + y_err, dtype=np.uint8)), GF2(np.array(z_err + y_err, dtype=np.uint8))
    else:
        x_err = rng.random(n) < p
        z_err = rng.random(n) < p
        return GF2(np.array(x_err, dtype=np.uint8)), GF2(np.array(z_err, dtype=np.uint8))

In [26]:
# from abc import ABC, abstractmethod
from typing import Callable
import array
import time
from tqdm.notebook import tqdm

class MonteCarloSampler:
    def __init__(self, sampler: Callable[[], Sample], is_logical_error: Callable[[Sample], tuple[bool, float]]):
        self.sampler = sampler
        sample = sampler()
        self.config = sample.config
        self.is_logical_error = is_logical_error
        self.decoding_time = array.array('d', [])
    def run(self, max_n: int = 1_000_000, max_failed: int = 10_000, max_time: float = 3600):
        pbar1 = tqdm(total=max_n)
        pbar2 = tqdm(total=max_failed)
        count = 0
        failed = 0
        for _ in range(max_n):
            pbar1.update(1)
            sample = self.sampler()
            is_logical_error, decoding_time = self.is_logical_error(sample)
            self.decoding_time.append(decoding_time)
            count += 1
            if is_logical_error:
                pbar2.update(1)
                failed += 1
                if failed >= max_failed:
                    break

config = configs[0]
p = 0.01
cluster_node_limit = 10000
decode_z = False
x_decoder = MWPFDecoder(config, is_z=False, cluster_node_limit=cluster_node_limit, timeout=3600)
z_decoder = MWPFDecoder(config, is_z=True, cluster_node_limit=cluster_node_limit, timeout=3600)
def is_logical_error(sample: Sample) -> tuple[bool, float]:
    start = time.time()
    z_correction = x_decoder.decode(sample.x_syndrome)
    if decode_z:
        x_correction = z_decoder.decode(sample.z_syndrome)
    else:
        x_correction = sample.x_err
    duration = time.time() - start
    is_error = sample.is_logical_err(x_correction, z_correction)
    return is_error, duration
sampler = MonteCarloSampler(lambda: Sample(config, p=p, depolarize=False), is_logical_error)
sampler.run()

{'cluster_node_limit': 10000, 'timeout': 3600}
{'cluster_node_limit': 10000, 'timeout': 3600}


  0%|          | 0/1000000 [00:00<?, ?it/s]

  0%|          | 0/10000 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [24]:
config = configs[0]
p = 0.01
cluster_node_limit = 0
decode_z = False
x_decoder = MWPFDecoder(config, is_z=False, cluster_node_limit=cluster_node_limit, timeout=0)
z_decoder = MWPFDecoder(config, is_z=True, cluster_node_limit=cluster_node_limit, timeout=0)
def is_logical_error(sample: Sample) -> tuple[bool, float]:
    start = time.time()
    z_correction = x_decoder.decode(sample.x_syndrome)
    if decode_z:
        x_correction = z_decoder.decode(sample.z_syndrome)
    else:
        x_correction = sample.x_err
    duration = time.time() - start
    is_error = sample.is_logical_err(x_correction, z_correction)
    return is_error, duration
sampler = MonteCarloSampler(lambda: Sample(config, p=p, depolarize=False), is_logical_error)
sampler.run()

{'cluster_node_limit': 0, 'timeout': 0}
{'cluster_node_limit': 0, 'timeout': 0}


  0%|          | 0/1000000 [00:00<?, ?it/s]

  0%|          | 0/10000 [00:00<?, ?it/s]

In [25]:
configs[1]

BBConfig(nkd=(90, 8, 10), l=15, m=3, A=[('x', 9), ('y', 1), ('y', 2)], B=[('x', 0), ('x', 2), ('x', 7)], unsure_d=False)

In [30]:
config = configs[1]
p = 0.01
cluster_node_limit = 10000
decode_z = True
x_decoder = MWPFDecoder(config, is_z=False, cluster_node_limit=cluster_node_limit, timeout=0)
z_decoder = MWPFDecoder(config, is_z=True, cluster_node_limit=cluster_node_limit, timeout=0)
def is_logical_error(sample: Sample) -> tuple[bool, float]:
    start = time.time()
    z_correction = x_decoder.decode(sample.x_syndrome)
    if decode_z:
        x_correction = z_decoder.decode(sample.z_syndrome)
    else:
        x_correction = sample.x_err
    duration = time.time() - start
    is_error = sample.is_logical_err(x_correction, z_correction)
    return is_error, duration
sampler = MonteCarloSampler(lambda: Sample(config, p=p, depolarize=True), is_logical_error)
sampler.run()

"""I should not use decoders trained for independent noise to decode depolarizing noise! Let's use a fair comparison instead"""

{'cluster_node_limit': 10000, 'timeout': 0}
{'cluster_node_limit': 10000, 'timeout': 0}


  0%|          | 0/1000000 [00:00<?, ?it/s]

  0%|          | 0/10000 [00:00<?, ?it/s]

Exception in thread Thread-6:
Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
    self.run()
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/site-packages/tqdm/_monitor.py", line 69, in run
    instances = self.get_instances()
                ^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/site-packages/tqdm/_monitor.py", line 49, in get_instances
    return [i for i in self.tqdm_cls._instances.copy()
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/_weakrefset.py", line 96, in copy
    return self.__class__(self)
           ^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/_weakrefset.py", line 51, in __init__
    self.update(data)
  File "/opt/homebrew/Caskroom/miniconda/base/lib/python3.11/_weakrefset.py", line 123, in update
    for element in ot