# Задача 1
Нужно написать две программы: 

1. Первая генерирует бинарный файл (min 2Гб), состоящий из случайных 32-разрядных беззнаковых целых чисел (big endian). 


2. Вторая считает сумму этих чисел (с применением длинной арифметики), находит минимальное и максимальное число.

Реализуйте две версии: 
* Простое последовательное чтение 
* Многопоточная + memory-mapped files. 

Сравните время работы.

In [1]:
import numpy as np
import time
import threading
import mmap
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures

## Генерация бинарного файла 

In [2]:
def get_numbers():
    highest_uint32 = 2 ** 32 
    amount_of_4B_in_1GB = (2 ** 10) * (2 ** 10) * (2 ** 8)
    return np.random.randint(0, high = highest_uint32, size = amount_of_4B_in_1GB, dtype = np.dtype('uint32')) 

In [3]:
def generate_file(file_name):
    arr = get_numbers()
    with open(file_name, 'wb') as f:
        for number in arr:
            f.write(number.tobytes())
        f.close()

In [2]:
f_name = "uint32_numbers.txt"

In [5]:
generate_file(f_name)

## Последовательные вычисления

In [3]:
def print_results(title, time_sum_min_max):
    print(title)
    print("Время работы (c): ", time_sum_min_max[0])
    print("Сумма чисел: ", time_sum_min_max[1])
    print("Минимальное число: ", time_sum_min_max[2])
    print("Максимальное число: ", time_sum_min_max[3])

In [4]:
def sequential_calculations(file_name):
    start_time = time.time()

    with open(file_name, 'rb') as f:
        buf = f.read()
        arr = np.frombuffer(buf, dtype = np.dtype('uint32').newbyteorder('B'))
        
        num_sum = 0
        min_num = 2 ** 32 
        max_num = 0
        
        for num in arr:
            num_sum += num
            if (num < min_num):
                min_num = num
            if(num > max_num):
                max_num = num
        
        f.close()
    
    result_time = time.time() - start_time
    return [result_time, num_sum, min_num, max_num]

In [5]:
sequential_results = sequential_calculations(f_name)

In [6]:
print_results("Последовательные вычисления", sequential_results)

Последовательные вычисления
Время работы (c):  256.2826988697052
Сумма чисел:  576451367301247579
Минимальное число:  3
Максимальное число:  4294967244


## Последовательные вычисления с помощью встроенных функций

In [10]:
def built_in_func_calculations(file_name):
    start_time = time.time()

    with open(file_name, 'rb') as f:
        buf = f.read()
        arr = np.frombuffer(buf, dtype = np.dtype('uint32').newbyteorder('B'))
        
        num_sum = sum(arr)
        min_num = min(arr)
        max_num = max(arr)
        
        f.close()
    
    result_time = time.time() - start_time
    return [result_time, num_sum, min_num, max_num]

In [11]:
built_in_func_results = built_in_func_calculations(f_name)

In [12]:
print_results("Последовательные вычисления с помощью функций numpy", built_in_func_results)

Последовательные вычисления с помощью функций numpy
Время работы (c):  313.1986496448517
Сумма чисел:  576451367301247579
Минимальное число:  3
Максимальное число:  4294967244


## Многопоточность + memory-mapped files

In [13]:
def thread_calc(file_name, results, thr_index, chunk_start, chunk_end):
    with open(file_name, 'rb') as f:
        buf = mmap.mmap(f.fileno(), length = chunk_end - chunk_start, offset = chunk_start, access = mmap.ACCESS_READ)        
        arr = np.frombuffer(buf, dtype = np.dtype('uint32').newbyteorder('B'))
        
        for num in arr:
            results[0][thr_index] += num
            if (num < results[1][thr_index]):
                results[1][thr_index] = num
            if (num > results[2][thr_index]):
                results[2][thr_index] = num 
                
        f.close()

In [23]:
def multithreaded_calculations_with_mmap(file_name, threads_num):
    start_time = time.time()
    numbers_amount = (2 ** 10) * (2 ** 10) * (2 ** 8)
    
    threads_sum = [0] * threads_num
    threads_min = [2 ** 32] * threads_num
    threads_max = [0] * threads_num
    
    thr_results = [threads_sum, threads_min, threads_max]
    
    threads = []
    chunk_size = int(numbers_amount / threads_num)
    chunk_start = 0
    
    for thr_index in range(threads_num):
        chunk_end = chunk_start + chunk_size
        cur_thread = threading.Thread(target = thread_calc, args = (file_name, thr_results, thr_index, chunk_start, chunk_end))
        threads.append(cur_thread)
        cur_thread.start()
        chunk_start = chunk_end
    
    for thr in threads:
        thr.join()
    
#     num_sum = sum(thr_results[0])
#     min_num = min(thr_results[1])
#     max_num = max(thr_results[2])

    total_num_sum = 0
    total_min_num = 2 ** 32 
    total_max_num = 0
    
    for num_sum in thr_results[0]:
        total_num_sum += num_sum
    
    for min_num in thr_results[1]:
        if (min_num < total_min_num):
            total_min_num = min_num
            
    for max_num in thr_results[2]:
        if (max_num > total_max_num):
            total_max_num = max_num
    
    result_time = time.time() - start_time
    return [result_time, total_num_sum, total_min_num, total_max_num]

In [4]:
threads_number = 2 ** 8

In [25]:
multithreaded_results = multithreaded_calculations_with_mmap(f_name, threads_number)

In [26]:
print_results("Многопоточные вычисления и memory-mapped file", multithreaded_results)

Многопоточные вычисления и memory-mapped file
Время работы (c):  108.36901473999023
Сумма чисел:  144118369208273030
Минимальное число:  27
Максимальное число:  4294967244


## Многопоточность (futures) + memory-mapped files

In [5]:
def futures_calc(chunk):
    return [sum(int(x) for x in chunk), np.min(chunk), np.max(chunk)]

In [6]:
def multithreaded_calculations_with_mmap_and_futures(file_name, threads_num):
    start_time = time.time()  
    results = [0, 2 ** 32, 0]
    
    with open(file_name, 'rb') as f:
        buf = mmap.mmap(f.fileno(), length = 0, offset = 0, access = mmap.ACCESS_READ)
        arr = np.frombuffer(buf, dtype = np.dtype('uint32').newbyteorder('B'))
                
        chunks_num = len(arr) // threads_num
        chunks = [arr[i:i+chunks_num] for i in range(0, len(arr), chunks_num)] 
                
        executor = ThreadPoolExecutor(max_workers = threads_num)        
        futures = {executor.submit(futures_calc, chunk): chunk for chunk in chunks}
                
        for future in concurrent.futures.as_completed(futures):
            results[0] += future.result()[0]
            if (future.result()[1] < results[1]):
                results[1] = future.result()[1]
            if (future.result()[2] > results[2]):
                results[2] = future.result()[2] 

    result_time = time.time() - start_time
    return [result_time, results[0], results[1], results[2]]

In [7]:
futures_results = multithreaded_calculations_with_mmap_and_futures(f_name, threads_number)

In [8]:
print_results("Многопоточные вычисления (futures) и memory-mapped file", futures_results)

Многопоточные вычисления (futures) и memory-mapped file
Время работы (c):  185.91981887817383
Сумма чисел:  576451367301247579
Минимальное число:  3
Максимальное число:  4294967244
