In [75]:
import requests
import json
import subprocess

from gen_ids import get_EMAID

from cryptography.hazmat.primitives import hashes, serialization
# from cryptography.hazmat.primitives.asymmetric import ec
from cryptography import x509
from cryptography.x509.oid import NameOID

In [2]:
emaid = get_EMAID()

In [4]:
headers = {'accept': 'application/json', 'X-API-KEY': '014f43b1b6a27650c00298f226_696e1e0a8be2a219d251e0661ad2e8f4e87d42fc42ab4bab5a1acefb283af355', 'Content-Type': 'application/json'}

In [111]:
def get_cert(serial):
    data = {"serial_number": serial}
    res = requests.get('https://demo.one.digicert.com/iot/api/v1/certificate/', headers=headers, params=data)
    data = res.json()
    if len(data['records']):
        cert = x509.load_pem_x509_certificate(data['records'][0]['body'].encode())
        return cert
    else:
        raise Exception(f"Certificate with serial {serial} couldn't be found.")

In [122]:
def get_certChain(serial):
    data = {"serial_number": serial}
    res = requests.get('https://demo.one.digicert.com/iot/api/v1/certificate/', headers=headers, params=data)
    data = res.json()
    if len(data['records']):
        certs = []
        certs.append(x509.load_pem_x509_certificate(data['records'][0]['body'].encode()))
        for cert in data['records'][0]['chain']:
            certs.append(x509.load_pem_x509_certificate(cert['blob'].encode()))
        return certs
    else:
        raise Exception(f"Certificate with serial {serial} couldn't be found.")

In [117]:
def download_cert(serial, encoding = 'all'):
    cert = get_cert(serial)
    save_cert(cert, encoding = encoding)

In [118]:
def download_certs(serial, encoding = 'all'):
    certs = get_certChain(serial)
    save_certs(certs, encoding = encoding)

In [105]:
def list_certs(cert_type = "contract", status = "all", offset = 0, limit = 20, return_list = False):
    if cert_type == "contract":
        enrollment_profile_id = "IOT_79f09602-de08-4355-a265-08a581d84143"
    elif cert_type == "secc":
        enrollment_profile_id = "IOT_f2286f88-4cfa-4d99-be24-998dd1a10e60"
    else:
        print("Wrong certificate type. It should be either 'contract' or 'secc'.")
        return None
    
    data = {"enrollment_method": "API",
        "enrollment_profile_id": enrollment_profile_id,
        "certificate_type": "x509",
        "offset": offset,
        "limit": limit
    }
    if status in ["ISSUED","REVOKED","EXPIRED"]:
        data["status"] = status
    r = requests.get('https://demo.one.digicert.com/iot/api/v1/certificate/', headers=headers, params=data)
    d = r.json()
    for i in range(len(d['records'])):
        print(f"Cert {i+1+offset}, CN: {d['records'][i]['common_name']}, Serial: {d['records'][i]['serial_number']}, Status: {d['records'][i]['status']}")
    if return_list:
        return d

In [57]:
def issue_certs():
    next_emaid = next(emaid)
    res = subprocess.run("/home/sahabul/Sandia/switchev_iso15118/iso15118/shared/pki/iso15118_2/create_csr.sh -e "+next_emaid, shell=True)
    with open("csrs/"+next_emaid+".csr", "r") as f:
        csr = f.read()
    data = {"csr": csr, "enrollment_profile_id": "IOT_79f09602-de08-4355-a265-08a581d84143"}
    res = requests.post('https://demo.one.digicert.com/iot/api/v1/certificate/', headers=headers, data=json.dumps(data))
    data = res.json()
    if data['result'] == 'SUCCESS':
        certs = x509.load_pem_x509_certificates(data['pem'].encode())
        return certs
    else:
        raise Exception(f"Certificate issue request is not successful.")

In [126]:
def save_cert(cert, encoding = 'all'):
    name = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value
    if encoding == 'all' or encoding == 'pem':
        with open(name+".pem", "wb") as outfile:
            outfile.write(cert.public_bytes(serialization.Encoding.PEM))
    if encoding == 'all' or encoding == 'der':
        with open(name+".der", "wb") as outfile:
            outfile.write(cert.public_bytes(serialization.Encoding.DER))

In [127]:
def save_certs(certs, encoding = 'all'):
    for cert in certs:
        save_cert(cert, encoding = encoding)

In [58]:
# certs = issue_certs()
# save_certs(certs)

In [108]:
list_certs(cert_type="contract")

Cert 1, CN: USEMAC00000001, Serial: 76F8365B420D67AEB3F7E8AC00E0BB844E54D7FE, Status: ISSUED
Cert 2, CN: EMAID_ID1, Serial: 316AFB1CF0EF5CCF630BD71BEB54E36A56F1A195, Status: ISSUED


In [None]:
download_cert('76F8365B420D67AEB3F7E8AC00E0BB844E54D7FE', encoding = 'der')