-
Notifications
You must be signed in to change notification settings - Fork 579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding Context API #395
Adding Context API #395
Changes from 17 commits
1500bec
3bdf1c8
e144871
569878e
55c817a
746e591
b53bdea
76a8c03
3fe5a23
1f8d67f
4bdd27c
b28fb3f
51af769
530c21a
16eb703
ddd81a5
b935bf5
15805a4
68b98e5
3a2b0ef
62c76ec
5dca7cc
7203e72
e43f168
f81a673
ba65845
ed187bf
2b8c69f
1a0217f
212b14b
dbfeb98
2235bc3
5dc9fd5
02c7c4c
ae2cbc4
6f99621
01e0054
ebcf0bd
ddf3c8d
6f9780b
49b6abd
4c5083a
5a8a9fb
d5da10c
7995ac3
dbbdefb
37f1555
81f06a9
f29111a
057c22c
5376e60
e7d0286
0cee462
f8032c1
02b8f71
727ae50
c9c66f3
f25992f
91d2ebc
335678e
619fc1b
1f9bd76
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
opentelemetry.context.base\_context module | ||
========================================== | ||
|
||
.. automodule:: opentelemetry.context.base_context | ||
.. automodule:: opentelemetry.context.context | ||
:members: | ||
:undoc-members: | ||
:show-inheritance: |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,141 +12,121 @@ | |
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
|
||
""" | ||
The OpenTelemetry context module provides abstraction layer on top of | ||
thread-local storage and contextvars. The long term direction is to switch to | ||
contextvars provided by the Python runtime library. | ||
|
||
A global object ``Context`` is provided to access all the context related | ||
functionalities:: | ||
|
||
>>> from opentelemetry.context import Context | ||
>>> Context.foo = 1 | ||
>>> Context.foo = 2 | ||
>>> Context.foo | ||
2 | ||
|
||
When explicit thread is used, a helper function | ||
``Context.with_current_context`` can be used to carry the context across | ||
threads:: | ||
|
||
from threading import Thread | ||
from opentelemetry.context import Context | ||
|
||
def work(name): | ||
print('Entering worker:', Context) | ||
Context.operation_id = name | ||
print('Exiting worker:', Context) | ||
|
||
if __name__ == '__main__': | ||
print('Main thread:', Context) | ||
Context.operation_id = 'main' | ||
|
||
print('Main thread:', Context) | ||
|
||
# by default context is not propagated to worker thread | ||
thread = Thread(target=work, args=('foo',)) | ||
thread.start() | ||
thread.join() | ||
|
||
print('Main thread:', Context) | ||
|
||
# user can propagate context explicitly | ||
thread = Thread( | ||
target=Context.with_current_context(work), | ||
args=('bar',), | ||
) | ||
thread.start() | ||
thread.join() | ||
|
||
print('Main thread:', Context) | ||
|
||
Here goes another example using thread pool:: | ||
|
||
import time | ||
import threading | ||
|
||
from multiprocessing.dummy import Pool as ThreadPool | ||
from opentelemetry.context import Context | ||
|
||
_console_lock = threading.Lock() | ||
|
||
def println(msg): | ||
with _console_lock: | ||
print(msg) | ||
|
||
def work(name): | ||
println('Entering worker[{}]: {}'.format(name, Context)) | ||
Context.operation_id = name | ||
time.sleep(0.01) | ||
println('Exiting worker[{}]: {}'.format(name, Context)) | ||
|
||
if __name__ == "__main__": | ||
println('Main thread: {}'.format(Context)) | ||
Context.operation_id = 'main' | ||
pool = ThreadPool(2) # create a thread pool with 2 threads | ||
pool.map(Context.with_current_context(work), [ | ||
'bear', | ||
'cat', | ||
'dog', | ||
'horse', | ||
'rabbit', | ||
]) | ||
pool.close() | ||
pool.join() | ||
println('Main thread: {}'.format(Context)) | ||
|
||
Here goes a simple demo of how async could work in Python 3.7+:: | ||
|
||
import asyncio | ||
|
||
from opentelemetry.context import Context | ||
|
||
class Span(object): | ||
def __init__(self, name): | ||
self.name = name | ||
self.parent = Context.current_span | ||
|
||
def __repr__(self): | ||
return ('{}(name={}, parent={})' | ||
.format( | ||
type(self).__name__, | ||
self.name, | ||
self.parent, | ||
)) | ||
|
||
async def __aenter__(self): | ||
Context.current_span = self | ||
|
||
async def __aexit__(self, exc_type, exc, tb): | ||
Context.current_span = self.parent | ||
|
||
async def main(): | ||
print(Context) | ||
async with Span('foo'): | ||
print(Context) | ||
await asyncio.sleep(0.1) | ||
async with Span('bar'): | ||
print(Context) | ||
await asyncio.sleep(0.1) | ||
print(Context) | ||
await asyncio.sleep(0.1) | ||
print(Context) | ||
|
||
if __name__ == '__main__': | ||
asyncio.run(main()) | ||
""" | ||
|
||
from .base_context import BaseRuntimeContext | ||
|
||
__all__ = ["Context"] | ||
|
||
try: | ||
from .async_context import AsyncRuntimeContext | ||
|
||
Context = AsyncRuntimeContext() # type: BaseRuntimeContext | ||
except ImportError: | ||
from .thread_local_context import ThreadLocalRuntimeContext | ||
|
||
Context = ThreadLocalRuntimeContext() | ||
import logging | ||
import typing | ||
from os import environ | ||
|
||
from pkg_resources import iter_entry_points | ||
|
||
from opentelemetry.context.context import Context | ||
|
||
logger = logging.getLogger(__name__) | ||
_CONTEXT = None # type: typing.Optional[Context] | ||
|
||
|
||
def _copy_context(context: typing.Optional[Context]) -> Context: | ||
if context is not None: | ||
return context.copy() | ||
return get_current().copy() | ||
|
||
|
||
def create_key(key: str) -> "object": | ||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# FIXME Implement this | ||
raise NotImplementedError | ||
|
||
|
||
def get_value(key: str, context: typing.Optional[Context] = None) -> "object": | ||
""" | ||
To access the local state of an concern, the Context API | ||
c24t marked this conversation as resolved.
Show resolved
Hide resolved
|
||
provides a function which takes a context and a key as input, | ||
and returns a value. | ||
|
||
Args: | ||
key: The key of the value to retrieve. | ||
context: The context from which to retrieve the value, if None, the current context is used. | ||
""" | ||
if context is not None: | ||
return context.get_value(key) | ||
return get_current().get_value(key) | ||
|
||
|
||
def set_value( | ||
mauriciovasquezbernal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
key: str, value: "object", context: typing.Optional[Context] = None | ||
) -> Context: | ||
""" | ||
To record the local state of a cross-cutting concern, the | ||
Context API provides a function which takes a context, a | ||
key, and a value as input, and returns an updated context | ||
which contains the new value. | ||
|
||
Args: | ||
key: The key of the entry to set | ||
value: The value of the entry to set | ||
context: The context to copy, if None, the current context is used | ||
""" | ||
new_context = _copy_context(context) | ||
new_context.set_value(key, value) | ||
return new_context | ||
|
||
|
||
def remove_value( | ||
key: str, context: typing.Optional[Context] = None | ||
) -> Context: | ||
""" | ||
To remove a value, this method returns a new context with the key cleared. | ||
Note that the removed value still remains present in the old context. | ||
|
||
Args: | ||
key: The key of the entry to remove | ||
context: The context to copy, if None, the current context is used | ||
""" | ||
new_context = _copy_context(context) | ||
new_context.remove_value(key) | ||
return new_context | ||
|
||
|
||
def get_current() -> Context: | ||
""" | ||
To access the context associated with program execution, | ||
the Context API provides a function which takes no arguments | ||
and returns a Context. | ||
""" | ||
global _CONTEXT # pylint: disable=global-statement | ||
if _CONTEXT is None: | ||
# FIXME use a better implementation of a configuration manager to avoid having | ||
Oberon00 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# to get configuration values straight from environment variables | ||
_available_contexts = {} # typing.Dict[str, Context] | ||
|
||
for entry_point in iter_entry_points("opentelemetry_context"): | ||
try: | ||
_available_contexts[entry_point.name] = entry_point.load() # type: ignore | ||
except Exception: # pylint: disable=broad-except | ||
logger.error("Could not load entry_point %s", entry_point.name) | ||
|
||
configured_context = environ.get( | ||
"OPENTELEMETRY_CONTEXT", "default_context" | ||
) # type: str | ||
_CONTEXT = _available_contexts[configured_context]() # type: ignore | ||
return _CONTEXT # type: ignore | ||
|
||
|
||
def set_current(context: Context) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is supposed to represent the current context, then we cant just use a global variable. We need a context-local variable (or at least a thread local variable). But this is already what the context inside does, so it seems we have mixed up "current Context" and "configured context manager". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe my interpretation of what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Oberon00 , what do you mean with "configured context manager"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we have to distinguish four things:
EDIT: EDIT2: As a concrete example, consider setting a value in a context object and then passing it (e.g. via a queue) to another, already running, thread. In that case, the other thread must be able to read that value back. This is not currently the case, as our "Context" is in fact a context manager and will return the return value of EDIT3: I asked the spec folks: open-telemetry/opentelemetry-specification#424 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Some clarifications:
This is currently represented by the
This is implemented via the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @codeboten @ocelotl: No, ContextVarsContext and ThradLocalContext are not a contexts in the OTEP 66 sense. E.g. ThreadLocalContext is a mapping For example, in the OTEP, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think @Oberon00 has a good point here, and this will not work as expected. Using his context queue example: from queue import Queue
import asyncio
from opentelemetry import context
from opentelemetry.sdk.context.contextvars_context import ContextVarsContext
context.set_current(ContextVarsContext())
q = Queue()
async def main():
q.put(context.get_current().copy())
q.put(context.get_current().copy())
asyncio.gather(work(1), work(2))
async def work(val):
context.set_current(q.get_nowait())
context.get_current().set_value('k', val)
# At this point context.get_current() should be the one we popped from the
# queue, and current_context.get_value('k') should be `val`
await main() This works as expected, but only incidentally. If the call to from queue import Queue
from threading import Event
import asyncio
from opentelemetry import context
from opentelemetry.sdk.context.contextvars_context import ContextVarsContext
context.set_current(ContextVarsContext())
# Helper to print Contexts
def tos(context):
return "<Context at {}: {}>".format(
hex(id(context)),
','.join(["{}={}".format(k, v.get(None))
for k, v in context._contextvars.items()])
)
q = Queue()
run1 = Event()
run2 = Event()
async def main():
q.put(context.get_current().copy())
q.put(context.get_current().copy())
asyncio.gather(a1(), a2())
async def a1():
# (Step 1)
context.set_current(q.get_nowait())
context.get_current().set_value('k', 1)
# Expected context is current, k=1
print("[task 1] {}".format(tos(context.get_current())))
# (GOTO 2)
run2.set(); await asyncio.sleep(0); run1.wait();
# (Step 3)
# Current context has changed AND k=None because the call to `set` k=2
# happened in the other task!
print("[task 1] {}".format(tos(context.get_current())))
async def a2():
run2.wait()
# (Step 2)
context.set_current(q.get_nowait())
context.get_current().set_value('k', 2)
# Expected context is current, k=2
print("[task 2] {}".format(tos(context.get_current())))
# (GOTO 3)
run1.set(); await asyncio.sleep(0);
await main() This prints:
Task 1 gets the context from the (Sorry for the long example here) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ocelotl Yes, sorry I meant There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank for the clarifications. I've updated the I still need to update the PR to not use a global _CONTEXT variable, but I think this change is at least a step in the right direction. |
||
""" | ||
To associate a context with program execution, the Context | ||
API provides a function which takes a Context. | ||
|
||
Args: | ||
context: The context to use as current. | ||
""" | ||
global _CONTEXT # pylint: disable=global-statement | ||
_CONTEXT = context | ||
|
||
|
||
__all__ = [ | ||
"get_value", | ||
"set_value", | ||
"remove_value", | ||
"get_current", | ||
"set_current", | ||
"Context", | ||
] |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer writing these methods in the form
or even
return some_action(context or get_current())
.EDIT:
is (not) None
is fine