In [1]:
from pynq import Overlay, MMIO, Xlnk
import numpy as np
import time
import os
from PIL import Image
from IPython.display import display, Audio, clear_output
import ipywidgets as widgets
from ipywidgets import FileUpload, Button, VBox, HBox, Label, Text
import wave
import struct

In [2]:
print("Loading FPGA bitstream...")
Max_buffer_size = 345600
AES_En_De_overlay = Overlay("/home/xilinx/jupyter_notebooks/pynq-z2/AES_En_De.bit")

# Get references to hardware blocks
AES_En_De = AES_En_De_overlay.AES_En_De_0
dma_ip = AES_En_De_overlay.axi_dma_0

# Check DMA status
sendstatus = dma_ip.sendchannel.running
recvstatus = dma_ip.recvchannel.running
print(f"DMA Status - Send: {sendstatus}, Receive: {recvstatus}")

# Setup Memory-Mapped I/O for hardware control
AES_En_D_address = AES_En_De_overlay.ip_dict['AES_En_De_0']['phys_addr']
addr_range = 0x40  # 64 bytes
mmio = MMIO(AES_En_D_address, addr_range)

# Initialize Xlnk for memory allocation
xlnk = Xlnk()
print("Hardware initialization complete!")

Loading FPGA bitstream...
DMA Status - Send: True, Receive: True
Hardware initialization complete!


In [3]:

def call_AES_En_De(input_bytes, num_of_input_bytes, key_bytes_object, en_or_decryption):
    """
    Low-level function to control the AES hardware accelerator via DMA.
    
    Args:
        input_bytes: Input data to encrypt/decrypt
        num_of_input_bytes: Number of bytes to process
        key_bytes_object: 16-byte AES key
        en_or_decryption: 0 for encryption, 1 for decryption
    
    Returns:
        bytearray: Processed data
    """
    # Allocate DMA buffers
    in_buffer = xlnk.cma_array(shape=(num_of_input_bytes,), dtype=np.uint8)
    out_buffer = xlnk.cma_array(shape=(num_of_input_bytes,), dtype=np.uint8)
    
    # Copy input data to DMA buffer
    input_bytes = np.array(bytearray(input_bytes))
    np.copyto(in_buffer, input_bytes)
    
    # Configure hardware registers
    mmio.write(0x20, key_bytes_object)      # Write AES key
    mmio.write(0x10, num_of_input_bytes)    # Write data length
    mmio.write(0x14, en_or_decryption)      # Set mode (encrypt/decrypt)
    
    # Start DMA transfers
    dma_ip.sendchannel.transfer(in_buffer)
    dma_ip.recvchannel.transfer(out_buffer)
    mmio.write(0x00, 0x01)  # Start AES processing
    
    # Wait for completion
    dma_ip.sendchannel.wait()
    dma_ip.recvchannel.wait()
    
    return bytearray(out_buffer)

In [4]:
def expand_to_128b(plain_text, encoding="utf-8"):
    """
    Pads data to multiple of 16 bytes (AES block size).
    
    Args:
        plain_text: Input data (string or bytes)
        encoding: Character encoding for strings
    
    Returns:
        bytearray: Padded data
    """
    if isinstance(plain_text, str):
        plain_text = bytearray(plain_text.encode(encoding))
    elif isinstance(plain_text, bytes):
        plain_text = bytearray(plain_text)
    
    # Calculate padding needed
    len_to_add = (16 - len(plain_text) % 16) % 16
    
    # Add zero padding
    plain_text.extend([0] * len_to_add)
    
    return plain_text

In [5]:
def text_encryption_demo():
    """
    Demonstrates text encryption and decryption using FPGA acceleration.
    """
    print("=" * 60)
    print("TEXT ENCRYPTION DEMONSTRATION")
    print("=" * 60)
    
    # Test data
    plain_text = "Hello World! This is FPGA-accelerated AES encryption."
    key_text = "SecretKey123"
    
    print(f"Original text: {plain_text}")
    print(f"Key: {key_text}")
    
    # Prepare key
    key_bytes = expand_to_128b(key_text)
    
    # Encrypt
    print("\nEncrypting...")
    plain_bytes = expand_to_128b(plain_text)
    start_time = time.time()
    
    encrypted = bytearray()
    for i in range(0, len(plain_bytes), Max_buffer_size):
        chunk_size = min(Max_buffer_size, len(plain_bytes) - i)
        encrypted.extend(call_AES_En_De(
            plain_bytes[i:i + chunk_size],
            chunk_size,
            bytes(key_bytes),
            0  # Encrypt mode
        ))
    
    encrypt_time = time.time() - start_time
    print(f"Encrypted in {encrypt_time:.4f} seconds")
    print(f"Encrypted (hex): {encrypted[:32].hex()}...")
    
    # Decrypt
    print("\nDecrypting...")
    start_time = time.time()
    
    decrypted = bytearray()
    for i in range(0, len(encrypted), Max_buffer_size):
        chunk_size = min(Max_buffer_size, len(encrypted) - i)
        decrypted.extend(call_AES_En_De(
            encrypted[i:i + chunk_size],
            chunk_size,
            bytes(key_bytes),
            1  # Decrypt mode
        ))
    
    decrypt_time = time.time() - start_time
    decrypted_text = decrypted.decode('utf-8').rstrip('\x00')
    
    print(f"Decrypted in {decrypt_time:.4f} seconds")
    print(f"Decrypted text: {decrypted_text}")
    
    # Verify
    if plain_text == decrypted_text:
        print("\nSUCCESS: Text correctly encrypted and decrypted!")
    
    print(f"\nPerformance: {len(plain_bytes)/(encrypt_time)/1024/1024:.2f} MB/s")
    print("=" * 60)

# Run the demo
text_encryption_demo()

TEXT ENCRYPTION DEMONSTRATION
Original text: Hello World! This is FPGA-accelerated AES encryption.
Key: SecretKey123

Encrypting...
Encrypted in 0.0189 seconds
Encrypted (hex): d595f15088765e06a5db662223073846ab274dafd935946370fbd15284b0e725...

Decrypting...
Decrypted in 0.0057 seconds
Decrypted text: Hello World! This is FPGA-accelerated AES encryption.

SUCCESS: Text correctly encrypted and decrypted!

Performance: 0.00 MB/s


In [6]:
def image_AES_En_De(original_image, key_text, en_or_decryption):
    """
    Encrypts or decrypts an image using FPGA-accelerated AES.
    
    Args:
        original_image: PIL Image object
        key_text: Encryption key string
        en_or_decryption: 0 for encryption, 1 for decryption
    
    Returns:
        tuple: (processed_image, processed_bytes)
    """
    # Prepare key
    key_bytes = expand_to_128b(key_text)
    
    # Get image data
    image_bytes = expand_to_128b(original_image.tobytes())
    bytes_count = len(image_bytes)
    pic_width, pic_height = original_image.size
    
    print(f"Processing {bytes_count} bytes...")
    
    # Process in chunks
    result_bytes = bytearray()
    start_time = time.time()
    
    for i in range(0, bytes_count, Max_buffer_size):
        chunk_size = min(Max_buffer_size, bytes_count - i)
        result_bytes.extend(call_AES_En_De(
            image_bytes[i:i + chunk_size],
            chunk_size,
            bytes(key_bytes),
            en_or_decryption
        ))
    
    process_time = time.time() - start_time
    print(f"Processed in {process_time:.2f} seconds")
    
    # Reconstruct image
    result_image = Image.frombytes('RGB', (pic_width, pic_height), bytes(result_bytes))
    return result_image, result_bytes


In [7]:
def create_image_encryption_demo():
    """
    Creates an interactive image encryption/decryption demo.
    """
    print("=" * 60)
    print("IMAGE ENCRYPTION DEMONSTRATION")
    print("=" * 60)
    
    # Create widgets
    upload_widget = FileUpload(
        accept='image/*',
        multiple=False,
        description='Choose Image'
    )
    
    key_input = Text(
        value='ImageKey123',
        placeholder='Enter key',
        description='Key:',
        disabled=False
    )
    
    process_button = Button(
        description='Process Image',
        button_style='success',
        disabled=True
    )
    
    status_label = Label(value="Upload an image file")
    output = widgets.Output()
    
    def process_image(b):
        with output:
            clear_output()
            
            # Get uploaded file
            uploaded_files = upload_widget.value
            filename = list(uploaded_files.keys())[0]
            file_info = uploaded_files[filename]
            content = file_info['content']
            
            print(f"Processing: {filename}")
            
            # Save and load image
            folder = "/home/xilinx/jupyter_notebooks/pynq-z2/encrypted_images/"
            os.makedirs(folder, exist_ok=True)
            
            original_path = os.path.join(folder, "original_" + filename)
            with open(original_path, 'wb') as f:
                f.write(content)
            
            original_image = Image.open(original_path)
            if original_image.mode != 'RGB':
                original_image = original_image.convert('RGB')
            
            print(f"Image size: {original_image.size[0]}x{original_image.size[1]} pixels")
            
            # Display original
            print("\nOriginal Image:")
            display(original_image)
            
            # Encrypt
            key_text = key_input.value
            print(f"\nEncrypting with key: {key_text}")
            encrypted_image, _ = image_AES_En_De(original_image, key_text, 0)
            
            print("\nEncrypted Image:")
            display(encrypted_image)
            
            # Decrypt
            print("\nDecrypting...")
            decrypted_image, _ = image_AES_En_De(encrypted_image, key_text, 1)
            
            print("\nDecrypted Image:")
            display(decrypted_image)
            
            # Verify
            if original_image.tobytes() == decrypted_image.tobytes():
                print("\nSUCCESS: Image perfectly encrypted and decrypted!")
    
    def on_upload(change):
        if upload_widget.value:
            process_button.disabled = False
            status_label.value = "Ready to process"
    
    upload_widget.observe(on_upload, names='value')
    process_button.on_click(process_image)
    
    # Display UI
    ui = VBox([
        Label(value="Step 1: Upload an image"),
        upload_widget,
        Label(value="Step 2: Enter encryption key"),
        key_input,
        Label(value="Step 3: Process"),
        process_button,
        status_label,
        output
    ])
    
    display(ui)

# Create the demo
create_image_encryption_demo()


IMAGE ENCRYPTION DEMONSTRATION


VBox(children=(Label(value='Step 1: Upload an image'), FileUpload(value={}, accept='image/*', description='Cho…

In [10]:
def audio_file_encrypt_decrypt(file_data, key_text, mode):
    """
    Encrypts or decrypts audio file data using FPGA.
    
    Args:
        file_data: Raw audio file bytes
        key_text: Encryption key
        mode: 0 for encrypt, 1 for decrypt
    
    Returns:
        bytearray: Processed audio data
    """
    # Prepare data and key
    audio_bytes = expand_to_128b(file_data)
    key_bytes = expand_to_128b(key_text)
    bytes_count = len(audio_bytes)
    
    print(f"Processing {bytes_count/1024:.1f} KB of audio data...")
    
    # Process in chunks
    result = bytearray()
    start_time = time.time()
    
    for i in range(0, bytes_count, Max_buffer_size):
        chunk_size = min(Max_buffer_size, bytes_count - i)
        result.extend(call_AES_En_De(
            audio_bytes[i:i + chunk_size],
            chunk_size,
            bytes(key_bytes),
            mode
        ))
    
    process_time = time.time() - start_time
    throughput = bytes_count / process_time / 1024 / 1024
    print(f"Completed in {process_time:.2f}s ({throughput:.2f} MB/s)")
    
    return result

In [11]:
def create_audio_encryption_demo():
    """
    Creates an interactive audio encryption/decryption demo.
    """
    print("=" * 60)
    print("AUDIO ENCRYPTION DEMONSTRATION")
    print("=" * 60)
    
    # Create widgets
    audio_upload = FileUpload(
        accept='.mp3,.wav,.m4a',
        multiple=False,
        description='Upload Audio'
    )
    
    audio_key = Text(
        value='AudioKey123',
        placeholder='Enter key',
        description='Key:',
        disabled=False
    )
    
    process_btn = Button(
        description='Process Audio',
        button_style='success',
        disabled=True
    )
    
    status = Label(value="Upload an audio file (MP3, WAV, etc.)")
    output = widgets.Output()
    
    def process_audio(b):
        with output:
            clear_output()
            
            # Get uploaded file
            files = audio_upload.value
            filename = list(files.keys())[0]
            content = files[filename]['content']
            
            print(f"File: {filename}")
            print(f"Size: {len(content)/1024:.1f} KB")
            
            # Save files
            folder = "/home/xilinx/jupyter_notebooks/pynq-z2/encrypted_audio/"
            os.makedirs(folder, exist_ok=True)
            
            # Save original
            original_path = os.path.join(folder, "original_" + filename)
            with open(original_path, 'wb') as f:
                f.write(content)
            
            print("\nOriginal Audio:")
            display(Audio(original_path, autoplay=False))
            
            # Encrypt
            key = audio_key.value
            print(f"\nEncrypting with key: {key}")
            encrypted = audio_file_encrypt_decrypt(content, key, 0)
            
            # Save encrypted
            enc_path = os.path.join(folder, "encrypted_" + filename + ".enc")
            with open(enc_path, 'wb') as f:
                f.write(encrypted)
            print(f"Encrypted file saved: {enc_path}")
            
            # Create playable encrypted audio (will sound like noise)
            print("\nEncrypted Audio (as noise):")
            noise_path = os.path.join(folder, "encrypted_noise.wav")
            create_noise_wav(encrypted[:100000], noise_path)  # Use first 100KB
            display(Audio(noise_path, autoplay=False))
            
            # Decrypt
            print("\nDecrypting...")
            decrypted = audio_file_encrypt_decrypt(encrypted, key, 1)
            decrypted = decrypted[:len(content)]  # Trim padding
            
            # Save decrypted
            dec_path = os.path.join(folder, "decrypted_" + filename)
            with open(dec_path, 'wb') as f:
                f.write(decrypted)
            
            print("\nDecrypted Audio:")
            display(Audio(dec_path, autoplay=False))
            
            # Verify
            if content == decrypted:
                print("\nSUCCESS: Audio perfectly encrypted and decrypted!")
    
    def create_noise_wav(data, filepath):
        """Helper to create playable noise from encrypted data"""
        with wave.open(filepath, 'wb') as wav:
            wav.setnchannels(1)
            wav.setsampwidth(2)
            wav.setframerate(44100)
            # Convert bytes to audio samples (scaled down to prevent clipping)
            samples = np.frombuffer(data[:88200], dtype=np.uint8).astype(np.int16) * 100
            wav.writeframes(samples.tobytes())
    
    def on_upload(change):
        if audio_upload.value:
            process_btn.disabled = False
            status.value = "Ready to process"
    
    audio_upload.observe(on_upload, names='value')
    process_btn.on_click(process_audio)
    
    # Display UI
    ui = VBox([
        Label(value="Step 1: Upload audio file"),
        audio_upload,
        Label(value="Step 2: Set encryption key"),
        audio_key,
        Label(value="Step 3: Process"),
        process_btn,
        status,
        output
    ])
    
    display(ui)

# Create the demo
create_audio_encryption_demo()

AUDIO ENCRYPTION DEMONSTRATION


VBox(children=(Label(value='Step 1: Upload audio file'), FileUpload(value={}, accept='.mp3,.wav,.m4a', descrip…