In [2]:
import asyncio
import ascon

async def handle_emitter(key_init_receiver, nonce_init_receiver, reader, writer):
        
    # Lê os dados associados
    associateddata = await reader.readuntil(b'_')
    size = int(associateddata.decode()[:-1])
    
    #Lê a mensagem
    ciphertext = await reader.readexactly(size)
    
    #gerar chave e nonce
    key = ascon.hash(key_init_receiver.encode(), variant="Ascon-Xof", hashlength=16)
    nonce = ascon.hash(nonce_init_receiver.encode(), variant="Ascon-Xof", hashlength=16)
    
    #decifrar a mensagem
    plaintext = ascon.decrypt(key, nonce, associateddata, ciphertext, variant="Ascon-128")
    
    if plaintext is None:
        print("Autentication failed. Message compromised.")
        writer.close()
    else:
        print(f"Message: {plaintext.decode()}")
        writer.close()
    
#receiver
async def receiver(key_init_receiver, nonce_init_receiver):
    server = await asyncio.start_server(
        lambda reader, writer: handle_emitter(
            key_init_receiver, nonce_init_receiver, reader, writer), '127.0.0.1', 8888)

    print("Receiver ready...\n")

    async with server:
        await server.serve_forever()


#Emitter
async def send_message( key_init_emitter, nonce_init_emitter):
    
    #establecer concexão
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)

    plaintext = input("Enter message: ").encode()
    #associateddata = input("Enter associated data: ")
    
    #dados associados = comprimento do text
    associateddata = (str(len(plaintext)+16) + '_').encode()

    #gerar chaves aliatórias
    key = ascon.hash(key_init_emitter.encode(), variant="Ascon-Xof", hashlength=16)
    nonce = ascon.hash(nonce_init_emitter.encode(), variant="Ascon-Xof", hashlength=16)
    
    #cifrar texto
    ciphertext = ascon.encrypt(
        key, nonce, associateddata, plaintext, variant="Ascon-128")
    
    #anexar dados associados
    message = associateddata + ciphertext

    # Envia a mensagem para o servidor
    writer.write(message)
    await writer.drain()

    # Fecha a conexão
    writer.close()
    await writer.wait_closed()
    


In [3]:
#Receiver
key_init_receive = "27082001"
nonce_init_receive = "12071972"

asyncio.create_task(receiver(key_init_receive, nonce_init_receive))  # Inicia o servidor

<Task pending name='Task-5' coro=<receiver() running at /tmp/ipykernel_38206/4097190614.py:28>>

Receiver ready...



In [4]:
#Emitter, caso certo

key_init_emitter = "27082001"
nonce_init_emitter = "12071972"

async def emitter():
        
    await send_message

# Envia a mensagem
await send_message( key_init_emitter, nonce_init_emitter)


Message: sdghjkl


In [4]:
#Emitter, key errada

key_init_emitter = "27082002"
nonce_init_emitter = "12071972"

# Envia a mensagem
await send_message(key_init_emitter, nonce_init_emitter) 


Autentication failed. Message compromised.


In [5]:
#Emitter, nonce errado

key_init_emitter = "270801"
nonce_init_emitter = "12071973"

# Envia a mensagem
await send_message(key_init_emitter, nonce_init_emitter) 


Autentication failed. Message compromised.


# EX2

O modo de uma cifra por blocos contém pelo menos 3 fases

    - A preparação da mensagem num conjunto de blocos do mesmo tamanho (inclui o padding) e a geração das chaves, IV’s,  nounces ou tweaks que sejam necessários ao processamento.
    - O processamento individual dos blocos com a informação fornecida pela 1ª fase.
    - A re-combinação dos criptogramas individuais dos vários blocos, construindo o criptograma final da mensagem.

In [1]:
import sys
import struct, os
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.x448 import X448PrivateKey, X448PublicKey
from cryptography.hazmat.primitives.asymmetric.ed448 import Ed448PrivateKey, Ed448PublicKey
from cryptography.exceptions import InvalidSignature 
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
import math
import asyncio


#receiver
async def receiver_ex2():
    server = await asyncio.start_server(lambda reader, writer: handle_emitter_ex2(reader, writer), '127.0.0.1', 8888)

    print("Receiver ready...\n")

    async with server:
        await server.serve_forever()
        
        
async def handle_emitter_ex2(reader, writer):
    
    #### X448 Key Exchange
    
    private_key = X448PrivateKey.generate()
    public_key = private_key.public_key()
    
    
    # Envia as publicKeys
    writer.write(public_key.public_bytes_raw())
    await writer.drain()
    

    public_key_emitter = await reader.readexactly(56)
    public_key_emitter = X448PublicKey.from_public_bytes(public_key_emitter)
    
    verification_key = await reader.readexactly(57)
    verification_key = Ed448PublicKey.from_public_bytes(verification_key)
    
    shared_key = private_key.exchange(public_key_emitter)
    # Perform key derivation.
    key = HKDF(
        algorithm=hashes.SHA256(),
        length=32,
        salt=None,
        info=None,
    ).derive(shared_key)
    
    
        
    # Lê a tag
    signature = await reader.readexactly(114)
    
    #Lê os dados associados
    ad = await reader.readuntil(b'_')
    
    iv = ad[:16]
    
    associated_data = ad[16:].decode()[:-1]
    
    num_blocos = int(associated_data.split('+')[1])
    padding = int(associated_data.split('+')[2])
    size = num_blocos * 64
    
    #Lê o criptograma
    ciphertext = await reader.readexactly(size)
        
    try:
        verification_key.verify(signature, ad + ciphertext)
    except InvalidSignature:
        print("Assinatura inválida.")
        return
    

    decrypted_text = b""

    tweak = iv
    for i in range(num_blocos):
    # Decifragem

        if i != num_blocos - 1:
            cipher = Cipher(algorithms.ChaCha20(key, tweak), mode=None, backend=default_backend())
            decryptor = cipher.decryptor()
            decrypted_text += decryptor.update(ciphertext[:64]) + decryptor.finalize()

            ciphertext = ciphertext[64:]
        else:
            cipher = Cipher(algorithms.ChaCha20(key, tweak), mode=None, backend=default_backend())
            decryptor = cipher.decryptor()
            decrypted_text += decryptor.update(ciphertext[:64 - padding]) + decryptor.finalize()

        tweak = incrementar_tweak(tweak)


    print(decrypted_text.decode())

    # if plaintext is None:
    #     print("Autentication failed. Message compromised.")
    #     writer.close()
    # else:
    #     print(f"Message: {plaintext.decode()}")
    writer.close()
        
#Emitter
async def send_message_ex2():
    
    #establecer concexão
    reader, writer = await asyncio.open_connection('127.0.0.1', 8888)
    
    #### ED448 Signing&Verification

    ed_private_key = Ed448PrivateKey.generate()
    verification_key = ed_private_key.public_key()
    
    
    # Lê a publicKey
    public_key_receiver = await reader.readexactly(56)
    public_key_receiver = X448PublicKey.from_public_bytes(public_key_receiver)


    # Gera a chave privada e a pública
    private_key = X448PrivateKey.generate()
    public_key = private_key.public_key()
       
    # Envia a publicKey gerada
    writer.write(public_key.public_bytes_raw() + verification_key.public_bytes_raw())
    await writer.drain()

    # Obtem a chave compartilhada
    shared_key = private_key.exchange(public_key_receiver)
    # Perform key derivation.
    key = HKDF(
        algorithm=hashes.SHA256(),
        length=32,
        salt=None,
        info=None,
    ).derive(shared_key)
    

    message = input("Enter message: ").encode()
    
    blocos, pad = divide_em_blocos_com_padding(message, 64)    

    iv = os.urandom(16)

    ciphertext = b""

    tweak = iv
    for bloco in blocos:

        # Inicialização do objeto de cifra
        cipher = Cipher(algorithms.ChaCha20(key, tweak), mode=None, backend=default_backend())
        # Cifragem
        encryptor = cipher.encryptor()
        ciphertext += encryptor.update(bloco) + encryptor.finalize()

        tweak = incrementar_tweak(tweak)

    associated_data = iv + b'+' + str(len(blocos)).encode() + b'+' + str(pad).encode()

    ciphertext = associated_data + b'_' + ciphertext

    siganture = ed_private_key.sign(ciphertext)

    ciphertext = siganture + ciphertext
    

    # Envia a mensagem para o servidor
    writer.write(ciphertext)
    await writer.drain()

    # Fecha a conexão
    writer.close()
    await writer.wait_closed()
    

def divide_em_blocos_com_padding(mensagem, tamanho_bloco):
    bytes_adicionais = 0
    numero_blocos = math.ceil(len(mensagem) / tamanho_bloco)
    blocos = []
    for i in range(numero_blocos):
        inicio = i * tamanho_bloco
        fim = (i + 1) * tamanho_bloco
        bloco = mensagem[inicio: fim]
        if len(bloco) < tamanho_bloco:  # Adicionar padding com zeros
            bytes_adicionais = tamanho_bloco - len(bloco)
            bloco += b'\x00' * bytes_adicionais
        blocos.append(bloco)
    return blocos, bytes_adicionais

def incrementar_tweak(tweak):
    # Decodificar o tweak como um número inteiro de 128 bits (16 bytes)
    valor = int.from_bytes(tweak, byteorder='big')
    
    # Incrementar o valor do tweak
    novo_valor = valor + 1
    
    # Codificar o novo valor de volta para um tweak de 16 bytes
    novo_tweak = novo_valor.to_bytes(16, byteorder='big')
    
    return novo_tweak


In [2]:
asyncio.create_task(receiver_ex2())

<Task pending name='Task-5' coro=<receiver_ex2() running at /tmp/ipykernel_23930/3946752333.py:15>>

Receiver ready...



In [3]:
async def emitter():
        
    await send_message

# Envia a mensagem
await send_message_ex2()


associated_dataassociated_dataassociated_dataassociated_dataassociated_dataassociated_data
