# Testing cache memory benefits using Jay's research algorithm

## Jay's functions

In [1]:
import math
import numpy as np
import pennylane as qml
from pennylane.pauli import PauliWord

In [2]:
# Generate permutations:


def generate_combs(num_variables, required_sum):
    """Generate all combinations of values for k variables {a1, a2, ..., ak}
    such that sum_i(ai) = p

    Args:
        num_variables (int): The number of variables in the sum (k)
        required_sum (int): The required sum of the variables (p)

    Returns:
        List[List[int]]: A list containg lists, where each sub-list (of length k)
            contains the integer values assigned to each variable for that combination.

    """
    if num_variables == 0:
        return []

    if required_sum == 0:
        return [[0] * num_variables]

    if num_variables == 1:
        return [[required_sum]]

    if num_variables == 2:
        return [
            [i, required_sum - i] for i in range(required_sum + 1)
        ]  # [[0,p], [1,p-1], ..., [p-1,1], [p,0]]

    master_lst = []
    for i in range(required_sum + 1):
        for sub_perms in generate_combs(num_variables - 1, required_sum - i):
            master_lst.append([i] + sub_perms)

    return master_lst


def pprint(master_lst):
    for i in master_lst:
        print(f"{len(i)} variables: {i}", f"sum: {sum(i)}")

In [3]:
# Commutator functions:


def Ad(A, B, alpha):
    """Recursive commutator"""
    if alpha == 0:
        return B

    if alpha == 1:
        return qml.comm(A, B, pauli=True)

    return qml.comm(A, Ad(A, B, alpha - 1), pauli=True)

In [4]:
# Testing + Performance:

# A = qml.PauliZ(0)
# B = qml.PauliX(0)

# for alpha in range(5):
#     print(Ad(A,B,alpha))

## Expected outcome:
##   alpha = 0 -->  B = X
##   alpha = 1 -->  [A,B] = [Z,X] = i*Y - (-i*Y) = 2i*Y
##   alpha = 2 -->  [A,[A,B]] = 2i*[Z,Y] = 2i*(-i*X - (i*X)) = 4*X
##   alpha = 3 -->  [A,[A,[A,B]]] = 4*[Z,X] = 4*(2i*Y) = 8i*Y
##   alpha = 4 -->  [A,[A,[A,[A,B]]]] = 8i*[Z,Y] = 8i*(-2i*X) = 16*X

In [5]:
def f_coeffs(alphas_lst, a_lst):
    a_nu = a_lst[0]
    product = a_nu
    for a_j, alpha_j in zip(a_lst[1:], alphas_lst):
        product *= ((a_j) ** alpha_j) / math.factorial(alpha_j)

    return product


def nested_commutator_operator(a_lst, H_lst, p):
    q = len(a_lst)
    final_operator = qml.pauli.PauliSentence()

    for nu in range(1, q):  # nu = [1, 2, ..., q-1]
        nu_index = nu - 1
        combinations_alpha_bar_equals_p = generate_combs(q - nu, p)

        for alphas_lst in combinations_alpha_bar_equals_p:
            H_nu = H_lst[nu_index]
            f_nu = f_coeffs(alphas_lst, a_lst[nu_index:])

            nested_commutator = H_nu
            for H, alpha in zip(H_lst[nu_index + 1 :], alphas_lst):
                nested_commutator = Ad(H, nested_commutator, alpha)
            final_operator += f_nu * nested_commutator

    q_index = q - 1
    a_q, H_q = (a_lst[q_index], H_lst[q_index])

    #     final_operator += a_q * H_q  # nu = q term from the sum
    return final_operator


def error_operator(H, s, product_formula_h_lst, product_formula_a_lst, p):
    exp_minus_ihs = qml.exp(H, -1j * s)
    nested_comm_op = nested_commutator_operator(product_formula_a_lst, product_formula_h_lst, p)

    e_minus_ihs_mat = qml.matrix(exp_minus_ihs)
    nested_comm_op_mat = nested_comm_op.to_mat(wire_order=exp_minus_ihs.wires)
    return ((-1j * s) ** (p + 1)) / (p + 1) * e_minus_ihs_mat @ nested_comm_op_mat


def explicit_subtraction(H, s, h_lst, a_lst):
    exp_minus_ihs = qml.exp(H, -1j * s)
    #     print("e^(-isH) = ", exp_minus_ihs)
    e_minus_ihs_mat = qml.matrix(exp_minus_ihs)

    W_s = qml.prod(*[qml.exp(op, -1j * s * a) for op, a in zip(h_lst, a_lst)])
    #     print("W(s) = ", W_s, "\n")
    W_s_mat = qml.matrix(W_s, wire_order=exp_minus_ihs.wires)

    return e_minus_ihs_mat - W_s_mat

In [6]:
# Trotter product formula to general form:


def _scalar(order):
    """Compute the scalar used in the recursive expression.

    Args:
        order (int): order of Trotter product (assume order is an even integer > 2).

    Returns
    """
    root = 1 / (order - 1)
    return (4 - 4**root) ** -1


def _recursive_expression(order, ops, scalar_t):
    """Generate a list of operations and coefficients using the
    recursive expression which defines the Trotter product.

    Args:
        order (int): the order of the Trotter expansion
        ops (Iterable(~.Operators)): a list of terms in the Hamiltonian

    Returns:
        list: the approximation as product of exponentials of the Hamiltonian terms
    """
    num_ops = len(ops)

    if order == 1:
        return ops, [1 * scalar_t] * num_ops

    if order == 2:
        return ops + ops[::-1], [0.5 * scalar_t] * (2 * num_ops)

    scalar_1 = _scalar(order)
    scalar_2 = 1 - 4 * scalar_1

    ops_lst_1, coeff_lst_1 = _recursive_expression(order - 2, ops, scalar_1 * scalar_t)
    ops_lst_2, coeff_lst_2 = _recursive_expression(order - 2, ops, scalar_2 * scalar_t)

    return (2 * ops_lst_1) + ops_lst_2 + (2 * ops_lst_1), (2 * coeff_lst_1) + coeff_lst_2 + (
        2 * coeff_lst_1
    )


def flatten_trotter(ops, order, reps=1):
    ops, coeffs = _recursive_expression(order, ops, 1 / reps)
    return ops * reps, coeffs * reps

In [7]:
# Testing + Performance:

# # ---- 2nd order product formula = e^-isX/2 * e^-isY * e^-isX/2 ---- :
# H = qml.sum(qml.PauliX(0), qml.PauliY(0)) # H = X + Y

# p = 2
# a_lst = [1/2, 1, 1/2]
# h_op_lst =  [qml.PauliX(0), qml.PauliY(0), qml.PauliX(0)]
# label = "2nd Ord, H=X+Y"


# #  ---- 2nd order product formula = e^-isX/2 * e^-isY * e^-isX/2 ---- :
# H = qml.sum(qml.PauliX(0), qml.PauliY(0), qml.PauliZ(0)) # H = X + Y + Z

# p = 2
# a_lst = [1/2, 1/2, 1, 1/2, 1/2]
# h_op_lst =  [qml.PauliX(0), qml.PauliY(0), qml.PauliZ(0), qml.PauliY(0), qml.PauliX(0)]
# label = "2nd Ord, H=X+Y+Z"


# # ---- 2nd order product formula = e^-isX/2 * e^-isY * e^-isX/2 ---- :
# H = qml.sum(qml.PauliX(0), qml.PauliY(0)) # H = X + Y
# import numpy as np

# p = 4
# # r = 1/(4 - 4**(1/3))
# r = np.power(4, np.float64(1/3))
# r = 1 / (4 - r)

# a_lst = [r/2, r, r, r, (1-3*r)/2, (1-4*r), (1-3*r)/2, r, r, r, r/2]
# h_op_lst =  [
#     qml.PauliX(0),
#     qml.PauliY(0),
#     qml.PauliX(0),
#     qml.PauliY(0),
#     qml.PauliX(0),
#     qml.PauliY(0),
#     qml.PauliX(0),
#     qml.PauliY(0),
#     qml.PauliX(0),
#     qml.PauliY(0),
#     qml.PauliX(0),
# ]
# label = "4th Ord, H=X+Y"

# H = qml.sum(
#     qml.PauliX(0),
#     qml.PauliY(0),
#     qml.PauliZ(0),
#     qml.prod(qml.PauliX(0), qml.PauliZ(1)),
# )

# s = 0.1

# print(nested_commutator_operator(a_lst, h_lst, p), "\n\n")  # everything inside the double sum in eq 5

# print("Error Operator LHS:\n", explicit_subtraction(H,s,h_op_lst,a_lst), "\n\n")  # e^-iHs - W(s) (LHS of eq 5)
# print("Error Operator RHS:\n", error_operator(H,s,h_lst,a_lst, p), "\n\n")        # (RHS of eq 5)

In [8]:
# import numpy as np
# import matplotlib.pyplot as plt

# times = [0.000001, 1e-7, 1e-8]
# spectral_norm_error = []

# PauliWord._commutator.cache_clear()

# for s in times:
#    print(f"s = {s}")
#    m1 = explicit_subtraction(H, s, h_op_lst, a_lst)
#    m2 = error_operator(H, s, h_lst, a_lst, p)
#    vals, vects = np.linalg.eig(m1 + m2)
#    spectral_norm_error.append(max(abs(vals)))

In [9]:
# plt.title("Varying evolution time (s)")
# plt.xscale("log")
# plt.yscale("log")
# lt.xlabel("time (s)")
# lt.ylabel("spectral norm error")

# for i in range(2):
#    slope = math.log10(spectral_norm_error[i]) - math.log10(spectral_norm_error[i + 1])
#    print(f"slope: {slope}")

# plt.plot(times, spectral_norm_error, "--*", label=label)

# plt.legend()
# plt.show()

## Benchmarking cache memory usage and speed-up

### Step 1: choose the test parameters

In [10]:
import platform
import os
import time
import re
import pandas as pd
from datetime import datetime

# Set the test parameters
H = qml.sum(
    qml.prod(
        qml.PauliX(0),
        qml.PauliZ(1),
        qml.PauliZ(2),
        qml.PauliX(3),
        qml.PauliX(4),
        qml.PauliY(5),
        qml.PauliX(6),
    ),
    qml.prod(
        qml.PauliZ(0),
        qml.PauliX(1),
        qml.PauliY(2),
        qml.PauliY(3),
        qml.PauliY(4),
        qml.PauliX(5),
        qml.PauliX(6),
    ),
    qml.prod(
        qml.PauliY(0),
        qml.PauliZ(1),
        qml.PauliX(2),
        qml.PauliY(3),
        qml.PauliX(4),
        qml.PauliZ(5),
        qml.PauliY(6),
    ),
)
evol_time = 1e-6
orders = [1, 2]
lru_cache_enabled = hasattr(PauliWord._commutator, "cache_info")

In [11]:
# Get the current date and time
now = datetime.now()
day_month_year = now.strftime("%d_%m_%Y")
hour_minute_second = now.strftime("%H_%M_%S")

# Create the directory if it doesn't exist
directory = os.path.join(f"tests_results/{day_month_year}")
if not os.path.exists(directory):
    os.makedirs(directory)

# Retrieve CPU information
cpu_info = platform.processor()

# Strings to print at the top of the text file
header_strings = [
    f"LRU cache enabled: {lru_cache_enabled}\n",
    f"Test performed on processor: {cpu_info}\n",
    f"Hamiltonian: {H.pauli_rep}\n",
    f"Evolution time: {evol_time}\n",
]

lru_cache_test_data = {
    "time_(s)": [],
    "spectral_norm_error": [],
}

if lru_cache_enabled:
    lru_cache_test_data["max_cache_size"] = []
    lru_cache_test_data["cache_size"] = []
    lru_cache_test_data["cache_hits"] = []

index = []

df_data = pd.DataFrame(lru_cache_test_data, index=index)

df_data.index.name = "order"

### Step 2: run the test

In [12]:
def test_start(H, evol_time, orders, lru_cache_enabled):

    test_data = {
        "spectral_norm_errors": [],
        "execution_times": [],
        "orders": orders,
    }

    if lru_cache_enabled:
        test_data["max_cache_sizes"] = []
        test_data["cache_sizes"] = []
        test_data["cache_hits"] = []

    for order in test_data["orders"]:
        print(f"current order: {order}")

        start_time = time.time()

        h_op_lst, a_lst = flatten_trotter(H.operands, order)
        h_lst = [op.pauli_rep for op in h_op_lst]
        m1 = explicit_subtraction(H, evol_time, h_op_lst, a_lst)
        m2 = error_operator(H, evol_time, h_lst, a_lst, order)
        vals, vects = np.linalg.eig(m1 + m2)
        test_data["spectral_norm_errors"].append(max(abs(vals)))

        end_time = time.time()

        execution_time = end_time - start_time
        test_data["execution_times"].append(execution_time)
        print("Execution time:", execution_time, "seconds")

        if lru_cache_enabled:
            cache_info = str(PauliWord._commutator.cache_info())
            hits = int(re.search(r"hits=(\d+)", cache_info).group(1))
            currsize = int(re.search(r"currsize=(\d+)", cache_info).group(1))
            maxsize = int(re.search(r"maxsize=(\d+)", cache_info).group(1))
            # for completess, although `max_cache_size` is always the same
            test_data["max_cache_sizes"].append(maxsize)
            test_data["cache_sizes"].append(currsize)
            test_data["cache_hits"].append(hits)

    return test_data


if lru_cache_enabled:
    PauliWord._commutator.cache_clear()

print("Test started...")
test_data = test_start(H, evol_time, orders, lru_cache_enabled)
print("Test completed\n")

df_data["time_(s)"] = test_data["execution_times"]
df_data["spectral_norm_error"] = test_data["spectral_norm_errors"]

if lru_cache_enabled:

    df_data["max_cache_size"] = test_data["max_cache_sizes"]
    df_data["cache_size"] = test_data["cache_sizes"]
    df_data["cache_hits"] = test_data["cache_hits"]

df_data.index = test_data["orders"]
df_data.index.name = "order"

Test started...
current order: 1
Execution time: 0.04824090003967285 seconds
current order: 2
Execution time: 0.04253888130187988 seconds
Test completed



In [13]:
df_data

Unnamed: 0_level_0,time_(s),spectral_norm_error,max_cache_size,cache_size,cache_hits
order,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1,0.048241,2e-12,128,3,0
2,0.042539,1.110225e-16,128,12,35


### Step 3: write the test result to a text file

In [14]:
file_path = os.path.join(directory, f"test_time_{hour_minute_second}.txt")
with open(file_path, "w") as file:
    for header_string in header_strings:
        file.write(header_string + "\n")
    file.write(df_data.to_string())

print("test results stored at:", file_path)

test results stored at: tests_results/22_02_2024/test_time_13_24_08.txt


## Testing with Snakeviz

In [15]:
import glob

In [16]:
%load_ext snakeviz

In [17]:
# Set the test parameters
H = qml.sum(
    qml.prod(
        qml.PauliX(0),
        qml.PauliZ(1),
        qml.PauliZ(2),
        qml.PauliX(3),
        qml.PauliX(4),
        qml.PauliY(5),
        qml.PauliX(6),
    ),
    qml.prod(
        qml.PauliZ(0),
        qml.PauliX(1),
        qml.PauliY(2),
        qml.PauliY(3),
        qml.PauliY(4),
        qml.PauliX(5),
        qml.PauliX(6),
    ),
    qml.prod(
        qml.PauliY(0),
        qml.PauliZ(1),
        qml.PauliX(2),
        qml.PauliY(3),
        qml.PauliX(4),
        qml.PauliZ(5),
        qml.PauliY(6),
    ),
)
evol_time = 1e-6
orders = [4]
lru_cache_enabled = hasattr(PauliWord._commutator, "cache_info")

In [20]:
%%snakeviz
print("Test started...")
test_data = test_start(H, evol_time, orders, lru_cache_enabled)
print("Test completed\n")

Test started...
current order: 4
Execution time: 9.988414287567139 seconds
Test completed

 
*** Profile stats marshalled to file '/tmp/tmpa5m5ktsn'.
Embedding SnakeViz in this document...
<function display at 0x7f08ce7be940>
