<a href="https://colab.research.google.com/github/leorasdsouza/Secure-Key-Management-System/blob/main/Secure_Key_Management_System.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
# symmetric key distribution
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding
import os

class KDC:
    def __init__(self):
        self.client_keys = {}  # Stores client IDs and their pre-shared keys

    def register_client(self, client_id, key):
        self.client_keys[client_id] = key

    def generate_symmetric_key(self):
        return os.urandom(32)  # 256-bit AES key

    def distribute_key(self, client_id1, client_id2):
        key = self.generate_symmetric_key()
        encrypted_key1 = self.encrypt(key, self.client_keys[client_id1])
        encrypted_key2 = self.encrypt(key, self.client_keys[client_id2])
        return encrypted_key1, encrypted_key2

    def encrypt(self, data, key):
        iv = os.urandom(16)  # Initialization vector
        cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
        encryptor = cipher.encryptor()
        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(data) + padder.finalize()
        return iv + encryptor.update(padded_data) + encryptor.finalize()

    def decrypt(self, encrypted_data, key):
        iv = encrypted_data[:16]  # Extract the IV from the encrypted data
        ciphertext = encrypted_data[16:]
        cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
        decryptor = cipher.decryptor()
        padded_data = decryptor.update(ciphertext) + decryptor.finalize()
        unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
        return unpadder.update(padded_data) + unpadder.finalize()

# Example
kdc = KDC()

# Register clients with pre-shared keys
client1_key = os.urandom(32)
client2_key = os.urandom(32)
kdc.register_client("client1", client1_key)
kdc.register_client("client2", client2_key)

# Distribute a symmetric key between client1 and client2
encrypted_key1, encrypted_key2 = kdc.distribute_key("client1", "client2")
print("Encrypted key for client1:", encrypted_key1)
print("Encrypted key for client2:", encrypted_key2)

# Client1 decrypts the symmetric key
decrypted_key1 = kdc.decrypt(encrypted_key1, client1_key)
print("Decrypted key for client1:", decrypted_key1)

# Client2 decrypts the symmetric key
decrypted_key2 = kdc.decrypt(encrypted_key2, client2_key)
print("Decrypted key for client2:", decrypted_key2)

# Verify that both clients have the same symmetric key
assert decrypted_key1 == decrypted_key2, "Decrypted keys do not match!"
print("Decrypted keys match. Symmetric key distribution successful!")

Encrypted key for client1: b'&\xc7&\x1d3\x19wA\xe6\xd8\x14\xd5\xb8\xc7\xe3\xe6\x7f\xe2v\x00\xf1\xb1\xba\xf7\x16s\xbeJ\x8a\xdde\x8a%\x8b\xdaN\xc6\x04g}7\xc9\xf3\xbf_\x90\xc4\xdc\xc6\x96\xf4EU\x06#\xf1\x91L\x83\x03\xc2no\x7f'
Encrypted key for client2: b'<D\xabw\xd0y\xcc\xd0\xaa\x86\xca\xe7p\xc44\xc7\xc6p}\xffn\x8c\xb0\x9b\xe9(\xca\x13}7\xc7d\x13\\\xbe\xaeY\xcar-K\x92\x01(\xe8\xf6\x89\xec\xb5\xdd\xb5\xbe\x88\xd2v9\xcd\x90\x84\x87>\x8ao\xec'
Decrypted key for client1: b'}\xba\x84\x0fT\x1e\x1d\x80\x826\x196\x89\t\x9bv\xe9b\xe0\x13X\x84\xf9y\xac,Ca\xcb]y\x13'
Decrypted key for client2: b'}\xba\x84\x0fT\x1e\x1d\x80\x826\x196\x89\t\x9bv\xe9b\xe0\x13X\x84\xf9y\xac,Ca\xcb]y\x13'
Decrypted keys match. Symmetric key distribution successful!


In [8]:
# pki asymmetric encryption and revocation of certificate
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import serialization
import datetime

class CA:
    def __init__(self):
        # Generate CA's private key
        self.ca_private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
        self.ca_public_key = self.ca_private_key.public_key()
        self.revoked_certs = []  # List to store revoked certificates

    def issue_certificate(self, client_public_key, client_name):
        # Create a certificate for the client
        subject = issuer = x509.Name([
            x509.NameAttribute(NameOID.COMMON_NAME, client_name),
        ])
        cert = x509.CertificateBuilder().subject_name(
            subject
        ).issuer_name(
            issuer
        ).public_key(
            client_public_key
        ).serial_number(
            x509.random_serial_number()
        ).not_valid_before(
            datetime.datetime.utcnow()
        ).not_valid_after(
            datetime.datetime.utcnow() + datetime.timedelta(days=365)
        ).sign(self.ca_private_key, hashes.SHA256())
        return cert

    def revoke_certificate(self, cert):
        self.revoked_certs.append(cert.serial_number)
        print(f"Certificate with serial number {cert.serial_number} revoked.")

    def is_revoked(self, cert):
        return cert.serial_number in self.revoked_certs

# Example
ca = CA()

# Client generates their own key pair
client_private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
client_public_key = client_private_key.public_key()

# Client requests a certificate from the CA
client_cert = ca.issue_certificate(client_public_key, "client1")

# Serialize the certificate for storage/transmission
pem_cert = client_cert.public_bytes(serialization.Encoding.PEM)
print("Client Certificate:\n", pem_cert.decode())

# Revoke the certificate
ca.revoke_certificate(client_cert)
print("Is certificate revoked?", ca.is_revoked(client_cert))

Client Certificate:
 -----BEGIN CERTIFICATE-----
MIICsDCCAZigAwIBAgIUWIvJFj3gxcdxs3GrPIPxA0zTzbMwDQYJKoZIhvcNAQEL
BQAwEjEQMA4GA1UEAwwHY2xpZW50MTAeFw0yNTAzMTgyMDQyNThaFw0yNjAzMTgy
MDQyNThaMBIxEDAOBgNVBAMMB2NsaWVudDEwggEiMA0GCSqGSIb3DQEBAQUAA4IB
DwAwggEKAoIBAQDGL9IyR0FGyHB7WyRFdJdQhjg/+cwj49P0+BxEghOm63WEVGGR
itMLrLeTXrhDjNts4bNDOdrX1NnJPumK0ZIu9MDLX/NFePevgILk3MDLV58tKyEx
OOC3QgK9Nm1qNIn6CKSxYJbzTNbRLGSjUah/LeZST1jwaMPRcOQ5QKgGecS6D7eV
WIcrTg8CuJ7P0IlV28ECeukCDxHmzFT7B31ror44Gtcm/jDVrYffQVxnmOFFc07x
uPjE9E9U4pUqaflNofR0QuCpVKc/SEaSF+1fREFB28w83OAMqfNuwgFFBTQHrNyg
Btg1MmMQsCQwvaNH6117j+nauoI063aVpiDHAgMBAAEwDQYJKoZIhvcNAQELBQAD
ggEBAJ0tmLHrWRKluF1ijU7XRCXng/HqC244kYAoveqo59ROgxNYPAD8YUh7769Y
gHtGcnyH0L5FvHL6F+JqL1jWaz8nUCQubR64nOeHMjkfhlNfRLX6iPZLqGlPWDxz
6Gi6IzcaelMGiUFk0xaAJSobgoWAnSqRPGDHNSZpQVyNRrZXw7+gOBtsROZnYLbY
S7s1uF81sr+e3pZxKnBiUBwGbZKwN/uMfCaew6yBtTF0rYtfhNoth254peLhbvKv
7OMut7hNM4k5Lc4VssBXvLzb2NouWpjeRN9wiVAr6v7ehz6feJd4KnDQ+b53VsYD
GgMiHcFwV9S4tm4cu+jldwOKhKE=
-----END CER

In [9]:
# diffie hellman key exchange
from cryptography.hazmat.primitives.asymmetric import dh
from cryptography.hazmat.primitives import serialization

# Generate DH parameters
parameters = dh.generate_parameters(generator=2, key_size=2048)

# Generate private keys for two parties
private_key_a = parameters.generate_private_key()
private_key_b = parameters.generate_private_key()

# Generate public keys
public_key_a = private_key_a.public_key()
public_key_b = private_key_b.public_key()

# Perform key exchange
shared_key_a = private_key_a.exchange(public_key_b)
shared_key_b = private_key_b.exchange(public_key_a)

# Verify that both parties have the same shared key
assert shared_key_a == shared_key_b, "Shared keys do not match!"

# Serialize and store the shared key
with open("shared_key.key", "wb") as shared_file:
    shared_file.write(shared_key_a)

print("Diffie-Hellman key exchange completed. Shared key stored in 'shared_key.key'.")

Diffie-Hellman key exchange completed. Shared key stored in 'shared_key.key'.
