In [1]:
def set_parameters(kyberX):
    if kyberX == "kyber512":
        n = 256
        k = 2
        q = 3329
        eta1 = 3
        eta2 = 2
        du = 10
        dv = 4
        print("Done set parameters kyber512")
    elif kyberX == "kyber768":
        n = 256
        k = 3
        q = 3329
        eta1 = 2
        eta2 = 2
        du = 10
        dv = 4
        print("Done set parameters kyber768")
    elif kyberX == "kyber1024": 
        n = 256
        k = 4
        q = 3329
        eta1 = 2
        eta2 = 2
        du = 11
        dv = 5
        print("Done set parameters kyber1024")
    return n, k, q, eta1, eta2, du, dv

In [2]:
from sage.all import parent, ZZ, vector, PolynomialRing, GF
from sage.all import log, ceil, randint, set_random_seed, random_vector, matrix, floor
import numpy as np
import random

# Kyber parameters 
# default is Kyber768
n = 256
k = 3
q = 3329
eta1 = 2
eta2 = 2
du = 10
dv = 4
Fq = GF(q)
Rq = PolynomialRing(Fq, "x")
R, x = PolynomialRing(ZZ, "x").objgen()
fx = R([1] + [0] * (n - 1) + [1])


def B(eta):
    # Kết quả là một số nguyên ngẫu nhiên phạm vi từ [-2*eta;2*eta]      
    return sum(np.random.randint(0, 2, eta) - np.random.randint(0, 2, eta))


def string_to_bits(s):
    bits_message = []
    for letter in s:
        for i in range(6, -1, -1):
            bits_message.append(ord(letter) >> i & 1)
    return bits_message


def bits_to_string(b):
    s = []
    if len(b) == 0:
        return ""
    sub = b[:7]
    i = 0
    while not (sub == [0] * 7) and i < len(b) // 7:
        s.append(chr(int(''.join(str(item) for item in sub), 2)))
        i += 1
        sub = b[7 * i:7 * i + 7]
    return "".join(s)


def map_q_to_01(polynomial):
    new_coeffs = [(coeff + q // 4) % q for coeff in polynomial]
    return [int(new_coeff) * (2 / q) - 0.5 for new_coeff in new_coeffs]


def add_noise_v(v, P):
    noisy_v = []
    for vv in v:
        # Tạo một vector nhiễu mới
        # Mỗi phần tử noise được tạo bằng cách:
        # Nếu random.uniform(0, 1) < P (tức xác suất ngẫu nhiên < P), thì lấy 2 ** (int(random.uniform(0, 3))), 
        # tức là một số nguyên mũ của 2 trong khoảng [1, 8).
        # Nếu không, thì lấy 0 (không thêm nhiễu).
        noise = R([2 ** (int(random.uniform(0, 3))) if random.uniform(0, 1) < P else 0 for _ in range(0, n)])
        noisy_v.append(vv + noise)
    return noisy_v


def add_noise_u(u, P):
    noisy_u = []
    for uu in u:
        # Nếu random.uniform(0, 1) < P (xác suất ngẫu nhiên < P), thì lấy 2 ** (int(random.uniform(0, 9))), 
        # tức là một số nguyên mũ của 2 trong khoảng [1, 512).
        # Nếu không, thì lấy 0 (không thêm nhiễu).
        noise = vector(R, k,
                       [R([2 ** (int(random.uniform(0, 9))) if random.uniform(0, 1) < P else 0 for _ in range(0, n)])
                        for _ in range(k)])
        noisy_u.append(uu + noise)
    return noisy_u


def compress(poly, d):
    return R([round(int(coeff) * (2 ** d) / (q - 1)) for coeff in poly])


def compress_u(polys, d):
    for i in range(k):
        polys[i] = compress(polys[i], d)
    return polys


def decompress(poly_list, d):
    return [R([round(int(coeff) * (q - 1) / (2 ** d)) for coeff in poly]) for poly in poly_list]


def decompress_u(poly_list, d):
    for i in range(len(poly_list)):
        for j in range(k):
            poly_list[i][j] = decompress([poly_list[i][j]], d)[0]
    return poly_list


def decompress_list(lst, d):
    return [round(int(coeff) * (q - 1) / (2 ** d)) for coeff in lst]


def generate_keys():
    A = matrix(Rq, k, k, [Rq.random_element(degree=n - 1) for _ in range(k * k)])
#     print("A", type(A))
    e = vector(R, k, [R([(B(eta1)) for _ in range(n)]) for _ in range(k)])
#     print("e", type(e))
    s = vector(R, k, [R([(B(eta1)) for _ in range(n)]) for _ in range(k)])
#     print("s", type(s))
    t = (A * s + e) % fx
#     print("t", type(t))

    return (A, t), s  # (pk), sk


def encrypt(message, pk):
    A, t = pk
    if type(message) == str:
        message = string_to_bits(message)

    message = list(message)
    # chia message thành các đoạn nhỏ, mỗi đoạn dài 256 ký tự
    message = [message[i:i + 256] for i in range(0, len(message), 256)]
    u, v = [], []
    r = vector(R, k, [R([(B(eta1)) for _ in range(n)]) for _ in range(k)])
    e_1 = vector(R, k, [R([(B(eta2)) for _ in range(n)]) for _ in range(k)])
    e_2 = R([(B(eta2)) for _ in range(n)])
    for submessage in message:
        v.append(compress((r * t + e_2 + R(decompress_list(submessage, 1))) % fx, dv))
        u.append(compress_u((r * A + e_1) % fx, du))
#     print("v =", v)
    return u, v


def decrypt(u, v, sk, for_pc=False):
    message = []

    v = decompress(v, dv)
    u = decompress_u(u, du)
    for i in range(0, len(v)):
        decrypted_message = (v[i] - sk * u[i]) % fx
        if for_pc:
            message.append(map_q_to_01(decrypted_message))
        else:
            message.append([int(coef > q // 4 and coef < 3 * q // 4) for coef in decrypted_message])
    if for_pc:
        return np.array([bit for submessage in message for bit in submessage])
    else:
        all_messages = [bit for submessage in message for bit in submessage]
        return "".join(
            [bits_to_string(bits) for bits in [all_messages[i:i + 7] for i in range(0, len(all_messages), 7)]])


In [3]:
# Test inference time
from Repetition_code import *
# from Kyber import *
import time
import string
import random


# Set Kyber parameters 
n, k, q, eta1, eta2, du, dv = set_parameters("kyber768")
probability_of_bit_flip_in_channel = 10**-9 # typical

message = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(100))
print("Org text:", (message))
print("Length org text:", len(message))
message = string_to_bits(message)
start_time = time.time()

pk,sk = generate_keys()
key_time = time.time() - start_time

## encode message
encoded_small = repetition_encode(message)
closest_power_of_2 = 256
while len(encoded_small) > closest_power_of_2:
    closest_power_of_2 *= 2 
encoded = zero_padding(encoded_small, closest_power_of_2)
padding_time = time.time() - start_time - key_time
## encrypt message 
u, v = encrypt(encoded, pk)
encrypt_time = time.time() - start_time - key_time - padding_time

noisy_v = add_noise_v(v, probability_of_bit_flip_in_channel)
noisy_u = add_noise_u(u, probability_of_bit_flip_in_channel)
noise_time = time.time() - start_time  - encrypt_time - key_time - padding_time

decrypted = decrypt(noisy_u, noisy_v, sk)
decrypt_time = time.time() - start_time - noise_time - encrypt_time - key_time - padding_time
total_time = time.time() - start_time
print(f"key {key_time}, padding {padding_time}, encrypt {encrypt_time}, noise {noise_time}, decrypt {decrypt_time}, total {total_time}", "seconds")
    

Done set parameters kyber768
Org text: CF8WLER7S47AP595EPP4GICEZ4VFFOV2B2M41S4TSROXLCX350E4MP4ARDCSYQM8RAA6VYVOF8ESQRPM20IJ6NCSISFP689M0B0K
Length org text: 100
key 0.11332511901855469, padding 0.02064824104309082, encrypt 0.20037102699279785, noise 0.029662132263183594, decrypt 0.0524594783782959, total 0.4165048599243164 seconds


In [4]:
# Test inference time
from Repetition_code import *
# from Kyber import *
import time
import string
import random


# Set Kyber parameters 
n, k, q, eta1, eta2, du, dv = set_parameters("kyber1024")
probability_of_bit_flip_in_channel = 10**-9 # typical
print("2^i key_time encrypt_time noise_time decrypt_time total_time")

for i in range(3, 23):
    org_message = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(2 << i))
#     print("Org text: 2 ^", i, "=", len(org_message))
    message = string_to_bits(org_message)
    start_time = time.time()

    pk,sk = generate_keys()
    key_time = time.time() - start_time

    ## encrypt message 
    u, v = encrypt(message, pk)
    encrypt_time = time.time() - start_time - key_time

    noisy_v = add_noise_v(v, probability_of_bit_flip_in_channel)
    noisy_u = add_noise_u(u, probability_of_bit_flip_in_channel)
    noise_time = time.time() - start_time  - encrypt_time - key_time

    decrypted = decrypt(noisy_u, noisy_v, sk)
    decrypt_time = time.time() - start_time - noise_time - encrypt_time - key_time
    total_time = time.time() - start_time
#     print("Result:", decrypted == org_message)
#     print(f"genkey {key_time}, encrypt {encrypt_time}, noise {noise_time}, decrypt {decrypt_time}, total {total_time}", "seconds")
    print(i+1, key_time, encrypt_time, noise_time, decrypt_time, total_time)
    

Done set parameters kyber1024
2^i key_time encrypt_time noise_time decrypt_time total_time
4 0.11443924903869629 0.09798336029052734 0.001968860626220703 0.0033893585205078125 0.21778273582458496
5 0.07487034797668457 0.08313322067260742 0.0018656253814697266 0.003313302993774414 0.16318368911743164
6 0.14989376068115234 0.09835410118103027 0.0037894248962402344 0.006937265396118164 0.2589759826660156
7 0.07636260986328125 0.10026812553405762 0.00704646110534668 0.013152599334716797 0.19683241844177246
8 0.07596445083618164 0.11516880989074707 0.013096094131469727 0.023572206497192383 0.22780466079711914
9 0.07509422302246094 0.14769983291625977 0.024969816207885742 0.045935869216918945 0.2937028408050537
10 0.07283902168273926 0.2126765251159668 0.050826311111450195 0.09194087982177734 0.42828917503356934
11 0.07277536392211914 0.35161685943603516 0.09923315048217773 0.18808555603027344 0.7117137908935547
12 0.07642722129821777 0.6263351440429688 0.19872522354125977 0.3862209320068359

KeyboardInterrupt: 

In [None]:
print("Done!")