# AES
# Шифрование/дешифрование символов, текстовых файлов, изображений
### Примечание: HLS IP-блока поддерживает входной буфер размером до 345600 байт/символов

## Импорт библиотек

In [1]:
from pynq import Overlay
import time
# import pynq.lib.dma
Max_buffer_size = 345600

## Добавление битстрима

In [2]:
AES_En_De_overlay = Overlay("./bitstream/AES_En_De.bit")

In [3]:
AES_En_De = AES_En_De_overlay.AES_En_De_0
dma_ip = AES_En_De_overlay.axi_dma_0

In [4]:
sendstatus = dma_ip.sendchannel.running
recvstatus = dma_ip.recvchannel.running
print("sendstatus", sendstatus,"recvstatus",recvstatus)

sendstatus True recvstatus True


In [5]:
# AES_En_De_overlay.ip_dict

## Создание объекта MMIO для доступа к функцииAES_En_De

In [6]:
from pynq import MMIO
AES_En_D_address = AES_En_De_overlay.ip_dict['AES_En_De_0']['phys_addr']
addr_range = 0x40  # 64
mmio = MMIO(AES_En_D_address, addr_range)

## Вызов функций HLS

In [7]:
from pynq import Xlnk
import numpy as np
xlnk = Xlnk()
"""
def call_AES_En_De(input_bytes, num_of_input_bytes, key_bytes_object, en_or_decryption):
Вызов HLS функции
    input:
        input_bytes:   массив байт размера 16
        num_of_input_bytes:  количество байт
        key_bytes_object: 16-байтовый ключ
        en_or_decryption: 0 - шифрование, 1 - расшифрование         
    
    return:
        bytearray: массив байт размера 16
"""
def call_AES_En_De(input_bytes, num_of_input_bytes, key_bytes_object, en_or_decryption):

    ## Создание объекта Xlnk для доступа к DMA   
    in_buffer = xlnk.cma_array(shape=(num_of_input_bytes,),dtype = np.uint8)  #uint8
    out_buffer = xlnk.cma_array(shape=(num_of_input_bytes,),dtype = np.uint8)

    input_bytes = np.array( bytearray(input_bytes))
    np.copyto(in_buffer,input_bytes)

    ## Запись необходимых данных
    mmio.write(32, key_bytes_object ) 
    mmio.write(16, num_of_input_bytes ) 
    mmio.write(20, en_or_decryption )

    while( not( mmio.read(0x00) & 0x4) ): # Ожидание AP_READY
        pass                             
    
    ## Начало DMA передачи
    dma_ip.sendchannel.transfer(in_buffer)
    dma_ip.recvchannel.transfer(out_buffer)
    mmio.write(0x00, 0x01) # Запись AP_START для начала отправки

    ## Ожидание завершения передачи
    dma_ip.sendchannel.wait()
    dma_ip.recvchannel.wait()    

    bytes_arrray = bytearray(out_buffer)
#     print( "hex：", bytes_arrray.hex() ) 
    
    return bytes_arrray

## Функция расширения нулями входного текста до значения, кратного 128 бит
## Ключ должен быть ровно 128 бит

In [8]:
"""
def expand_to_128b(plain_text, encoding):
    input:
        plain_text:  входная строка
        encoding:   кодировка: 'ascii', 'unicode', 'utf-8'
    
    return:
        plain_text:   строка, расширенная до 16 байт
"""
def expand_to_128b(plain_text, encoding= "utf-8" ):
   
    if ( isinstance(plain_text, str) ):
        plain_text = bytearray(plain_text.encode(encoding))
        
    elif ( isinstance(plain_text, bytes) ):        
        plain_text = bytearray(plain_text)
    
    len_to_add = 16 - len(plain_text)%16   
    
    while(len_to_add > 0 and len_to_add < 16 ):
        plain_text.append(0)
        len_to_add = len_to_add - 1    
    
#     print("Длина строки после расширения：",len(plain_text))
#     print("Новая строка: ",plain_text.decode('utf-8'))   
    
    return plain_text

# Функция шифрования сообщения
## Шифрование/дешифрование сообщения любой длины
### В случае не кратности 16 байтам, добавляются нули

In [9]:
"""
def string_AES_En_De(plain_text, key_text, en_or_decryption, encoding):
    input:
        plain_text: строка любой длины
        key_text:   ключ любой длины
        en_or_decryption: 0 - шифрование, 1 - дешифрование   
        encoding:   кодировка
    
    return:
        bytearray: bytearray после шифрования, str после дешифрования    
        
"""
def string_AES_En_De(plain_text, key_text, en_or_decryption, encoding= "utf-8" ):

    if( len(key_text.encode( encoding ))<=16 ):      #  'utf-8'
        key_text = expand_to_128b(key_text, encoding)
    else:
        print("key length is too long, "+encoding+" after endoding：",len(key_text.encode( encoding ))) #  'utf-8'
        return b"this is a wrong return~~no!!!"

    result_text = bytearray()
    
    
    if ( en_or_decryption == 0):   
        plain_text = expand_to_128b(plain_text, encoding)       
        
        len_of_plain_text = len(plain_text)
        
        if( len_of_plain_text < Max_buffer_size):
#             pass
            result_text = call_AES_En_De(plain_text, len_of_plain_text, bytes(key_text), en_or_decryption)
        else:
            i = 0        
            while(i < len_of_plain_text):
                rest_of_bytes = len_of_plain_text - i
                if(rest_of_bytes > Max_buffer_size ):
                    result_text.extend( call_AES_En_De((plain_text[i : i + Max_buffer_size]), Max_buffer_size, bytes(key_text),en_or_decryption) )
                else:
                    result_text.extend( call_AES_En_De((plain_text[i : i + rest_of_bytes]), rest_of_bytes, bytes(key_text),en_or_decryption) )
                i = i + Max_buffer_size
        
        #print("***Шифрование текста в "+encoding+"：",result_text)




    elif ( en_or_decryption == 1):
        if ( isinstance(plain_text, str) ):
            plain_text = bytes.fromhex(plain_text)
        len_of_plain_text = len(plain_text)
        
        if( len_of_plain_text < Max_buffer_size):
#             pass
            result_text = call_AES_En_De(plain_text, len_of_plain_text, bytes(key_text), en_or_decryption)        
        else:
            i = 0        
            while(i < len_of_plain_text):
                rest_of_bytes = len_of_plain_text - i
                if(rest_of_bytes > Max_buffer_size ):
                    result_text.extend( call_AES_En_De((plain_text[i : i + Max_buffer_size]), Max_buffer_size, bytes(key_text),en_or_decryption) )
                else:
                    result_text.extend( call_AES_En_De((plain_text[i : i + rest_of_bytes]), rest_of_bytes, bytes(key_text),en_or_decryption) )
                i = i + Max_buffer_size
        
        result_text = result_text.decode( encoding )
        #print("***Дешифрование текста в "+encoding+"：",result_text)

    return result_text

# Тесты 

注意：Примечание: HLS IP-блока поддерживает входной буфер размером до 345600 байт/символов 

## Шифрование входной строки с кодировкой utf-8
Проверку можно провести здесь: https://the-x.cn/cryptography/Aes.aspx

In [10]:
plain_text = "this is a test string---2020年新工科联盟-Xilinx 暑期学校（Summer School）项目---  2020年8月1日02:02:15"
key_text = "Xilinx暑校"
encoding = "utf-8"

cipher  = string_AES_En_De(plain_text,key_text,0, encoding)
print(len(cipher))
print("Зашифрованный текст в utf-8：",cipher.hex())

plain  = string_AES_En_De(cipher,key_text,1, encoding)
print(len(plain))
print(type(plain))
print(plain)

## тест шифрования/дешифрования одной и той же строки
# cipher  = string_AES_En_De(plain, key_text,0, encoding)
# print(len(cipher))
# print("密文 hex utf-8：",cipher.hex())

# plain  = string_AES_En_De(cipher,key_text,1, encoding)
# print(len(plain))
# print(plain)

# import base64
# base64.b64encode(cipher)

# help(str.encode)

128
Зашифрованный текст в utf-8： b68703682d02f035c670a1110c82bb0482e1001d0ea54d53fb71192227f448ba10f359a9b1ba0cb22e2216950fabb745d60e6b67c61b93460a0ebe76b460b45b8e6d916c8f0dbbe26e622986b09b8dc704f2d7d3f964d123f1f68f2744d92ecad8ce5d59f89bccb25df4182d81fab3e86baa4ec7df4f4db66c599c27aff409c8
94
<class 'str'>
this is a test string---2020年新工科联盟-Xilinx 暑期学校（Summer School）项目---  2020年8月1日02:02:15         


Перекрестная проверка
1. Результат шифрования совпадает с результатом шифрования онлайн-инструмента шифрования AES  
2. Зашифрованный результат онлайн-инструмента шифрования AES правильно дешифровывается 

In [11]:
cipher_hex = "b68703682d02f035c670a1110c82bb0482e1001d0ea54d53fb71192227f448ba10f359a9b1ba0cb22e2216950fabb745d60e6b67c61b93460a0ebe76b460b45b8e6d916c8f0dbbe26e622986b09b8dc704f2d7d3f964d123f1f68f2744d92ecad8ce5d59f89bccb25df4182d81fab3e86baa4ec7df4f4db66c599c27aff409c8"

plain  = string_AES_En_De((cipher_hex),key_text,1, encoding)

# cipher_objects = bytes.fromhex(cipher_hex)
# # hex = hex_str.encode('utf-8')

# # cipher_objects = cipher_hex.encode(encoding)
# print(cipher == cipher_objects)
# print(cipher)
# print(len(cipher))

# print(cipher_objects)
# print(bytearray(cipher_objects))

# print(len(cipher_objects))

# plain  = string_AES_En_De((cipher_objects),key_text,1, encoding)

# Шифрование/дешифрование файлов
## Текстовые файлы

In [14]:
"""
def file_AES_En_De(file_name, key_text, en_or_decryption, encoding= "utf-8" ):
    input:
        file_name: имя файла, помещённого в ./files/ 
        key_text:   ключ любой длины
        en_or_decryption: 0 - шифрование, 1 - дешифрование   
        encoding:   кодировка
    
    return:
        str  : строка с логами о записанном файле
        
"""
floder_path = "./files/"
def file_AES_En_De(file_name, key_text, en_or_decryption, encoding= "utf-8" ): 
    
    result_text = bytearray()    
    
    if ( en_or_decryption == 0):
        
        f = open(floder_path + file_name ,'r',encoding = encoding)        
        plain_text= f.read()
        f.close()        
       
        result_text = string_AES_En_De( plain_text , key_text, en_or_decryption, encoding )       
    
        #write_out_file_path = floder_path + "encrypted_" + file_name
        #f = open( write_out_file_path ,'wb')
        #f.write(result_text)
        #f.close()
        return "Запись в файл после шифрования："#+ write_out_file_path

    elif ( en_or_decryption == 1):
        
        f = open(floder_path + file_name,'rb')
        plain_text= f.read()
        f.close()
        
        result_text = string_AES_En_De(plain_text, key_text, en_or_decryption, encoding) # расшифрованый вывод - строка
        
        write_out_file_path = floder_path + "decrypted_" + file_name
        f = open( write_out_file_path ,'w')
        f.write(result_text)
        f.close()
        return "Запись в файл после дешифрования："+ write_out_file_path
#     return result_text
    

## Тест шифрования текстовых файлов

In [15]:
## Файл сохранён в utf-8
# folder_path = "./files/"

file_name_200kb = "test_200kb.txt" 
file_name_1mb = "test_1mb.txt" 
file_name_10mb = "test_10mb.txt" 
file_name_100mb = "test_100mb.txt" 


start = time.time()
file_AES_En_De(file_name_200kb, key_text, 0, encoding= "utf-8" )
end = time.time()

print('200kb file time elapsed: ', (end-start))

start = time.time()
file_AES_En_De(file_name_1mb, key_text, 0, encoding= "utf-8" )
end = time.time()

print('1mb file time elapsed: ', (end-start))

start = time.time()
file_AES_En_De(file_name_10mb, key_text, 0, encoding= "utf-8" )
end = time.time()

print('10mb file time elapsed: ', (end-start))

start = time.time()
file_AES_En_De(file_name_100mb, key_text, 0, encoding= "utf-8" )
end = time.time()

print('100mb file time elapsed: ', (end-start))
    

200kb file time elapsed:  0.03777456283569336
1mb file time elapsed:  0.25138163566589355
10mb file time elapsed:  3.778394937515259
100mb file time elapsed:  21.70111656188965


## Описание используемых алгоритмов

Для проведения замеров был выбран алгоритмы AES и ГОСТ 28147-89, написанные на C++. Так же алгоритм AES был написан для ПЛИС с использованием методов синтеза выского уровня для перевода алгоритма с языка С++ на язык описания аппаратуры VHDL.

## Результат теста времени:

## ПК
### ГОСТ 28147-89

|  Size | Time, ms |    Speed   |
|:-----:|:--------:|------------|
| 200kb |    18    | 88.8 Mbps  |
| 1mb   |    94    | 87.14 Mbps |
| 10mb  |    936   | 87.52 Mbps |
| 100mb |   9252   | 88.53 Mbps |

### AES

|  Size | Time, ms |    Speed   |
|:-----:|:--------:|------------|
| 200kb |    67    | 23.8 Mbps  |
| 1mb   |    340   | 24.08 Mbps |
| 10mb  |   3356   | 26.8 Mbps  |
| 100mb |   33498  | 24.4 Mbps  |

Как видно из результатов, ГОСТ 28147-89 "Магма" в среднем в три раза быстрее алгоритма AES

## Pynq Z2
### AES

|  Size | Time, ms |    Speed   |
|:-----:|:--------:|:----------:|
| 200kb |    39    | 41.02 Mbps |
| 1mb   |    209   | 40.00 Mbps |
| 10mb  |   3738   | 21.62 Mbps |
| 100mb |   21701  | 32.38 Mbps |

Было произведено несколько замеров, результаты которых различались на допустимую погрешность. Интересно, что скорость работы алгоритма при шифровании 100мб на Pynq Z2 стабильно заметно меньше скорости шифрования других файлов. Вероятно, это связано с алгоритмом доступа к памяти в Zynq 7020

Таким образом, ожидаемая скорость работы алгоритма ГОСТ 28147-89 на Pynq Z2 находится из соотношения:

$$ gost_{pynq} = \frac{gost_{pc} * aes_{pynq}}{aes_{pc}} $$

что соответственно:

|  Size |    Speed    |
|:-----:|:-----------:|
| 200kb | 153.04 Mbps |
| 1mb   | 144.75 Mbps |
| 10mb  | 70.60 Mbps  |
| 100mb | 117.47 Mbps |

что в среднем **121.46 Mbps**

## Тест времени на Zynq 7020

Результат теста времени выполнения алгоритма AES без использования ПЛИС:

|  Size | Time, ms |   Speed   |
|:-----:|----------|:---------:|
| 200kb | 1059     | 1.50 Mbps |
| 1mb   | 5475     | 1.46 Mbps |
| 10mb  | 98240    | 0.81 Mbps |
| 100mb | 545992   | 1.46 Mbps |

Видно, что ПЛИС даёт прирост в скорости примерно в двадцать пять раз, превосходя скорость выполнения даже на ПК (Intel Core i5)

## Методы улучшения текущей реализации

Очевидно, что данная реализация AES может быть улучшена, что в теории может увеличить скорость в несколько раз. Некоторые проблемы и их методы улучшения:
1. В силу того, что HLS IP-блока имеет пропускную способность в 345600 символов, в функции string_AES_En_De приходится делить входящий буфер на части, что сильно влияет на скорость выполнения. Теоретически, этот фрагмент можно заменить, используя инструменты пакета Numpy;
2. Высокий прирост производительности должен дать переход от языка Python к компилируемым языкам, например С; 
3. Максимальный прирост должен дать отказ от использования микроконтроллера для управления IP-блоком и переход к ПЛИС

In [None]:
file_AES_En_De("encrypted_test.md", key_text, 1, encoding= "utf-8" )

# Шифрование и дешифрование изображений 

## Создание объекта Image с помощью PI
Загрузка изображения с SD карты

In [None]:
import time
from PIL import Image
from IPython.display import display

In [None]:
"""
def image_AES_En_De(original_image, key_text, en_or_decryption, encoding = 'utf-8'):
    input:
        original_image: объект PIL Image 
        key_text:   ключ любой длины
        en_or_decryption: 0 - шифрование, 1 - дешифрование   
        encoding:   кодировка
    
    return:
        result_image: результат в виде PIL Image
        result_image_bytes_arrray: результат в виде bytearray     
        
"""
def image_AES_En_De(original_image, key_text, en_or_decryption, encoding = 'utf-8'):
    
    if( len(key_text.encode( encoding )) <= 16 ):      #  'utf-8'
        key_text = expand_to_128b(key_text, encoding)
        key_text = bytes(key_text)
    else:
        print("key length is too long, "+encoding+" after endoding：",len(key_text.encode( encoding ))) #  'utf-8'
        return b"this is a wrong return~~no!!!"
    
    image_bytes_object = original_image.tobytes()  
    pic_width, pic_height = original_image.size
    
       
    image_bytes_object = expand_to_128b( image_bytes_object )
    bytes_of_image     = len(image_bytes_object)  
    
    print("bytes_of_image after expand_to_128b: ",bytes_of_image)
    
    i = 0;
    result_image_bytes_arrray = bytearray()
    
    if ( en_or_decryption == 0 ):
        start = time.time()        
            
        if( bytes_of_image < Max_buffer_size):
#             pass # 直接完成
            result_image_bytes_arrray = call_AES_En_De(image_bytes_object, bytes_of_image, bytes(key_text), en_or_decryption)        
        else:
            i = 0        
            while(i < bytes_of_image):
                rest_of_bytes = bytes_of_image - i
                if( rest_of_bytes > Max_buffer_size ):
                    result_image_bytes_arrray.extend( call_AES_En_De((image_bytes_object[i : i + Max_buffer_size]), Max_buffer_size, bytes(key_text),en_or_decryption) )
                else:
                    result_image_bytes_arrray.extend( call_AES_En_De((image_bytes_object[i : i + rest_of_bytes]), rest_of_bytes, bytes(key_text),en_or_decryption) )
                i = i + Max_buffer_size       
            
        print(i)    
        end = time.time()
        print("Время затрачено: " + str (end - start) )

    elif( en_or_decryption == 1 ):
        
        start = time.time()
        
        if( bytes_of_image < Max_buffer_size):
#             pass #
            result_image_bytes_arrray = call_AES_En_De(image_bytes_object, bytes_of_image, bytes(key_text), en_or_decryption)        
        else:
            i = 0        
            while(i < bytes_of_image):
                rest_of_bytes = bytes_of_image - i
                if( rest_of_bytes > Max_buffer_size ):
                    result_image_bytes_arrray.extend( call_AES_En_De((image_bytes_object[i : i + Max_buffer_size]), Max_buffer_size, bytes(key_text),en_or_decryption) )
                else:
                    result_image_bytes_arrray.extend( call_AES_En_De((image_bytes_object[i : i + rest_of_bytes]), rest_of_bytes, bytes(key_text),en_or_decryption) )
                i = i + Max_buffer_size  
        
        print(i)
        end = time.time()
        print("Время расшифровки: " + str (end - start) )
        
    result_image = Image.frombytes('RGB',(pic_width, pic_height ),bytes(result_image_bytes_arrray))
    return result_image, result_image_bytes_arrray

## Тест шифрования и дешифрования изображений  

In [None]:
key_text = "世界你好啊"

##   /paris/paris
##  /lena/lena
##  /toys/toys
floder_path = "./images/paris/"
image_path = floder_path + "paris.jpg"

original_image = Image.open(image_path)
pic_width, pic_height = original_image.size

display(original_image)
print("Image size: {}x{} pixels.".format(pic_width, pic_height))

## Шифрование изображений 

In [None]:
encryption_image, encryption_image_bytes_arrray = image_AES_En_De(original_image, key_text, 0 )

encryption_image_bytes_arrray = bytes( encryption_image_bytes_arrray )
# encryption_image = Image.frombytes('L',(pic_width, pic_height ),encryption_image_bytes_arrray)
encryption_image2 = Image.frombytes('RGB',(pic_width, pic_height ),encryption_image_bytes_arrray)

display(encryption_image)
display(encryption_image2)

## Дешифрование изображений

In [None]:
decryption_image, decryption_image_bytes_arrray = image_AES_En_De(encryption_image, key_text, 1 )
decryption_image_bytes_arrray = bytes( decryption_image_bytes_arrray )

# encryption_image = Image.frombytes('L',(pic_width, pic_height ),decryption_image_bytes_arrray)
decryption_image2 = Image.frombytes('RGB',(pic_width, pic_height ),decryption_image_bytes_arrray)

display(decryption_image)
display(decryption_image2)

## Сохранение изображений

In [None]:
original_image.save( floder_path + "original_image.jpg")
encryption_image.save( floder_path + "encryption_image.jpg")
decryption_image.save( floder_path + "decryption_image.jpg")