In [1]:
import os, math, sys
from time import perf_counter
from typing import NamedTuple, TypeAlias
from multiprocessing import Process, SimpleQueue, queues  # <1>
from multiprocessing import set_start_method


from primes import least_prime_factor, make_sample

from table import Table

# Magnitude of primes that take a few seconds to check
#
# machine  magnitude
# RPI4     2 ** 49
# X250     2 ** 53
# YOGA9    2 ** 57
# M2MAX    2 ** 57
# VIVO     2 ** 63

MAGNITUDE = 2**57
sample = sorted(make_sample(MAGNITUDE))

set_start_method('fork')  # make MacOS happy

sample

[18014398509481951,
 18014398509481984,
 18014399314786597,
 27021597764222939,
 27021597764222976,
 27021598744655129,
 36028797018963913,
 36028797018963968,
 36028803757875637,
 54043195528445869,
 54043195528445952,
 54043196378148947,
 72057594037927931,
 72057594037927936,
 72057596722278677,
 108086391056891903,
 108086391056891904,
 108086392348502419,
 144115188075855859,
 144115188075855872,
 144115189976253901]

In [2]:
class Experiment(NamedTuple):  # <3>
    n: int
    lpf: int
    elapsed: float

    @property
    def prime(self):
        return self.n == self.lpf

def check(n: int) -> Experiment:  # <6>
    t0 = perf_counter()
    res = least_prime_factor(n)
    return Experiment(n, res, perf_counter() - t0)

check(7)

Experiment(n=7, lpf=7, elapsed=8.594997780164704e-06)

In [3]:
check(9)

Experiment(n=9, lpf=3, elapsed=1.5760015230625868e-06)

In [4]:
check(72057596722278677)

Experiment(n=72057596722278677, lpf=268435399, elapsed=7.265580000999762)

In [5]:
JobQueue = queues.SimpleQueue[int]  # <4>
ResultQueue = queues.SimpleQueue[Experiment]  # <5>

def worker(jobs: JobQueue, results: ResultQueue) -> None:  # <7>
    while n := jobs.get():  # <8>
        results.put(check(n))  # <9>
    results.put(Experiment(0, False, 0.0))  # <10>

In [6]:
def xstart_jobs(qtd_procs: int, results: ResultQueue) -> None:
    jobs: JobQueue = SimpleQueue()  # <2>
    for n in make_sample(MAGNITUDE):
        jobs.put(n)  # <12>
    for _ in range(qtd_procs):
        proc = Process(target=worker, args=(jobs, results))  # <13>
        proc.start()  # <14>
        jobs.put(0)  # <15> "poison pill"

In [7]:
def queue_jobs() -> JobQueue:
    jobs: JobQueue = SimpleQueue()  # <2>
    for n in make_sample(MAGNITUDE):
        jobs.put(n)  # <12>
    return jobs
    
def start_jobs(qtd_procs: int, jobs: JobQueue, results: ResultQueue):    
    for _ in range(qtd_procs):
        proc = Process(target=worker, args=(jobs, results))  # <13>
        proc.start()  # <14>
        jobs.put(0)  # <15> "poison pill"

In [8]:
def report(qtd_procs: int, results: ResultQueue) -> int:  # <6>
    checked = 0
    procs_done = 0
    while procs_done < qtd_procs:  # <7>
        exp = results.get()  # <8>
        if exp.n == 0:  # <9>
            procs_done += 1
        else:
            checked += 1  # <10>
            label = '=' if exp.prime else ' '
            #print(f'{exp.n:26_d}  {label}  {exp.lpf:26_d}  {exp.elapsed:9.6f}s')
            table.update(exp.n, exp.lpf, exp.elapsed)
            
    return checked

In [9]:
table = Table(sample)
table.display()

def supervisor(qtd_procs: int) -> None:
    print(f'Using {qtd_procs} worker processes.')
    t0 = perf_counter()
    jobs = queue_jobs()
    results: ResultQueue = SimpleQueue()
    start_jobs(qtd_procs, jobs, results)  # <3>
    checked = report(qtd_procs, results)  # <4>
    elapsed = perf_counter() - t0
    print(f'{checked} checks in {elapsed:.1f}s')

supervisor(os.cpu_count())

VBox(children=(Valid(value=False, description='18_014_398_509_481_951', layout=Layout(width='90%'), readout='⏳…

Using 12 worker processes.
21 checks in 20.7s
