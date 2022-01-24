temporalio / sdk-python Public
-
Notifications
You must be signed in to change notification settings
- Fork 65
-
/
README.md
Latest commit
HistoryHistory
1496 lines (1141 loc) · 69.2 KB
/
README.md
File metadata and controls
1496 lines (1141 loc) · 69.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
![Temporal Python SDK](https://assets.temporal.io/w/py-banner.svg)
[![Python 3.8+](https://img.shields.io/pypi/pyversions/temporalio.svg?style=for-the-badge)](https://pypi.org/project/temporalio)
[![PyPI](https://img.shields.io/pypi/v/temporalio.svg?style=for-the-badge)](https://pypi.org/project/temporalio)
[![MIT](https://img.shields.io/pypi/l/temporalio.svg?style=for-the-badge)](LICENSE)
[Temporal](https://temporal.io/) is a distributed, scalable, durable, and highly available orchestration engine used to
execute asynchronous, long-running business logic in a scalable and resilient way.
"Temporal Python SDK" is the framework for authoring workflows and activities using the Python programming language.
Also see:
* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) - Once you've tried our
[Quick Start](#quick-start), check out our guide on how to use Temporal in your Python applications, including
information around Temporal core concepts.
* [Python Code Samples](https://github.com/temporalio/samples-python)
* [API Documentation](https://python.temporal.io) - Complete Temporal Python SDK Package reference.
In addition to features common across all Temporal SDKs, the Python SDK also has the following interesting features:
**Type Safe**
This library uses the latest typing and MyPy support with generics to ensure all calls can be typed. For example,
starting a workflow with an `int` parameter when it accepts a `str` parameter would cause MyPy to fail.
**Different Activity Types**
The activity worker has been developed to work with `async def`, threaded, and multiprocess activities. While
`async def` activities are the easiest and recommended, care has been taken to make heartbeating and cancellation also
work across threads/processes.
**Custom `asyncio` Event Loop**
The workflow implementation basically turns `async def` functions into workflows backed by a distributed, fault-tolerant
event loop. This means task management, sleep, cancellation, etc have all been developed to seamlessly integrate with
`asyncio` concepts.
See the [blog post](https://temporal.io/blog/durable-distributed-asyncio-event-loop) introducing the Python SDK for an
informal introduction to the features and their implementation.
---
<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Contents**
- [Quick Start](#quick-start)
- [Installation](#installation)
- [Implementing a Workflow](#implementing-a-workflow)
- [Running a Workflow](#running-a-workflow)
- [Next Steps](#next-steps)
- [Usage](#usage)
- [Client](#client)
- [Data Conversion](#data-conversion)
- [Custom Type Data Conversion](#custom-type-data-conversion)
- [Workers](#workers)
- [Workflows](#workflows)
- [Definition](#definition)
- [Running](#running)
- [Invoking Activities](#invoking-activities)
- [Invoking Child Workflows](#invoking-child-workflows)
- [Timers](#timers)
- [Conditions](#conditions)
- [Asyncio and Determinism](#asyncio-and-determinism)
- [Asyncio Cancellation](#asyncio-cancellation)
- [Workflow Utilities](#workflow-utilities)
- [Exceptions](#exceptions)
- [External Workflows](#external-workflows)
- [Testing](#testing)
- [Automatic Time Skipping](#automatic-time-skipping)
- [Manual Time Skipping](#manual-time-skipping)
- [Mocking Activities](#mocking-activities)
- [Workflow Sandbox](#workflow-sandbox)
- [How the Sandbox Works](#how-the-sandbox-works)
- [Avoiding the Sandbox](#avoiding-the-sandbox)
- [Customizing the Sandbox](#customizing-the-sandbox)
- [Passthrough Modules](#passthrough-modules)
- [Invalid Module Members](#invalid-module-members)
- [Known Sandbox Issues](#known-sandbox-issues)
- [Global Import/Builtins](#global-importbuiltins)
- [Sandbox is not Secure](#sandbox-is-not-secure)
- [Sandbox Performance](#sandbox-performance)
- [Extending Restricted Classes](#extending-restricted-classes)
- [Certain Standard Library Calls on Restricted Objects](#certain-standard-library-calls-on-restricted-objects)
- [is_subclass of ABC-based Restricted Classes](#is_subclass-of-abc-based-restricted-classes)
- [Compiled Pydantic Sometimes Using Wrong Types](#compiled-pydantic-sometimes-using-wrong-types)
- [Activities](#activities)
- [Definition](#definition-1)
- [Types of Activities](#types-of-activities)
- [Synchronous Activities](#synchronous-activities)
- [Synchronous Multithreaded Activities](#synchronous-multithreaded-activities)
- [Synchronous Multiprocess/Other Activities](#synchronous-multiprocessother-activities)
- [Asynchronous Activities](#asynchronous-activities)
- [Activity Context](#activity-context)
- [Heartbeating and Cancellation](#heartbeating-and-cancellation)
- [Worker Shutdown](#worker-shutdown)
- [Testing](#testing-1)
- [Workflow Replay](#workflow-replay)
- [OpenTelemetry Support](#opentelemetry-support)
- [Protobuf 3.x vs 4.x](#protobuf-3x-vs-4x)
- [Known Compatibility Issues](#known-compatibility-issues)
- [gevent Patching](#gevent-patching)
- [Development](#development)
- [Building](#building)
- [Prepare](#prepare)
- [Build](#build)
- [Use](#use)
- [Local SDK development environment](#local-sdk-development-environment)
- [Testing](#testing-2)
- [Proto Generation and Testing](#proto-generation-and-testing)
- [Style](#style)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
# Quick Start
We will guide you through the Temporal basics to create a "hello, world!" script on your machine. It is not intended as
one of the ways to use Temporal, but in reality it is very simplified and decidedly not "the only way" to use Temporal.
For more information, check out the docs references in "Next Steps" below the quick start.
## Installation
Install the `temporalio` package from [PyPI](https://pypi.org/project/temporalio).
These steps can be followed to use with a virtual environment and `pip`:
* [Create a virtual environment](https://packaging.python.org/en/latest/tutorials/installing-packages/#creating-virtual-environments)
* Update `pip` - `python -m pip install -U pip`
* Needed because older versions of `pip` may not pick the right wheel
* Install Temporal SDK - `python -m pip install temporalio`
The SDK is now ready for use. To build from source, see "Building" near the end of this documentation.
**NOTE: This README is for the current branch and not necessarily what's released on `PyPI`.**
## Implementing a Workflow
Create the following in `activities.py`:
```python
from temporalio import activity
@activity.defn
def say_hello(name: str) -> str:
return f"Hello, {name}!"
```
Create the following in `workflows.py`:
```python
from datetime import timedelta
from temporalio import workflow
# Import our activity, passing it through the sandbox
with workflow.unsafe.imports_passed_through():
from .activities import say_hello
@workflow.defn
class SayHello:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
say_hello, name, schedule_to_close_timeout=timedelta(seconds=5)
)
```
Create the following in `run_worker.py`:
```python
import asyncio
import concurrent.futures
from temporalio.client import Client
from temporalio.worker import Worker
# Import the activity and workflow from our other files
from .activities import say_hello
from .workflows import SayHello
async def main():
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")
# Run the worker
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[SayHello],
activities=[say_hello],
activity_executor=activity_executor,
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
```
Assuming you have a [Temporal server running on localhost](https://docs.temporal.io/docs/server/quick-install/), this
will run the worker:
python run_worker.py
## Running a Workflow
Create the following script at `run_workflow.py`:
```python
import asyncio
from temporalio.client import Client
# Import the workflow from the previous code
from .workflows import SayHello
async def main():
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")
# Execute a workflow
result = await client.execute_workflow(SayHello.run, "my name", id="my-workflow-id", task_queue="my-task-queue")
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
```
Assuming you have `run_worker.py` running from before, this will run the workflow:
python run_workflow.py
The output will be:
Result: Hello, my-name!
## Next Steps
Temporal can be implemented in your code in many different ways, to suit your application's needs. The links below will
give you much more information about how Temporal works with Python:
* [Code Samples](https://github.com/temporalio/samples-python) - If you want to start with some code, we have provided
some pre-built samples.
* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) Our Python specific
Developer's Guide will give you much more information on how to build with Temporal in your Python applications than
our SDK README ever could (or should).
* [API Documentation](https://python.temporal.io) - Full Temporal Python SDK package documentation.
---
# Usage
From here, you will find reference documentation about specific pieces of the Temporal Python SDK that were built around
Temporal concepts. *This section is not intended as a how-to guide* -- For more how-to oriented information, check out
the links in the [Next Steps](#next-steps) section above.
### Client
A client can be created and used to start a workflow like so:
```python
from temporalio.client import Client
async def main():
# Create client connected to server at the given address and namespace
client = await Client.connect("localhost:7233", namespace="my-namespace")
# Start a workflow
handle = await client.start_workflow(MyWorkflow.run, "some arg", id="my-workflow-id", task_queue="my-task-queue")
# Wait for result
result = await handle.result()
print(f"Result: {result}")
```
Some things to note about the above code:
* A `Client` does not have an explicit "close"
* To enable TLS, the `tls` argument to `connect` can be set to `True` or a `TLSConfig` object
* A single positional argument can be passed to `start_workflow`. If there are multiple arguments, only the
non-type-safe form of `start_workflow` can be used (i.e. the one accepting a string workflow name) and it must be in
the `args` keyword argument.
* The `handle` represents the workflow that was started and can be used for more than just getting the result
* Since we are just getting the handle and waiting on the result, we could have called `client.execute_workflow` which
does the same thing
* Clients can have many more options not shown here (e.g. data converters and interceptors)
* A string can be used instead of the method reference to call a workflow by name (e.g. if defined in another language)
* Clients do not work across forks
Clients also provide a shallow copy of their config for use in making slightly different clients backed by the same
connection. For instance, given the `client` above, this is how to have a client in another namespace:
```python
config = client.config()
config["namespace"] = "my-other-namespace"
other_ns_client = Client(**config)
```
#### Data Conversion
Data converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of type
`temporalio.converter.DataConverter` can be set via the `data_converter` client parameter. Data converters are a
combination of payload converters, payload codecs, and failure converters. Payload converters convert Python values
to/from serialized bytes. Payload codecs convert bytes to bytes (e.g. for compression or encryption). Failure converters
convert exceptions to/from serialized failures.
The default data converter supports converting multiple types including:
* `None`
* `bytes`
* `google.protobuf.message.Message` - As JSON when encoding, but has ability to decode binary proto from other languages
* Anything that can be converted to JSON including:
* Anything that [`json.dump`](https://docs.python.org/3/library/json.html#json.dump) supports natively
* [dataclasses](https://docs.python.org/3/library/dataclasses.html)
* Iterables including ones JSON dump may not support by default, e.g. `set`
* Any class with a `dict()` method and a static `parse_obj()` method, e.g.
[Pydantic models](https://pydantic-docs.helpmanual.io/usage/models)
* The default data converter is deprecated for Pydantic models and will warn if used since not all fields work.
See [this sample](https://github.com/temporalio/samples-python/tree/main/pydantic_converter) for the recommended
approach.
* [IntEnum, StrEnum](https://docs.python.org/3/library/enum.html) based enumerates
* [UUID](https://docs.python.org/3/library/uuid.html)
This notably doesn't include any `date`, `time`, or `datetime` objects as they may not work across SDKs.
Users are strongly encouraged to use a single `dataclass` for parameter and return types so fields with defaults can be
easily added without breaking compatibility.
Classes with generics may not have the generics properly resolved. The current implementation does not have generic
type resolution. Users should use concrete types.
##### Custom Type Data Conversion
For converting from JSON, the workflow/activity type hint is taken into account to convert to the proper type. Care has
been taken to support all common typings including `Optional`, `Union`, all forms of iterables and mappings, `NewType`,
etc in addition to the regular JSON values mentioned before.
Data converters contain a reference to a payload converter class that is used to convert to/from payloads/values. This
is a class and not an instance because it is instantiated on every workflow run inside the sandbox. The payload
converter is usually a `CompositePayloadConverter` which contains a multiple `EncodingPayloadConverter`s it uses to try
to serialize/deserialize payloads. Upon serialization, each `EncodingPayloadConverter` is tried until one succeeds. The
`EncodingPayloadConverter` provides an "encoding" string serialized onto the payload so that, upon deserialization, the
specific `EncodingPayloadConverter` for the given "encoding" is used.
The default data converter uses the `DefaultPayloadConverter` which is simply a `CompositePayloadConverter` with a known
set of default `EncodingPayloadConverter`s. To implement a custom encoding for a custom type, a new
`EncodingPayloadConverter` can be created for the new type. For example, to support `IPv4Address` types:
```python
class IPv4AddressEncodingPayloadConverter(EncodingPayloadConverter):
@property
def encoding(self) -> str:
return "text/ipv4-address"
def to_payload(self, value: Any) -> Optional[Payload]:
if isinstance(value, ipaddress.IPv4Address):
return Payload(
metadata={"encoding": self.encoding.encode()},
data=str(value).encode(),
)
else:
return None
def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any:
assert not type_hint or type_hint is ipaddress.IPv4Address
return ipaddress.IPv4Address(payload.data.decode())
class IPv4AddressPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Just add ours as first before the defaults
super().__init__(
IPv4AddressEncodingPayloadConverter(),
*DefaultPayloadConverter.default_encoding_payload_converters,
)
my_data_converter = dataclasses.replace(
DataConverter.default,
payload_converter_class=IPv4AddressPayloadConverter,
)
```
Imports are left off for brevity.
This is good for many custom types. However, sometimes you want to override the behavior of the just the existing JSON
encoding payload converter to support a new type. It is already the last encoding data converter in the list, so it's
the fall-through behavior for any otherwise unknown type. Customizing the existing JSON converter has the benefit of
making the type work in lists, unions, etc.
The `JSONPlainPayloadConverter` uses the Python [json](https://docs.python.org/3/library/json.html) library with an
advanced JSON encoder by default and a custom value conversion method to turn `json.load`ed values to their type hints.
The conversion can be customized for serialization with a custom `json.JSONEncoder` and deserialization with a custom
`JSONTypeConverter`. For example, to support `IPv4Address` types in existing JSON conversion:
```python
class IPv4AddressJSONEncoder(AdvancedJSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, ipaddress.IPv4Address):
return str(o)
return super().default(o)
class IPv4AddressJSONTypeConverter(JSONTypeConverter):
def to_typed_value(
self, hint: Type, value: Any
) -> Union[Optional[Any], _JSONTypeConverterUnhandled]:
if issubclass(hint, ipaddress.IPv4Address):
return ipaddress.IPv4Address(value)
return JSONTypeConverter.Unhandled
class IPv4AddressPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Replace default JSON plain with our own that has our encoder and type
# converter
json_converter = JSONPlainPayloadConverter(
encoder=IPv4AddressJSONEncoder,
custom_type_converters=[IPv4AddressJSONTypeConverter()],
)
super().__init__(
*[
c if not isinstance(c, JSONPlainPayloadConverter) else json_converter
for c in DefaultPayloadConverter.default_encoding_payload_converters
]
)
my_data_converter = dataclasses.replace(
DataConverter.default,
payload_converter_class=IPv4AddressPayloadConverter,
)
```
Now `IPv4Address` can be used in type hints including collections, optionals, etc.
### Workers
Workers host workflows and/or activities. Here's how to run a worker:
```python
import asyncio
import logging
from temporalio.client import Client
from temporalio.worker import Worker
# Import your own workflows and activities
from my_workflow_package import MyWorkflow, my_activity
async def run_worker(stop_event: asyncio.Event):
# Create client connected to server at the given address
client = await Client.connect("localhost:7233", namespace="my-namespace")
# Run the worker until the event is set
worker = Worker(client, task_queue="my-task-queue", workflows=[MyWorkflow], activities=[my_activity])
async with worker:
await stop_event.wait()
```
Some things to note about the above code:
* This creates/uses the same client that is used for starting workflows
* While this example accepts a stop event and uses `async with`, `run()` and `shutdown()` may be used instead
* Workers can have many more options not shown here (e.g. data converters and interceptors)
### Workflows
#### Definition
Workflows are defined as classes decorated with `@workflow.defn`. The method invoked for the workflow is decorated with
`@workflow.run`. Methods for signals, queries, and updates are decorated with `@workflow.signal`, `@workflow.query`
and `@workflow.update` respectively. Here's an example of a workflow:
```python
import asyncio
from datetime import timedelta
from temporalio import workflow
# Pass the activities through the sandbox
with workflow.unsafe.imports_passed_through():
from .my_activities import GreetingInfo, create_greeting_activity
@workflow.defn
class GreetingWorkflow:
def __init__(self) -> None:
self._current_greeting = "<unset>"
self._greeting_info = GreetingInfo()
self._greeting_info_update = asyncio.Event()
self._complete = asyncio.Event()
@workflow.run
async def run(self, name: str) -> str:
self._greeting_info.name = name
while True:
# Store greeting
self._current_greeting = await workflow.execute_activity(
create_greeting_activity,
self._greeting_info,
start_to_close_timeout=timedelta(seconds=5),
)
workflow.logger.debug("Greeting set to %s", self._current_greeting)
# Wait for salutation update or complete signal (this can be
# cancelled)
await asyncio.wait(
[
asyncio.create_task(self._greeting_info_update.wait()),
asyncio.create_task(self._complete.wait()),
],
return_when=asyncio.FIRST_COMPLETED,
)
if self._complete.is_set():
return self._current_greeting
self._greeting_info_update.clear()
@workflow.signal
async def update_salutation(self, salutation: str) -> None:
self._greeting_info.salutation = salutation
self._greeting_info_update.set()
@workflow.signal
async def complete_with_greeting(self) -> None:
self._complete.set()
@workflow.query
def current_greeting(self) -> str:
return self._current_greeting
@workflow.update
def set_and_get_greeting(self, greeting: str) -> str:
old = self._current_greeting
self._current_greeting = greeting
return old
```
This assumes there's an activity in `my_activities.py` like:
```python
from dataclasses import dataclass
from temporalio import workflow
@dataclass
class GreetingInfo:
salutation: str = "Hello"
name: str = "<unknown>"
@activity.defn
def create_greeting_activity(info: GreetingInfo) -> str:
return f"{info.salutation}, {info.name}!"
```
Some things to note about the above workflow code:
* Workflows run in a sandbox by default.
* Users are encouraged to define workflows in files with no side effects or other complicated code or unnecessary
imports to other third party libraries.
* Non-standard-library, non-`temporalio` imports should usually be "passed through" the sandbox. See the
[Workflow Sandbox](#workflow-sandbox) section for more details.
* This workflow continually updates the queryable current greeting when signalled and can complete with the greeting on
a different signal
* Workflows are always classes and must have a single `@workflow.run` which is an `async def` function
* Workflow code must be deterministic. This means no `set` iteration, threading, no randomness, no external calls to
processes, no network IO, and no global state mutation. All code must run in the implicit `asyncio` event loop and be
deterministic. Also see the [Asyncio and Determinism](#asyncio-and-determinism) section later.
* `@activity.defn` is explained in a later section. For normal simple string concatenation, this would just be done in
the workflow. The activity is for demonstration purposes only.
* `workflow.execute_activity(create_greeting_activity, ...` is actually a typed signature, and MyPy will fail if the
`self._greeting_info` parameter is not a `GreetingInfo`
Here are the decorators that can be applied:
* `@workflow.defn` - Defines a workflow class
* Must be defined on the class given to the worker (ignored if present on a base class)
* Can have a `name` param to customize the workflow name, otherwise it defaults to the unqualified class name
* Can have `dynamic=True` which means all otherwise unhandled workflows fall through to this. If present, cannot have
`name` argument, and run method must accept a single parameter of `Sequence[temporalio.common.RawValue]` type. The
payload of the raw value can be converted via `workflow.payload_converter().from_payload`.
* `@workflow.run` - Defines the primary workflow run method
* Must be defined on the same class as `@workflow.defn`, not a base class (but can _also_ be defined on the same
method of a base class)
* Exactly one method name must have this decorator, no more or less
* Must be defined on an `async def` method
* The method's arguments are the workflow's arguments
* The first parameter must be `self`, followed by positional arguments. Best practice is to only take a single
argument that is an object/dataclass of fields that can be added to as needed.
* `@workflow.signal` - Defines a method as a signal
* Can be defined on an `async` or non-`async` function at any hierarchy depth, but if decorated method is overridden,
the override must also be decorated
* The method's arguments are the signal's arguments
* Can have a `name` param to customize the signal name, otherwise it defaults to the unqualified method name
* Can have `dynamic=True` which means all otherwise unhandled signals fall through to this. If present, cannot have
`name` argument, and method parameters must be `self`, a string signal name, and a
`Sequence[temporalio.common.RawValue]`.
* Non-dynamic method can only have positional arguments. Best practice is to only take a single argument that is an
object/dataclass of fields that can be added to as needed.
* Return value is ignored
* `@workflow.query` - Defines a method as a query
* All the same constraints as `@workflow.signal` but should return a value
* Should not be `async`
* Temporal queries should never mutate anything in the workflow or call any calls that would mutate the workflow
* `@workflow.update` - Defines a method as an update
* May both accept as input and return a value
* May be `async` or non-`async`
* May mutate workflow state, and make calls to other workflow APIs like starting activities, etc.
* Also accepts the `name` and `dynamic` parameters like signals and queries, with the same semantics.
* Update handlers may optionally define a validator method by decorating it with `@update_handler_method.validator`.
To reject an update before any events are written to history, throw an exception in a validator. Validators cannot
be `async`, cannot mutate workflow state, and return nothing.
#### Running
To start a locally-defined workflow from a client, you can simply reference its method like so:
```python
from temporalio.client import Client
from my_workflow_package import GreetingWorkflow
async def create_greeting(client: Client) -> str:
# Start the workflow
handle = await client.start_workflow(GreetingWorkflow.run, "my name", id="my-workflow-id", task_queue="my-task-queue")
# Change the salutation
await handle.signal(GreetingWorkflow.update_salutation, "Aloha")
# Tell it to complete
await handle.signal(GreetingWorkflow.complete_with_greeting)
# Wait and return result
return await handle.result()
```
Some things to note about the above code:
* This uses the `GreetingWorkflow` from the previous section
* The result of calling this function is `"Aloha, my name!"`
* `id` and `task_queue` are required for running a workflow
* `client.start_workflow` is typed, so MyPy would fail if `"my name"` were something besides a string
* `handle.signal` is typed, so MyPy would fail if `"Aloha"` were something besides a string or if we provided a
parameter to the parameterless `complete_with_greeting`
* `handle.result` is typed to the workflow itself, so MyPy would fail if we said this `create_greeting` returned
something besides a string
#### Invoking Activities
* Activities are started with non-async `workflow.start_activity()` which accepts either an activity function reference
or a string name.
* A single argument to the activity is positional. Multiple arguments are not supported in the type-safe form of
start/execute activity and must be supplied via the `args` keyword argument.
* Activity options are set as keyword arguments after the activity arguments. At least one of `start_to_close_timeout`
or `schedule_to_close_timeout` must be provided.
* The result is an activity handle which is an `asyncio.Task` and supports basic task features
* An async `workflow.execute_activity()` helper is provided which takes the same arguments as
`workflow.start_activity()` and `await`s on the result. This should be used in most cases unless advanced task
capabilities are needed.
* Local activities work very similarly except the functions are `workflow.start_local_activity()` and
`workflow.execute_local_activity()`
* ⚠️Local activities are currently experimental
* Activities can be methods of a class. Invokers should use `workflow.start_activity_method()`,
`workflow.execute_activity_method()`, `workflow.start_local_activity_method()`, and
`workflow.execute_local_activity_method()` instead.
* Activities can callable classes (i.e. that define `__call__`). Invokers should use `workflow.start_activity_class()`,
`workflow.execute_activity_class()`, `workflow.start_local_activity_class()`, and
`workflow.execute_local_activity_class()` instead.
#### Invoking Child Workflows
* Child workflows are started with async `workflow.start_child_workflow()` which accepts either a workflow run method
reference or a string name. The arguments to the workflow are positional.
* A single argument to the child workflow is positional. Multiple arguments are not supported in the type-safe form of
start/execute child workflow and must be supplied via the `args` keyword argument.
* Child workflow options are set as keyword arguments after the arguments. At least `id` must be provided.
* The `await` of the start does not complete until the start has been accepted by the server
* The result is a child workflow handle which is an `asyncio.Task` and supports basic task features. The handle also has
some child info and supports signalling the child workflow
* An async `workflow.execute_child_workflow()` helper is provided which takes the same arguments as
`workflow.start_child_workflow()` and `await`s on the result. This should be used in most cases unless advanced task
capabilities are needed.
#### Timers
* A timer is represented by normal `asyncio.sleep()`
* Timers are also implicitly started on any `asyncio` calls with timeouts (e.g. `asyncio.wait_for`)
* Timers are Temporal server timers, not local ones, so sub-second resolution rarely has value
* Calls that use a specific point in time, e.g. `call_at` or `timeout_at`, should be based on the current loop time
(i.e. `workflow.time()`) and not an actual point in time. This is because fixed times are translated to relative ones
by subtracting the current loop time which may not be the actual current time.
#### Conditions
* `workflow.wait_condition` is an async function that doesn't return until a provided callback returns true
* A `timeout` can optionally be provided which will throw a `asyncio.TimeoutError` if reached (internally backed by
`asyncio.wait_for` which uses a timer)
#### Asyncio and Determinism
Workflows must be deterministic. Workflows are backed by a custom
[asyncio](https://docs.python.org/3/library/asyncio.html) event loop. This means many of the common `asyncio` calls work
as normal. Some asyncio features are disabled such as:
* Thread related calls such as `to_thread()`, `run_coroutine_threadsafe()`, `loop.run_in_executor()`, etc
* Calls that alter the event loop such as `loop.close()`, `loop.stop()`, `loop.run_forever()`,
`loop.set_task_factory()`, etc
* Calls that use anything external such as networking, subprocesses, disk IO, etc
Also, there are some `asyncio` utilities that internally use `set()` which can make them non-deterministic from one
worker to the next. Therefore the following `asyncio` functions have `workflow`-module alternatives that are
deterministic:
* `asyncio.as_completed()` - use `workflow.as_completed()`
* `asyncio.wait()` - use `workflow.wait()`
#### Asyncio Cancellation
Cancellation is done the same way as `asyncio`. Specifically, a task can be requested to be cancelled but does not
necessarily have to respect that cancellation immediately. This also means that `asyncio.shield()` can be used to
protect against cancellation. The following tasks, when cancelled, perform a Temporal cancellation:
* Activities - when the task executing an activity is cancelled, a cancellation request is sent to the activity
* Child workflows - when the task starting or executing a child workflow is cancelled, a cancellation request is sent to
cancel the child workflow
* Timers - when the task executing a timer is cancelled (whether started via sleep or timeout), the timer is cancelled
When the workflow itself is requested to cancel, `Task.cancel` is called on the main workflow task. Therefore,
`asyncio.CancelledError` can be caught in order to handle the cancel gracefully.
Workflows follow `asyncio` cancellation rules exactly which can cause confusion among Python developers. Cancelling a
task doesn't always cancel the thing it created. For example, given
`task = asyncio.create_task(workflow.start_child_workflow(...`, calling `task.cancel` does not cancel the child
workflow, it only cancels the starting of it, which has no effect if it has already started. However, cancelling the
result of `handle = await workflow.start_child_workflow(...` or
`task = asyncio.create_task(workflow.execute_child_workflow(...` _does_ cancel the child workflow.
Also, due to Temporal rules, a cancellation request is a state not an event. Therefore, repeated cancellation requests
are not delivered, only the first. If the workflow chooses swallow a cancellation, it cannot be requested again.
#### Workflow Utilities
While running in a workflow, in addition to features documented elsewhere, the following items are available from the
`temporalio.workflow` package:
* `continue_as_new()` - Async function to stop the workflow immediately and continue as new
* `info()` - Returns information about the current workflow
* `logger` - A logger for use in a workflow (properly skips logging on replay)
* `now()` - Returns the "current time" from the workflow's perspective
#### Exceptions
* Workflows/updates can raise exceptions to fail the workflow or the "workflow task" (i.e. suspend the workflow
in a retrying state).
* Exceptions that are instances of `temporalio.exceptions.FailureError` will fail the workflow with that exception
* For failing the workflow explicitly with a user exception, use `temporalio.exceptions.ApplicationError`. This can
be marked non-retryable or include details as needed.
* Other exceptions that come from activity execution, child execution, cancellation, etc are already instances of
`FailureError` and will fail the workflow when uncaught.
* All other exceptions fail the "workflow task" which means the workflow will continually retry until the workflow is
fixed. This is helpful for bad code or other non-predictable exceptions. To actually fail the workflow, use an
`ApplicationError` as mentioned above.
This default can be changed by providing a list of exception types to `workflow_failure_exception_types` when creating a
`Worker` or `failure_exception_types` on the `@workflow.defn` decorator. If a workflow-thrown exception is an instance
of any type in either list, it will fail the workflow instead of the task. This means a value of `[Exception]` will
cause every exception to fail the workflow instead of the task. Also, as a special case, if
`temporalio.workflow.NondeterminismError` (or any superclass of it) is set, non-deterministic exceptions will fail the
workflow. WARNING: These settings are experimental.
#### External Workflows
* `workflow.get_external_workflow_handle()` inside a workflow returns a handle to interact with another workflow
* `workflow.get_external_workflow_handle_for()` can be used instead for a type safe handle
* `await handle.signal()` can be called on the handle to signal the external workflow
* `await handle.cancel()` can be called on the handle to send a cancel to the external workflow
#### Testing
Workflow testing can be done in an integration-test fashion against a real server, however it is hard to simulate
timeouts and other long time-based code. Using the time-skipping workflow test environment can help there.
The time-skipping `temporalio.testing.WorkflowEnvironment` can be created via the static async `start_time_skipping()`.
This internally downloads the Temporal time-skipping test server to a temporary directory if it doesn't already exist,
then starts the test server which has special APIs for skipping time.
**NOTE:** The time-skipping test environment does not work on ARM. The SDK will try to download the x64 binary on macOS
for use with the Intel emulator, but for Linux or Windows ARM there is no proper time-skipping test server at this time.
##### Automatic Time Skipping
Anytime a workflow result is waited on, the time-skipping server automatically advances to the next event it can. To
manually advance time before waiting on the result of a workflow, the `WorkflowEnvironment.sleep` method can be used.
Here's a simple example of a workflow that sleeps for 24 hours:
```python
import asyncio
from temporalio import workflow
@workflow.defn
class WaitADayWorkflow:
@workflow.run
async def run(self) -> str:
await asyncio.sleep(24 * 60 * 60)
return "all done"
```
An integration test of this workflow would be way too slow. However the time-skipping server automatically skips to the
next event when we wait on the result. Here's a test for that workflow:
```python
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
async def test_wait_a_day_workflow():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(env.client, task_queue="tq1", workflows=[WaitADayWorkflow]):
assert "all done" == await env.client.execute_workflow(WaitADayWorkflow.run, id="wf1", task_queue="tq1")
```
That test will run almost instantly. This is because by calling `execute_workflow` on our client, we have asked the
environment to automatically skip time as much as it can (basically until the end of the workflow or until an activity
is run).
To disable automatic time-skipping while waiting for a workflow result, run code inside a
`with env.auto_time_skipping_disabled():` block.
##### Manual Time Skipping
Until a workflow is waited on, all time skipping in the time-skipping environment is done manually via
`WorkflowEnvironment.sleep`.
Here's workflow that waits for a signal or times out:
```python
import asyncio
from temporalio import workflow
@workflow.defn
class SignalWorkflow:
def __init__(self) -> None:
self.signal_received = False
@workflow.run
async def run(self) -> str:
# Wait for signal or timeout in 45 seconds
try:
await workflow.wait_condition(lambda: self.signal_received, timeout=45)
return "got signal"
except asyncio.TimeoutError:
return "got timeout"
@workflow.signal
def some_signal(self) -> None:
self.signal_received = True
```
To test a normal signal, you might:
```python
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
async def test_signal_workflow():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(env.client, task_queue="tq1", workflows=[SignalWorkflow]):
# Start workflow, send signal, check result
handle = await env.client.start_workflow(SignalWorkflow.run, id="wf1", task_queue="tq1")
await handle.signal(SignalWorkflow.some_signal)
assert "got signal" == await handle.result()
```
But how would you test the timeout part? Like so:
```python
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
async def test_signal_workflow_timeout():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(env.client, task_queue="tq1", workflows=[SignalWorkflow]):
# Start workflow, advance time past timeout, check result
handle = await env.client.start_workflow(SignalWorkflow.run, id="wf1", task_queue="tq1")
await env.sleep(50)
assert "got timeout" == await handle.result()
```
Also, the current time of the workflow environment can be obtained via the async `WorkflowEnvironment.get_current_time`
method.
##### Mocking Activities
Activities are just functions decorated with `@activity.defn`. Simply write different ones and pass those to the worker
to have different activities called during the test.
#### Workflow Sandbox
By default workflows are run in a sandbox to help avoid non-deterministic code. If a call that is known to be
non-deterministic is performed, an exception will be thrown in the workflow which will "fail the task" which means the
workflow will not progress until fixed.
The sandbox is not foolproof and non-determinism can still occur. It is simply a best-effort way to catch bad code
early. Users are encouraged to define their workflows in files with no other side effects.
The sandbox offers a mechanism to pass through modules from outside the sandbox. By default this already includes all
standard library modules and Temporal modules. **For performance and behavior reasons, users are encouraged to pass
through all third party modules whose calls will be deterministic.** This includes modules containing the activities to
be referenced in workflows. See "Passthrough Modules" below on how to do this.
If you are getting an error like:
> temporalio.worker.workflow_sandbox._restrictions.RestrictedWorkflowAccessError: Cannot access
> http.client.IncompleteRead.\_\_mro_entries\_\_ from inside a workflow. If this is code from a module not used in a
> workflow or known to only be used deterministically from a workflow, mark the import as pass through.
Then you are either using an invalid construct from the workflow, this is a known limitation of the sandbox, or most
commonly this is from a module that is safe to pass through (see "Passthrough Modules" section below).
##### How the Sandbox Works
The sandbox is made up of two components that work closely together:
* Global state isolation
* Restrictions preventing known non-deterministic library calls
Global state isolation is performed by using `exec`. Upon workflow start, the file that the workflow is defined in is
imported into a new sandbox created for that workflow run. In order to keep the sandbox performant a known set of
"passthrough modules" are passed through from outside of the sandbox when they are imported. These are expected to be
side-effect free on import and have their non-deterministic aspects restricted. By default the entire Python standard
library, `temporalio`, and a couple of other modules are passed through from outside of the sandbox. To update this
list, see "Customizing the Sandbox".
Restrictions preventing known non-deterministic library calls are achieved using proxy objects on modules wrapped around
the custom importer set in the sandbox. Many restrictions apply at workflow import time and workflow run time, while
some restrictions only apply at workflow run time. A default set of restrictions is included that prevents most
dangerous standard library calls. However it is known in Python that some otherwise-non-deterministic invocations, like
reading a file from disk via `open` or using `os.environ`, are done as part of importing modules. To customize what is
and isn't restricted, see "Customizing the Sandbox".
##### Avoiding the Sandbox
There are three increasingly-scoped ways to avoid the sandbox. Users are discouraged from avoiding the sandbox if
possible.
To remove restrictions around a particular block of code, use `with temporalio.workflow.unsafe.sandbox_unrestricted():`.
The workflow will still be running in the sandbox, but no restrictions for invalid library calls will be applied.
To run an entire workflow outside of a sandbox, set `sandboxed=False` on the `@workflow.defn` decorator when defining
it. This will run the entire workflow outside of the workflow which means it can share global state and other bad
things.
To disable the sandbox entirely for a worker, set the `Worker` init's `workflow_runner` keyword argument to
`temporalio.worker.UnsandboxedWorkflowRunner()`. This value is defaulted to
`temporalio.worker.workflow_sandbox.SandboxedWorkflowRunner()` so by changing it to the unsandboxed runner, the sandbox
will not be used at all.
##### Customizing the Sandbox
⚠️ WARNING: APIs in the `temporalio.worker.workflow_sandbox` module are not yet considered stable and may change in
future releases.
When creating the `Worker`, the `workflow_runner` is defaulted to
`temporalio.worker.workflow_sandbox.SandboxedWorkflowRunner()`. The `SandboxedWorkflowRunner`'s init accepts a
`restrictions` keyword argument that is defaulted to `SandboxRestrictions.default`. The `SandboxRestrictions` dataclass
is immutable and contains three fields that can be customized, but only two have notable value. See below.
###### Passthrough Modules
By default the sandbox completely reloads non-standard-library and non-Temporal modules for every workflow run. To make
the sandbox quicker and use less memory when importing known-side-effect-free third party modules, they can be marked
as passthrough modules.
**For performance and behavior reasons, users are encouraged to pass through all third party modules whose calls will be
deterministic.**
One way to pass through a module is at import time in the workflow file using the `imports_passed_through` context
manager like so:
```python
# my_workflow_file.py
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic
@workflow.defn
class MyWorkflow:
...
```
Alternatively, this can be done at worker creation time by customizing the runner's restrictions. For example:
```python
my_worker = Worker(
...,
workflow_runner=SandboxedWorkflowRunner(
restrictions=SandboxRestrictions.default.with_passthrough_modules("pydantic")
)
)
```
In both of these cases, now the `pydantic` module will be passed through from outside of the sandbox instead of
being reloaded for every workflow run.
###### Invalid Module Members
`SandboxRestrictions.invalid_module_members` contains a root matcher that applies to all module members. This already
has a default set which includes things like `datetime.date.today()` which should never be called from a workflow. To
remove this restriction:
```python
my_restrictions = dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=SandboxRestrictions.invalid_module_members_default.with_child_unrestricted(