Skip to content

Commit

Permalink
Workflow implementation (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed May 27, 2022
1 parent 23aa7ae commit f1aa1c9
Show file tree
Hide file tree
Showing 30 changed files with 8,095 additions and 368 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ __pycache__
/dist
/docs/_autosummary
/docs/_build
/docs/_build_pydoctor
temporalio/api/*
!temporalio/api/__init__.py
temporalio/bridge/proto/*
Expand Down
367 changes: 323 additions & 44 deletions README.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Modules

temporalio.client
temporalio.worker
temporalio.workflow
temporalio.activity
temporalio.converter
temporalio.exceptions
Expand Down
427 changes: 400 additions & 27 deletions poetry.lock

Large diffs are not rendered by default.

19 changes: 16 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ black = "^21.12b0"
furo = "^2022.3.4"
grpcio-tools = "^1.43.0"
isort = "^5.10.1"
mypy = "^0.931"
mypy = "^0.950"
mypy-protobuf = "^3.2.0"
pydocstyle = "^6.1.1"
pydoctor = "^22.5.1"
pytest = "^7.1.1"
pytest-asyncio = "^0.18.0"
pytest-timeout = "^2.1.0"
Expand All @@ -56,14 +57,17 @@ build-bridge-develop = "python scripts/setup_bridge.py develop"
fix-wheel = "python scripts/fix_wheel.py"
format = [{cmd = "black ."}, {cmd = "isort ."}]
gen-docs = "sphinx-build docs docs/_build"
gen-docs-pydoctor = "pydoctor"
gen-protos = "python scripts/gen_protos.py"
lint = [
{cmd = "black --check ."},
{cmd = "isort --check-only ."},
{ref = "lint-types"},
{ref = "lint-docs"},
]
lint-docs = "pydocstyle"
# TODO(cretz): Why does pydocstyle complain about @overload missing docs after
# https://github.com/PyCQA/pydocstyle/pull/511?
lint-docs = "pydocstyle --ignore-decorators=overload"
lint-types = "mypy ."
test = "pytest"

Expand All @@ -86,7 +90,7 @@ asyncio_mode = "auto"
log_cli = true
log_cli_level = "INFO"
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
timeout = 60
timeout = 600
timeout_func_only = true

[tool.isort]
Expand All @@ -111,6 +115,15 @@ add_ignore = [
"D205", "D415"
]

[tool.pydoctor]
add-package = ["temporalio"]
docformat = "google"
html-output = "docs/_build_pydoctor"
intersphinx = ["https://docs.python.org/3/objects.inv", "https://googleapis.dev/python/protobuf/latest/objects.inv"]
privacy = ["PRIVATE:temporalio.bridge", "HIDDEN:**.*_pb2*"]
project-name = "Temporal"
sidebar-expand-depth = 40

[build-system]
build-backend = "poetry.core.masonry.api"
requires = ["poetry-core>=1.0.0", "setuptools", "wheel", "setuptools-rust"]
8 changes: 8 additions & 0 deletions scripts/gen_protos.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,17 @@ def fix_generated_output(base_path: Path):
f.write(content)
# Write init
with (base_path / "__init__.py").open("w") as f:
# Imports
message_names = []
for stem, messages in imports.items():
for message in messages:
f.write(f"from .{stem} import {message}\n")
message_names.append(message)
# __all__
if message_names:
f.write(
f'\n__all__ = [\n "' + '",\n "'.join(message_names) + '",\n]\n'
)


if __name__ == "__main__":
Expand Down
56 changes: 46 additions & 10 deletions temporalio/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import asyncio
import contextvars
import inspect
import logging
import threading
from dataclasses import dataclass
Expand Down Expand Up @@ -57,15 +58,8 @@ def defn(fn: Optional[ActivityFunc] = None, *, name: Optional[str] = None):
"""

def with_name(name: str, fn: ActivityFunc) -> ActivityFunc:
# Validate the activity
if not callable(fn):
raise TypeError("Activity is not callable")
elif not fn.__code__:
raise TypeError("Activity callable missing __code__")
elif fn.__code__.co_kwonlyargcount:
raise TypeError("Activity cannot have keyword-only arguments")
# Set the name
setattr(fn, "__temporal_activity_name", name)
# This performs validation
_Definition._apply_to_callable(fn, name)
return fn

# If name option is available, return decorator function
Expand Down Expand Up @@ -320,7 +314,7 @@ def process(
) -> Tuple[Any, MutableMapping[str, Any]]:
"""Override to add activity details."""
msg, kwargs = super().process(msg, kwargs)
if self.activity_info_on_extra or self.activity_info_on_extra:
if self.activity_info_on_message or self.activity_info_on_extra:
context = _current_context.get(None)
if context:
if self.activity_info_on_message:
Expand All @@ -342,3 +336,45 @@ def base_logger(self) -> logging.Logger:

#: Logger that will have contextual activity details embedded.
logger = LoggerAdapter(logging.getLogger(__name__), None)


@dataclass
class _Definition:
name: str
fn: Callable
is_async: bool

@staticmethod
def from_callable(fn: Callable) -> Optional[_Definition]:
return getattr(fn, "__temporal_activity_definition", None)

@staticmethod
def must_from_callable(fn: Callable) -> _Definition:
ret = _Definition.from_callable(fn)
if ret:
return ret
fn_name = getattr(fn, "__name__", "<unknown>")
raise TypeError(
f"Activity {fn_name} missing attributes, was it decorated with @activity.defn?"
)

@staticmethod
def _apply_to_callable(fn: Callable, activity_name: str) -> None:
# Validate the activity
if hasattr(fn, "__temporal_activity_definition"):
raise ValueError("Function already contains activity definition")
elif not callable(fn):
raise TypeError("Activity is not callable")
elif not fn.__code__:
raise TypeError("Activity callable missing __code__")
elif fn.__code__.co_kwonlyargcount:
raise TypeError("Activity cannot have keyword-only arguments")
setattr(
fn,
"__temporal_activity_definition",
_Definition(
name=activity_name,
fn=fn,
is_async=inspect.iscoroutinefunction(fn),
),
)
38 changes: 19 additions & 19 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions temporalio/bridge/proto/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
from .core_interface_pb2 import ActivityHeartbeat, ActivityTaskCompletion

__all__ = [
"ActivityHeartbeat",
"ActivityTaskCompletion",
]
2 changes: 1 addition & 1 deletion temporalio/bridge/sdk-core
Submodule sdk-core updated 69 files
+5 −4 .buildkite/docker/docker-compose.yaml
+1 −0 .gitignore
+1 −1 bridge-ffi/include/sdk-core-bridge.h
+9 −9 bridge-ffi/src/lib.rs
+14 −14 bridge-ffi/src/wrappers.rs
+4 −6 client/Cargo.toml
+129 −156 client/src/lib.rs
+66 −1 client/src/metrics.rs
+0 −171 client/src/mocks.rs
+6 −6 client/src/raw.rs
+51 −51 client/src/retry.rs
+1 −1 core-api/Cargo.toml
+1 −8 core-api/src/lib.rs
+9 −0 core-api/src/worker.rs
+2 −3 core/Cargo.toml
+2 −2 core/benches/workflow_replay.rs
+10 −3 core/src/abstractions.rs
+43 −50 core/src/core_tests/activity_tasks.rs
+10 −11 core/src/core_tests/child_workflows.rs
+12 −22 core/src/core_tests/determinism.rs
+29 −38 core/src/core_tests/local_activities.rs
+9 −9 core/src/core_tests/mod.rs
+293 −24 core/src/core_tests/queries.rs
+10 −40 core/src/core_tests/workers.rs
+262 −59 core/src/core_tests/workflow_tasks.rs
+36 −58 core/src/lib.rs
+11 −0 core/src/pending_activations.rs
+4 −6 core/src/pollers/mod.rs
+19 −14 core/src/pollers/poll_buffer.rs
+6 −20 core/src/replay/mod.rs
+1 −1 core/src/telemetry/mod.rs
+91 −45 core/src/test_help/mod.rs
+11 −8 core/src/worker/activities.rs
+28 −25 core/src/worker/activities/activity_heartbeat_manager.rs
+58 −6 core/src/worker/activities/local_activities.rs
+209 −0 core/src/worker/client.rs
+86 −0 core/src/worker/client/mocks.rs
+111 −51 core/src/worker/mod.rs
+14 −14 core/src/workflow/history_update.rs
+1 −1 core/src/workflow/machines/mod.rs
+14 −3 core/src/workflow/machines/workflow_machines.rs
+5 −2 core/src/workflow/mod.rs
+41 −2 core/src/workflow/workflow_tasks/cache_manager.rs
+86 −13 core/src/workflow/workflow_tasks/concurrency_manager.rs
+266 −127 core/src/workflow/workflow_tasks/mod.rs
+1 −1 protos/local/temporal/sdk/core/bridge/bridge.proto
+3 −7 sdk-core-protos/src/history_info.rs
+1 −0 sdk/Cargo.toml
+223 −0 sdk/src/activity_context.rs
+8 −2 sdk/src/interceptors.rs
+176 −134 sdk/src/lib.rs
+1 −1 test-utils/Cargo.toml
+3 −3 test-utils/src/histfetch.rs
+64 −57 test-utils/src/lib.rs
+13 −6 tests/integ_tests/client_tests.rs
+7 −4 tests/integ_tests/heartbeat_tests.rs
+4 −2 tests/integ_tests/polling_tests.rs
+23 −14 tests/integ_tests/queries_tests.rs
+98 −19 tests/integ_tests/workflow_tests.rs
+57 −34 tests/integ_tests/workflow_tests/activities.rs
+4 −5 tests/integ_tests/workflow_tests/cancel_wf.rs
+25 −8 tests/integ_tests/workflow_tests/continue_as_new.rs
+33 −17 tests/integ_tests/workflow_tests/local_activities.rs
+2 −28 tests/integ_tests/workflow_tests/replay.rs
+1 −1 tests/integ_tests/workflow_tests/resets.rs
+6 −3 tests/integ_tests/workflow_tests/timers.rs
+4 −4 tests/integ_tests/workflow_tests/upsert_search_attrs.rs
+78 −6 tests/load_tests.rs
+5 −5 tests/main.rs
12 changes: 6 additions & 6 deletions temporalio/bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use pyo3_asyncio::tokio::future_into_py;
use std::collections::HashMap;
use std::time::Duration;
use temporal_client::{
ConfiguredClient, RetryConfig, RetryGateway, ServerGatewayOptions, ServerGatewayOptionsBuilder,
TlsConfig, WorkflowService, WorkflowServiceClientWithMetrics,
ClientOptions, ClientOptionsBuilder, ConfiguredClient, RetryClient, RetryConfig, TlsConfig,
WorkflowService, WorkflowServiceClientWithMetrics,
};
use tonic;
use url::Url;

pyo3::create_exception!(temporal_sdk_bridge, RPCError, PyException);

type Client = RetryGateway<ConfiguredClient<WorkflowServiceClientWithMetrics>>;
type Client = RetryClient<ConfiguredClient<WorkflowServiceClientWithMetrics>>;

#[pyclass]
pub struct ClientRef {
Expand Down Expand Up @@ -51,7 +51,7 @@ struct ClientRetryConfig {

pub fn connect_client(py: Python, config: ClientConfig) -> PyResult<&PyAny> {
// TODO(cretz): Add metrics_meter?
let opts: ServerGatewayOptions = config.try_into()?;
let opts: ClientOptions = config.try_into()?;
future_into_py(py, async move {
Ok(ClientRef {
retry_client: opts.connect_no_namespace(None).await.map_err(|err| {
Expand Down Expand Up @@ -226,11 +226,11 @@ where
}
}

impl TryFrom<ClientConfig> for temporal_client::ServerGatewayOptions {
impl TryFrom<ClientConfig> for ClientOptions {
type Error = PyErr;

fn try_from(opts: ClientConfig) -> PyResult<Self> {
let mut gateway_opts = ServerGatewayOptionsBuilder::default();
let mut gateway_opts = ClientOptionsBuilder::default();
gateway_opts
.target_url(
Url::parse(&opts.target_url)
Expand Down
Loading

0 comments on commit f1aa1c9

Please sign in to comment.