# Imports

In [1]:

import sys

if "google.colab" in sys.modules:
  %pip install QuantLib
  %pip install optax
  %pip install qiskit
  %pip install qcware

  %pip install qcware-quasar
  ! rm -rf deep-hedging
  ! git clone https://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@github.com/SnehalRaj/jpmc-qcware-deephedging deep-hedging
  ! cp -r deep-hedging/* .

In [2]:
import qiskit

import quasar
from qcware_transpile.translations.quasar.to_qiskit import translate, audit
from qiskit.compiler import assemble
import collections

from qio import loader

## Circuit constructions

- `prepare_circuit(input, params, loader_layout, layer_layout)`: This function prepares a quantum circuit for a given input and set of parameters, which can later be executed on quantum hardware. The function takes three inputs: input, params, and optional arguments `loader_layout` and `layer_layout`. The function first determines the number of qubits in the circuit based on the length of the input array. It then creates a loader circuit using the loader function provided in the loader_layout format. The function then calls a `_get_layer_circuit()` function that creates a circuit layer based on the params array and the layer_layout format. The circuit layer is constructed using the RBS gate defined using `quasar.Gate.RBS()`. The `quasar.Gate.RBS()` function is called with a negative value of the params array elements (so as to take care of qiskit's reverse qubits notation) and added to the circuit layer for each gate in the `rbs_idxs` array. The function then combines the loader and layer circuits into a single `quasar.Circuit` object, which is then translated to a Qiskit circuit using the `translate()` function. The Qiskit circuit is then optimized and measured, and the function returns the resulting circuit.

- `run_circuit(circs,circuit_dim, backend_name)`: This function accepts a list of `qiskit` circuits (which need to have the same qubit count), the number of qubits in each of the given circuit and the backend_name. The circuits are assumed to be hamming-weights preserving circuit used as part of quantum orthogonal layers. The function prepares a numpy array called `results` containing the output vector in unary basis. Currently this function supports two backends `qiskit` and `quantinuum`. The results run on `quantinuum`-based backends are stored using global variables `global_number_of_circuits_executed` and  `global_hardware_run_results_dict`. This function then runs the circuit on the selected backend, and returns the result of the computation. If the backend is 'qiskit', the function uses the Qiskit Aer simulator to simulate the circuit and compute the result. If the backend is 'quantinuum', the function uses the Quantinuum simulator to perform the computation. Note that this function assumes that the user has already installed the required packages for the specified backend, and that the backend is properly configured and accessible.

In [3]:
# Global counter

global_number_of_circuits_executed = 0

# Global object keeping track of result
# Used for pickling
# Populated initially in DeepHedgingBenchmark().__test_model
# and with run results in run_circuit
# keeping track of batch_idx in scan (under "Models")

global_hardware_run_results_dict = {}

In [4]:
%load_ext autoreload
%autoreload 2
import numpy as np
from qnn import _get_butterfly_idxs, _get_pyramid_idxs, _make_orthogonal_fn
# fix for older versions of Qiskit
if qiskit.__version__ <= '0.37.1':
    import qiskit.providers.aer.noise as noise
else:
    import qiskit_aer.noise as noise
import json
import pickle
import time
import copy
from pathlib import Path
from tqdm import tqdm


def prepare_circuit(input, params, loader_layout='parallel', layer_layout='butterfly'):
    def _get_layer_circuit():
      _params = np.array(params).astype('float')
      if layer_layout == 'butterfly':
        rbs_idxs = _get_butterfly_idxs(num_qubits, num_qubits)
      elif layer_layout == 'pyramid':
        rbs_idxs = _get_pyramid_idxs(num_qubits, num_qubits)
      circuit_layer = quasar.Circuit()
      idx_angle = 0
      for gates_per_timestep in rbs_idxs[::-1]:
        for gate in gates_per_timestep:
          circuit_layer.add_gate(quasar.Gate.RBS(theta=-_params[::-1][idx_angle]), tuple(gate))
          idx_angle+=1
      return circuit_layer
    
    num_qubits = len(input)
    loader_circuit = loader(np.array(input),mode=loader_layout,initial=True,controlled=False)
    layer_circuit = _get_layer_circuit()
    circuit = quasar.Circuit.join_in_time([loader_circuit, layer_circuit])
    # Translate from qcware-quasar to qiskit
    qiskit_circuit = translate(circuit)
    
    # qiskit_circuit.save_statevector()    

    qiskit_circuit = qiskit.transpile(qiskit_circuit, optimization_level=3)
    c = qiskit.ClassicalRegister(num_qubits)
    qiskit_circuit.add_register(c)
    qiskit_circuit.barrier()
    qiskit_circuit.measure(qubit=range(num_qubits),cbit=c)
    return qiskit_circuit

def counter_to_dict(c):
    """Converts counter returned by pytket get_counts function
    to dictionary returned by qiskit
    canonical use:
    >>> result = backend.get_result(handle)
    >>> counts = result.get_counts(basis=BasisOrder.dlo)
    >>> counts_qiskit = counter_to_dict(counts)
    """
    d = {}
    for k, v in c.items():
        d[''.join(str(x) for x in k)] = int(v)
    return d

def run_circuit(circs,circuit_dim, backend_name = 'quantinuum_H1-1'):
    """
    backend name accepted 
    """
    global global_number_of_circuits_executed
    global global_hardware_run_results_dict
    input_size = circuit_dim
    results = np.zeros((len(circs), input_size))
    
    global_number_of_circuits_executed += len(circs)
    num_measurements = 1000
    
    if "qiskit" in backend_name:
        backend = qiskit.Aer.get_backend('qasm_simulator')
        if backend_name == 'qiskit_noiseless':
            measurement = qiskit.execute(circs, backend, shots=num_measurements)
        elif backend_name == 'qiskit_noisy': 
            # Error probabilities
            prob_1 = 0.001  # 1-qubit gate
            prob_2 = 0.01   # 2-qubit gate
            # Dylan's tunes error probabilities
            # prob_1 = 0  # 1-qubit gate
            # prob_2 = 3.5e-3   # 2-qubit gate

            # Depolarizing quantum errors
            error_1 = noise.depolarizing_error(prob_1, 1)
            error_2 = noise.depolarizing_error(prob_2, 2)

            # Add errors to noise model
            noise_model = noise.NoiseModel()
            noise_model.add_all_qubit_quantum_error(error_1, ['h', 'x', 'ry'])
            noise_model.add_all_qubit_quantum_error(error_2, ['cz'])

            # Get basis gates from noise model
            basis_gates = noise_model.basis_gates
            measurement = qiskit.execute(circs, backend,basis_gates=basis_gates, noise_mode=noise_model, shots=num_measurements)
        else:
            raise ValueError(f"Unexpected backend name {backend_name}")
        all_counts = measurement.result().get_counts()
    elif "quantinuum" in backend_name:
        # From docs: "Batches cannot exceed the maximum limit of 500 H-System Quantum Credits (HQCs) total"
        # Therefore batching is more or less useless on quantinuum
        from pytket.extensions.qiskit import qiskit_to_tk
        from pytket.circuit import BasisOrder
        from pytket.extensions.quantinuum import QuantinuumBackend
    
        outpath_stem = "_".join([
            "1115_device",
            global_hardware_run_results_dict['model_type'],
            backend_name,
            global_hardware_run_results_dict['layer_type'],
            str(global_hardware_run_results_dict['epsilon']),
            str(global_hardware_run_results_dict['batch_idx']),
        ])
        
        outpath_result_final = f"data/{outpath_stem}.json"
        outpath_handles = f"data/handles_{outpath_stem}.pickle"
        
        if Path(outpath_result_final).exists():
            # if precomputed results already present on disk, simply load
            print(f"Using precomputed counts from {outpath_result_final}")
            all_counts = json.load(open(outpath_result_final, "r"))['all_counts']
        else:
            if backend_name == "quantinuum_H1-2E":
                backend = QuantinuumBackend(device_name="H1-2E")
            elif backend_name == "quantinuum_H1-2":
                backend = QuantinuumBackend(device_name="H1-2")
            elif backend_name == "quantinuum_H1-1E":
                backend = QuantinuumBackend(device_name="H1-1E")
            elif backend_name == "quantinuum_H1-1":
                backend = QuantinuumBackend(device_name="H1-1")
            else:
                raise ValueError(f"Unknown Quantinuum backend: {backend_name}")
            if Path(outpath_handles).exists():
                # if circuits already submitted, simply load from disk
                print(f"Using pickled handles from {outpath_handles}")
                handles = pickle.load(open(outpath_handles, "rb"))
            else:
                # otherwise, submit circuits and pickle handles
                circs_tk = [qiskit_to_tk(circ) for circ in circs]
                for idx, circ in enumerate(circs_tk):
                    circ.name = f'{outpath_stem}_{idx+1}_of_{len(circs)}'
                compiled_circuits = backend.get_compiled_circuits(circs_tk, optimisation_level=2)
                handles = backend.process_circuits(compiled_circuits, n_shots=num_measurements)
                pickle.dump(handles, open(outpath_handles, "wb"))
                print(f"Dumped handles to {outpath_handles}")
            # retrieve results from handles
            result_list = []
            
            with tqdm(total=len(handles), desc='#jobs finished') as pbar:
                for handle in handles:
                    while True:
                        status = backend.circuit_status(handle).status
                        if status.name == 'COMPLETED':
                            result = backend.get_result(handle)
                            result_list.append(copy.deepcopy(result))
                            pbar.update(1)
                            break
                        else:
                            assert status.name in ['QUEUED', 'RUNNING'] 
                        time.sleep(1)
            global_hardware_run_results_dict['result_list'] = [x.to_dict() for x in result_list]
            # convert from tket counts format to qiskit
            all_counts = [
                counter_to_dict(
                    result.get_counts(basis=BasisOrder.dlo)
                ) for result in result_list
            ]
            global_hardware_run_results_dict['all_counts'] = all_counts
            # dump result on disk
            json.dump(global_hardware_run_results_dict, open(outpath_result_final, "w"))
    else:
        raise ValueError(f"Unexpected backend name {backend_name}")
        
    global_hardware_run_results_dict['batch_idx'] += 1    
    # Post processing
    # Discard bitstrings that do not correspond to unary encoding (not Hamming weight 1)
    # We build a dictionary with all unary bitstrings and only add counts corresponding to unary bitstrings
    # Note: f"{2**i:0{input_size}b}" converts 2**i to its binary string representation.
    for j in range(len(circs)):
        measurementRes = all_counts[j]
        num_postselected = 0
        filtered_counts = {f"{2**i:0{input_size}b}":0 for i in range(input_size)}
        for bitstring, count in measurementRes.items():
            if sum([int(x) for x in bitstring]) != 1:
                continue
            filtered_counts[bitstring] += count
            num_postselected+= count
        results[j] = [filtered_counts[k]/num_postselected for k in sorted(filtered_counts)][::-1]    
    return results

## ortho_linear_hardware()

This layer replaces `ortho_linear()` present in `qnn.py` and runs the same operation but on real hardware using the `run_circuit()` function defined above.

In [5]:
from qnn import *
def ortho_linear_hardware(
    n_features: int,
    layout: Union[str, List[List[Tuple[int, int]]]] = 'butterfly',
    normalize_inputs: bool = True,
    normalize_outputs: bool = True,
    normalize_stop_gradient: bool = True,
    with_scale: bool = True,
    with_bias: bool = True,
    t_init: Optional[InitializerFn] = None,
    s_init: Optional[InitializerFn] = None,
    b_init: Optional[InitializerFn] = None,
) -> ModuleFn:
    """ Create an orthogonal layer from a layout of RBS gates which can be executed on hardware.
    Args:
        n_features: The number of features in the output.
        layout: The layout of the RBS gates.
        normalize_inputs: Whether to normalize the inputs.
        normalize_outputs: Whether to normalize the outputs.
        normalize_stop_gradient: Whether to stop the gradient of the norm.
        with_scale: Whether to use a scale parameter.
        with_bias: Whether to include a bias term.
        t_init: The initializer for the angles.
        s_init: The initializer for the scale.
        b_init: The initializer for the bias.
    """
    def apply_fn(params, state, key, inputs, **kwargs):
        # Step 1: preprocess the inputs
        if layout == 'butterfly':
            rbs_idxs = _get_butterfly_idxs(inputs.shape[-1], n_features)
            circuit_dim = int(2**np.ceil(
                np.log2(max(inputs.shape[-1], n_features))))
        elif layout == 'pyramid':
            rbs_idxs = _get_pyramid_idxs(inputs.shape[-1], n_features)
            make_unitary = _get_pyramid_idxs(inputs.shape[-1], n_features)
            circuit_dim = max(inputs.shape[-1], n_features)
        else:
            rbs_idxs = layout
            circuit_dim = max(
                [max(idxs) for moment in layout for idxs in moment])
        if normalize_inputs:
            norm = jnp.linalg.norm(inputs, axis=-1)[..., None]
            if normalize_stop_gradient:
                norm = lax.stop_gradient(norm)
            inputs /= norm
        if inputs.shape[-1] < circuit_dim:
            zeros = jnp.zeros(
                (*inputs.shape[:-1], circuit_dim - inputs.shape[-1]), )
            inputs = jnp.concatenate([zeros, inputs], axis=-1)
        # Step 2: generate the circuits
        circs = []
        out_shape = inputs.shape[:-1]+(n_features,)
        for input in inputs.reshape(-1,circuit_dim):
            circs.append(prepare_circuit(input,params['t']))
        # run circuits and truncate to desired number of outputs
        outputs = jnp.array(run_circuit(circs, circuit_dim))[..., -n_features:]
        
        outputs = outputs.reshape(out_shape)
        # unitary = make_unitary(params['t'])
        # outputs = jnp.dot(inputs, unitary.T)[..., -n_features:]
        # outputs = inputs
        if with_scale:
            outputs *= params['s']
        if with_bias:
            outputs += params['b']
        return outputs, state

    def init_fn(key, inputs_shape):
        if layout == 'butterfly':
            rbs_idxs = _get_butterfly_idxs(inputs_shape[-1], n_features)
        elif layout == 'pyramid':
            rbs_idxs = _get_pyramid_idxs(inputs_shape[-1], n_features)
        else:
            rbs_idxs = layout
        n_angles = sum(map(len, rbs_idxs))
        params, state = {}, None
        key, t_key, b_key, s_key = jax.random.split(key, 4)
        t_init_ = t_init or uniform(-np.pi, np.pi)
        t_shape = (n_angles, )
        params['t'] = t_init_(t_key, t_shape)
        if with_scale:
            s_init_ = s_init or ones()
            s_shape = (n_features, )
            params['s'] = s_init_(s_key, s_shape)
        if with_bias:
            b_init_ = b_init or zeros()
            b_shape = (n_features, )
            params['b'] = b_init_(b_key, b_shape)
        shape = inputs_shape[:-1] + (n_features, )
        return params, state, shape

    return ModuleFn(apply_fn, init=init_fn)

# Data

We begin by import the modules from the repository. Then to generate test data using geometric Brownian motion we use the `gen_paths()` function in `data.py`. 

Note: The data generation has been commented here because we pickled a particular test split. 

In [6]:
# from models import simple_network, attention_network
from qnn import linear
from train import build_train_fn
from qnn import ortho_linear, ortho_linear_noisy
from models import simple_network, recurrent_network_hardware, lstm_network_hardware, attention_network
from loss_metrics import entropy_loss
from data import gen_paths
from utils import train_test_split, get_batches, HyperParams
import numpy as np
from tqdm import tqdm
import optax
from functools import partial 
from utils import HyperParams
seed = 100
key = jax.random.PRNGKey(seed)
# No need to generate data as we load from disk
# hps = HyperParams(S0=100,
#                   n_steps=5,
#                   n_paths=10000,
#                   discrete_path=False,
#                   strike_price=100,
#                   epsilon=0.0,
#                   sigma=0.2,
#                   risk_free=0,
#                   dividend=0,
#                   model_type='simple',
#                   layer_type='noisy_ortho',
#                   n_features=8,
#                   n_layers=1,
#                   loss_param=1.0,
#                   batch_size=4,
#                   test_size=0.2,
#                   optimizer='adam',
#                   learning_rate=1E-3,
#                   num_epochs=100
#                   )


# Data
# S = gen_paths(hps)
# [S_train, S_test] = train_test_split([S], test_size=0.2)
# _, test_batches = get_batches(jnp.array(S_test[0]), batch_size=hps.batch_size)
# test_batch = test_batches[0]




# DeepHedgingBenchmark()

This is a class to benchmark the performance of different models and layers for deep hedging. It has one main method __test_model() which is used to perform inference using the layers and models specified in the input. Note that choosing `hps.layer_type` to `ortho_linear_hardware()` runs inference on hardware backend using `run_circuit()` function defined above. 

#### Parameters
- `key`: A random key value used for jax random splitting.
- `eps`: A list of float values representing the hedge intervals to be used in training.
- `layers`: A list of string values representing the layer types to be used in 
training. It should only contain values from `['linear', 'ortho', 'noisy_ortho', 'hardware_ortho']`.
- `models`: A list of string values representing the model types to be used in training. It should only contain values from `['simple', 'recurrent', 'lstm', 'attention']`.

#### Methods
- `__test_model(hps,test_batch, save_dir)`: A private method that tests the model. It takes in hyperparameters hps, a batch of paths `test_batch` to run inference on and `save_dir` which specifies where to load the saved model parameters from. The hyperparameters include layer_type, model_type, n_steps, epsilon, and num_epochs. It returns the testing loss.
- `test(test_batch)`:  A method to test the model. It takes in `test_batch` which is the testing data and then output the necessary information about the model. It outputs the layer, the model architecture, the utility on these paths and the number of circuits executed. It also prints the output actions for each day and the Terminal PnL for each paths. 

In [7]:
from utils import load_params
class DeepHedgingBenchmark():
  """
  Runs the benchmark with different models / layers
  Input: test_batch above
  test_batch has 8 datapoints
  """
  def __init__(self, key, eps,  layers, models):
      self.__key = key
      self.__models = models
      self.__layers = layers
      self.__eps = eps
      self.test_info = {layer:{str(eps):{} for eps in self.__eps} for layer in self.__layers}
  def __test_model(self, hps, test_batch, save_dir = 'params/params_all_models_16_qubits_5_days.pkl'):
    # set up global objects for pickling circuit execution results
    global global_number_of_circuits_executed
    global global_hardware_run_results_dict
    global_number_of_circuits_executed = 0
    global_hardware_run_results_dict = {
        'model_type' : hps.model_type,
        'measurementRes' : None,
        'epsilon' : hps.epsilon,
        'backend_name' : None,
        'layer_type' : hps.layer_type,
        'batch_idx' : 0,
    }
    if hps.layer_type in ['linear','linear_svb']:
      layer_func = linear
    elif hps.layer_type=='ortho':
      layer_func = ortho_linear
    elif hps.layer_type=='noisy_ortho':
      layer_func = partial(ortho_linear_noisy,noise_scale=0.01)
    elif hps.layer_type=='hardware_ortho':
      # TODO want to run this on Quantinuum device 
      layer_func = ortho_linear_hardware

    if hps.model_type == 'simple':
      net = simple_network(hps=hps, layer_func=layer_func)
    elif hps.model_type == 'recurrent':
      net = recurrent_network_hardware(hps=hps, layer_func=layer_func)
    elif hps.model_type == 'lstm':
      net = lstm_network_hardware(hps=hps, layer_func=layer_func)
    elif hps.model_type == 'attention':
      net = attention_network(hps=hps, layer_func=layer_func)
    
    opt = optax.adam(1E-3)
    key, init_key = jax.random.split(self.__key)
    _, state, _ = net.init(init_key, (1, hps.n_steps, 1))
    loss_metric = entropy_loss

    # Training

    train_fn, loss_fn = build_train_fn(hps, net, opt, loss_metric)

    train_info = load_params(save_dir)
    layer_type = "noisy_ortho" if hps.layer_type == 'hardware_ortho' else hps.layer_type
    train_losses, params = train_info[layer_type][str(hps.epsilon)][hps.model_type]
    loss, (state, wealths, deltas, outputs) = loss_fn(params, state, key, test_batch[...,None])
    print(f'Model = {hps.model_type} | Layer = {hps.layer_type} | EPS = {hps.epsilon}| Loss = {loss} | #circs = {global_number_of_circuits_executed}')
    print(f'Deltas = {deltas[...,0]}')
    print(f'Terminal PnL = {wealths.reshape(-1)}')
    return loss
  def test(self, inputs):
    for model in self.__models:
      for eps in self.__eps:
        for layer in self.__layers:
            hps = HyperParams(S0=100,
                  n_steps=5,
                  n_paths=120000,
                  discrete_path=True,
                  strike_price=100,
                  epsilon=eps,
                  sigma=0.2,
                  risk_free=0,
                  dividend=0,
                  model_type=model,
                  layer_type=layer,
                  n_features=16,
                  n_layers=1,
                  loss_param=1.0,
                  batch_size=4,
                  test_size=0.2,
                  optimizer='adam',
                  learning_rate=1E-3,
                  num_epochs=100)
            self.test_info[layer][str(eps)][model] = self.__test_model(hps, inputs)
    



In [8]:
seed = 100
key = jax.random.PRNGKey(seed)

LAYERS = ['hardware_ortho']
EPS = [  0.01]
MODELS = ['lstm']

# LAYERS = ['linear','ortho','noisy_ortho','hardware_ortho']
# EPS = [ 0.01]
# MODELS = ['lstm']

# test only

# LAYERS = ['hardware_ortho']
# EPS = [ 0.01]
# MODELS = ['simple','recurrent','lstm','attention']

dhb = DeepHedgingBenchmark(key=key,eps=EPS, layers=LAYERS, models=MODELS)

In [9]:
np.set_printoptions(formatter={'float': "{0:0.3f}".format})

In [10]:
# Fixing test_batch as suggested by Snehal
# pickle.dump(test_batch, open('data/1115_test_batch_4_points.pickle', 'wb'))
test_batch = pickle.load(open('data/1115_test_batch_4_points.pickle', 'rb'))

Non-emulator results:
5 days, 4 paths

```
Model = lstm | Layer = linear | EPS = 0.01| Loss = 2.1739442348480225 | #circs = 0
Deltas = [[0.435 -0.002 -0.187 -0.182 -0.050 -0.014]
 [0.435 0.050 0.019 -0.001 -0.027 -0.476]
 [0.435 -0.015 -0.000 -0.085 -0.038 -0.296]
 [0.435 0.000 0.019 -0.001 -0.002 -0.451]]
Terminal PnL = [-2.578 -1.225 -1.420 -2.671]
Model = lstm | Layer = ortho | EPS = 0.01| Loss = 2.1762888431549072 | #circs = 0
Deltas = [[0.435 0.001 -0.203 -0.165 -0.049 -0.019]
 [0.435 0.051 0.011 -0.006 -0.045 -0.447]
 [0.435 -0.013 0.004 -0.090 -0.025 -0.310]
 [0.435 0.000 0.010 0.001 -0.001 -0.446]]
Terminal PnL = [-2.586 -1.194 -1.439 -2.671]
Model = lstm | Layer = noisy_ortho | EPS = 0.01| Loss = 2.1889312267303467 | #circs = 0
Deltas = [[0.437 0.004 -0.213 -0.158 -0.043 -0.027]
 [0.428 0.040 0.027 -0.009 -0.022 -0.463]
 [0.432 -0.019 0.000 -0.104 -0.018 -0.291]
 [0.435 -0.000 0.029 0.010 -0.001 -0.472]]
Terminal PnL = [-2.620 -1.205 -1.435 -2.669]
```

In [11]:
dhb.test(test_batch)

Using precomputed counts from data/1115_device_lstm_quantinuum_H1-1_hardware_ortho_0.01_0.json
Using precomputed counts from data/1115_device_lstm_quantinuum_H1-1_hardware_ortho_0.01_1.json
Using precomputed counts from data/1115_device_lstm_quantinuum_H1-1_hardware_ortho_0.01_2.json
Using precomputed counts from data/1115_device_lstm_quantinuum_H1-1_hardware_ortho_0.01_3.json
Using precomputed counts from data/1115_device_lstm_quantinuum_H1-1_hardware_ortho_0.01_4.json
Using precomputed counts from data/1115_device_lstm_quantinuum_H1-1_hardware_ortho_0.01_5.json
Using precomputed counts from data/1115_device_lstm_quantinuum_H1-1_hardware_ortho_0.01_6.json
Using precomputed counts from data/1115_device_lstm_quantinuum_H1-1_hardware_ortho_0.01_7.json
Using precomputed counts from data/1115_device_lstm_quantinuum_H1-1_hardware_ortho_0.01_8.json
Using precomputed counts from data/1115_device_lstm_quantinuum_H1-1_hardware_ortho_0.01_9.json
Using precomputed counts from data/1115_device_lst

In [12]:
# dhb.test(test_batch)