-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathcontext.py
85 lines (60 loc) · 2.19 KB
/
context.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from contextvars import ContextVar, copy_context
from typing import Optional
from quixstreams.exceptions import QuixException
from quixstreams.models.messagecontext import MessageContext
__all__ = (
"MessageContextNotSetError",
"set_message_context",
"message_context",
"copy_context",
)
_current_message_context: ContextVar[Optional[MessageContext]] = ContextVar(
"current_message_context"
)
class MessageContextNotSetError(QuixException): ...
def set_message_context(context: Optional[MessageContext]):
"""
Set a MessageContext for the current message in the given `contextvars.Context`
>***NOTE:*** This is for advanced usage only. If you need to change the message key,
`StreamingDataFrame.to_topic()` has an argument for it.
Example Snippet:
```python
from quixstreams import Application, set_message_context, message_context
# Changes the current sdf value based on what the message partition is.
def alter_context(value):
context = message_context()
if value > 1:
context.headers = context.headers + (b"cool_new_header", value.encode())
set_message_context(context)
app = Application()
sdf = app.dataframe()
sdf = sdf.update(lambda value: alter_context(value))
```
:param context: instance of `MessageContext`
"""
_current_message_context.set(context)
def message_context() -> MessageContext:
"""
Get a MessageContext for the current message, which houses most of the message
metadata, like:
- key
- timestamp
- partition
- offset
Example Snippet:
```python
from quixstreams import Application, message_context
# Changes the current sdf value based on what the message partition is.
app = Application()
sdf = app.dataframe()
sdf = sdf.apply(lambda value: 1 if message_context().partition == 2 else 0)
```
:return: instance of `MessageContext`
"""
try:
ctx = _current_message_context.get()
except LookupError:
raise MessageContextNotSetError("Message context is not set")
if ctx is None:
raise MessageContextNotSetError("Message context is not set")
return ctx