# Taask 4.2 Demonstrate running Circuits on Real Hardware

## Objective 1 : Execute on Target Hardware

### Overview

Executing quantum circuits on real hardware involves several key steps:

1. **Authentication**: Connecting to the IBM Quantum Runtime service
2. **Backend Selection**: Choosing an available quantum processor
3. **Circuit Preparation**: Transpiling circuits for specific hardware constraints
4. **Job Submission**: Sending circuits for execution
5. **Result Retrieval**: Obtain and analyze measurement outcomes

In [22]:
# Import necessary libraries
from qiskit import QuantumCircuit
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager
from qiskit_ibm_runtime import EstimatorV2 as Estimator
from qiskit.quantum_info import SparsePauliOp
from qiskit.circuit import Parameter
import numpy as np

<span style="color:red"> **Running the cell below will use around 10-15 seconds from your IBM account credit**
</span>

In [None]:
# Example:  Running a Quantum Circuit on Real Hardware

# --- STEP 1: Authentication ---
# if account is already saved, this will load it
service = QiskitRuntimeService()
# replace <instance_name> with your instance name
# replace <token> with your IBM Quantum API token
# Uncomment and use the following line if you need to specify instance and token
# srvice = QiskitRuntimeService(channel="ibm_cloud",instance="<instance_name>",token=<token>)

   
# --- STEP 2: Backend Selection ---
# Choose the least busy available backend to minimize queue time with real hardware only
backend = service.least_busy(operational=True, simulator=False)
print(f"Using backend: {backend.name}")
print(f"Backend properties: {backend.num_qubits} qubits")

# --- STEP 3: Circuit Preparation ---
# Create a simple Bell state circuit
qc = QuantumCircuit(2, 2)
qc.h(0)                   
qc.cx(0, 1) 
qc.measure([0, 1], [0, 1])

# Display the circuit
qc.draw('mpl')

# Transpile the circuit for the specific hardware backend
# This transpiles the circuit to the backend's: Native gate set,  Qubit connectivity and Optimization level (0-3, where 3 is most optimized)
pm = generate_preset_pass_manager(backend=backend, optimization_level=3)
transpiled_circuit = pm.run(qc)

# Define the observable we want to measure (ZZ correlation for example)
ZZ = SparsePauliOp.from_list([("ZZ", 1)])

# --- STEP 4: Job Submission ---
# Create an Estimator instance and run the job
estimator = Estimator(mode=backend)

# Map observables to the transpiled circuit layout
observables = [ZZ.apply_layout(transpiled_circuit.layout)]

# Submit the job
job = estimator.run([(transpiled_circuit, observables)])
print(f"\nJob ID: {job.job_id()}")
print("Job submitted successfully! Check the IBM Quantum dashboard for status.")

In [None]:
# --- STEP 5: Result Retrieval ---
result = job.result()
print(f"Expectation value: {result[0].data.evs}")

## Objective 2: Understanding Primitives

Primitives are foundational quantum computing operations that abstract away hardware-specific details. They provide standardized interfaces for common quantum tasks.

### Key Primitives:


**1. Sampler**:
- **Used for**: Sampling measurement outcomes from quantum circuits
- **Output**: Counts or probability distributions

**2. Estimator**:
- **Used for**: Calculating expectation values of observables
- **Output**: Expectation value ± standard error

### Primitive Implementations:

**Base Classes**:
- `BaseSamplerV2` : Base class for sampler, all sampler implementations inherit from this class
- `BaseEstimatorV2`: Base class for estimator, all estimator implementations inherit from this class

**V2 Primitives**:
- `SamplerV2`: Latest sampler with enhanced capabilities
- `EstimatorV2`: Latest estimator with improved performance and features

**Alternatives**:
- `StateVectorSampler`: Simulator-based sampler
- `StateVectorEstimator`: Simulator-based estimator using state vector simulation
- `BackendSamplerV2`: Backend-specific sampler optimizations
- `BackendEstimatorV2`: Backend-specific estimator optimizations


<span style="color:red"> **Running the cell below will use around 5-10 seconds from your IBM account credit** </span>

In [None]:
# Example: Using the Sampler Primitive 
# The Sampler primitive executes quantum circuits and returns measurement statistics.

from qiskit_ibm_runtime import SamplerV2 as Sampler

# Create a Sampler instance connected to our backend
sampler = Sampler(mode=backend)

# The 'shots' parameter determines how many times to execute the circuit
shots = 1024

# Submit the sampling job with the transpiler bell state circuit
job = sampler.run([transpiled_circuit], shots=shots)

print(f"Sampler Job ID: {job.job_id()}")
print(f"Requested shots: {shots}")

In [None]:
# To retrieve results:
result = job.result()
counts = result[0].data.c.get_counts()
print(f"\nMeasurement counts: {counts}")

## Objective 3: Primitive Inputs and Outputs

### Inputs

#### Sampler Input Structure

Each Sampler **Pub** (Primitive Unified Bloc) accepts:
1. **Quantum Circuit**: The circuit to execute (should include measurements for qubits that will be sampled)
2. **Parameters**: Parameter values for parameterized circuits (needed only if the circuit is parameterized)
3. **Shots**: Number of circuit executions (optional)

#### Estimator Input Structure

Each Estimator **Pub**  accepts:
1. **Quantum Circuit**: The circuit to execute (must be transpiled for the target backend)
2. **Observables**: One or more observables to measure (as `Pauli`, `SparsePauliOp`, `PauliList` or `str` objects)
3. **Parameters**: Parameter values for parameterized circuits (optional)
4. **Precision**: Target precision for error mitigation (optional)



<span style="color:red"> **Running the cell below will use around 5 minutes from your IBM account credit**
</span>

In [None]:
# Example 1: Using the Estimator Primitive with Multiple Parameters and Observables
# --- Create a parameterized circuit ---
circuit = QuantumCircuit(2)
circuit.h(0)                     
circuit.cx(0, 1)                 
circuit.ry(Parameter("a"), 0)    # Parameterized rotation Y
circuit.rz(Parameter("b"), 0)    # Parameterized rotation Z
circuit.cx(0, 1)                 
circuit.h(0)

# Display the parameterized circuit
print("Parameterized Circuit:")
circuit.draw('mpl')

# --- Transpile for hardware ---
pm = generate_preset_pass_manager(optimization_level=1, backend=backend)
transpiled_circuit = pm.run(circuit)
layout = transpiled_circuit.layout

# --- Define parameter sweep ---
# Create 100 different parameter combinations
# Shape: (100, 2) where 100 is number of parameter sets, 2 is parameters per set
params = np.vstack([
    np.linspace(-np.pi, np.pi, 100),        # Parameter 'a' values
    np.linspace(-4 * np.pi, 4 * np.pi, 100) # Parameter 'b' values
]).T

print(f"\nParameter array shape: {params.shape}")
print(f"First 5 parameter sets:\n{params[:5]}")

# --- Define multiple observables ---
# measure three different observables for each parameter set
# Shape: (3, 1) where 3 is number of observable sets, 1 is observables per set
observables = [
    [SparsePauliOp(["XX", "IY"], [0.5, 0.5])],   # Linear combination of XX and IY
    [SparsePauliOp("XX")],                       # XX
    [SparsePauliOp("IY")],                       # IY
]

# Apply the same layout as the transpiled circuit
observables = [
    [observable.apply_layout(layout) for observable in observable_set]
    for observable_set in observables
]

print(f"\nNumber of observable sets: {len(observables)}")
print(f"Observables per set: {len(observables[0])}")

# --- Create the Estimator Pub ---
# Total calculations: 1 circuit  × 3 observables × 100 parameter sets = 300 expectation values
estimator_pub = (transpiled_circuit, observables, params)

# --- Execute with Estimator ---
estimator = Estimator(mode=backend)
job = estimator.run([estimator_pub])

print(f"\nJob submitted! Job ID: {job.job_id()}")

In [None]:

# --- Process Results ---
result = job.result()
print(f"\nResult shape: {result[0].data.evs.shape}")  # Should be (3, 100)
print(f"First expectation values: {result[0].data.evs[:, :5]}")

#### Broadcasting Rules

Broadcasting allows you to efficiently compute expectation values for multiple parameter sets and observables in a single primitive call. The rules follow NumPy broadcasting conventions.

- Input arrays do not need to have the same number of dimensions.
   - The resulting array will have the same number of dimensions as the input array with the largest dimension.
   - The size of each dimension is the largest size of the corresponding dimension.
   - Missing dimensions are assumed to have size one.
- Shape comparisons start with the rightmost dimension and continue to the left.
- Two dimensions are compatible if their sizes are equal or if one of them is 1.


In Qiskit
- Parameter values are provided as n × m arrays
- Observables are provided as k × 1 arrays

They are then both combined according to standard broadcasting rules

##### Examples

In [None]:
from qiskit_aer import AerSimulator

simulator = AerSimulator()

# Parameterized 1-qubit circuit
theta = Parameter("θ")
qc = QuantumCircuit(1)
qc.rx(theta, 0)

# Estimator primitive
estimator = Estimator(mode=simulator)

pm = generate_preset_pass_manager(optimization_level=1, backend=simulator)
transpiled_circuit = pm.run(qc)
transpiled_circuit.draw('mpl')


In [None]:
# Broadcasting Example 1: Single observable and parameter set
#Parameter values:   (5 × 1)
#Observables:        (1 × 1)
#--------------------------------
#Output:             (5 × 1)

parameter_values = [[v] for v in np.linspace(0, np.pi, 5)]
observable = SparsePauliOp("Z")
estimator_pub = (transpiled_circuit, observable, parameter_values)
job = estimator.run([estimator_pub])
result = job.result()
print(result[0].data.evs)
print(f"Output shape: {result[0].data.evs.shape}")

In [None]:
# Broadcasting Example 2: Zip Parameters and Observables
#Parameter values:   (5 × 1)
#Observables:        (5 × 1)
#--------------------------------
#Output:             (5 × 1)

parameter_values = [[v] for v in np.linspace(0, np.pi, 5)]

observables = [
    SparsePauliOp("Z"),
    SparsePauliOp("X"),
    SparsePauliOp("Y"),
    SparsePauliOp("Z"),
    SparsePauliOp("X"),
]

estimator_pub = (transpiled_circuit, observables, parameter_values)
job = estimator.run([estimator_pub])
result = job.result()
print(result[0].data.evs)
print(f"Output shape: {result[0].data.evs.shape}")

In [None]:
# Broadcasting Example 3: Outer Product (Full Broadcast)
#Parameter values:   (1 × 6)
#Observables:        (4 × 1)
#--------------------------------
#Output:             (4 × 6)

parameter_values = [np.linspace(0, np.pi, 6).tolist()]

observables = [
    SparsePauliOp("Z"),
    SparsePauliOp("X"),
    SparsePauliOp("Y"),
    SparsePauliOp("I"),
]


estimator_pub = (transpiled_circuit, observable, parameter_values)
job = estimator.run([estimator_pub])
result = job.result()
print(result[0].data.evs)
print(f"Output shape: {result[0].data.evs.shape}")


In [None]:
# Broadcasting Example 4: N-D Generalization (Multiple Observable Arrays)
#Parameter values:   (3 × 6)
# 2 Observables:     (3 × 1)
#--------------------------------
#Output:             2 * (3 × 6)
parameter_values = [
    np.linspace(0, np.pi, 6).tolist(),
    np.linspace(np.pi, 2*np.pi, 6).tolist(),
    np.linspace(2*np.pi, 3*np.pi,6).tolist(),
]

observables_a = [
    SparsePauliOp("Z"),
    SparsePauliOp("X"),
    SparsePauliOp("Y"),
]

observables_b = [
    SparsePauliOp("X"),
    SparsePauliOp("Y"),
    SparsePauliOp("Z"),
]

estimator_pub = (transpiled_circuit, [observables_a, observables_b], parameter_values)

job = estimator.run([estimator_pub])
result = job.result()
print(result[0].data.evs)
print(f"Output shape: {result[0].data.evs.shape}")




### Outputs


the data returned from the result object is a `PubResult` list, each element in this list corresponds to each PUB submitted as an input to the primitive `run` method 

#### Sampler Output Structure:

```
└── PrimitiveResult
    ├── PubResult[0]
    │   ├── metadata
    │   └── data  ## In the form of a DataBin object
    │       ├── NAME_OF_CLASSICAL_REGISTER
    │       │   └── BitArray of count data (default is 'meas')
    |       |
    │       └── NAME_OF_ANOTHER_CLASSICAL_REGISTER
    │           └── BitArray of count data (exists only if more than one
    |                 ClassicalRegister was specified in the circuit)
    ├── PubResult[1]
    |   ├── metadata
    |   └── data  ## In the form of a DataBin object
    |       └── NAME_OF_CLASSICAL_REGISTER
    |           └── BitArray of count data for second pub
    ├── ...
    ├── ...
    └── ...
```

##### Estimator Output Structure:

```
└── PrimitiveResult
    ├── PubResult[0]
    │   ├── metadata
    │   └── data  ## In the form of a DataBin object
    │       ├── evs
    │       │   └── List of estimated expectation values in the shape
    |       |         specified by the first pub
    │       └── stds
    │           └── List of calculated standard deviations in the
    |                 same shape as above
    ├── PubResult[1]
    |   ├── metadata
    |   └── data  ## In the form of a DataBin object
    |       ├── evs
    |       │   └── List of estimated expectation values in the shape
    |       |        specified by the second pub
    |       └── stds
    |           └── List of calculated standard deviations in the
    |                same shape as above
    ├── ...
    ├── ...
    └── ...
```

<span style="color:red"> **Running the cell below will use around 5 seconds from your IBM account credit**
</span>

In [None]:
# Example 2: Processing Results

from qiskit_ibm_runtime import SamplerV2 as Sampler

# --- Create a 10-qubit GHZ circuit ---
# GHZ states are maximally entangled states: (|0...0⟩ + |1...1⟩)/√2
circuit = QuantumCircuit(10)
circuit.h(0)                        
circuit.cx(range(0, 9), range(1, 10)) 
circuit.measure_all()                

print("GHZ State Circuit (10 qubits):")
circuit.draw('mpl', fold=-1)  # Don't fold the circuit display

# --- Transpile for hardware ---
transpiled_circuit = pm.run(circuit)
sampler_pub = [transpiled_circuit]
# --- Execute with Sampler ---
sampler = Sampler(mode=backend)
job = sampler.run(sampler_pub, shots=1000)
print(f"Sampler Job ID: {job.job_id()}")

In [None]:
result = job.result()

print("\n=== SAMPLER RESULTS ANALYSIS ===\n")

# 1. Access the data bin
data = result[0].data
print(f"DataBin contents: {data}")
print(f"Available registers: {list(data.keys())}\n")

# 2. Access the BitArray
# 'meas' is the default classical register name created by measure_all()
bit_array = data.meas
print(f"BitArray type: {type(bit_array)}")
print(f"BitArray shape: {bit_array.array.shape}")

# 3. View raw measurement data
for i in range(min(10, bit_array.array.shape[0])):
    print(f"   Shot {i}: {bit_array.array[i]}")

# 4. Convert to counts dictionary
counts = bit_array.get_counts()
sorted_counts = sorted(counts.items(), key=lambda x: x[1], reverse=True)
print(f"\nMeasurement counts (all outcomes):")
for outcome, count in sorted_counts:
    print(f"   {outcome}: {count} shots ({count/1000*100:.1f}%)")

In [None]:
# optionally, convert away from the native BitArray format to a dictionary format
counts = data.meas.get_counts()
print(f"Counts: {counts}")

## Objective 4 : Run Job sessions

Sessions provide an efficient way to run multiple related jobs on the same quantum hardware. They maintain context between jobs, which can reduce overhead and improve performance for iterative algorithms.


A **Session** is a context that:
1. **Reserves hardware time** for multiple jobs
2. **Maintains circuit context** between jobs
3. **Reduces overhead** for iterative algorithms
4. **Ensures consistent hardware conditions** for related jobs

**Session Lifecycle:**
1. **Open**: Create a session with a specific backend
2. **Use**: Run multiple jobs within the session
3. **Close**: Explicitly close or let timeout

**Key Parameters:**
- **TTL (Time To Live)**: Maximum session duration (default: 8 hours)
- **Max Time**: Maximum job execution time within session


<span style="color:red"> **Note:Open plan users can not run job in session execution mode, The following code blocks in the rest of the notebook will fail** </span>

### Open a session

Can be done by initializing the `Session` class or by using Context Manager `with Session()`

In [None]:
from qiskit_ibm_runtime import (
    Session,
    SamplerV2 as Sampler,
    EstimatorV2 as Estimator,
)

session = Session(backend=backend)
estimator = Estimator(mode=session)
sampler = Sampler(mode=session)
# Close the session because no context manager was used.
session.close()

In [None]:
from qiskit_ibm_runtime import (
    Session,
    SamplerV2 as Sampler,
    EstimatorV2 as Estimator,
)
 
backend = service.least_busy(operational=True, simulator=False)
with Session(backend=backend):
    estimator = Estimator()
    sampler = Sampler()

### Session Length




Similar to `Batch`, A session max TTL can be defined with `max_time` , it starts when the session starts, when max time is reached , the session is closed and any job running will finish but all queued jobs will fail.

| Plan Type | Maximum Job Duration |
|-----------|-----------------------|
| Premium Plans | 8 hours |

Additionlly there is also interactive TTL can not be configured (1 minute), if there is no session jobs on the queue within the window, the session is temporarliy deactivated

### End a session

A session is closed when any of the following happens

* TTL is reached
* session is manually canceled or closed using `session.close()`
* session is used in a context manager , automatically closed when the context ends

When a session is closed. it no longer accepts jobs, however the already submitted jobs will be executed.


In [None]:
session = Session(backend=backend)
 
estimator = Estimator(mode=session)
sampler = Sampler(mode=session)
job1 = estimator.run([estimator_pub])
job2 = sampler.run([sampler_pub])
print(f"Result1: {job1.result()}")
print(f"Result2: {job2.result()}")
 
# Manually close the session. Running and queued jobs will run to completion.
session.close()

### Check Session status

Use <code>session.status()</code> to check the session status, it can be one of the following

* Pending
* In Progress, accepting new jobs
* In Progress, not accepting new jobs : submitted jobs will be completed, and session will be closed after that
* Closed

### Determine Session details

In [None]:
from qiskit_ibm_runtime import (
    QiskitRuntimeService,
    Session,
    EstimatorV2 as Estimator,
)
  
with Session(backend=backend) as session:
    print(session.details())

### Usage Patterns

Sessions are most useful for algorithms that requitr communication between classical and quantum resources.
for example

* Run iterative workload that uses SCiPy to minimize cost function
* Run VQE algorithms in sessions

---

## Practice Questions

---

**1) Consider the following code that creates a batch with specific time limits**

```
from qiskit_ibm_runtime import Batch, SamplerV2 as Sampler

# Create a batch with 30 minute max_time
with Batch(backend=backend, max_time="30m") as batch:
    sampler = Sampler()
    
    # Submit first job immediately
    job1 = sampler.run([circuit1], shots=1000)
    print(f"Job1 submitted at: {datetime.now()}")
    
    # Wait 2 minutes before submitting second job
    time.sleep(120)
    job2 = sampler.run([circuit2], shots=1000)
    print(f"Job2 submitted at: {datetime.now()}")
    
    # Get results
    result1 = job1.result()
    result2 = job2.result()
```

Which statement is correct?

A) The max_time="30m" parameter means the batch will stay open for exactly 30 minutes from creation, regardless of job activity.

B) If no jobs are submitted for 1 minute after job2 completes, the batch will enter a "temporarily deactivated" state due to the interactive TTL.

C) The interactive TTL (1 minute) is configurable and can be extended using an additional parameter like interactive_ttl="5m".

D) If job2 takes 29 minutes to complete, the batch will immediately close when finished, even though 1 minute remains of the 30-minute max_time.

**Answer:**
<details> <br/>

B) If no jobs are submitted for 1 minute after job2 completes, the batch will enter a "temporarily deactivated" state due to the interactive TTL

</details>

---

**2)In which scenario is session mode most appropriate?**

A) Iterative workloads where results from one job influence the next

B) Submitting many independent jobs with no shared context

C) Running a single circuit once

D) When minimizing cost is the top priority

**Answer:**
<details> <br/>
A) Iterative workloads where results from one job influence the next

Session mode is designed for iterative or interactive workloads
</details>

---

**3)What broadcasting rule is applied in the following example**

```
params = [[0.1], [0.2], [0.3]]

observables = [
    SparsePauliOp("Z"),
    SparsePauliOp("X"),
    SparsePauliOp("Y"),
]

job = estimator.run([{transpiled_qc,observables,params,}])
```

A) Single-observable broadcast

B) Outer product broadcast

C) Zip

D) Invalid input — raises an error

**Answer:**
<details> <br/>
C) Zip

Both lists have length 3*1, Qiskit applies zip broadcasting
</details>

---

**4)Which of the following code patterns demonstrates the recommended way to use Session mode for an iterative algorithm?**

```
# Pattern A
session = Session(backend=backend)
estimator = Estimator(mode=session)
for i in range(10):
    job = estimator.run([(circuit, observables, params[i])])
    result = job.result()
    # Process result and update params
session.close()

# Pattern B
with Session(backend=backend) as session:
    estimator = Estimator()
    for i in range(10):
        job = estimator.run([(circuit, observables, params[i])])
        result = job.result()
        # Process result and update params

# Pattern C
jobs = []
for i in range(10):
    session = Session(backend=backend)
    estimator = Estimator(mode=session)
    job = estimator.run([(circuit, observables, params[i])])
    jobs.append(job)
    session.close()
results = [job.result() for job in jobs]
```

Which pattern is most efficient and follows best practices?

A) Pattern A - Explicit session creation and closure

B) Pattern B - Using a context manager (with statement)

C) Pattern C - Creating a new session for each iteration

D) All patterns are equally good

**Answer:**
<details> <br/>
B) Pattern B - Using a context manager 

Context manager automatically handles session cleanup (closing) when the block exits, even if an error occurs
</details>

---