In [1]:
import numpy as np

FILE = './numbers.txt'
NUM_COUNT = 2000
THREAD_COUNT = 4

In [2]:
def generate_file(num_count = NUM_COUNT, filename = FILE):   
    numbers = np.random.randint(0, 2**32, num_count, dtype=np.dtype(np.uint32))
    res = "\n".join(list(map(lambda x: str(x), numbers)))
    with open(filename, 'w') as f:
        f.writelines(res)

generate_file()

In [3]:
def factorize(num):
    div_list = []
    div = 2
    while div**2 <= num:
        if num % div == 0:
            div_list.append(div)
            num //= div
        else:
            div += 1
    if num > 1:
        div_list.append(num)
    return div_list

In [4]:
factorize(12)

[2, 2, 3]

In [5]:
factorize(1) # единицу не считаем

[]

In [6]:
factorize(11)

[11]

Обычный подсчет

In [7]:
%%time
numbers = []
with open(FILE, 'r') as file:
    line = file.readline()
    while line != '':
        numbers.append(int(line))
        line = file.readline()

sum(list(map(lambda x: len(factorize(x)), numbers)))

CPU times: user 3.99 s, sys: 7.65 ms, total: 4 s
Wall time: 4 s


8307

Многопоточно (на Python - multiprocessing), с использованием примитивов синхронизации 

In [8]:
from multiprocessing import Pool
import mmap


In [9]:
%%time
def factorize_numbers(numbers):
    count = 0
    for number in numbers:
        count += len(factorize(int(number)))
    return count

def process_wrapper(res):
    line = map_file.readline()
    while line != '':
        res.append(factorize(int(line)))
        line = map_file.readline()
    return res


count = 0
num_list = []
with open(FILE, "r+b") as f:
         with mmap.mmap(f.fileno(), length=0, prot=mmap.PROT_READ) as map_file:
            new_line = map_file.readline()
            while new_line != b'':
                num_list.append(int(new_line))
                new_line = map_file.readline()

split_array = np.array_split(np.array(num_list), THREAD_COUNT)

with Pool(THREAD_COUNT) as pool:
    results = pool.map(factorize_numbers, split_array)
print(np.sum(results))

8307
CPU times: user 10.5 ms, sys: 24.1 ms, total: 34.5 ms
Wall time: 1.58 s


In [10]:
from multiprocessing import Process, Value, Lock, Semaphore
import os

In [11]:
def split_params(file, max_threads):
    file_size = os.stat(FILE).st_size
    batch_size = mmap.ALLOCATIONGRANULARITY
    batchs_count = -(-file_size // batch_size) # Округляем в большую сторону
    split_line_index = np.array_split(np.array([i for i in range(batchs_count)]), THREAD_COUNT)
    for i in range(max_threads):
        thread_size = (len(split_line_index[i])) * batch_size
        offset = 0 if (i == 0) else offset + length + 1
        length = min(file_size - offset - 1, thread_size)
        if i != max_threads-1:
            file.seek(offset+length)
            fr = file.read(1)
            while fr != b'\n':
                length += 1
                fr = file.read(1)
        file.seek(0)
        yield i, {'fileno': file.fileno(),
                  'length': length,
                  'offset': offset,
                  'access': mmap.ACCESS_READ}

In [12]:
offset = 0
with open(FILE, "r+b") as f:    
        for i, params in split_params(f, THREAD_COUNT):
                f.seek(params['offset'])
                with open(f'./file{i}.txt', "wb") as output:    
                        output.write(f.read(params['length']))

In [13]:
%%time
lock = Lock()
# semaphore = Semaphore(1)

def process(res, i):
    with open(f'file{i}.txt', "r+b") as f:    
        with mmap.mmap(f.fileno(), length=0, prot=mmap.PROT_READ) as map_file:
            line = map_file.readline()
            local_res = 0
            while line != b'':
                local_res += len(factorize(int(line)))
                line = map_file.readline()
            with lock:
                res.value += local_res
            return res

threads = []
res = Value('i', 0)

split_line_index = np.array_split(np.array([i for i in range(NUM_COUNT)]), THREAD_COUNT)
with open(FILE, "r+b") as f:     
        for i in range(THREAD_COUNT):
            thread = Process(target=process, args=(res, i))
            threads.append(thread)

        for thread in threads:
            thread.start()

        for thread in threads:
            thread.join()

res.value

CPU times: user 1.09 ms, sys: 19.9 ms, total: 21 ms
Wall time: 1.81 s


8307

С помощью Ray

In [14]:
import ray


In [15]:
%%time
ray.init(num_cpus=4)

CPU times: user 53 ms, sys: 43.7 ms, total: 96.8 ms
Wall time: 3.23 s


RayContext(dashboard_url='', python_version='3.9.12', ray_version='1.13.0', ray_commit='e4ce38d001dbbe09cd21c497fedd03d692b2be3e', address_info={'node_ip_address': '192.168.1.110', 'raylet_ip_address': '192.168.1.110', 'redis_address': None, 'object_store_address': '/tmp/ray/session_2022-06-10_11-30-21_267987_12439/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2022-06-10_11-30-21_267987_12439/sockets/raylet', 'webui_url': '', 'session_dir': '/tmp/ray/session_2022-06-10_11-30-21_267987_12439', 'metrics_export_port': 62388, 'gcs_address': '192.168.1.110:47904', 'address': '192.168.1.110:47904', 'node_id': '266fc5391e27ea30daef692d0a73642fbfdece1caa8a07a6847d7e28'})

In [16]:
%%time
@ray.remote
def map(obj, f):
    return f(obj)

def map_func(num_list):
    sum = 0
    for num in num_list:
        res = factorize(num)
        sum += len(res)
    return sum

@ray.remote
def sum_results(*counts):
    return np.sum(counts)

num_list = []
with open(FILE, "r+b") as f:
        with mmap.mmap(f.fileno(), length=0, prot=mmap.PROT_READ) as map_file: 
            new_line = map_file.readline()
            while new_line != b'':
                num_list.append(int(new_line))
                new_line = map_file.readline()
split_array = np.array_split(np.array(num_list), THREAD_COUNT)

factors_counts = [map.remote(subarray, map_func) for subarray in split_array]
final_sum = sum_results.remote(*factors_counts)
res = ray.get(final_sum)
print(res)


8307
CPU times: user 32.6 ms, sys: 4.81 ms, total: 37.4 ms
Wall time: 2.84 s
