In [None]:
# Run in terminal: sudo -u brokerregistry /bin/python3 /home/sht/github/brokerRegistry/api/api.py

In [None]:
# TEST: Imports
from seccom import Seccom
import datetime, secrets, binascii, os
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHMAC
from Cryptodome.Cipher import PKCS1_OAEP

In [None]:
# REQ: Peer Keys
# B Keys
ec_private_key_b = ec.generate_private_key(ec.SECP256R1(), default_backend())
ec_public_key_b = ec_private_key_b.public_key()
ec_public_key_b_hex = ec_public_key_b.public_bytes(encoding=serialization.Encoding.PEM,
    format=serialization.PublicFormat.SubjectPublicKeyInfo
).hex()
# Generate a new RSA private key
rsa_private_key_b = rsa.generate_private_key(
    public_exponent=65537,
    key_size=2048,
    backend=default_backend()
)
# Define the subject of the certificate
subject = x509.Name([
    x509.NameAttribute(NameOID.COUNTRY_NAME, "DE"),
    x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, ""),
    x509.NameAttribute(NameOID.LOCALITY_NAME, ""),
    x509.NameAttribute(NameOID.ORGANIZATION_NAME, ""),
    x509.NameAttribute(NameOID.COMMON_NAME, "0.0.0.0"),
])
# Create the certificate
cert_b = (
    x509.CertificateBuilder()
    .subject_name(subject)
    .issuer_name(subject)
    .public_key(rsa_private_key_b.public_key())
    .serial_number(x509.random_serial_number())
    .not_valid_before(datetime.datetime.utcnow())
    .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=365))
    .add_extension(
        x509.BasicConstraints(ca=False, path_length=None), critical=True,
    )
    .add_extension(
        x509.SubjectAlternativeName([x509.DNSName("127.0.0.1")]), critical=False,
    )
    .sign(rsa_private_key_b, hashes.SHA256(), default_backend())
)
# Print the private key and certificate in PEM format
private_key_pem_b = rsa_private_key_b.private_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PrivateFormat.TraditionalOpenSSL,
    encryption_algorithm=serialization.NoEncryption()
)
cert_pem_b = cert_b.public_bytes(encoding=serialization.Encoding.PEM)

In [None]:
# TEST: Initialization
seccom = Seccom()

In [None]:
# TEST: ECDHE
resultEcdhe = seccom.ecdh(ec_public_key_b_hex)

In [None]:
# TEST: RSA Exchange
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(bytes.fromhex(resultEcdhe['sharedSecret'])),
                modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
encPKRSA = encryptor.update(cert_pem_b)
resultRsaE = seccom.rsa(encPKRSA.hex(), iv.hex(), resultEcdhe['sharedSecret'])

In [None]:
# TEST: Encrypt and Decrypt
inputPlaintext="abc123!)?"
resultCipherText = seccom.encrypt(inputPlaintext.encode('utf-8'), cert_pem_b, resultEcdhe['sharedSecret'])
seccom.rsa_private_key = rsa_private_key_b
resultDec_plaintext = seccom.decrypt(resultCipherText['ciphertext'], resultCipherText['iv'], resultEcdhe['sharedSecret'])

In [None]:
# RESULTS:
print('Output ECDHE Handling:')
print(resultEcdhe)
print('\nOutput RSA Handling:')
print(resultRsaE)
print('\nPlaintext Pre-Ciphering:')
print(inputPlaintext)
print('\nCiphertext:')
print(resultCipherText)
print('\nDecrypted Plaintext:')
print(resultDec_plaintext.decode('utf-8'))

In [None]:
# REQ: Both 'Parties' for Handshake
sec1 = Seccom()
sec2 = Seccom()

In [None]:
# TEST: Initiate Handshake on Peer (ECDHE)
req11 = sec1.ec_public_key.public_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PublicFormat.SubjectPublicKeyInfo
).hex()
req12 = sec2.ec_public_key.public_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PublicFormat.SubjectPublicKeyInfo
).hex()

resp11 = sec2.ecdh(req11)
resp12 = sec1.ecdh(req12)

In [None]:
# TEST: Finalize Handshake (RSAE)
req21 = {}
req21['iv'] = os.urandom(16)
cipher = Cipher(algorithms.AES(bytes.fromhex(resp11['sharedSecret'])),
                modes.CBC(req21['iv']), backend=default_backend())
encryptor = cipher.encryptor()
req21['encPKRSA'] = encryptor.update(sec1.cert_pem)
req22 = {}
req22['iv'] = os.urandom(16)
cipher = Cipher(algorithms.AES(bytes.fromhex(resp11['sharedSecret'])),
                modes.CBC(req22['iv']), backend=default_backend())
encryptor = cipher.encryptor()
req22['encPKRSB'] = encryptor.update(sec2.cert_pem)

resp21 = sec2.rsa(req21['encPKRSA'].hex(), req21['iv'].hex(), resp12['sharedSecret'])
resp22 = sec1.rsa(req22['encPKRSB'].hex(), req22['iv'].hex(), resp11['sharedSecret'])

res21 = sec1.decrypt(resp21['payload']['ciphertext'], resp21['payload']['iv'], resp11['sharedSecret'])
res22 = sec2.decrypt(resp22['payload']['ciphertext'], resp22['payload']['iv'], resp11['sharedSecret'])

In [None]:
# RESULT: Handshake
print('Clear-text Key 1:')
print(sec1.cert.public_bytes(encoding=serialization.Encoding.PEM))
print('Clear-text Key 2:')
print(sec2.cert.public_bytes(encoding=serialization.Encoding.PEM))
print('Request Initial Key 1:')
print(req11)
print('Request Initial Key 2:')
print(req12)
print('Peer Response for First Request 1:')
print(resp11)
print('Peer Response for First Request 2:')
print(resp12)
print('Rsa Request 1:')
print(req21)
print('Rsa Request 2:')
print(req22)
print('Rsa Responese 1:')
print(resp21)
print('Rsa Responese 2:')
print(resp22)
print('Rsa Responese Deciphered 1:')
print(res21)
print('Rsa Responese Deciphered 2:')
print(res22)
print('Val 1')
print(str(sec1.cert.public_bytes(encoding=serialization.Encoding.PEM) == res22))
print('Val 2')
print(str(sec2.cert.public_bytes(encoding=serialization.Encoding.PEM) == res21))

In [None]:
#IMPORTS, CONSTANTS, TEST-FUNCTION:
import requests, json, string, random

apiTokenValue = 'LlLvXURqNHrp75iyz9Riwen4DJ2QcfDK78sCw4tgjgPF4gsFcIJuPbQfrUMm'
userTokenValue = 'hmLHka03PSxsE73bKnD4xNXlPmy6UF7oiVNHANEvxAZOCsXz0J9DX82JjyBR'
creatorTokenValue = 'q98lD6P6s4jQWO8yMUZalvQZqZ2pQq3QHZLnwQKhXPhocrVYIuWTEYBOJta9'

def testRequest(methode: str, url: str, headers: dict, cookies: dict or None, body: dict) -> dict or Exception:
    resp = {}
    if methode == 'POST':
        try:
            response = requests.post(url, headers=headers, cookies=cookies, data=json.dumps(body), verify=False)
            if response.status_code == 200:
                resp['payload']: dict = json.loads(response.text)
                resp['csrfToken'] = response.cookies['X-CSRFToken']
                resp['sessionId'] = response.cookies['sessionId']
            else:
                raise Exception('Request failed with status code ' + str(response.status_code))
        except:
            raise Exception('Request failed without status code ')
    elif methode == 'GET':
        try:
            response = requests.get(url, headers=headers, cookies=cookies, data=json.dumps(body), verify=False)
            if response.status_code == 200:
                resp['payload']: dict = json.loads(response.text)
                resp['csrfToken'] = response.cookies['X-CSRFToken']
                resp['sessionId'] = response.cookies['sessionId']
            else:
                raise Exception('Request failed with status code ' + str(response.status_code))
        except:
            raise Exception('Request failed without status code ')

    return resp

def createId(length: int) -> str:
    letters     = string.ascii_lowercase
    result_str  = ''.join(random.choice(letters) for i in range(length))
    return result_str

In [None]:
# TEST: Ecdhe Request
sec3 = Seccom()
#sec4 = Seccom()

ecPkHex = sec3.ec_public_key.public_bytes(
    encoding=serialization.Encoding.PEM,
    format=serialization.PublicFormat.SubjectPublicKeyInfo
).hex()

resp0 = {}
try:
    headers = {'Content-Type': 'application/json'}
    body = {'apiToken': apiTokenValue, 'ecdhPublic': ecPkHex}
    resp0 = testRequest('POST', 'https://0.0.0.0:22432/handshake', headers, None, body)
except:
    raise Exception('Request failed with/without status code ')

localEcdhe = sec3.ecdh(str(resp0['payload']['key']))
sharedSecret = localEcdhe['sharedSecret']
valEcdhe = localEcdhe['payload']['digest'] == resp0['payload']['digest']

In [None]:
# TEST: Rsae Request
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(bytes.fromhex(sharedSecret)),
                modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
encSymRsaPk = encryptor.update(sec3.cert_pem)

resp1 = {}
try:
    cookies = {'sessionId': resp0['sessionId'], 'X-CSRFToken': resp0['csrfToken']}
    headers = {'Content-Type': 'application/json'}
    body = {'apiToken': apiTokenValue, 'iv': iv.hex(), 'key': encSymRsaPk.hex()}
    resp1 = testRequest('GET', 'https://0.0.0.0:22432/handshake', headers, cookies, body)
except:
    raise Exception('Request failed with/without status code ')

peer_rsa_crt = sec3.decrypt(resp1['payload']['ciphertext'], resp1['payload']['iv'], sharedSecret)

In [None]:
# TEST: Distribution Request
resp2 = {}
try:
    channelName = createId(5)
    channelName_hex_string = hex(int.from_bytes(channelName.encode(), 'big'))
    peer_hex_string = peer_rsa_crt.hex()
    encRes = sec3.encrypt(bytes.fromhex(channelName_hex_string[2:]), bytes.fromhex(peer_hex_string), sharedSecret)
    encIv = encRes['iv']
    encText = encRes['ciphertext']
    cookies = {
        'sessionId': resp1['sessionId'],
        'X-CSRFToken': resp1['csrfToken']
    }
    headers = {'Content-Type': 'application/json'}
    body = {'apiToken': apiTokenValue, 'userToken': userTokenValue, 'iv': encIv, 'channelName': encText}
    resp2 = testRequest('POST', 'https://0.0.0.0:22432/distribution', headers, cookies, body)
except Exception as e:
    print(e)
    raise Exception('Request failed with/without status code ')

In [None]:
# TEST: API Ca Creation Request
resp3 = {}
try:
    cookies = {
        'sessionId': resp2['sessionId'],
        'X-CSRFToken': resp2['csrfToken']
    }
    headers = {'Content-Type': 'application/json'}
    body = {'apiToken': apiTokenValue, 'creatorToken': creatorTokenValue}
    resp3 = testRequest('POST', 'https://0.0.0.0:22432/cert/api/ca', headers, cookies, body)
except Exception as e:
    print(e)
    raise Exception('Request failed with/without status code ')

In [None]:
# TEST: API Ca Request
resp4 = {}
try:
    cookies = {
        'sessionId': resp3['sessionId'],
        'X-CSRFToken': resp3['csrfToken']
    }
    headers = {'Content-Type': 'application/json'}
    body = {'apiToken': apiTokenValue, 'creatorToken': creatorTokenValue}
    resp4 = testRequest('GET', 'https://0.0.0.0:22432/cert/api/ca', headers, cookies, body)
except Exception as e:
    print(e)
    raise Exception('Request failed with/without status code ')

In [None]:
# TEST: API Broker Crt Creation Request
resp6 = {}
try:
    cookies = {
        'sessionId': resp4['sessionId'],
        'X-CSRFToken': resp4['csrfToken']
    }
    headers = {'Content-Type': 'application/json'}
    body = {'apiToken': apiTokenValue, 'creatorToken': creatorTokenValue}
    resp6 = testRequest('POST', 'https://0.0.0.0:22432/cert/api/broker', headers, cookies, body)
except Exception as e:
    print(e)
    raise Exception('Request failed with/without status code ')

In [None]:
# TEST: API Client Crt Creation Request
resp7 = {}
try:
    cookies = {
        'sessionId': resp6['sessionId'],
        'X-CSRFToken': resp6['csrfToken']
    }
    headers = {'Content-Type': 'application/json'}
    body = {'apiToken': apiTokenValue, 'creatorToken': creatorTokenValue}
    resp7 = testRequest('POST', 'https://0.0.0.0:22432/cert/api/client', headers, cookies, body)
except Exception as e:
    print(e)
    raise Exception('Request failed with/without status code ')

In [None]:
# TEST: CAM Ca Creation Request
resp8 = {}
try:
    cookies = {
        'sessionId': resp7['sessionId'],
        'X-CSRFToken': resp7['csrfToken']
    }
    headers = {'Content-Type': 'application/json'}
    body = {'apiToken': apiTokenValue, 'creatorToken': creatorTokenValue}
    resp8 = testRequest('POST', 'https://0.0.0.0:22432/cert/cam/ca', headers, cookies, body)
except Exception as e:
    print(e)
    raise Exception('Request failed with/without status code ')

In [None]:
# TEST: CAM Ca Request
resp9 = {}
try:
    cookies = {
        'sessionId': resp8['sessionId'],
        'X-CSRFToken': resp8['csrfToken']
    }
    headers = {'Content-Type': 'application/json'}
    body = {'apiToken': apiTokenValue, 'creatorToken': creatorTokenValue}
    resp9 = testRequest('GET', 'https://0.0.0.0:22432/cert/cam/ca', headers, cookies, body)
except Exception as e:
    print(e)
    raise Exception('Request failed with/without status code ')
print(resp9)
print(sec3.decrypt(resp9['payload']['caCrt']['ciphertext'], resp9['payload']['caCrt']['iv'], sharedSecret))
print(sec3.decrypt(resp9['payload']['caKey']['ciphertext'], resp9['payload']['caKey']['iv'], sharedSecret))

In [None]:
# TEST: CAM Broker Crt Creation Request
resp10 = {}
try:
    cookies = {
        'sessionId': resp9['sessionId'],
        'X-CSRFToken': resp9['csrfToken']
    }
    headers = {'Content-Type': 'application/json'}
    body = {'apiToken': apiTokenValue, 'creatorToken': creatorTokenValue}
    resp10 = testRequest('POST', 'https://0.0.0.0:22432/cert/cam/broker', headers, cookies, body)
except Exception as e:
    print(e)
    raise Exception('Request failed with/without status code ')

In [None]:
# TEST: Als Creation Request
resp11 = {}
try:
    cookies = {
        'sessionId': resp10['sessionId'],
        'X-CSRFToken': resp10['csrfToken']
    }
    headers = {'Content-Type': 'application/json'}
    body = {'apiToken': apiTokenValue, 'creatorToken': creatorTokenValue}
    resp11 = testRequest('POST', 'https://0.0.0.0:22432/als', headers, cookies, body)
except Exception as e:
    print(e)
    raise Exception('Request failed with/without status code ')

In [None]:
# TEST: Als Distribution Request
resp12 = {}
try:
    cookies = {
        'sessionId': resp11['sessionId'],
        'X-CSRFToken': resp11['csrfToken']
    }
    headers = {'Content-Type': 'application/json'}
    body = {'apiToken': apiTokenValue, 'creatorToken': creatorTokenValue}
    resp12 = testRequest('GET', 'https://0.0.0.0:22432/als', headers, cookies, body)
except Exception as e:
    print(e)
    raise Exception('Request failed with/without status code ')

In [None]:
# TEST: Deauth

In [None]:
print('RESULTS ECDHE TEST')
print('Server Response:')
print(resp0)
print('Server Response Validation:')
print(valEcdhe)
print('\n\nRESULTS RSA TEST')
print('Server Response:')
print(resp1)
print('Peer Cert:')
print(str(peer_rsa_crt))
print('Peer Cert Hex:')
print(str(peer_rsa_crt.hex()))
print('\n\nRESULTS DISTRIBUTION TEST')
print('Server Response:')
print(resp2)
print('\n\nRESULTS API CA CREATION TEST')
print('Server Response:')
print(resp3)
print('\n\nRESULTS API CA DISTRIBUTION TEST')
print('\nServer Response:')
print(resp4)
print(sec3.decrypt(resp4['payload']['caCrt']['ciphertext'], resp4['payload']['caCrt']['iv'], sharedSecret))
print(sec3.decrypt(resp4['payload']['caKey']['ciphertext'], resp4['payload']['caKey']['iv'], sharedSecret))
print('\n\nRESULTS API BROKER CREATION TEST')
print('Server Response:')
print(resp6)
print(sec3.decrypt(resp6['payload']['caCrt']['ciphertext'], resp6['payload']['caCrt']['iv'], sharedSecret))
print(sec3.decrypt(resp6['payload']['caKey']['ciphertext'], resp6['payload']['caKey']['iv'], sharedSecret))
print('\n\nRESULTS API CLIENT CREATION TEST')
print('Server Response:')
print(resp7)
print(sec3.decrypt(resp7['payload']['caCrt']['ciphertext'], resp7['payload']['caCrt']['iv'], sharedSecret))
print(sec3.decrypt(resp7['payload']['caKey']['ciphertext'], resp7['payload']['caKey']['iv'], sharedSecret))
print('\n\nRESULTS CAM CA CREATION TEST')
print('Server Response:')
print(resp8)
print('\n\nRESULTS CAM CA DISTRIBUTION TEST')
print('Server Response:')
print(resp9)
print(sec3.decrypt(resp9['payload']['caCrt']['ciphertext'], resp9['payload']['caCrt']['iv'], sharedSecret))
print(sec3.decrypt(resp9['payload']['caKey']['ciphertext'], resp9['payload']['caKey']['iv'], sharedSecret))
print('\n\nRESULTS API BROKER CREATION TEST')
print('Server Response:')
print(resp10)
print(sec3.decrypt(resp10['payload']['caCrt']['ciphertext'], resp10['payload']['caCrt']['iv'], sharedSecret))
print(sec3.decrypt(resp10['payload']['caKey']['ciphertext'], resp10['payload']['caKey']['iv'], sharedSecret))
print('\n\nRESULTS API CLIENT CREATION TEST')
print('Server Response:')
print(resp11)
print('\n\nRESULTS ALS KEY DISTRIBUTION TEST')
print('Server Response:')
print(resp12)