# Team Project 
## Member
- Hyeonseo Park (19102090)
- Younghwan Pahn (19102091)
### Subject 
Digital signature proxy in a cloud environment

In [1]:
%/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip
%pip install pi-heaan numpy pandas pickle
%%python3 -m pip install cryptography



UsageError: Line magic function `%/Library/Developer/CommandLineTools/usr/bin/python3` not found.


In [2]:
import piheaan as heaan
import json
from piheaan.math import sort
from piheaan.math import approx # for piheaan math function
import numpy as np
import pandas as pd
import pickle
from os import urandom
import os
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.asymmetric import padding as asymmetric_padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes


## User Class for simulating 
### Purpose
- Key Management
- Encrypt Secure Information
- Decrypt Secure Information

In [3]:
class User:
    def __init__(self, _id):
        params = heaan.ParameterPreset.FGa  # Set HEAAN library parameters
        self.context = heaan.make_context(params)  # Create HEAAN context
        heaan.make_bootstrappable(self.context)  # Enable bootstrapping
        self.id = _id
        key_file_path = _id
        heaan_private_key = heaan.SecretKey(self.context)  # Create a secret key
        os.makedirs(key_file_path, mode=0o775, exist_ok=True)  # Create directory
        heaan_private_key.save(key_file_path + "/secretkey.bin")  # Save the secret key
        key_generator = heaan.KeyGenerator(self.context, heaan_private_key)  # Create a public key
        key_generator.gen_common_keys()
        key_generator.save(key_file_path + "/")  # Save the public key

        # Generate RSA key pair
        self.private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048,
            backend=default_backend()
        )
        self.public_key = self.private_key.public_key()  # Derive public key from private key
    
    def readHeaanKeyPair(self, _id):
        key_file_path = "./" + _id
        sk = heaan.SecretKey(self.context, key_file_path + "/secretkey.bin")  # Load secret key
        pk = heaan.KeyPack(self.context, key_file_path + "/")  # Load public key pack
        pk.load_enc_key()
        pk.load_mult_key()
        enc = heaan.Encryptor(self.context)  # For encryption
        dec = heaan.Decryptor(self.context)  # For decryption
        eval = heaan.HomEvaluator(self.context, pk)  # For homomorphic evaluation
        return sk, pk, enc, dec, self.context, eval
    
    def encryptData(self, plainTexts):
        cipherTexts = []
        for data in plainTexts:
            cipherText = self.public_key.encrypt(
                data.encode(),  
                asymmetric_padding.OAEP(
                    mgf=asymmetric_padding.MGF1(algorithm=hashes.SHA256()),
                    algorithm=hashes.SHA256(),
                    label=None
                )
            )
            cipherTexts.append(cipherText)
        return cipherTexts
    
    def decryptData(self, cipherTexts):
        plainTexts = []
        for edata in cipherTexts:
            plainText = self.private_key.decrypt(
                edata,
                asymmetric_padding.OAEP(
                    mgf=asymmetric_padding.MGF1(algorithm=hashes.SHA256()),
                    algorithm=hashes.SHA256(),
                    label=None
                )
            )
            plainTexts.append(plainText.decode())
        return plainTexts
    
    def decryptData_SymmetricKey(self, key, cipherText):
        iv = cipherText[:16]  # Extract the IV from the first 16 bytes of the ciphertext
        actual_ciphertext = cipherText[16:]
        cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
        decryptor = cipher.decryptor()
        decrypted_padded = decryptor.update(actual_ciphertext) + decryptor.finalize()
        # Remove padding
        unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
        decrypted = unpadder.update(decrypted_padded) + unpadder.finalize()
        return decrypted.decode()

class Holder(User):
    def __init__(self, _id, email, name):
        super().__init__(_id)
        self.holder_info = [_id, email, name]
        self.symmetricKey = urandom(32)  # Generate a random 32-byte symmetric key
        self.secondSymmetricKey = urandom(32)  # Generate a second random 32-byte symmetric key
    
    def encryptData_SymmetricKey(self, key, plainText):
        iv = urandom(16)  # Generate a random 16-byte IV
        cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
        encryptor = cipher.encryptor()
        
        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(plainText.encode()) + padder.finalize()
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()
        return iv + ciphertext  # Prepend the IV to the ciphertext
    
    def encryptData_OtherKey(self, key, plainText, isBytes):
        data = plainText if isBytes else plainText.encode()
        cipherText = key.encrypt(
            data,  
            asymmetric_padding.OAEP(
                mgf=asymmetric_padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )   
        return cipherText
    

class CA(User):
    def __init__(self, _id):
        super().__init__(_id)
    
    def decryptData(self, cipherText):
        plainText = self.private_key.decrypt(
            cipherText,
            asymmetric_padding.OAEP(
                mgf=asymmetric_padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
        return plainText
    
    def encryptData(self, plainText, isBytes):
        data = plainText if isBytes else plainText.encode()
        cipherText = self.public_key.encrypt(
            data,  
            asymmetric_padding.OAEP(
                mgf=asymmetric_padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
        
        return cipherText
    
class Verifier(User):
    def __init__(self, _id):
        super().__init__(_id)

    def encryptData_OtherKey(self, key, plainTexts, isBytes):
        cipherTexts = []
        for plainText in plainTexts:
            data = plainText if isBytes else plainText.encode()
            cipherText = key.encrypt(
                data,  
                asymmetric_padding.OAEP(
                    mgf=asymmetric_padding.MGF1(algorithm=hashes.SHA256()),
                    algorithm=hashes.SHA256(),
                    label=None
                )
            )
            cipherTexts.append(cipherText)
        return cipherTexts   
    
    def decryptData(self, cipherText):
        plainText = self.private_key.decrypt(
            cipherText,
            asymmetric_padding.OAEP(
                mgf=asymmetric_padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
        return plainText



## Send Digital Envelop To CA
- Assuming the reliability of the information and identity of the holder, the certification issuance process proceeds.

- Holder encrypt Holder's Client ID with Holder's public key 
- Holder encrypt Public Key with Holder's Symmetric Key
- Symmetric key is encrypted by CA’s Public Key. ( Holder's Symmetric Key is already registed in the CA's Database securely. So CA can sign without decrypt holder's encrypted information )
- Holder converts his confidential information to int after encryption so that it can be used for homomorphic encryption.
- However, because this integer is very large, it cannot be used as a float, so the integer is divided into 15 digits and made into chunks.
- Send Digital Envelope to CA.

In [4]:
# Because the encrypted byte string converted to an integer is too long, dividing it is necessary.
def split_large_integer(n, chunk_size=15):
    n_str = str(n)  # Convert the large integer to a string
    chunks = []  # Initialize a list to hold the chunks
    for i in range(len(n_str), 0, -chunk_size):
        start = max(0, i - chunk_size)  # Calculate the start index for the current chunk
        chunks.append(int(n_str[start:i]))  # Extract the chunk and convert it back to an integer

    return chunks[::-1]  # Reverse the list to maintain the correct order

def serialize_public_key(public_key):
    pem = public_key.public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo
    )
    return pem.decode('utf-8')  # Convert bytes to string for serialization

def deserialize_public_key(pem_data):
    formatted_pem = pem_data.replace("\\n", "\n")  # Ensure correct newline characters
    try:
        public_key = serialization.load_pem_public_key(
            formatted_pem.encode('utf-8'),  # Convert PEM data to bytes
            backend=default_backend()
        )
        return public_key
    except ValueError as e:
        print("Failed to load PEM data:", e)  # Handle errors in loading PEM data
        return None


In [5]:
holder = Holder("990816-1", "lopahn2@gmail.com", "반영환")
_CA = CA("Government")

# Client already knows about CA's public key
PUca = _CA.public_key

# Holder encrypts his client ID and personal information
cipherTextOfHolderInfo = holder.encryptData(holder.holder_info)

# Convert each encrypted byte string to an integer
parseInt_CipherTextOfHolderInfo = [int.from_bytes(b, 'big') for b in cipherTextOfHolderInfo]

# Initialize dictionary to store chunks of the encrypted data
chunks = {
    0: [],
    1: [],
    2: []
}

# Split each large integer into smaller chunks
for i in range(len(parseInt_CipherTextOfHolderInfo)):
    chunks[i] = split_large_integer(parseInt_CipherTextOfHolderInfo[i])

# For HE (Homomorphic Encryption) Calculation, create data slots
log_slots = 15
num_slots = 2**log_slots

# Initialize messages for each part of the data
messages = {
    0: heaan.Message(log_slots),
    1: heaan.Message(log_slots),
    2: heaan.Message(log_slots)
}

# Populate the messages with the chunks
for k, v in chunks.items():
    for i in range(len(v)):
        messages[k][i] = complex(v[i], 0)

# Serialize holder's public key to a PEM format string
PUho = serialize_public_key(holder.public_key)

# Convert the PEM string to JSON format
puhoJson = json.dumps(PUho)

# Encrypt the JSON serialized public key using the holder's symmetric key
encryptedpuhoJson = holder.encryptData_SymmetricKey(holder.symmetricKey, puhoJson)

# Encrypt the symmetric key using CA's public key
encryptedSymmetricKey = holder.encryptData_OtherKey(PUca, holder.symmetricKey, True)

# Holder sends the digital envelope to CA, containing the message, encrypted public key, and encrypted symmetric key
envelop = {
    "message": messages,
    "public_key": encryptedpuhoJson,
    "symmetric_key": encryptedSymmetricKey
}


## Receive Digital Envelop From Holder
- CA Decrypt E(SY_KEY) by CA’s Private Key

-  CA Decrypt E(Message) By SY_KEY
-  CA obtain Holder’s public key and Encrypted Client ID
-  This stage, CA couldn’t know about Holder’s private
information.
- CA encrypt Holder's SY_KEY with CA's public Key and parse to integer. ( In this step, CA check holder's SY_KEY is registed or not. )
- CA makes the large integer 15 digits long to match the number of digits in the user's encrypted message.
- CA add ciphertext and encrypted symmetric key drived addend. Then make response message for response to the Holder

In [6]:
# Extract the plain messages from the envelope
plainMessages = envelop["message"]

# CA decrypts the symmetric key from the envelope
symmetricKey = _CA.decryptData(envelop["symmetric_key"])

# CA decrypts the received public key using the symmetric key
receivedPublicKey = _CA.decryptData_SymmetricKey(symmetricKey, envelop["public_key"])

# Read HEAAN key pair for the CA
sk, pk, enc, dec, context, eval = _CA.readHeaanKeyPair(_CA.id)

# Initialize dictionary to store encrypted messages
cipherMessages = {
    0: heaan.Ciphertext(context),
    1: heaan.Ciphertext(context),
    2: heaan.Ciphertext(context)
}

# Encrypt the plain messages using HEAAN encryptor and public key
for i in range(len(plainMessages)):
    enc.encrypt(plainMessages[i], pk, cipherMessages[i])

# Initialize a message to derive an additive component from the symmetric key
symmetricKeyDrivedAddend = heaan.Message(log_slots)

# Set cutoff for integer division
cutoff = 602
cnt = 0

# Derive the additive component from the symmetric key
while cnt != len(symmetricKeyDrivedAddend):
    # Encrypt each byte of the symmetric key, convert to an integer and adjust by cutoff
    symmetricKeyDrivedAddend[cnt] = int.from_bytes(
        _CA.encryptData(str(symmetricKey[(cnt % 32)]).encode(), True), "little"
    ) // 10 ** cutoff
    cnt += 1

# Initialize dictionary to store the response messages
responseMessage = {
    0: heaan.Ciphertext(context),
    1: heaan.Ciphertext(context),
    2: heaan.Ciphertext(context),
}

# Add the derived additive component to each encrypted message
for i in range(len(cipherMessages)):
    eval.add(cipherMessages[i], symmetricKeyDrivedAddend, responseMessage[i])


## Verify User Information
- Holder sends the HE-encrypted message received from the CA and the HE plaintext message they created, along with their plaintext identity information, to the Verifier.

- Verifier requests the CA to subtract the value added at issuance from the signed data to confirm the data's owner is the Holder.
- Verifier, after receiving the HE plaintext result from the CA, compares the result provided by the Holder with the CA's HE operation result that has been reverse-processed (CAResultMessage) to verify the authenticity of the Holder's identity information.
- If they are the same, the identity verification is completed.
- If they are different, the identity verification is false, and access is blocked.


In [7]:
# Initialize a Verifier object with ID "verifier"
verifier = Verifier("verifier")

# Extract the public key of the verifier
PUv = verifier.public_key

# Holder receives signed message from CA
holderSignValues = responseMessage  # Signed values from CA
holderEncryptInfo = messages  # Encrypted information from the holder
plainSecureInformation = holder.holder_info  # Holder's plain secure information

# Holder sends a message to the verifier containing data, sign, and result
holderSendMessageToVerifier = {
    "data": plainSecureInformation,
    "sign": holderSignValues,
    "result": holderEncryptInfo
}

# Verifier receives the message from the holder
targetData = holderSendMessageToVerifier["data"]  # Extract the plain secure information
targetSign = holderSendMessageToVerifier["sign"]  # Extract the signed values
targetResult = holderSendMessageToVerifier["result"]  # Extract the encrypted information

# Print the received data, sign, and result
print(targetData)
print(targetSign)
print(targetResult)


['990816-1', 'lopahn2@gmail.com', '반영환']
{0: (level: 9, log(num slots): 15, data: [ (240078264449600.000000+0.000000j), (684047245342855.000000+0.000000j), (931457321178576.000000+0.000000j), (742905522207669.000000+0.000000j), (776013173682995.000000+0.000000j), ..., (126993547194388.000000+0.000000j), (293369734527206.000000+0.000000j), (293148658827050.000000+0.000000j), (113698685007764.000000+0.000000j), (71531240047213.000000+0.000000j) ]), 1: (level: 9, log(num slots): 15, data: [ (240078264449612.000000+0.000000j), (569113796432951.000000+0.000000j), (1066909310274724.000000+0.000000j), (1309689365670826.000000+0.000000j), (247231094567498.000000+0.000000j), ..., (126993547194388.000000+0.000000j), (293369734527206.000000+0.000000j), (293148658827050.000000+0.000000j), (113698685007764.000000+0.000000j), (71531240047213.000000+0.000000j) ]), 2: (level: 9, log(num slots): 15, data: [ (240078264449608.000000+0.000000j), (417596855786105.000000+0.000000j), (1212456472985064.000000

In [8]:
# Initialize a dictionary to store the subtracted results
caSubResult = {
    0: heaan.Ciphertext(context),
    1: heaan.Ciphertext(context),
    2: heaan.Ciphertext(context),
}

# Subtract the derived additive component from each response message
for i in range(len(responseMessage)):
    eval.sub(responseMessage[i], symmetricKeyDrivedAddend, caSubResult[i])

# Initialize a dictionary to store the decrypted messages
caSubResultMessages = {
    0: heaan.Message(log_slots),
    1: heaan.Message(log_slots),
    2: heaan.Message(log_slots),
}

# Decrypt the subtracted results using the secret key
for i in range(len(caSubResultMessages)):
    dec.decrypt(caSubResult[i], sk, caSubResultMessages[i])

# Verifier receives the result from CA
verifierGetResultFromCA = caSubResultMessages

# Function to compare two messages
def compare_messages(msg1, msg2):
    for key in msg1:
        if key not in msg2:
            print(f"Key {key} is missing in the second message.")
            return False
        if len(msg1[key]) != len(msg2[key]):
            print(f"Length mismatch for key {key}: {len(msg1[key])} vs {len(msg2[key])}")
            return False
        for i in range(len(msg1[key])):
            if msg1[key][i] != msg2[key][i]:
                print(f"Mismatch at key {key}, index {i}: {msg1[key][i]} vs {msg2[key][i]}")
                return False
    return True

# Compare the decrypted messages from CA with the verifier's received result
if compare_messages(caSubResultMessages, verifierGetResultFromCA):
    print("Messages are identical. User is certified")
else:
    print("Messages are not identical. User is not certified")


Messages are identical. User is certified
