In [1]:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import (Encoding, PrivateFormat, NoEncryption)
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import serialization
import uuid
import datetime

In [7]:
def generate_root_ca(name,password):
    password = str.encode(password)
    private_key = rsa.generate_private_key(
    public_exponent=65537,
    key_size=2048,
    backend=default_backend()
    )
    public_key = private_key.public_key()
    builder = x509.CertificateBuilder()
    builder = builder.subject_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, name)
    ]))
    builder = builder.issuer_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, name),
    ]))
    builder = builder.not_valid_before(datetime.datetime.now()-datetime.timedelta(hours=1))
    builder = builder.not_valid_after(datetime.datetime.now()+datetime.timedelta(7))
    builder = builder.serial_number(int(uuid.uuid4()))
    builder = builder.public_key(public_key)
    builder = builder.add_extension(
        x509.BasicConstraints(ca=True, path_length=None), critical=True,
    )
    certificate = builder.sign(
        private_key=private_key, algorithm=hashes.SHA256(),
        backend=default_backend()
    )

    #print(isinstance(certificate, x509.Certificate))

    with open(name+".key", "wb") as f:
        f.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.TraditionalOpenSSL,\
                                          encryption_algorithm=serialization.BestAvailableEncryption(password)))

    with open(name+".crt", "wb") as f:
        f.write(certificate.public_bytes(
            encoding=serialization.Encoding.PEM,
        ))

    print(certificate.not_valid_before)
    print(certificate.not_valid_after)
    
    pem_cert = open(name+".crt","rb").read()
    ca_crt = x509.load_pem_x509_certificate(pem_cert, default_backend())
    pem_key = open(name+".key","rb").read()
    ca_key = serialization.load_pem_private_key(pem_key, password=password, backend=default_backend())


    builder = x509.CertificateRevocationListBuilder()
    builder = builder.last_update(datetime.datetime.now()-datetime.timedelta(hours=1))
    builder = builder.next_update(datetime.datetime.now() + datetime.timedelta(1) - datetime.timedelta(hours=1))
    builder = builder.issuer_name(ca_crt.issuer)

    cert_revocation_list = builder.sign(private_key=ca_key,algorithm=hashes.SHA256(),backend=default_backend())

    with open("ca.crl","wb") as f:
        f.write(cert_revocation_list.public_bytes(serialization.Encoding.PEM))
    
def generate_intermediate_ca(name,password,ca_name,ca_password):
    password = str.encode(password)
    ca_password = str.encode(ca_password)
    pem_key = open(ca_name+".key","rb").read()
    ca_key = serialization.load_pem_private_key(pem_key, password=ca_password, backend=default_backend())

    private_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048,
        backend=default_backend()
    )
    public_key = private_key.public_key()
    builder = x509.CertificateBuilder()
    builder = builder.subject_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, name)
    ]))
    builder = builder.issuer_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, name),
    ]))
    builder = builder.not_valid_before(datetime.datetime.now()-datetime.timedelta(hours=1))
    builder = builder.not_valid_after(datetime.datetime.now()+datetime.timedelta(7))
    builder = builder.serial_number(int(uuid.uuid4()))
    builder = builder.public_key(public_key)
    builder = builder.add_extension(
        x509.BasicConstraints(ca=True, path_length=None), critical=True,
    )
    certificate = builder.sign(
        private_key=ca_key, algorithm=hashes.SHA256(),
        backend=default_backend()
    )

    with open(name+".key", "wb") as f:
        f.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.TraditionalOpenSSL,\
                                          encryption_algorithm=serialization.BestAvailableEncryption(password)))

    with open(name+".crt", "wb") as f:
        f.write(certificate.public_bytes(
            encoding=serialization.Encoding.PEM,
        ))

    print(certificate.not_valid_before)
    print(certificate.not_valid_after)

def issue_cert(name,ica,ica_password):
    ica_password = str.encode(ica_password)
    private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048, backend=default_backend())

    builder = x509.CertificateSigningRequestBuilder()
    builder = builder.subject_name(x509.Name([
        x509.NameAttribute(NameOID.COMMON_NAME, name)
    ]))

    builder = builder.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True,)

    request = builder.sign(private_key, hashes.SHA256(), default_backend())

    with open(name+".csr","wb") as f:
        f.write(request.public_bytes(Encoding.PEM))

    with open(name+".key","wb") as f:
        f.write(private_key.private_bytes(Encoding.PEM, PrivateFormat.TraditionalOpenSSL, NoEncryption()))

    pem_csr = open(name+".csr","rb").read()
    csr = x509.load_pem_x509_csr(pem_csr, default_backend())

    pem_cert = open(ica+".crt","rb").read()
    ica_cert = x509.load_pem_x509_certificate(pem_cert, default_backend())
    pem_key = open(ica+".key","rb").read()
    ica_key = serialization.load_pem_private_key(pem_key, password=ica_password, backend=default_backend())

    print(csr.subject)

    builder = x509.CertificateBuilder()
    builder = builder.subject_name(csr.subject)
    builder = builder.issuer_name(ica_cert.subject)
    builder = builder.not_valid_before(datetime.datetime.now()-datetime.timedelta(hours=1))
    builder = builder.not_valid_after(datetime.datetime.now()+datetime.timedelta(7))
    builder = builder.public_key(csr.public_key())
    builder = builder.serial_number(int(uuid.uuid4()))

    for ext in csr.extensions:
        builder = builder.add_extension(ext.value, ext.critical)

    certificate = builder.sign(private_key=ica_key, algorithm=hashes.SHA256(),backend=default_backend())

    print(certificate.serial_number)

    with open(name+".crt","wb") as f:
        f.write(certificate.public_bytes(serialization.Encoding.PEM))

def revoke_cert(name,password):
    password = str.encode(password)
    # cert you want to revoke
    cert_to_revoke_data = open(name+".crt","rb").read()
    cert_to_revoke = x509.load_pem_x509_certificate(cert_to_revoke_data, backend=default_backend())

    pem_cert = open("ca.crt","rb").read()
    ca_crt = x509.load_pem_x509_certificate(pem_cert, default_backend())
    pem_key = open("ca.key","rb").read()
    ca_key = serialization.load_pem_private_key(pem_key, password=password, backend=default_backend())

    # load crl
    pem_crl_data = open("ca.crl","rb").read()
    crl = x509.load_pem_x509_crl(pem_crl_data, backend=default_backend())

    # generate a new crl object
    builder = x509.CertificateRevocationListBuilder()
    builder = builder.issuer_name(crl.issuer)
    builder = builder.last_update(crl.last_update)
    builder = builder.next_update(datetime.datetime.now() + datetime.timedelta(1, 0, 0))

    # add crl certificates from file to the new crl object
    for i in range(0,len(crl)):    
        builder = builder.add_revoked_certificate(crl[i])

    # see if the cert to be revoked already in the list    
    ret = crl.get_revoked_certificate_by_serial_number(cert_to_revoke.serial_number)

    # if not, then add new revoked cert
    if  not isinstance(ret, x509.RevokedCertificate):

        revoked_cert = x509.RevokedCertificateBuilder()\
        .serial_number(cert_to_revoke.serial_number)\
        .revocation_date(datetime.datetime.now()).build(backend=default_backend())

        builder = builder.add_revoked_certificate(revoked_cert)

    # sign and save to new crl file
    cert_revocation_list = builder.sign(private_key=ca_key,algorithm=hashes.SHA256(),backend=default_backend())

    with open("ca.crl","wb") as f:
        f.write(cert_revocation_list.public_bytes(serialization.Encoding.PEM))
        
def verify_cert(issuer,certname):
    pem_cert = open(issuer+".crt","rb").read()
    issuer_cert = x509.load_pem_x509_certificate(pem_cert, default_backend())
    issuer_public_key = issuer_cert.public_key()
    pem_cert = open(certname+".crt","rb").read()
    cert_to_check = x509.load_pem_x509_certificate(pem_cert, default_backend())
    pem_crl_data = open("ca.crl","rb").read()
    crl = x509.load_pem_x509_crl(pem_crl_data, backend=default_backend())
    ret = crl.get_revoked_certificate_by_serial_number(cert_to_check.serial_number)
    if  not isinstance(ret, x509.RevokedCertificate):
        issuer_public_key.verify(
            cert_to_check.signature,
            cert_to_check.tbs_certificate_bytes,
            padding.PKCS1v15(),
            cert_to_check.signature_hash_algorithm,
        )
        return True
    else:
        print("Certificate is revoked!")
        return False

In [8]:
while True:
    choice = input(
    '''
    Please input option to continue:
    1.Generate root CA
    2.generate intermediate CA
    3.Issue a certificate
    4.Verify a certificate
    5.Revoke a certificate                                                        
    Input any other key to exit
    ''')
    #Call the respective functions based on user input
    if choice == '1':
        name = input("Enter root CA name: ")
        password = input("Enter password for encrypting root CA private key: ")
        generate_root_ca(name,password)
    elif choice == '2':
        ca_name = input("Enter root CA name: ")
        ca_password = input("Enter password for encrypting root CA private key: ")
        name = input("Enter intermediate CA name: ")
        password = input("Enter password for encrypting intermediate CA private key: ")
        generate_intermediate_ca(name,password,ca_name,ca_password)
    elif choice == '3':
        name = input("Enter certificate name: ")
        ica = input("Enter CA name: ")
        ica_password = input("Enter password for encrypting CA private key: ")
        issue_cert(name,ica,ica_password)
    elif choice == '4':
        cert_name = input("Enter certificate name: ")
        ca_name = input("Enter root CA name: ")
        ica_name = input("Enter intermediate CA name: ")
        try:
            if verify_cert(ica_name,cert_name) and verify_cert(ca_name,ica_name):
                print("Valid cert!")
        except:
            print("Invalid cert!")
    elif choice == '5':
        name = input("Enter name of certificate to be revoked: ")
        password = input("Enter password for encrypting root CA private key: ")
        revoke_cert(name,password)
        print("Certificate "+name+" has been revoked.")
    #Break the loop thus exit the program if user input does not match with any option 
    else:
        print("Thank you for using this app.")
        break


    Please input option to continue:
    1.Generate root CA
    2.generate intermediate CA
    3.Issue a certificate
    4.Verify a certificate
    5.Revoke a certificate                                                        
    Input any other key to exit
    1
Enter root CA name: ca
Enter password for encrypting root CA private key: 4010
2021-11-28 18:07:50
2021-12-05 19:07:50

    Please input option to continue:
    1.Generate root CA
    2.generate intermediate CA
    3.Issue a certificate
    4.Verify a certificate
    5.Revoke a certificate                                                        
    Input any other key to exit
    2
Enter root CA name: ca
Enter password for encrypting root CA private key: 4010
Enter intermediate CA name: ica
Enter password for encrypting intermediate CA private key: 4010
2021-11-28 18:07:57
2021-12-05 19:07:57

    Please input option to continue:
    1.Generate root CA
    2.generate intermediate CA
    3.Issue a certificate
    4.Verify a c