# Pilot-Quantum 

In [None]:
import os, time, sys
sys.path.insert(0, os.path.abspath('../../..'))
import socket
import getpass
import datetime

# Python Dask and Data stack
import numpy as np
import pandas as pd
import distributed, dask
import dask.dataframe as dd
import dask.array as da
import dask.bag as db

# Pilot-Quantum
import pilot.streaming

import logging
logging.basicConfig(level=logging.WARNING)

# Qiskit
from qiskit import QuantumCircuit, transpile, execute, Aer
from qiskit_aer import AerSimulator # for GPU
from qiskit.primitives import Estimator
from qiskit_aer.primitives import Estimator as AirEstimator
from qiskit_benchmark import run_graph, generate_data

logging.getLogger('qiskit').setLevel(logging.INFO)
logging.getLogger('qiskit.transpiler').setLevel(logging.WARN)
logging.getLogger('stevedore.extension').setLevel(logging.INFO)

## Start Pilot

In [None]:
RESOURCE_URL_HPC = "slurm://localhost"
WORKING_DIRECTORY = os.path.join(os.environ["PSCRATCH"], "work")

def start_pilot(number_cores):
    pilot_compute_description_dask = {
        #"resource": "ssh://{}@localhost".format(getpass.getuser()),
        "resource": RESOURCE_URL_HPC,
        #"working_directory": os.path.join(os.path.expanduser("~"), "work"),
        "working_directory": WORKING_DIRECTORY,
        #"number_of_nodes": 1,
        "number_cores": number_cores,
        #"cores_per_node": number_cores,
        "queue": "normal",
        "walltime": 5,
        "type": "dask",
        "project": "m4408",
        "os_ssh_keyfile": "~/.ssh/nersc",
        "scheduler_script_commands": ["#SBATCH --constraint=cpu"]
    }

#     pilot_compute_description_dask = {
#     "resource": RESOURCE_URL_HPC,
#     "working_directory": WORKING_DIRECTORY,
#     "number_cores": 1,
#     "queue": "normal",
#     "walltime": 5,
#     "type": "dask",
#     "project": "m4408",
#     "os_ssh_keyfile": "~/.ssh/nersc",
#     "scheduler_script_commands": ["#SBATCH --constraint=cpu"]
# }


    dask_pilot = pilot.streaming.PilotComputeService.create_pilot(pilot_compute_description_dask)
    dask_pilot.wait()
    dask_pilot.get_details()
    dask_client  = distributed.Client(dask_pilot.get_details()['master_url'])
    return dask_pilot, dask_client

In [None]:
dask_pilot, dask_client = start_pilot(1)
dask_client.scheduler_info()
dask_client.gather(dask_client.map(lambda a: a*a, range(10)))

In [None]:
dask_pilot.cancel()

## Test different quantum workloads with Pilot

In [None]:
# run_graph(depth_of_recursion=1,
#         num_qubits=2,
#         n_entries=1,
#         circuit_depth=1,
#         size_of_observable=1,
#         n_backends=1)

In [None]:
circuits, observables =  generate_data(
            depth_of_recursion=1, # number of circuits and observables
            num_qubits=20,
            n_entries=1, # number of circuits and observables => same as depth_of_recursion
            circuit_depth=5,
            size_of_observable=1
        )
# for c in circuits: c.save_statevector() 

In [None]:
circuits[0].draw()

In [None]:

observables[0]

Run with "vanilla" Estimator

In [None]:
Estimator().run(circuits, observables).result()

Run with Aer Estimator (without GPU)

In [None]:
options = {"method": "statevector", "device":'CPU'}
estimator = AirEstimator(backend_options=options)
estimator.run(circuits, observables).result()


Run with GPU enabled Estimator

In [None]:
#options = {"shots": 1024}
options = {"method": "statevector", "device":'GPU', "cuStateVec_enable": True, "shots":None}
# options = {"backend": simulator}
estimator = AirEstimator(backend_options=options)
estimator.run(circuits, observables).result()

# simulator = Aer.get_backend('aer_simulator_statevector_gpu', shots=None)
# simulator = AerSimulator(method='statevector', device='GPU', cuStateVec_enable=True)
# result = execute(circuits, simulator).result()
# print(result)
# statevector = result.get_statevector()

#circ = transpile(circuits)
# Execute the circuit and get the statevector
#result = execute(circuits, simulator, shots=0).result()
#statevector = result.get_statevector()

# Create an Estimator object
# estimator = AirEstimator()

# Estimate the expectation value of the observable
# expectation_value = 

### Dask Implementation

In [None]:
# create array of tuples with circuits and collections
circuits_observables =zip(circuits, observables)
circuit_bag = db.from_sequence(circuits_observables) 

In [None]:
circuit_bag.map(lambda circ_obs: Estimator().run(circ_obs[0], circ_obs[1]).result()).compute()

In [None]:
dask_pilot.cancel()

## Micro Benchmark 

In [None]:
# create a benchmark loop
num_qubits = 2
n_entries = 1024
results = []
run_timestamp=datetime.datetime.now()
RESULT_FILE= "pilot-quantum-summary-" + run_timestamp.strftime("%Y%m%d-%H%M%S") + ".csv"
       
for number_cores in [1]:
    start = time.time()
    dask_pilot, dask_client = start_pilot(number_cores)
    circuits, observables =  generate_data(
            depth_of_recursion=1, # number of circuits and observables
            num_qubits=num_qubits,
            n_entries=n_entries, # number of circuits and observables => same as depth_of_recursion
            circuit_depth=1,
            size_of_observable=1
    ) 
    end_pilot = time.time()
    circuits_observables =zip(circuits, observables)
    circuit_bag = db.from_sequence(circuits_observables)
    circuit_bag.map(lambda circ_obs: Estimator().run(circ_obs[0], circ_obs[1]).result()).compute()
    end_compute = time.time()
    dask_pilot.cancel()
    end_stop_pilot = time.time()
    # write data to file
    result_string = "Number Cores: {} Number Qubits: {} Number Circuits {} Pilot start: {}, Total: {}, Compute: {}s".format(number_cores, num_qubits, n_entries, end_pilot-start, end_stop_pilot-start, end_compute-end_pilot)
    results.append(result_string)
    with open(RESULT_FILE, "w") as f:
        f.write(results)
    time.sleep(60)

In [None]:
print(results)