Skip to content

sanggi-wjg/thread-manager-py

Repository files navigation

thread-manager-py

Python Thread Manager

✅Build And Test ✅ PyPI version PyPI
CodeFactor PyPI

Install

pip install thread-manager-py

Pool Manager Usage

You can find examples in test/test_pool_manager.py.

Simple Usage

import os
from pool_manager import PoolManager

def calculate(x):
    print(f"[{os.getpid()}]  func: {x}\t\t", r := x ** 5 ** 2, flush=True)
    return r

manager = PoolManager()
manager.add_task(calculate, [i for i in range(2, 22)])
manager.run_map()

manager.add_task(calculate, [i for i in range(2, 22)])
manager.add_task(calculate, [i for i in range(2, 22)])
manager.run_map()

task_result = manager.get_task_result()
with PoolManager() as manager:
    manager.add_task(calculate, [i for i in range(2, 22)])
    manager.add_task(calculate, [i for i in range(2, 22)])
    manager.add_task(calculate, [i for i in range(2, 22)])
    manager.run_map()

Thread Manager Usage

You can find examples in test/test_thread_manager.py.

Simple Usage

import time
from thread_manager import ThreadManager, ThreadArgument

def print_something(name: str, number: int):
    print(name, number)
    time.sleep(1)

    
thread_manager = ThreadManager(print_something, [
    ThreadArgument(thread_name=f"Thread:{x}", args=(x, x) )
    for x in range(1, 23)
])
thread_manager.run()

Get Thread Error

errors = thread_manager.get_errors()
has_error = thread_manager.has_error()
error_count = thread_manager.get_error_count()

for e in errors:
    print(e)

Simple Usage with decorator

from thread_manager import using_thread

@using_thread
def print_something(number, **kwargs):
    print(number, kwargs)

for i in range(10):
    print_something(i, name=f"thread-{i}")

Simple Usage with Exception Hook

from thread_manager import ThreadManager, ThreadArgument

errors = []

def func_something(*args):
    raise Exception("test error")

def func_exception_hook(*args):
    errors.append(args)

thread_manager = ThreadManager(func_something, [
    ThreadArgument(thread_name=f"Thread:{x}", args=(x, x), kwargs={}, )
    for x in range(1, 23)
], except_hook=func_exception_hook)
thread_manager.run()