Skip to content

Commit

Permalink
Merge 080e9da into 84b3f3f
Browse files Browse the repository at this point in the history
  • Loading branch information
timgates42 committed Mar 17, 2020
2 parents 84b3f3f + 080e9da commit e75a8d6
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 13 deletions.
23 changes: 10 additions & 13 deletions app/meticulous/_process.py
Expand Up @@ -49,6 +49,7 @@
from meticulous._sources import obtain_sources
from meticulous._storage import get_json_value, prepare, set_json_value
from meticulous._summary import display_and_check_files, display_repo_intro
from meticulous._threadpool import main as threadpool_main
from meticulous._util import get_browser, get_editor
from meticulous._websearch import Suggestion, get_suggestion

Expand Down Expand Up @@ -128,6 +129,7 @@ def manual_menu(target):
try:
lookup = {
"automated process": automated_process,
"automated work queue": automated_work_queue,
"examine a repository": examine_repo_selection,
"manually add a new repository": manually_add_new_repo,
"remove a repository": remove_repo_selection,
Expand All @@ -136,8 +138,6 @@ def manual_menu(target):
"prepare a pr/issue": prepare_a_pr_or_issue,
"show statistics": show_statistics,
}
if not lookup:
lookup["test"] = test
handler = make_choice(lookup)
if handler is None:
print("Goodbye.")
Expand Down Expand Up @@ -681,16 +681,6 @@ def context_to_filename(name):
raise Exception(f"Unable to get filepath for {name}")


def test(target): # pylint: disable=unused-argument
"""
Prompt for a organization and repository to test
"""
orgrepo = get_input("What organization/repository name?")
if orgrepo is None:
return
print(get_true_orgrepo(orgrepo))


def automated_process(target): # pylint: disable=unused-argument
"""
Work out the current point in the automated workflow and process the next
Expand All @@ -703,6 +693,13 @@ def automated_process(target): # pylint: disable=unused-argument
my_engine.process([State(target)])


def automated_work_queue(target): # pylint: disable=unused-argument
"""
Run the multi task work queue
"""
threadpool_main({})


class State: # pylint: disable=too-few-public-methods
"""
Store the workflow state.
Expand Down Expand Up @@ -954,7 +951,7 @@ def get_sorted_words(jsonobj):
obj = Suggestion.load(details["suggestion"])
details["suggestion_obj"] = obj
priority = obj.priority
order.append(((priority, len(details["files"]),), word))
order.append(((priority, len(details["files"])), word))
order.sort(reverse=True)
return [word for _, word in order]

Expand Down
94 changes: 94 additions & 0 deletions app/meticulous/_threadpool.py
@@ -0,0 +1,94 @@
"""
Multithread processing to maximize time value of user input
"""

import concurrent.futures
import logging


class PoolManager:
"""
Used to add tasks that must be json serializable to pass to threads for
execution or if requiring input saved to the user input priority heap.
"""

def __init__(self, handlers, max_workers):
self._handlers = handlers
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
self._draining = False
self._saved = []

def add(self, taskjson):
"""
add a task to the executor
"""
if self._draining:
raise Exception("No new tasks when draining.")
self._executor.submit(self.run_task, taskjson=taskjson)

def run_task(self, taskjson):
"""
Called by a thread in the pool to run the task
"""
try:
if self._draining:
self._saved.append(taskjson)
return
handler = self.load_handler(taskjson)
handler()
except Exception: # pylint: disable=broad-except
logging.exception("Unhandled error")

def load_handler(self, taskjson):
"""
Lookup the handlers to return a task
"""
factory = self._handlers[taskjson["name"]]
return factory(taskjson)

def stop(self):
"""
Wait for current tasks to complete
"""
self._executor.shutdown()

def __enter__(self):
"""
Implement python with interface
"""
return self

def __exit__(self, type, value, traceback): # pylint: disable=redefined-builtin
"""
Implement python with interface
"""
self.stop()

def drain(self):
"""
Signal to stop executing new tasks
"""
self._draining = True

def save(self):
"""
Shutdown and collect and saved results
"""
self.drain()
self.stop()
return self._saved


def get_pool(handlers, max_workers=5):
"""
Obtain a threadpool
"""
return PoolManager(handlers, max_workers)


def main(handlers):
"""
Main entry point to run pool manager
"""
with get_pool(handlers) as pool:
return pool.save()
64 changes: 64 additions & 0 deletions app/meticulous/_threadpool_test.py
@@ -0,0 +1,64 @@
"""
Test cases to ensure tasks are picked up and executed concurrently whilst
serializing user input
"""

import threading

from meticulous._threadpool import get_pool


def test_add_async():
"""
Check adding async task correctly runs
"""
# Setup
result = []

def run():
result.append(True)
return result

def load_run(_):
return run

pool = get_pool({"run": load_run})
# Exercise
pool.add({"name": "run"})
# Verify
pool.stop()
assert result[0] # noqa=S101 # nosec


def test_shutdown():
"""
Check saving async task beyond number of works suspends correctly
"""
# Setup
cond = threading.Condition()
running = [0]

def run():
with cond:
running[0] += 1
cond.wait(60)

def load_run(_):
return run

pool = get_pool({"run": load_run}, max_workers=2)
taskjson = {"name": "run"}
for _ in range(10):
pool.add(taskjson)
with cond:
while running[0] < 2:
cond.wait()
pool.drain()
with cond:
cond.notify()
cond.notify()
pool.stop()
# Exercise
result = pool.save()
# Verify
assert result == ([taskjson] * 8) # noqa=S101 # nosec

0 comments on commit e75a8d6

Please sign in to comment.