# Trabalho Final - Parte 1 - Apresentação

## Cargas de trabalho CPU-bound

- Criptografia
- Multiplicação de Matrizes
- Ordenação (sorting)

# Criptografia

- AES-CTR

In [13]:
!pip install pycryptodome

Collecting pycryptodome
  Downloading pycryptodome-3.23.0-cp37-abi3-win_amd64.whl.metadata (3.5 kB)
Downloading pycryptodome-3.23.0-cp37-abi3-win_amd64.whl (1.8 MB)
   ---------------------------------------- 0.0/1.8 MB ? eta -:--:--
   ---------------------------------------- 1.8/1.8 MB 28.3 MB/s  0:00:00
Installing collected packages: pycryptodome
Successfully installed pycryptodome-3.23.0


In [14]:
# Parallel AES-CTR Benchmark Script (PyCryptodome)

import os
import time
from Crypto.Cipher import AES
from concurrent.futures import ThreadPoolExecutor

# Parameters
KEY = os.urandom(32)   # AES-256 key
NONCE = os.urandom(8)  # 64-bit nonce
THREADS = 8
BLOCK_SIZE = 16  # AES block size in bytes

def encrypt_chunk(chunk, counter_start):
    """Encrypt one chunk with AES-CTR using a thread-specific counter offset."""
    cipher = AES.new(KEY, AES.MODE_CTR, nonce=NONCE, initial_value=counter_start)
    return cipher.encrypt(chunk)

def parallel_encrypt(data, num_threads=THREADS):
    """Encrypt data in parallel with exactly num_threads threads."""
    chunk_size = len(data) // num_threads
    chunks = [data[i*chunk_size:(i+1)*chunk_size] for i in range(num_threads)]
    
    # Assign counters so keystreams don't overlap
    counters = [i * (chunk_size // BLOCK_SIZE) for i in range(num_threads)]
    
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        encrypted_chunks = list(executor.map(encrypt_chunk, chunks, counters))
    
    return b''.join(encrypted_chunks)

def run_benchmark(data_size_bytes):
    """Run encryption benchmark for a given data size (bytes)."""
    print(f"\n=== Benchmark: {data_size_bytes / (1024*1024)} MB ===")
    data = os.urandom(data_size_bytes)  # Allocate test data
    
    start = time.perf_counter()
    ciphertext = parallel_encrypt(data)
    end = time.perf_counter()
    
    elapsed = end - start
    throughput_mb_s = (data_size_bytes / (1024*1024)) / elapsed
    
    print(f"Time: {elapsed:.2f} s")
    print(f"Throughput: {throughput_mb_s:.2f} MB/s using {THREADS} threads")

if __name__ == "__main__":
    sizes = [
        512 * 1024 * 1024,   # 512 MB
        1024 * 1024 * 1024,  # 1 GB
        2 * 1024 * 1024 * 1024  # 2 GB
    ]
    for size in sizes:
        run_benchmark(size)


=== Benchmark: 512.0 MB ===
Time: 2.25 s
Throughput: 227.92 MB/s using 8 threads

=== Benchmark: 1024.0 MB ===
Time: 4.12 s
Throughput: 248.32 MB/s using 8 threads

=== Benchmark: 2048.0 MB ===
Time: 7.63 s
Throughput: 268.58 MB/s using 8 threads


In [20]:
# Correctness Check

from Crypto.Cipher import AES

def decrypt_chunk(chunk, counter_start):
    cipher = AES.new(KEY, AES.MODE_CTR, nonce=NONCE, initial_value=counter_start)
    return cipher.decrypt(chunk)

def parallel_decrypt(ciphertext, num_threads=THREADS):
    chunk_size = len(ciphertext) // num_threads
    chunks = []
    start = 0
    for i in range(num_threads):
        end = start + chunk_size
        if i == num_threads - 1:
            end = len(ciphertext)
        chunks.append(ciphertext[start:end])
        start = end

    blocks_per_chunk = (len(ciphertext) + num_threads - 1) // num_threads // BLOCK_SIZE
    counters = [i * blocks_per_chunk for i in range(num_threads)]

    from concurrent.futures import ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        decrypted_chunks = list(executor.map(decrypt_chunk, chunks, counters))

    return b''.join(decrypted_chunks)

data_size_bytes = 512 * 1024 * 1024 # 512 MB
data = os.urandom(data_size_bytes)
ciphertext = parallel_encrypt(data)
recovered = parallel_decrypt(ciphertext)
assert recovered == data, "Decryption failed! Data does not match."
print("Encryption verified successfully!")

Encryption verified successfully!


# Sorting

-> Dask is an open-source Python library for parallel and distributed computing. It is designed to scale existing Python libraries like NumPy, Pandas, and scikit-learn to handle datasets that are larger than memory or to accelerate computations by utilizing multiple cores or machines

## Install Dask

In [1]:
# Install only core parts of dask
!pip install "dask[complete]"



In [2]:
!pip install psutil
!pip install statistics



In [3]:
# Create DataFrame

import pandas as pd
import numpy as np
import dask.dataframe as dd  # for reading large files efficiently

# =========================
# Configurable parameters
N = 1_000_000 # Number of rows
OUTPUT_FILE = "data.parquet"
# =========================

# 1. Create a Pandas DataFrame
df = pd.DataFrame({
    "id": np.arange(N),
    "value": np.random.randint(0, 1_000_000, size=N)
})

print("DataFrame created:")
print(df.head())

# 2. Save to Parquet
df.to_parquet(OUTPUT_FILE, index=False)
print(f"DataFrame saved to '{OUTPUT_FILE}'")

# 3. Read the file back with Dask and print row count
ddf = dd.read_parquet(OUTPUT_FILE)
print(f"Number of rows in the saved file: {len(ddf):,}")


DataFrame created:
   id   value
0   0  612678
1   1  820845
2   2  942427
3   3  762804
4   4  836892
DataFrame saved to 'data.parquet'
Number of rows in the saved file: 1,000,000


In [None]:
# Parallel Sort

import dask.dataframe as dd
from dask.distributed import Client
import time
import statistics

# =========================
# Configurable parameters
NUM_WORKERS = 8        # Number of Dask workers (processes)
NUM_PARTITIONS = 8     # Number of partitions for the Dask DataFrame
PARQUET_FILE = "data.parquet"
SORT_COLUMN = "value"
OUTPUT_FILE = "sorted_data.parquet"
N_ITERATIONS = 10     # Number of sorting attempts
# =========================

# Start Dask client
with Client(n_workers=NUM_WORKERS, threads_per_worker=1) as client:

    # Load Parquet
    ddf = dd.read_parquet(PARQUET_FILE)

    # Repartition to the desired number of partitions
    ddf = ddf.repartition(npartitions=NUM_PARTITIONS)

    # Store times for each iteration
    elapsed_times = []

    for i in range(1, N_ITERATIONS + 1):
        #print(f"\n--- Iteration {i} ---")

        # Make a copy of the Dask DataFrame before sorting
        ddf_copy = ddf.copy()

        # Start timing
        start_time = time.time()

        # Sort the copy
        sorted_ddf = ddf_copy.sort_values(by=SORT_COLUMN)

        # Trigger computation
        result = sorted_ddf.compute()

        # End timing
        end_time = time.time()
        elapsed = end_time - start_time
        elapsed_times.append(elapsed)

        print(f"Iteration {i} completed in {elapsed:.5f} seconds")

    # Compute statistical metrics
    mean_time = statistics.mean(elapsed_times)
    median_time = statistics.median(elapsed_times)
    stdev_time = statistics.stdev(elapsed_times) if len(elapsed_times) > 1 else 0
    min_time = min(elapsed_times)
    max_time = max(elapsed_times)
    variance_time = statistics.variance(elapsed_times) if len(elapsed_times) > 1 else 0

    print("\n=== Statistical Summary ===")
    print(f"Mean time       : {mean_time:.5f} seconds")
    print(f"Median time     : {median_time:.5f} seconds")
    print(f"Standard dev    : {stdev_time:.5f} seconds")
    print(f"Variance        : {variance_time:.5f}")
    print(f"Minimum time    : {min_time:.5f} seconds")
    print(f"Maximum time    : {max_time:.5f} seconds")


# Multiplicação de Matrizes

In [1]:
"""
Dask Matrix Multiplication Script
---------------------------------
- Matrix size is configurable via MATRIX_SIZE variable (default 800)
- Uses Dask arrays for blocked / parallel multiplication
- Creates exactly 8 Dask workers (processes), each using 1 thread
- Each block multiplication uses NumPy / BLAS internally
"""

import os
import numpy as np
import dask.array as da
from dask.distributed import Client
import time

# -----------------------------
# CONFIGURATION
MATRIX_SIZE = 2000       # Size of NxN matrices
NUM_WORKERS = 8          # Number of Dask worker processes
THREADS_PER_WORKER = 1   # Threads per worker (BLAS / NumPy)
BLOCK_MEMORY_GB = 1      # Approx memory per block in GB

# -----------------------------
# Set BLAS threading environment variables (to avoid oversubscription)
os.environ["OMP_NUM_THREADS"] = str(THREADS_PER_WORKER)
os.environ["MKL_NUM_THREADS"] = str(THREADS_PER_WORKER)
os.environ["OPENBLAS_NUM_THREADS"] = str(THREADS_PER_WORKER)

# -----------------------------
# Start Dask client
client = Client(n_workers=NUM_WORKERS, threads_per_worker=THREADS_PER_WORKER)
print("Dask client started with 8 workers")

# -----------------------------
# Compute safe chunk size based on memory per block
chunk_elements = int(np.sqrt(BLOCK_MEMORY_GB * (1024**3) / 8))
chunk_size = min(chunk_elements, MATRIX_SIZE)
print(f"Using chunk size: {chunk_size} x {chunk_size}")

# -----------------------------
# Create Dask arrays (random matrices)
A = da.random.random((MATRIX_SIZE, MATRIX_SIZE), chunks=(chunk_size, chunk_size))
B = da.random.random((MATRIX_SIZE, MATRIX_SIZE), chunks=(chunk_size, chunk_size))

# -----------------------------
# Matrix multiplication
start_time = time.time()
C = A @ B
result = C.compute()  # Trigger parallel execution
end_time = time.time()

print("Matrix multiplication completed")
print(f"Result shape: {result.shape}")
print(f"Elapsed time: {end_time - start_time:.2f} seconds")

# -----------------------------
# Clean up
client.close()


Dask client started with 8 workers
Using chunk size: 800 x 800
Matrix multiplication completed
Result shape: (800, 800)
Elapsed time: 0.31 seconds
