# Direct Access API - Getting Started
Once you have received access to the Direct Access API you can try the below instruction to make sure all the functionality works as expected.

In [None]:
# Install python dependencies
! pip install "qiskit>=1.2.2"
! pip install "qiskit_ibm_runtime>=0.30.0"
! pip install qiskit_qasm3_import
! pip install boto3
! pip install matplotlib
! pip install pylatexenc
! pip install numpy

In [None]:
# Getting started
import requests
import json
import boto3

In [None]:
# S3 related parameters
aws_access_key_id = "YOUR_AWS_ACCESS_KEY_ID"
aws_secret_access_key = "YOUR_AWS_SECRET_ACCESS_KEY"
s3_endpoint_url = "YOUR_S3_ENDPOINT - https://s3.us-east.cloud-object-storage.appdomain.cloud"
s3_bucket = "YOUR BUCKET"

# Direct Access API endpoint
base_url = "http://localhost:8290"

In [None]:
# run with daa_sim(Qiskit Aer) ? if this is True, num_qubits of the circuit will be reduced to 7 qubits.
# set False if you run with real device.
use_daa_sim = True

## Get access token (IAM based authentication)

In [None]:
import datetime as dt

iam_apikey = "YOUR_IAM_API_KEY"
iam_endpoint_url = "https://iam.cloud.ibm.com"
service_crn = "YOUR_PROVISIONED_INSTANCE - crn:v1:..."

# if IAM access token authentication is enabled.
iam_headers = {
    "content-type": "application/x-www-form-urlencoded",
    "accept": "application/json",
}

request_payload = {
    "grant_type": "urn:ibm:params:oauth:grant-type:apikey",
    "apikey": {iam_apikey},
}

get_token_url = f"{iam_endpoint_url}/identity/token"
token_response = requests.post(get_token_url, data=request_payload, headers=iam_headers)
resp_json = token_response.json()

# create HTTP header for subsequent API calls
access_token = resp_json["access_token"]
token_type = resp_json["token_type"]
now = dt.datetime.now(dt.timezone.utc) 
headers={
    "Authorization": f"{token_type} {access_token}",
    "Service-CRN": service_crn,
}
print(json.dumps(headers, indent=2))

## (Deprecated) Get access token (AppId based authentication)

In [None]:
import datetime as dt

appid_client_id = "YOUR_APPID_CLIENT_ID"
appid_secret = "YOUR_APPID_SECRET"

# (Deprecated) if AppId access token authentication is enabled
get_token_url = f"{base_url}/v1/token"
token_response = requests.post(get_token_url, data={}, auth=(appid_client_id, appid_secret))

resp_json = token_response.json()

# create HTTP header for subsequent API calls
access_token = resp_json["access_token"]
token_type = resp_json["token_type"]
now = dt.datetime.now(dt.timezone.utc) 
headers={
    "Authorization": f"{token_type} {access_token}",
}
print(json.dumps(headers, indent=2))

## Listing supported backends

In [None]:
backends_url = f"{base_url}/v1/backends"
backends_response = requests.get(backends_url, headers=headers)
if backends_response.status_code == 200:
    print(json.dumps(backends_response.json(), indent=4))
else:
    print(backends_response.__dict__) 

## Run a job

During the testing phase you can run upto 5 concurrent jobs and in order to be able to submit more jobs you have to delete completed jobs using DELETE /jobs API, otherwise you'll get a 429 Too Many Requests error.

### 0. Select a Quantum Backend

In [None]:
backend_name = "fake_brisbane"

### 1. Get backend configuration

In [None]:
from qiskit_ibm_runtime.models import BackendConfiguration

backend_config_url = f"{base_url}/v1/backends/{backend_name}/configuration"
backend_config_resp = requests.get(backend_config_url, headers=headers)
if backend_config_resp.status_code == 200:
    backend_config_json = backend_config_resp.json()
    print(json.dumps(backend_config_json, indent=4))
    backend_config = BackendConfiguration.from_dict(backend_config_json)
else:
    print(backend_config_resp.__dict__) 
print(backend_config)

### 2. Get backend properties

In [None]:
from qiskit_ibm_runtime.models import BackendProperties

backend_props_url = f"{base_url}/v1/backends/{backend_name}/properties"
backend_props_resp = requests.get(backend_props_url, headers=headers)
if backend_props_resp.status_code == 200:
    backend_props_json = backend_props_resp.json()
    print(json.dumps(backend_props_json, indent=4))
    backend_props = BackendProperties.from_dict(backend_props_json)
else:
    print(backend_props_resp.__dict__)
print(backend_props)

### 3. Set up Circuit and Transpile 

Set up simple SamplerV2 circuit and transpile it.  

In [None]:
# Setup circuit and transpile
import numpy as np
from qiskit.circuit.library import IQP
from qiskit.quantum_info import random_hermitian
from qiskit.transpiler.preset_passmanagers import generate_preset_pass_manager
from qiskit_ibm_runtime.utils.backend_converter import convert_to_target

# Create simple circuit - Use SamplerV2 example which is introduced in "Getting started with Primitive" page.
# https://docs.quantum.ibm.com/guides/get-started-with-primitives#get-started-with-sampler
num_qubits = backend_config.num_qubits if use_daa_sim == False else 7
mat = np.real(random_hermitian(num_qubits, seed=1234))
circuit = IQP(mat)
circuit.measure_all()

# Generate transpiler target from backend configuration & properties
target = convert_to_target(backend_config, backend_props)
pm = generate_preset_pass_manager(
    optimization_level=1,
    target=target,
)
isa_circuit = pm.run(circuit)
isa_circuit.draw("mpl", idle_wires=False)

### 4. Create input to SamplerV2 primitive
Create the primitive input as per the SamplerV2 schema https://github.com/Qiskit/ibm-quantum-schemas/blob/main/schemas/sampler_v2_schema.json
and serialize it. EstimatorV2 schema is also available here https://github.com/Qiskit/ibm-quantum-schemas/blob/main/schemas/estimator_v2_schema.json

In [None]:
from qiskit import qasm3
from qiskit_ibm_runtime.utils import RuntimeEncoder

# Default precision level which applies to all pubs without precision
shots = 10000

# Generate QASM3 instructions
qasm3_str = qasm3.dumps(
    isa_circuit,
    disable_constants=True,
    allow_aliasing=True,
    experimental=qasm3.ExperimentalFeatures.SWITCH_CASE_V1,
)

# Generates JSON representation of sampler job
input_json = {
    "pubs": [[qasm3_str]],
    "version": 2,
    "support_qiskit": False,
    "shots": shots,
    "options": {},
}

print(json.dumps(input_json, cls=RuntimeEncoder, indent=2))
primitive_input = json.dumps(input_json, cls=RuntimeEncoder)

### 5. Upload primitive input to S3 storage and construct payload to run job

In [None]:
# Initialize S3 client
expiration = 86400  # 1 day
cos = boto3.client("s3",
                   aws_access_key_id=aws_access_key_id,
                   aws_secret_access_key=aws_secret_access_key,
                   endpoint_url=s3_endpoint_url)

In [None]:
import uuid

# Generate job ID (UUIDv4)
job_id = str(uuid.uuid4())
print("Job ID: " + job_id)

In [None]:
# Generate presigned URL to upload job input to S3
input_key_name = f"params_{job_id}"
input_put_signed_url = cos.generate_presigned_url(
        ClientMethod="put_object",
        Params={"Bucket": s3_bucket, "Key": input_key_name},
        ExpiresIn=expiration,
        HttpMethod="PUT",
    )
print(input_put_signed_url)

In [None]:
# Upload job input to S3
http_response = requests.put(input_put_signed_url, data=primitive_input)
print(http_response.__dict__["status_code"])

In [None]:
# Generate presigned URL for getting job input from S3
input_get_signed_url = cos.generate_presigned_url(
        ClientMethod="get_object",
        Params={"Bucket": s3_bucket, "Key": input_key_name},
        ExpiresIn=expiration,
        HttpMethod="GET",
    )
print(input_get_signed_url)

In [None]:
# Generate presigned URL for results
results_key_name = f"results_{job_id}"
results_put_signed_url = cos.generate_presigned_url(
        ClientMethod="put_object",
        Params={"Bucket": s3_bucket, "Key": results_key_name},
        ExpiresIn=expiration,
        HttpMethod="PUT",
    )
print(results_put_signed_url)

In [None]:
# Generate presigned URL for logs
logs_key_name = f"logs_{job_id}"
logs_put_signed_url = cos.generate_presigned_url(
        ClientMethod="put_object",
        Params={"Bucket": s3_bucket, "Key": logs_key_name},
        ExpiresIn=expiration,
        HttpMethod="PUT",
    )
print(logs_put_signed_url)

In [None]:
# Create job input
job_input = {
    "backend": backend_name,
    "id": job_id,
    "log_level": "info",
    "program_id": "sampler",
    "timeout_secs": 10000,
    "storage": {
        "input": {
            "type": "s3_compatible",
            "presigned_url": input_get_signed_url
        },
        "results": {
            "type": "s3_compatible",
            "presigned_url": results_put_signed_url
        },
        "logs": {
            "type": "s3_compatible",
            "presigned_url": logs_put_signed_url
        }
    }
}

print(json.dumps(job_input, indent=2))

### 6. Submit job
Once the job completes the results and logs will be uploaded to the S3 compatible object storage using the privided presigned URLs.

In [None]:
# Submit a job
run_job_url = f"{base_url}/v1/jobs"
run_job_response = requests.post(run_job_url, data=json.dumps(job_input), headers=headers)
if run_job_response.status_code == 204:
    print("Succeeded.")
else:
    print(run_job_response.__dict__) 

## Check job status
The GET /v1/jobs API can be used to check status of submitted jobs. User can repeatedly call the API to know when the Job is completed.

In [None]:
# Get job status
get_jobs_url = f"{base_url}/v1/jobs"
get_jobs_response = requests.get(get_jobs_url, headers=headers)
if get_jobs_response.status_code == 200:
    print(json.dumps(get_jobs_response.json(), indent=4))
else:
    print(get_jobs_response.__dict__)

## Cancel job (Optional)
This POST /v1/jobs/{job_id}/cancel API can be used to cancel a job when the user requests cancellation or when the client scheduler wants to cancel a long running job that seems stuck.

In [None]:
# Cancel job
cancel_job_url = f"{base_url}/v1/jobs/{job_id}/cancel"
cancel_job_response = requests.post(cancel_job_url, data={}, headers=headers)
if cancel_job_response.status_code == 204:
    print("Job cancelled successfully.")
else:
    print(cancel_job_response.__dict__)

## Download the results from S3

In [None]:
# Generate presigned URL for getting result
results_key_name = f"results_{job_id}"
results_get_signed_url = cos.generate_presigned_url(
        ClientMethod="get_object",
        Params={"Bucket": s3_bucket, "Key": results_key_name},
        ExpiresIn=expiration,
        HttpMethod="GET",
    )
print(results_get_signed_url)

In [None]:
# Downloading result from S3 by using presigned URL.
http_response = requests.get(results_get_signed_url)
print(http_response.__dict__["status_code"])
print(json.dumps(http_response.json(), cls=RuntimeEncoder, indent=2))

## Download the logs from S3 (Optional)

In [None]:
# Generate presigned URL for getting logs
logs_key_name = f"logs_{job_id}"
logs_get_signed_url = cos.generate_presigned_url(
        ClientMethod="get_object",
        Params={"Bucket": s3_bucket, "Key": logs_key_name},
        ExpiresIn=expiration,
        HttpMethod="GET",
    )
print(logs_get_signed_url)

In [None]:
# Downloading logs from S3 by using presigned URL.
http_response = requests.get(logs_get_signed_url)
print(http_response.__dict__["status_code"])
print(http_response.text)

## Delete job
This DELETE /v1/jobs/{job-id} API can be used to delete completed jobs, a job has to be in a terminal state (Terminal Statuses: Completed, Failed, Cancelled) before it can be deleted, please use cancel API to cancel the job and then delete it if need be.

In [None]:
# Delete job
delete_job_url = f"{base_url}/v1/jobs/{job_id}"
delete_job_response = requests.delete(delete_job_url, headers=headers)
if delete_job_response.status_code == 204:
    print("Succeeded.")
else:
    print(delete_job_response.__dict__) 

In [None]:
# Really deleted ? - Get jobs again
get_jobs_url = f"{base_url}/v1/jobs"
get_jobs_response = requests.get(get_jobs_url, headers=headers)
if get_jobs_response.status_code == 200:
    print(json.dumps(get_jobs_response.json(), indent=4))
else:
    print(get_jobs_response.__dict__)

End of Notebook