Skip to content

Commit

Permalink
Merge pull request #8 from temporalio/python_worker
Browse files Browse the repository at this point in the history
Add python worker
  • Loading branch information
Quinn-With-Two-Ns committed Feb 23, 2023
2 parents 654a3ab + 9a4caba commit 547f54d
Show file tree
Hide file tree
Showing 10 changed files with 805 additions and 9 deletions.
42 changes: 41 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on: # rebuild any PRs and main branch changes
- main

jobs:
build-lint-test:
build-lint-test-go:
runs-on: ubuntu-latest
steps:
- name: Print build information
Expand Down Expand Up @@ -40,3 +40,43 @@ jobs:
env:
TEMPORAL_CLOUD_ADDRESS: sdk-ci.a2dd6.tmprl.cloud:7233
TEMPORAL_CLOUD_NAMESPACE: sdk-ci.a2dd6

build-lint-test-python:
runs-on: ubuntu-latest
steps:
- name: Print build information
run: "echo head_ref: ${{ github.head_ref }}, ref: ${{ github.ref }}, os: ${{ matrix.os }}"
- uses: actions/checkout@v2
- uses: actions/setup-python@v1
with:
python-version: '3.10'
- run: python -m pip install --upgrade wheel poetry poethepoet
- run: poetry install --no-root
- run: poe lint

- uses: actions/setup-go@v2
with:
go-version: "^1.19"
- run: go build ./cmd/omes

- name: Python all-in-one
run: ./omes all-in-one --scenario workflow_with_single_noop_activity --log-level debug --language py --start-local-server
- name: Download Cloud certs to temporary directory
run: |
mkdir /tmp/temporal-certs &&
echo "$TEMPORAL_CLIENT_CERT" > /tmp/temporal-certs/client.pem &&
echo "$TEMPORAL_CLIENT_KEY" > /tmp/temporal-certs/client.key &&
wc /tmp/temporal-certs/client.pem /tmp/temporal-certs/client.key
env:
TEMPORAL_CLIENT_CERT: ${{ secrets.TEMPORAL_CLIENT_CERT }}
TEMPORAL_CLIENT_KEY: ${{ secrets.TEMPORAL_CLIENT_KEY }}
- name: Python on Cloud namespace
run: |
./omes all-in-one --scenario workflow_with_single_noop_activity --log-level debug --language py \
--server-address $TEMPORAL_CLOUD_ADDRESS \
--namespace $TEMPORAL_CLOUD_NAMESPACE \
--tls-cert-path /tmp/temporal-certs/client.pem \
--tls-key-path /tmp/temporal-certs/client.key
env:
TEMPORAL_CLOUD_ADDRESS: sdk-ci.a2dd6.tmprl.cloud:7233
TEMPORAL_CLOUD_NAMESPACE: sdk-ci.a2dd6
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
omes
.vscode
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ Omes (pronounced oh-mess) is the Hebrew word for "load" (עומס).

## Prerequisites

- Go 1.19

- [Go](https://golang.org/) 1.19+
- [Node](https://nodejs.org) 16+
- [Python](https://www.python.org/) 3.10+
- [Poetry](https://python-poetry.org/): `poetry install`
(More TBD when we support workers in other languages)

## Installation
Expand Down
2 changes: 1 addition & 1 deletion components/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ func MustSetup(options *Options) *zap.SugaredLogger {

// AddCLIFlags adds the relevant flags to populate the options struct.
func AddCLIFlags(fs *pflag.FlagSet, options *Options, prefix string) {
fs.StringVar(&options.LogLevel, fmt.Sprintf("%s%s", prefix, components.OptionToFlagName(options, "LogLevel")), "info", "(debug info warn error dpanic panic fatal)")
fs.StringVar(&options.LogLevel, fmt.Sprintf("%s%s", prefix, components.OptionToFlagName(options, "LogLevel")), "info", "(debug info warn error panic fatal)")
fs.StringVar(&options.LogEncoding, fmt.Sprintf("%s%s", prefix, components.OptionToFlagName(options, "LogEncoding")), "console", "(console json)")
}
308 changes: 308 additions & 0 deletions poetry.lock

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
[tool.poetry]
name = "omes"
version = "0.1.0"
description = "Temporal load generator"
license = "MIT"
authors = ["Temporal Technologies Inc <sdk@temporal.io>"]
packages = [
{ include = "workers/python/**/*.py" },
]

[tool.poetry.dependencies]
python = "^3.10"
temporalio = "^1.0.0"
prometheus-client = "^0.16.0"
python-json-logger = "^2.0.7"

[tool.poetry.dev-dependencies]
mypy = "^0.961"
black = "^22.3.0"
isort = "^5.10.1"

[tool.poe.tasks]
format = [{cmd = "black ."}, {cmd = "isort ."}]
lint = [
{cmd = "black --check ."},
{cmd = "isort --check-only ."},
{ref = "lint-types"},
]
lint-types = "mypy --explicit-package-bases --namespace-packages ."

[tool.isort]
profile = "black"
skip_gitignore = true

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
78 changes: 73 additions & 5 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ func getIdentity() string {
return fmt.Sprintf("%s@%s", username, hostname)
}

func normalizeLangName(lang string) (string, error) {
switch lang {
case "go", "java", "ts", "py":
// Allow the full typescript or python word, but we need to match the file
// extension for the rest of run
case "typescript":
lang = "ts"
case "python":
lang = "py"
default:
return "", fmt.Errorf("invalid language %q, must be one of: go or java or ts or py", lang)
}
return lang, nil
}

// Cleanup cleans up all workflows associated with the task.
// Requires ElasticSearch.
// TODO(bergundy): This fails on Cloud, not sure why.
Expand Down Expand Up @@ -146,6 +161,38 @@ func (r *Runner) prepareWorker(ctx context.Context, options PrepareWorkerOptions
r.logger.Infof("Building go worker with %v", args)
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
return cmd.Run()
case "py":
os.Mkdir(options.Output, 0755)
pwd, _ := os.Getwd()
pyProjectTOML := fmt.Sprintf(`
[tool.poetry]
name = "omes-python-load-test-worker"
version = "0.1.0"
description = "Temporal Omes load testing framework worker"
authors = ["Temporal Technologies Inc <sdk@temporal.io>"]
[tool.poetry.dependencies]
python = "^3.10"
omes = { path = %q }
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"`, pwd)
if err := os.WriteFile(filepath.Join(options.Output, "pyproject.toml"), []byte(pyProjectTOML), 0644); err != nil {
return fmt.Errorf("failed writing pyproject.toml: %w", err)
}
args := []string{
"poetry",
"install",
"--no-root",
"--no-dev",
"-v",
}
r.logger.Infof("Building python worker with %v %v", args, options.Output)
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
cmd.Dir = options.Output
cmd.Stdin, cmd.Stdout, cmd.Stderr = os.Stdin, os.Stdout, os.Stderr
return cmd.Run()
default:
return fmt.Errorf("language not supported: '%s'", options.Language)
}
Expand Down Expand Up @@ -181,27 +228,48 @@ func (r *Runner) RunWorker(ctx context.Context, options WorkerOptions) error {
if gracefulShutdownDuration == 0 {
gracefulShutdownDuration = 30 * time.Second
}
language, err := normalizeLangName(options.Language)
if err != nil {
return fmt.Errorf("could not parse this language: %w", err)
}

switch options.Language {
workingDir := tmpDir
switch language {
case "go":
outputPath := filepath.Join(tmpDir, "worker")
if err := r.prepareWorker(ctx, PrepareWorkerOptions{Language: options.Language, Output: outputPath}); err != nil {
if err := r.prepareWorker(ctx, PrepareWorkerOptions{Language: language, Output: outputPath}); err != nil {
return err
}
args = []string{
outputPath,
"--task-queue", scenario.TaskQueueForRun(r.options.ScenarioName, r.options.RunID),
}
args = append(args, components.OptionsToFlags(&options.ClientOptions)...)
args = append(args, components.OptionsToFlags(&options.MetricsOptions)...)
args = append(args, components.OptionsToFlags(&options.LoggingOptions)...)
case "py":
outputPath := filepath.Join(tmpDir, "worker")
if err := r.prepareWorker(ctx, PrepareWorkerOptions{Language: language, Output: outputPath}); err != nil {
return err
}
workingDir = filepath.Join(tmpDir, "worker")
args = []string{
"poetry",
"run",
"python",
"-m",
"workers.python.main",
"--task-queue", scenario.TaskQueueForRun(r.options.ScenarioName, r.options.RunID),
}
default:
return fmt.Errorf("language not supported: '%s'", options.Language)
}
// Add common args
args = append(args, components.OptionsToFlags(&options.ClientOptions)...)
args = append(args, components.OptionsToFlags(&options.MetricsOptions)...)
args = append(args, components.OptionsToFlags(&options.LoggingOptions)...)

runErrorChan := make(chan error, 1)
// Inentionally not using CommandContext since we want to kill the worker gracefully (using SIGTERM).
cmd := exec.Command(args[0], args[1:]...)
cmd.Dir = workingDir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
r.logger.Infof("Starting worker with args: %v", args)
Expand Down
6 changes: 6 additions & 0 deletions workers/python/activities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from temporalio import activity


@activity.defn(name="noop")
async def noop_activity():
return
Loading

0 comments on commit 547f54d

Please sign in to comment.