Skip to content

Commit

Permalink
release 3.115.20293
Browse files Browse the repository at this point in the history
  • Loading branch information
klahnakoski committed Oct 19, 2020
2 parents 21cbe81 + b4595e1 commit 8a0b110
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 15 deletions.
16 changes: 9 additions & 7 deletions mo_threads/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import subprocess

from mo_dots import set_default, to_data, Null
from mo_files import File
from mo_future import text
from mo_logs import Log, strings
from mo_logs.exceptions import Except
Expand All @@ -23,7 +24,7 @@
from mo_threads.till import Till
from mo_times import Timer

DEBUG = True
DEBUG = False


class Process(object):
Expand Down Expand Up @@ -207,11 +208,14 @@ def _kill(self):

PROMPT = "READY_FOR_MORE"

if "windows" in platform.system().lower():
# def cmd_escape(v):
# return "".join(WINDOWS_ESCAPE_DCT.get(c, c) for c in v)
cmd_escape = strings.quote

def cmd_escape(value):
if isinstance(value, File):
return value.abspath
return strings.quote(value)


if "windows" in platform.system().lower():
def set_prompt():
return "prompt "+PROMPT+"$g"

Expand All @@ -222,8 +226,6 @@ def to_text(value):
return value.decode("latin1")

else:
cmd_escape = strings.quote

def set_prompt():
return "set prompt="+cmd_escape(PROMPT+">")

Expand Down
3 changes: 2 additions & 1 deletion mo_threads/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,6 @@ def write_profiles(main_thread_profile):

tab = stats2tab(acc)

stats_file = File(FILENAME, suffix=Date.now().format("_%Y%m%d_%H%M%S")).write(tab)
stats_file = File(FILENAME).add_suffix(Date.now().format("_%Y%m%d_%H%M%S"))
stats_file.write(tab)
Log.note("profile written to {{filename}}", filename=stats_file.abspath)
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
classifiers=["Programming Language :: Python :: 2.7","Programming Language :: Python :: 3.7","Development Status :: 4 - Beta","Topic :: Software Development :: Libraries","Topic :: Software Development :: Libraries :: Python Modules","License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)"],
description='More Threads! Simpler and faster threading.',
include_package_data=True,
install_requires=["mo-dots>=3.93.20259","mo-future>=3.89.20246","mo-logs>=3.99.20292","mo-math>=3.91.20246","mo-times>=3.76.20190"],
install_requires=["mo-dots>=3.93.20259","mo-future>=3.89.20246","mo-logs>=3.111.20292","mo-math>=3.91.20246","mo-times>=3.76.20190"],
license='MPL 2.0',
long_description='\n# More Threads!\n\n\n|Branch |Status |\n|------------|---------|\n|master | [![Build Status](https://travis-ci.org/klahnakoski/mo-threads.svg?branch=master)](https://travis-ci.org/klahnakoski/mo-threads) |\n|dev | [![Build Status](https://travis-ci.org/klahnakoski/mo-threads.svg?branch=dev)](https://travis-ci.org/klahnakoski/mo-threads) [![Coverage Status](https://coveralls.io/repos/github/klahnakoski/mo-threads/badge.svg?branch=dev)](https://coveralls.io/github/klahnakoski/mo-threads?branch=dev) |\n\n## Module `threads`\n\nThe main benefits over Python\'s threading library is:\n\n1. **Multi-threaded queues do not use serialization** - Serialization is \ngreat in the general case, where you may also be communicating between \nprocesses, but it is a needless overhead for single-process multi-threading. \nIt is left to the programmer to ensure the messages put on the queue are \nnot changed, which is not ominous demand.\n2. **Shutdown order is deterministic and explicit** - Python\'s threading \nlibrary is missing strict conventions for controlled and orderly shutdown. \nThese conventions eliminate the need for `interrupt()` and `abort()`, both of \nwhich are unstable idioms when using resources. Each thread can shutdown on \nits own terms, but is expected to do so expediently.\n * All threads are required to accept a `please_stop` signal; are \n expected to test it in a timely manner; and expected to exit when signalled.\n * All threads have a parent - The parent is responsible for ensuring their \n children get the `please_stop` signal, and are dead, before stopping \n themselves. This responsibility is baked into the thread spawning process, \n so you need not deal with it unless you want.\n3. Uses [**Signals**](#signal-class) (much like [Events](https://docs.python.org/2/library/threading.html#event-objects)) to simplify logical \ndependencies among multiple threads, events, and timeouts.\n4. **Logging and Profiling is Integrated** - Logging and exception handling \nis seamlessly integrated: This means logs are centrally handled, and thread \nsafe. Parent threads have access to uncaught child thread exceptions, and \nthe cProfiler properly aggregates results from the multiple threads.\n\n\n### What\'s it used for\n\nA good amount of time is spent waiting for underlying C libraries and OS\nservices to respond to network and file access requests. Multiple\nthreads can make your code faster, despite the GIL, when dealing with those\nrequests. For example, by moving logging off the main thread, we can get\nup to 15% increase in overall speed because we no longer have the main thread\nwaiting for disk writes or remote logging posts. Please note, this level of\nspeed improvement can only be realized if there is no serialization happening\nat the multi-threaded queue. \n\n### Do not use Async\n\n[Actors](http://en.wikipedia.org/wiki/Actor_model) are easier to reason about than [async tasks](https://docs.python.org/3/library/asyncio-task.html). Mixing regular methods and co-routines (with their `yield from` pollution) is dangerous because:\n\n1. calling styles between synchronous and asynchronous methods can be easily confused\n2. actors can use blocking methods, async can not\n3. there is no way to manage resource priority with co-routines.\n4. stack traces are lost with co-routines\n5. async scope easily escapes lexical scope, which promotes bugs \n\nPython\'s async efforts are still immature; a re-invention of threading functionality by another name. Expect to experience a decade of problems that are already solved by threading; [here is an example](https://www.python.org/dev/peps/pep-0550/). \n\n**Reading**\n\n* Fibers were an async experiment using a stack, as opposed to the state-machine-based async Python uses now. It does not apply to my argument, but is an interesting read: [[Fibers are] not an appropriate solution for writing scalable concurrent software](http://www.open-std.org/JTC1/SC22/WG21/docs/papers/2018/p1364r0.pdf)\n\n\n## Usage\n\nMost threads will be declared and run in a single line. It is much like Python\'s threading library, except it demands a name for the thread: \n\n thread = Thread.run("name", function, p1, p2, ...)\n \nSometimes you want to separate creation from starting:\n\n thread = Thread("name", function, p1, p2, ...)\n thread.start()\n \n### `join()` vs `release()`\n\nOnce a thread is created, one of two actions can be performed.\n\n* `join()` - Join on `thread` will make the caller thread wait until `thread` has stopped. Then, return the resulting value or to re-raise `thread`\'s exception in the caller.\n\n result = thread.join() # may raise exception\n\n* `release()` - Will ignore any return value, and post any exception to logging. Tracking is still performed; released threads are still properly stopped. You may still `join()` on a released thread, but you risk being too late: The thread will have already completed and logged it\'s failure.\n\n thread.release() # release thread resources asap\n \n### Registering Threads\n\nThreads created without this module can call your code; You want to ensure these "alien" threads have finished their work, released the locks, and exited your code before stopping. If you register alien threads, then `mo-threads` will ensure the alien work is done for a clean stop. \n\n def my_method():\n with RegisterThread():\n t = Thread.current() # we can now use this library on this thread \n print(t.name) # a name is always given to the alien thread \n\n\n### Synchronization Primitives\n\nThere are three major aspects of a synchronization primitive:\n\n* **Resource** - Monitors and locks can only be owned by one thread at a time\n* **Binary** - The primitive has only two states\n* **Irreversible** - The state of the primitive can only be set, or advanced, never reversed\n\nThe last, *irreversibility* is very useful, but ignored in many threading\nlibraries. The irreversibility allows us to model progression; and\nwe can allow threads to poll for progress, or be notified of progress. \n\nThese three aspects can be combined to give us 8 synchronization primitives:\n\n* `- - -` - Semaphore\n* `- B -` - Binary Semaphore\n* `R - -` - Monitor\n* `R B -` - **[Lock](#lock-class)**\n* `- - I` - Iterator/generator\n* `- B I` - **[Signal](#signal-class)** (or Promise)\n* `R - I` - Private Iterator \n* `R B I` - Private Signal (best implemented as `is_done` Boolean flag)\n\n## `Lock` Class\n\nLocks are identical to [threading monitors](https://en.wikipedia.org/wiki/Monitor_(synchronization)), except for two differences: \n\n1. The `wait()` method will **always acquire the lock before returning**. This is an important feature, it ensures every line inside a `with` block has lock acquisition, and is easier to reason about.\n2. Exiting a lock via `__exit__()` will **always** signal a waiting thread to resume. This ensures no signals are missed, and every thread gets an opportunity to react to possible change.\n3. `Lock` is **not reentrant**! This is a feature to ensure locks are not held for long periods of time.\n\n**Example**\n```python\nlock = Lock()\nwhile not please_stop:\n with lock:\n while not todo:\n lock.wait(seconds=1)\n # DO SOME WORK\n```\nIn this example, we look for stuff `todo`, and if there is none, we wait for a second. During that time others can acquire the `lock` and add `todo` items. Upon releasing the the `lock`, our example code will immediately resume to see what\'s available, waiting again if nothing is found.\n\n\n## `Signal` Class\n\n[The `Signal` class](mo_threads/signals.py) is a binary semaphore that can be signalled only once; subsequent signals have no effect. It can be signalled by any thread; any thread can wait on a `Signal`; and once signalled, all waiting threads are unblocked, including all subsequent waiting threads. A Signal\'s current state can be accessed by any thread without blocking. `Signal` is used to model thread-safe state advancement. It initializes to `False`, and when signalled (with `go()`) becomes `True`. It can not be reversed. \n\nSignals are like a Promise, but more explicit \n\n| Signal | Promise |\n|:----------:|:------------------:|\n| s.go() | s.resolve() |\n| s.then(f) | s.then(m) |\n| s.wait() | await s |\n| s & t | Promise.all(s, t) | \n| s | t | Promise.race(s, t) |\n\n```python\nis_done = Signal()\nyield is_done # give signal to another that wants to know when done\n# DO WORK\nis_done.go()\n```\n\nYou can attach methods to a `Signal`, which will be run, just once, upon `go()`. If already signalled, then the method is run immediately.\n\n```python\nis_done = Signal()\nis_done.then(lambda: print("done"))\nreturn is_done\n```\n\nYou may also wait on a `Signal`, which will block the current thread until the `Signal` is a go\n\n```python\nis_done = worker_thread.stopped.wait()\nprint("worker thread is done")\n```\n\n> Waiting on the `stopped` signal is different than `join()`; the latter will return the thread state (or throw an exception)\n\n\n`Signals` are first class, they can be passed around and combined with other Signals. For example, using the `__or__` operator (`|`): `either = lhs | rhs`; `either` will be triggered when `lhs` or `rhs` is triggered.\n\n```python\ndef worker(please_stop):\n while not please_stop:\n #DO WORK \n\nuser_cancel = get_user_cancel_signal()\nworker(user_cancel | Till(seconds=360))\n```\n\n`Signal`s can also be combined using logical and (`&`): `both = lhs & rhs`; `both` is triggered only when both `lhs` and `rhs` are triggered:\n\n```python\n(workerA.stopped & workerB.stopped).wait()\nprint("both threads are done")\n```\n\n## `Till` Class\n\n[The `Till` class](mo-threads/till.py) is a special `Signal` used to represent timeouts. \n\n```python\nTill(seconds=20).wait()\nTill(till=Date("21 Jan 2016").unix).wait()\n```\n\nUse `Till` rather than `sleep()` because you can combine `Till` objects with other `Signals`. \n\n**Beware that all `Till` objects will be triggered before expiry when the main thread is asked to shutdown**\n',
long_description_content_type='text/markdown',
name='mo-threads',
packages=["mo_threads"],
url='https://github.com/klahnakoski/mo-threads',
version='3.99.20292',
version='3.115.20293',
zip_safe=False
)
4 changes: 2 additions & 2 deletions setuptools.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"description": "More Threads! Simpler and faster threading.",
"include_package_data": true,
"install_requires": [
"mo-dots>=3.93.20259", "mo-future>=3.89.20246", "mo-logs>=3.99.20292",
"mo-dots>=3.93.20259", "mo-future>=3.89.20246", "mo-logs>=3.111.20292",
"mo-math>=3.91.20246", "mo-times>=3.76.20190"
],
"license": "MPL 2.0",
Expand Down Expand Up @@ -236,6 +236,6 @@
"name": "mo-threads",
"packages": ["mo_threads"],
"url": "https://github.com/klahnakoski/mo-threads",
"version": "3.99.20292",
"version": "3.115.20293",
"zip_safe": false
}
7 changes: 4 additions & 3 deletions tests/test_processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from __future__ import unicode_literals

import os
from unittest import skipIf
from unittest import skipIf, skip

from mo_logs import Log
from mo_testing.fuzzytestcase import FuzzyTestCase
Expand Down Expand Up @@ -60,7 +60,7 @@ def test_sigint_no_exit(self):
p.join()
self.assertTrue(any("EXIT DETECTED" in line for line in p.stdout.pop_all()))

@skipIf(IS_TRAVIS, "travis can not kill")
@skip("travis can not kill, problem on windows")
def test_sigint(self):
"""
CAN WE CATCH A SIGINT?
Expand All @@ -69,10 +69,11 @@ def test_sigint(self):
"waiting", ["python", "-u", "tests/programs/sigint_test.py"], debug=True
)
p.stdout.pop() # WAIT FOR PROCESS TO START
Till(seconds=2).wait()
if IS_WINDOWS:
# Process("killer", ["TASKKILL", "/F", "/PID", p.pid], shell=True)
import signal
os.kill(p.pid, signal.CTRL_C_EVENT)
# Log.note("sent ctrl-c to {{pid}}", pid=p.pid)
else:
Process("killer", ["kill", "-SIGINT", p.pid]).join()
p.join()
Expand Down

0 comments on commit 8a0b110

Please sign in to comment.