In [12]:
import socket
import ssl
import json
import time
import jsonrpcclient
from jsonrpcserver import method, dispatch
import base64
import os

from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PublicKey, X25519PrivateKey
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PublicKey, Ed25519PrivateKey
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import serialization

In [28]:
HOST, PORT = "localhost", 23340

# Create a socket (SOCK_STREAM means a TCP socket)
context = ssl.create_default_context()
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.load_verify_locations(cafile="C:/Users/mcope/OneDrive - stu.scau.edu.cn/School Works/Computer Network/QhitChat/Certificate/server.pem")

@method
def ping_pong():
    return "pong from client"

def construct_payload(content: str):
    payload=bytes(str(content)+"\n", encoding="utf8")
    return len(payload).to_bytes(4, "big")+payload

In [29]:
# client invoke test.
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    sock.connect((HOST, PORT))
    sock.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 10000, 3000))

    # wrap with TLS layer to add security.
    sock = context.wrap_socket(sock, server_hostname="QhitChat-Server")

    # Initiate request
    sock.sendall(construct_payload(jsonrpcclient.request("Ping")))

    # Receive response from server
    received = sock.recv(1024)
    print(received)
    l, result = int.from_bytes(received[0:4], "big"), jsonrpcclient.parse(json.loads(str(received[7:], "utf8")))
    print(result)

b'\x00\x00\x00,\xef\xbb\xbf{"jsonrpc":"2.0","id":19,"result":"Pong"}'
Ok(result='Pong', id=19)


In [30]:
# server invoke test.
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    sock.connect((HOST, PORT))
    sock.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 10000, 3000))

    # wrap with TLS layer to add security.
    sock = context.wrap_socket(sock, server_hostname="QhitChat-Server")

    # Initiate request
    sock.sendall(construct_payload(jsonrpcclient.request("PingPong")))

    # Receive response from server
    received = sock.recv(1024)
    print(str(received[7:], "utf8"))

    # Respond server RPC call
    response = dispatch(str(received[7:], "utf8"))
    print(str(response))
    if response.wanted:
        sock.sendall(construct_payload(str(response)))
    
    received = sock.recv(1024)
    l, result = int.from_bytes(received[0:4], "big"), jsonrpcclient.parse(json.loads(str(received[7:], "utf8")))
    print(result)

{"jsonrpc":"2.0","id":2,"method":"ping_pong","params":[]}
{"jsonrpc": "2.0", "result": "pong from client", "id": 2}
Ok(result='Pong from server', id=20)


In [None]:
HOST, PORT = "localhost", 23340

# Create a socket (SOCK_STREAM means a TCP socket)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    sock.connect((HOST, PORT))
    sock.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 10000, 3000))

    start_time=time.time()
    count=0
    while True:
        # Authenticate server by selecting a random nonuce and encrypts it with server public key.
        nonce = os.urandom(128)

        # Generate a private key for use in the exchange.
        private_key = X25519PrivateKey.generate()

        # Connect to server and send data
        payload=bytes(str(jsonrpcclient.request("ExchangePublicKey", [str(base64.b64encode(private_key.public_key().public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw)), encoding="utf8"),
                                                                      str(base64.b64encode())]))+"\n", encoding="utf8")
        sock.sendall(len(payload).to_bytes(4, "big")+payload)

        # Receive data from the server
        received = sock.recv(1024)
        l, result = int.from_bytes(received[0:4], "big"), jsonrpcclient.parse(json.loads(str(received[7:], "utf8")))

        # print("Received: {}".format(received))

        peer_public_key = X25519PublicKey.from_public_bytes(base64.b64decode(result.result))
        shared_key = private_key.exchange(peer_public_key)

        # Perform key derivation.
        derived_key = HKDF(
            algorithm=hashes.SHA512(),
            length=32,
            salt=None,
            info=b'handshake data',
        ).derive(shared_key)
        # print(base64.b64encode(derived_key))
        
        count+=1
        if time.time()-start_time>5:
            print("{}requests/s".format(count/(time.time()-start_time)))
            start_time=time.time()
            count=0

319.7990850474785requests/s
409.27051439938356requests/s
392.19457662452595requests/s
471.01870350958205requests/s
410.53853135164053requests/s
472.833945178688requests/s
406.5328503707083requests/s
440.88546638389306requests/s
470.4621020059094requests/s
473.22377262301717requests/s
446.65434202932653requests/s
472.537179867865requests/s
451.8080009029366requests/s
422.69091857665364requests/s
470.8940621287764requests/s
432.71346357983333requests/s


KeyboardInterrupt: 

In [72]:
# Server private key
server_private_key = Ed25519PrivateKey.generate()
server_exchange_private_key = X25519PrivateKey.generate()

# Client private key
client_exchange_private_key = X25519PrivateKey.generate()

# Client generate nonce for authentication.
client_nonce = os.urandom(128)
payload=client_nonce+client_exchange_private_key.public_key().public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw)

# Server accept nonce and public key
accept_client_nonce=payload[0:128]
accept_client_public_key=payload[128:]

# Server generate server nounce, append its public key for key exchange, sign it with its own private key and send it back.
server_nonce = os.urandom(128)
payload=client_nonce+server_nonce+server_exchange_private_key.public_key().public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw)
signature=server_private_key.sign(payload)

# Cilent verify if payload is sent from server.
if not server_private_key.public_key().verify(signature, payload): # Check if the message is from server or not by verifying signature
    accept_client_nonce=payload[0:128]
    print(accept_client_nonce==client_nonce)
    accept_server_nonce=payload[128:256]
    accept_server_public_key=X25519PublicKey.from_public_bytes(payload[256:])
    client_shared_key = private_key.exchange(accept_server_public_key)
    # Perform key derivation.
    derived_key = HKDF(
        algorithm=hashes.SHA512(),
        length=32,
        salt=None,
        info=b'handshake data',
    ).derive(client_shared_key)
    chacha = ChaCha20Poly1305(derived_key)
    payload = chacha.encrypt(accept_client_nonce, "hello", )

True


In [None]:
import OpenSSL
key = OpenSSL.crypto.PKey()
key.generate_key( OpenSSL.crypto.TYPE_RSA, 1024 )
cert = OpenSSL.crypto.X509()
cert.set_serial_number(0)
cert.get_subject().CN = "me"
cert.set_issuer( cert.get_subject() )
cert.gmtime_adj_notBefore( 0 )
cert.gmtime_adj_notAfter( 10*365*24*60*60 )
cert.set_pubkey( key )
cert.sign( key, 'md5' )
open( "certificate.cer", 'w' ).write( 
  OpenSSL.crypto.dump_certificate( OpenSSL.crypto.FILETYPE_PEM, cert ) )
open( "private_key.pem", 'w' ).write( 
  OpenSSL.crypto.dump_privatekey( OpenSSL.crypto.FILETYPE_PEM, key ) )
p12 = OpenSSL.crypto.PKCS12()
p12.set_privatekey( key )
p12.set_certificate( cert )
open( "d:/container.pfx", 'wb' ).write( p12.export() )

In [2]:
import socket
import ssl

# SET VARIABLES
packet, reply = "<packet>SOME_DATA</packet>", ""
HOST, PORT = '127.0.0.1', 23340

# CREATE SOCKET
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)

# WRAP SOCKET
wrappedSocket = ssl.wrap_socket(sock)

# CONNECT AND PRINT REPLY
wrappedSocket.connect((HOST, PORT))
wrappedSocket.send(packet)
print(wrappedSocket.recv(1280))

# CLOSE SOCKET CONNECTION
wrappedSocket.close()

TypeError: a bytes-like object is required, not 'str'