Skip to content

Commit

Permalink
Merge pull request #8 from sanggi-wjg/feature/refactoring
Browse files Browse the repository at this point in the history
♻️ 소스 정리 및 리팩토링
  • Loading branch information
sanggi-wjg committed Apr 16, 2023
2 parents a8e3e66 + 574f4fe commit c9b9318
Show file tree
Hide file tree
Showing 14 changed files with 340 additions and 226 deletions.
2 changes: 2 additions & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
### What's changed
-
4 changes: 3 additions & 1 deletion .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: ✅Test
run: |
python -m unittest tests.test_package
python -m unittest
report-codecov:
runs-on: ubuntu-latest
Expand All @@ -43,6 +43,8 @@ jobs:

- name: ✅Generate Report
run: |
python -m pip install --upgrade pip
if [ -f requirements-test.txt ]; then pip install -r requirements-test.txt; fi
pip install coverage
coverage run -m unittest
Expand Down
8 changes: 0 additions & 8 deletions .idea/.gitignore

This file was deleted.

25 changes: 18 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,40 @@ pip install thread-manager-py


## Pool Manager Usage
#### You can find examples in `tests/test_package.py`.
#### You can find examples in `test/test_pool_manager.py`.


### Simple Usage
```python
import os
from pool_manager import PoolManager

def _calculate(x):
def calculate(x):
print(f"[{os.getpid()}] func: {x}\t\t", r := x ** 5 ** 2, flush=True)
return r

manager = PoolManager()
manager.add_task(_calculate, [i for i in range(2, 22)])
manager.add_task(calculate, [i for i in range(2, 22)])
manager.run_map()
manager.add_task(_calculate, [i for i in range(2, 22)])
manager.add_task(_calculate, [i for i in range(2, 22)])

manager.add_task(calculate, [i for i in range(2, 22)])
manager.add_task(calculate, [i for i in range(2, 22)])
manager.run_map()

task_result = manager.get_task_result()
```

```python
with PoolManager() as manager:
manager.add_task(calculate, [i for i in range(2, 22)])
manager.add_task(calculate, [i for i in range(2, 22)])
manager.add_task(calculate, [i for i in range(2, 22)])
manager.run_map()
```


## Thread Manager Usage
#### You can find examples in `tests/test_package.py`.
#### You can find examples in `test/test_thread_manager.py`.


### Simple Usage
Expand All @@ -52,8 +61,10 @@ def print_something(name: str, number: int):
print(name, number)
time.sleep(1)


thread_manager = ThreadManager(print_something, [
ThreadArgument(thread_name=f"Thread:{x}", args=(x, x), kwargs={}, ) for x in range(1, 23)
ThreadArgument(thread_name=f"Thread:{x}", args=(x, x) )
for x in range(1, 23)
])
thread_manager.run()
```
Expand Down
2 changes: 1 addition & 1 deletion pool_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

from pool_manager.manager import PoolManager

logger = logging.getLogger("pool.manager.py")
logger = logging.getLogger("pool.manager")
logger.addHandler(logging.NullHandler())
logger.propagate = False
57 changes: 41 additions & 16 deletions pool_manager/manager.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import collections
import logging
import multiprocessing
from multiprocessing import Pool
from multiprocessing.pool import MapResult, IMapIterator
from queue import PriorityQueue
from typing import Callable, Any

log = logging.getLogger("pool.manager")


class PoolManager:
_instance = None
default_timeout = 30

def __new__(cls, *args, **kwargs):
if not hasattr(cls, 'instance'):
cls._instance = super(PoolManager, cls).__new__(cls)
return cls._instance
cls.instance = super(PoolManager, cls).__new__(cls)
return cls.instance

def __init__(self, process_count: int = multiprocessing.cpu_count()):
"""
Expand All @@ -23,6 +26,13 @@ def __init__(self, process_count: int = multiprocessing.cpu_count()):
self.task_queue: PriorityQueue = PriorityQueue()
self.task_result_queue: collections.deque = collections.deque()
self.pool: Pool = Pool(min(process_count, multiprocessing.cpu_count()))
log.debug(f"pool manager inited, {min(process_count, multiprocessing.cpu_count())}")

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def is_empty_task(self) -> bool:
return self.task_queue.empty()
Expand All @@ -33,8 +43,13 @@ def has_task(self) -> bool:
def clear_task(self) -> bool:
while self.has_task():
self.task_queue.get(False)
log.debug("task_queue cleared")
return True

def close(self):
self.clear_task()
self.pool.close()

def add_task(self, callable_func: Callable, *arguments: list, priority: int = 100):
"""
Expand All @@ -46,39 +61,49 @@ def add_task(self, callable_func: Callable, *arguments: list, priority: int = 10
:type priority: int
"""
self.task_queue.put((priority, callable_func, *arguments), timeout=self.default_timeout)
log.debug(f"task_queue put, {priority}, {callable_func}. {arguments}")

def _get_task(self) -> Callable:
return self.task_queue.get()

def add_task_result(self, task_result: Any):
self.task_result_queue.append(task_result)
if isinstance(task_result, list):
self.task_result_queue.append(task_result)

elif isinstance(task_result, MapResult):
self.task_result_queue.append(task_result.get())

elif isinstance(task_result, IMapIterator):
self.task_result_queue.append([r for r in task_result])

else:
log.warning("task_results not saved.")
# assert False, "task_results not saved."

def get_task_result(self) -> collections.deque:
return self.task_result_queue.copy()

def _run(self, pool_func: Callable):
while not self.is_empty_task():
_, func, arguments = self._get_task()
result = pool_func(func, arguments)
log.debug(f"task_queue run, {func}")
self.add_task_result(result)

def run_map(self):
"""
https://stackoverflow.com/questions/26520781/multiprocessing-pool-whats-the-difference-between-map-async-and-imap
"""
while not self.is_empty_task():
_, func, arguments = self._get_task()
result = self.pool.map(func, arguments)
self.add_task_result(result)
self._run(self.pool.map)

def run_map_async(self):
"""
"""
while not self.is_empty_task():
_, func, arguments = self._get_task()
result = self.pool.map_async(func, arguments)
self.add_task_result(result.get())
self._run(self.pool.map_async)

def run_imap(self):
"""
"""
while not self.is_empty_task():
_, func, arguments = self._get_task()
result = self.pool.imap(func, arguments)
self.add_task_result([r for r in result])
self._run(self.pool.imap)
Empty file added test/__init__.py
Empty file.
32 changes: 32 additions & 0 deletions test/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging
import os
import unittest


class TestBase(unittest.TestCase):
allowed_logger_names = ["thread.manager", "pool.manager"]
logger_handler = None

@classmethod
def _calculate(cls, x):
assert isinstance(x, int)

print(f"[{os.getpid()}] func: {x}\t\t", r := x ** 5 ** 2, flush=True)
return r

@classmethod
def setUpClass(cls):
# https://docs.python.org/ko/3/library/logging.html#logrecord-attributes
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s (%(name)s) (%(levelname)s) [pid:%(process)d] [thread:%(thread)d:%(threadName)s] (%(filename)s L%(lineno)d) %(message)s")
handler.setFormatter(formatter)
cls.logger_handler = handler

def get_test_logger(self, logger_name: str = "thread.manager"):
assert logger_name in self.allowed_logger_names

log = logging.getLogger(logger_name)
log.setLevel(logging.DEBUG)
log.addHandler(self.logger_handler)
return log
88 changes: 88 additions & 0 deletions test/test_pool_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import logging

from pool_manager import PoolManager
from test.test_base import TestBase


class TestPollManager(TestBase):
default_range = [i for i in range(2, 32)]

def test_task_queue(self):
# given
manager = PoolManager(2)
# when
manager.add_task(self._calculate, self.default_range)
# then
assert not manager.is_empty_task()
assert manager.has_task()
assert manager.clear_task()
manager.run_map()
manager.close()

def test_context_manager(self):
with PoolManager() as manager:
manager.add_task(self._calculate, self.default_range)
manager.add_task(self._calculate, self.default_range)
manager.add_task(self._calculate, self.default_range)
manager.run_map()
assert manager.get_task_result()
assert not manager.has_task()

def test_run(self):
# given
manager = PoolManager()
# when
manager.add_task(self._calculate, self.default_range)
manager.run_map()
manager.add_task(self._calculate, self.default_range)
manager.add_task(self._calculate, self.default_range)
manager.run_map()
# then
task_result = manager.get_task_result()
assert task_result
manager.close()

def test_run_async(self):
# given
manager = PoolManager()
# when
manager.add_task(self._calculate, self.default_range)
manager.run_map_async()
manager.add_task(self._calculate, self.default_range)
manager.add_task(self._calculate, self.default_range)
manager.run_map_async()
# then
task_result = manager.get_task_result()
assert task_result
manager.close()

def test_run_imap(self):
# given
manager = PoolManager()
# when
manager.add_task(self._calculate, self.default_range)
manager.run_imap()
manager.add_task(self._calculate, self.default_range)
manager.add_task(self._calculate, self.default_range)
manager.run_imap()
# then
task_result = manager.get_task_result()
assert task_result
manager.close()

def test_logger(self):
log = self.get_test_logger("pool.manager")

# given
manager = PoolManager()
# when
manager.add_task(self._calculate, self.default_range)
manager.run_map()
# then
task_result = manager.get_task_result()
assert task_result
assert not manager.has_task()
# revert
log.removeHandler(self.logger_handler)
log.addHandler(logging.NullHandler())
manager.close()
Loading

0 comments on commit c9b9318

Please sign in to comment.