In [1]:
from binascii import hexlify, unhexlify
import socket
from itertools import cycle
from noise.connection import NoiseConnection, Keypair
from base64 import b64decode

In [2]:
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('localhost', 2000))
s.listen(1)

conn, addr = s.accept()
print('Accepted connection from', addr)

Accepted connection from ('127.0.0.1', 52288)


In [3]:
id = conn.recv(5)
print(f"id [{len(id)}] {hexlify(id)}")

id [5] b'0003010000'


In [4]:
noise = NoiseConnection.from_name(b'Noise_KK_25519_AESGCM_SHA256')
noise.set_as_responder()

server_privkey = "../../../noiseprotocol/examples/noise-keys/server_key_25519"
client_pubkey = "../../../noiseprotocol/examples/noise-keys/client_key_25519.pub"

with open(server_privkey, "rb") as f:
    noise.set_keypair_from_private_bytes(Keypair.STATIC, f.read())
with open(client_pubkey, "rb") as f:
    s = f.read()
    print(f"s: [{len(s)}] {hexlify(s)}")
    b = b64decode(s)
    print(f"b: [{len(b)}] {hexlify(b)}")
    noise.set_keypair_from_public_bytes(Keypair.REMOTE_STATIC, b)
                                         
# noise.set_keypair_from_private_path(Keypair.STATIC, "../../../noiseprotocol/examples/noise-keys/server_key_25519")
# noise.set_keypair_from_private_path(Keypair.REMOTE_STATIC, "../../../noiseprotocol/examples/noise-keys/client_key_25519")
noise.set_prologue(id)

noise.start_handshake()

s: [44] b'48355a5842565652497947694f576574766d6e744c3849624a667a6731583975682f4a364a674c68786e633d'
b: [32] b'1f96570555512321a23967adbe69ed2fc21b25fce0d57f6e87f27a2602e1c677'


In [5]:
def analyze_incoming_msg(b):
    return (b[0] << 8) | b[1], b[2:]

def compose_outgoing_msg(b):
    return len(b).to_bytes(2, "big") + b

In [6]:
# Perform handshake. Break when finished
for action in cycle(['receive', 'send']):
    if noise.handshake_finished:
        break
    elif action == 'send':
        ciphertext = noise.write_message()
        print(f"ciphertext [{len(ciphertext)}] {hexlify(ciphertext)}")
        conn.sendall(compose_outgoing_msg(ciphertext))
    elif action == 'receive':
        d = conn.recv(2048)
        data_size, data = analyze_incoming_msg(d)
        print(f"data [{data_size}] {hexlify(data)}")
        plaintext = noise.read_message(data)
        print(f"plaintext [{len(plaintext)}] {hexlify(plaintext)}")
        

data [48] b'b6f7c25706eb0cd64329a1488b8dd08d3c7b6e753dc6a515db0e290329a1e400f61aa86c3b8c5d8f7ab0c50d0dc75fd2'
plaintext [0] b''
ciphertext [48] b'aa2e1845ec30cf31a24a00073748c90cb01b963e9b7c7f2f0986cdf8b3d2ba696640fb511fcef42c9c1dce431e1cb659'


In [None]:
# Endless loop "echoing" received data
while True:
    d = conn.recv(2048)
    data_size, data = analyze_incoming_msg(d)
    print(f"data [{data_size}] {hexlify(data)}")    
    if not data:
        break
    received = noise.decrypt(data)
    conn.sendall(compose_outgoing_msg(noise.encrypt(received)))