# Trabalho Prático 1  #
#### André Freitas PG54707 ####
#### Bruna Macieira PG54467 ####

## Descrição e Abordagem ao Problema ##

O principal objetivo deste trabalho passa por criar um comunicação privada assíncrona em modo  “Lightweight Cryptography” entre um agente Emitter e um agente Receiver. Esta implementação em Python passa pelo uso dos módulos Cryptography, Ascon e Asyncio, cada um deles dedicado a uma determinada ação ocorrente na implementação. 

Usamos o pacote Ascon para autenticar o criptograma envolvido na counicação bem como os metadados envolventes. O Ascon é uma família de algoritmos de encriptação autenticada e de hashing concebidos para serem leves e fáceis de implementar, mesmo com contramedidas adicionais contra ataques de canais laterais. Foi concebido por uma equipa de criptógrafos da Universidade de Tecnologia de Graz, da Infineon Technologies e da Universidade de Radboud: Christoph Dobraunig, Maria Eichlseder, Florian Mendel e Martin Schläffer.

O NIST anunciou a seleção da família Ascon como o padrão de criptografia leve em fevereiro de 2023, após receber feedback público em um workshop. O NIST está trabalhando com a equipe Ascon para elaborar os padrões de criptografia leve.

Asyncio é uma biblioteca para escrever código concorrente usando a sintaxe async/await, por isso tiramos proveito dessa funcionalidade para poder implementar a comunicação cliente-servidor.

O pacote Cryptography é uma biblioteca Python que fornece fórmulas criptográficas e primitivas para programadores. O uso deste pacote serve para implementar uma AEAD com “Tweakable Block Ciphers”. 

AEAD (Authenticated Encryption with Associated Data) é um esquema de criptografia que simultaneamente assegura a confidencialidade dos dados (também conhecida como privacidade: a mensagem criptografada é impossível de entender sem o conhecimento de uma chave secreta) e a autenticidade (ou seja, é inalterável: a mensagem criptografada inclui uma etiqueta de autenticação que o remetente só pode calcular possuindo a chave secreta).

##  Primeira Parte ##

#### Imports ####

In [1]:
import asyncio
import os
from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hmac
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from secrets import token_bytes

#### Implementação ####

In [152]:
class AsconCipher:
    def __init__(self, key):
        self.key = key

    def encrypt(self, plaintext, associated_data):
        nonce = token_bytes(16)  # Generate nonce
        cipher = Cipher(algorithms.Ascon128, modes.AEAD(self.key, nonce), backend=default_backend())
        encryptor = cipher.encryptor()
        encryptor.authenticate_additional_data(associated_data)
        ciphertext = encryptor.update(plaintext) + encryptor.finalize()
        return (nonce, ciphertext, encryptor.tag)

    def decrypt(self, nonce, ciphertext, tag, associated_data):
        cipher = Cipher(algorithms.Ascon128, modes.AEAD(self.key, nonce, tag), backend=default_backend())
        decryptor = cipher.decryptor()
        decryptor.authenticate_additional_data(associated_data)
        plaintext = decryptor.update(ciphertext) + decryptor.finalize()
        return plaintext

In [153]:
async def handle_client(reader, writer):
    private_key = x25519.X25519PrivateKey.generate()
    public_key = private_key.public_key().public_bytes(serialization.Encoding.Raw, serialization.PublicFormat.Raw)

    writer.write(public_key)
    await writer.drain()

    peer_public_key_bytes = await reader.read(32)
    peer_public_key = x25519.X25519PublicKey.from_public_bytes(peer_public_key_bytes)

    shared_key = private_key.exchange(peer_public_key)

    # Derive keys using HKDF
    derived_key_material = HKDF(
        algorithm=hashes.SHA256(),
        length=64,
        salt=None,
        info=b'handshake data',
        backend=default_backend()
    ).derive(shared_key)

    cipher_key = derived_key_material[:32]
    hmac_key = derived_key_material[32:]

    cipher = AsconCipher(cipher_key)

    while True:
        nonce = await reader.readexactly(16)
        ciphertext = await reader.readuntil(separator=b'|')
        tag = await reader.readexactly(16)
        associated_data = await reader.readuntil(separator=b'||')
        plaintext = cipher.decrypt(nonce, ciphertext, tag, associated_data)

        writer.write(plaintext)
        await writer.drain()

In [154]:
async def main():
    server = await asyncio.start_server(
        handle_client, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

In [155]:
async def send_public_key(writer):
    private_key = x25519.X25519PrivateKey.generate()
    public_key = private_key.public_key().public_bytes(serialization.Encoding.Raw, serialization.PublicFormat.Raw)
    writer.write(public_key)
    await writer.drain()

In [156]:
async def send_message():
    while True:
        try:
            message = input("Enter message to send (or 'quit' to exit): ")
            if message.lower() == 'quit':
                break
            
            reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

            # Send public key
            await send_public_key(writer)

            # Encrypt and send message
            writer.write(message.encode())
            await writer.drain()

            # Receive and decrypt response
            data = await reader.read(1024)  # Increased buffer size for receiving data
            print(f"Received message from server: {data.decode('utf-8', 'ignore')}")  # Decode using UTF-8

            writer.close()
            await writer.wait_closed()
        except KeyboardInterrupt:
            print("Exiting client...")
            break

In [157]:
task = asyncio.create_task(main())


In [158]:
await asyncio.create_task(send_message())

Received message from server: ѤԖL2#-fFcҴΤG#4
Received message from server: ?NJW
2ϛG!Q]5
!d
Received message from server: :qgnD XL~zbؽ7


## Segunda parte ##

#### Imports ####

In [137]:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import x448, ed448
from secrets import token_bytes

#### Implementação ####

In [138]:
class TweakableChaCha20AEAD:
    def __init__(self, key, tweak):
        self.key = key[:32]  # Ensure key is 32 bytes (256 bits)
        self.tweak = tweak

    def encrypt(self, plaintext, associated_data, nonce):
        # Create a ChaCha20 cipher with the tweaked key
        nonce = token_bytes(16)  # Generate nonce
        cipher = Cipher(algorithms.ChaCha20(self.key, nonce), mode=None, backend=default_backend())
        encryptor = cipher.encryptor()

        # Encrypt the plaintext
        ciphertext = encryptor.update(plaintext) + encryptor.finalize()

        # Generate a tag using HMAC with associated data
        h = hmac.HMAC(self.key, hashes.SHA256(), backend=default_backend())
        h.update(associated_data)
        h.update(ciphertext)
        tag = h.finalize()

        return ciphertext, tag

    def decrypt(self, ciphertext, tag, associated_data, nonce):
        # Create a ChaCha20 cipher with the tweaked key
        nonce = token_bytes(16)  # Generate nonce
        cipher = Cipher(algorithms.ChaCha20(self.key, nonce), mode=None, backend=default_backend())
        decryptor = cipher.decryptor()

        # Decrypt the ciphertext
        plaintext = decryptor.update(ciphertext) + decryptor.finalize()

        # Verify the tag using HMAC with associated data
        h = hmac.HMAC(self.key, hashes.SHA256(), backend=default_backend())
        h.update(associated_data)
        h.update(ciphertext)
        h.verify(tag)

        return plaintext

In [139]:
class KeyExchange:
    def __init__(self):
        self.agent1_private_key = x448.X448PrivateKey.generate()
        self.agent1_public_key = self.agent1_private_key.public_key()

        self.agent2_private_key = x448.X448PrivateKey.generate()
        self.agent2_public_key = self.agent2_private_key.public_key()

    def perform_key_exchange(self, private_key, public_key):
        shared_secret = private_key.exchange(public_key)
        return shared_secret

In [140]:
class SigningVerification:
    def __init__(self):
        self.agent1_private_key = ed448.Ed448PrivateKey.generate()
        self.agent1_public_key = self.agent1_private_key.public_key()

        self.agent2_private_key = ed448.Ed448PrivateKey.generate()
        self.agent2_public_key = self.agent2_private_key.public_key()

    def sign(self, private_key, data):
        signature = private_key.sign(data)
        return signature

    def verify(self, public_key, signature, data):
        try:
            public_key.verify(signature, data)
            return True
        except Exception as e:
            print(f"Verification failed: {e}")
            return False

In [141]:
# Inicializar o canal assimétrico
key_exchange = KeyExchange()
signing_verification = SigningVerification()

In [142]:
# Agent 1 performs key exchange
shared_secret1 = key_exchange.perform_key_exchange(key_exchange.agent1_private_key, key_exchange.agent2_public_key)

# Agent 2 performs key exchange
shared_secret2 = key_exchange.perform_key_exchange(key_exchange.agent2_private_key, key_exchange.agent1_public_key)

# Check if both shared secrets match
print("Shared secrets match:", shared_secret1 == shared_secret2)

Shared secrets match: True


In [143]:
# Convert shared secrets to bytes for encryption key
encryption_key = shared_secret1
authentication_key = shared_secret2

In [144]:
# Initialize the TweakableChaCha20AEAD with the encryption key and authentication key
tweakable_cipher = TweakableChaCha20AEAD(encryption_key, authentication_key)

In [148]:
# Encrypt
plaintext = b'Hello, World!'
associated_data = b'This is associated data'
nonce = token_bytes(16)  # Generate nonce
ciphertext, tag = tweakable_cipher.encrypt(plaintext, associated_data, nonce)
print("Plaintext:", plaintext)
print("Ciphertext:", ciphertext)
print("Tag:", tag)

Plaintext: b'Hello, World!'
Ciphertext: b'tn\x95p\r\x0e\x04jc4\xf8#\xf0'
Tag: b'\xe7\x1d\x7f\xd6f\xff\x06\xd9\r5R\x02*o@\xa1\xcbXx`\xf3\xb2\x85F\x06\xdbK\xd1\xd6\xae9('


In [149]:
try:
    decrypted_plaintext = tweakable_cipher.decrypt(ciphertext, tag, associated_data, nonce)
    print("Decrypted plaintext:", decrypted_plaintext.decode('utf-8'))
except UnicodeDecodeError:
    print("Decrypted plaintext (as bytes):", decrypted_plaintext)


Decrypted plaintext (as bytes): b'\xbb\xd9{\x92*0z\xecyR\xe8`\x90'


In [151]:
# Agent 1 signs a message
message = b'This is a message.'
signature = signing_verification.sign(signing_verification.agent1_private_key, message)

# Agent 2 verifies the signature
print("Signature verified:", signing_verification.verify(signing_verification.agent1_public_key, signature, message))

Signature verified: True
