Skip to content

Commit

Permalink
Worker versioning - add BuildIDs search attribute (#4284)
Browse files Browse the repository at this point in the history
Note: This commit came from a feature branch and is not expected to build.
  • Loading branch information
bergundy authored and dnr committed May 26, 2023
1 parent 6b4e3cc commit 53a4a3a
Show file tree
Hide file tree
Showing 34 changed files with 683 additions and 289 deletions.
160 changes: 80 additions & 80 deletions api/persistence/v1/task_queues.pb.go

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions common/dynamicconfig/constants.go
Expand Up @@ -181,10 +181,10 @@ const (
// queue. Update requests which would cause the versioning data to exceed this number will fail with a
// FailedPrecondition error.
VersionCompatibleSetLimitPerQueue = "limit.versionCompatibleSetLimitPerQueue"
// VersionBuildIDLimitPerQueue is the max number of build IDs allowed to be defined in the versioning data for a
// VersionBuildIdLimitPerQueue is the max number of build IDs allowed to be defined in the versioning data for a
// task queue. Update requests which would cause the versioning data to exceed this number will fail with a
// FailedPrecondition error.
VersionBuildIDLimitPerQueue = "limit.versionBuildIDLimitPerQueue"
VersionBuildIdLimitPerQueue = "limit.versionBuildIdLimitPerQueue"

// keys for frontend

Expand Down Expand Up @@ -637,6 +637,8 @@ const (
DefaultWorkflowRetryPolicy = "history.defaultWorkflowRetryPolicy"
// HistoryMaxAutoResetPoints is the key for max number of auto reset points stored in mutableState
HistoryMaxAutoResetPoints = "history.historyMaxAutoResetPoints"
// HistoryMaxTrackedBuildIds indicates the max number of build IDs to store in the BuildIds search attribute
HistoryMaxTrackedBuildIds = "history.maxTrackedBuildIds"
// EnableParentClosePolicy whether to ParentClosePolicy
EnableParentClosePolicy = "history.enableParentClosePolicy"
// ParentClosePolicyThreshold decides that parent close policy will be processed by sys workers(if enabled) if
Expand Down
2 changes: 2 additions & 0 deletions common/searchattribute/defs.go
Expand Up @@ -46,6 +46,7 @@ const (
StateTransitionCount = "StateTransitionCount"
TemporalChangeVersion = "TemporalChangeVersion"
BinaryChecksums = "BinaryChecksums"
BuildIds = "BuildIds"
BatcherNamespace = "BatcherNamespace"
BatcherUser = "BatcherUser"
HistorySizeBytes = "HistorySizeBytes"
Expand Down Expand Up @@ -88,6 +89,7 @@ var (
predefined = map[string]enumspb.IndexedValueType{
TemporalChangeVersion: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST,
BinaryChecksums: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST,
BuildIds: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST,
BatcherNamespace: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
BatcherUser: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
TemporalScheduledStartTime: enumspb.INDEXED_VALUE_TYPE_DATETIME,
Expand Down
Expand Up @@ -25,8 +25,8 @@ option go_package = "go.temporal.io/server/api/persistence/v1;persistence";

import "temporal/server/api/clock/v1/message.proto";

// BuildID is an identifier with a timestamped status used to identify workers for task queue versioning purposes.
message BuildID {
// BuildId is an identifier with a timestamped status used to identify workers for task queue versioning purposes.
message BuildId {
enum State {
STATE_UNSPECIFIED = 0;
STATE_ACTIVE = 1;
Expand All @@ -49,8 +49,8 @@ message CompatibleVersionSet {
// case a set might end up with more than one ID.
repeated string set_ids = 1;
// All the compatible versions, unordered except for the last element, which is considered the set "default".
repeated BuildID build_ids = 2;
// HLC timestamp representing when the set default was updated. Different from BuildID.state_update_timestamp, which
repeated BuildId build_ids = 2;
// HLC timestamp representing when the set default was updated. Different from BuildId.state_update_timestamp, which
// refers to the build ID status.
// (-- api-linter: core::0142::time-field-type=disabled
// aip.dev/not-precedent: Using HLC instead of wall clock. --)
Expand Down
2 changes: 1 addition & 1 deletion schema/elasticsearch/visibility/index_template_v7.json
@@ -0,0 +1,87 @@
{
"order": 0,
"index_patterns": ["temporal_visibility_v1*"],
"settings": {
"index": {
"number_of_shards": "1",
"number_of_replicas": "0",
"auto_expand_replicas": "0-2",
"search.idle.after": "365d",
"sort.field": ["CloseTime", "StartTime", "RunId"],
"sort.order": ["desc", "desc", "desc"],
"sort.missing": ["_first", "_first", "_first"]
}
},
"mappings": {
"dynamic": "false",
"properties": {
"NamespaceId": {
"type": "keyword"
},
"TemporalNamespaceDivision": {
"type": "keyword"
},
"WorkflowId": {
"type": "keyword"
},
"RunId": {
"type": "keyword"
},
"WorkflowType": {
"type": "keyword"
},
"StartTime": {
"type": "date_nanos"
},
"ExecutionTime": {
"type": "date_nanos"
},
"CloseTime": {
"type": "date_nanos"
},
"ExecutionDuration": {
"type": "long"
},
"ExecutionStatus": {
"type": "keyword"
},
"TaskQueue": {
"type": "keyword"
},
"TemporalChangeVersion": {
"type": "keyword"
},
"BatcherNamespace": {
"type": "keyword"
},
"BatcherUser": {
"type": "keyword"
},
"BinaryChecksums": {
"type": "keyword"
},
"HistoryLength": {
"type": "long"
},
"StateTransitionCount": {
"type": "long"
},
"TemporalScheduledStartTime": {
"type": "date_nanos"
},
"TemporalScheduledById": {
"type": "keyword"
},
"TemporalSchedulePaused": {
"type": "boolean"
},
"HistorySizeBytes": {
"type": "long"
},
"BuildIds": {
"type": "keyword"
}
}
},
"aliases": {}
}
54 changes: 54 additions & 0 deletions schema/elasticsearch/visibility/versioned/v5/upgrade.sh
@@ -0,0 +1,54 @@
#!/usr/bin/env bash

set -eu -o pipefail

# Prerequisites:
# - jq
# - curl

# Input parameters.
: "${ES_SCHEME:=http}"
: "${ES_SERVER:=127.0.0.1}"
: "${ES_PORT:=9200}"
: "${ES_USER:=}"
: "${ES_PWD:=}"
: "${ES_VERSION:=v7}"
: "${ES_VIS_INDEX_V1:=temporal_visibility_v1_dev}"
: "${AUTO_CONFIRM:=}"
: "${SLICES_COUNT:=auto}"

es_endpoint="${ES_SCHEME}://${ES_SERVER}:${ES_PORT}"

echo "=== Step 0. Sanity check if Elasticsearch index is accessible ==="

if ! curl --silent --fail --user "${ES_USER}":"${ES_PWD}" "${es_endpoint}/${ES_VIS_INDEX_V1}/_stats/docs" --write-out "\n"; then
echo "Elasticsearch index ${ES_VIS_INDEX_V1} is not accessible at ${es_endpoint}."
exit 1
fi

echo "=== Step 1. Add new builtin search attributes ==="

new_mapping='
{
"properties": {
"BuildIds": {
"type": "keyword"
}
}
}
'

if [ -z "${AUTO_CONFIRM}" ]; then
read -p "Add new builtin search attributes to the index ${ES_VIS_INDEX_V1}? (N/y)" -n 1 -r
echo
else
REPLY="y"
fi
if [ "${REPLY}" = "y" ]; then
curl --silent --fail --user "${ES_USER}":"${ES_PWD}" -X PUT "${es_endpoint}/${ES_VIS_INDEX_V1}/_mapping" -H "Content-Type: application/json" --data-binary "$new_mapping" | jq
# Wait for mapping changes to go through.
until curl --silent --user "${ES_USER}":"${ES_PWD}" "${es_endpoint}/_cluster/health/${ES_VIS_INDEX_V1}" | jq --exit-status '.status=="green" | .'; do
echo "Waiting for Elasticsearch index ${ES_VIS_INDEX_V1} become green."
sleep 1
done
fi
2 changes: 2 additions & 0 deletions schema/mysql/v8/visibility/schema.sql
Expand Up @@ -37,6 +37,7 @@ CREATE TABLE executions_visibility (
TemporalScheduledById VARCHAR(255) GENERATED ALWAYS AS (search_attributes->>"$.TemporalScheduledById"),
TemporalSchedulePaused BOOLEAN GENERATED ALWAYS AS (search_attributes->"$.TemporalSchedulePaused"),
TemporalNamespaceDivision VARCHAR(255) GENERATED ALWAYS AS (search_attributes->>"$.TemporalNamespaceDivision"),
BuildIds JSON GENERATED ALWAYS AS (search_attributes->"$.BuildIds"),

PRIMARY KEY (namespace_id, run_id)
);
Expand All @@ -53,6 +54,7 @@ CREATE INDEX by_task_queue ON executions_visibility (namespace_id, task
-- Indexes for the predefined search attributes
CREATE INDEX by_temporal_change_version ON executions_visibility (namespace_id, (CAST(TemporalChangeVersion AS CHAR(255) ARRAY)), (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
CREATE INDEX by_binary_checksums ON executions_visibility (namespace_id, (CAST(BinaryChecksums AS CHAR(255) ARRAY)), (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
CREATE INDEX by_build_ids ON executions_visibility (namespace_id, (CAST(BuildIds AS CHAR(255) ARRAY)), (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
CREATE INDEX by_batcher_user ON executions_visibility (namespace_id, BatcherUser, (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
CREATE INDEX by_temporal_scheduled_start_time ON executions_visibility (namespace_id, TemporalScheduledStartTime, (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
CREATE INDEX by_temporal_scheduled_by_id ON executions_visibility (namespace_id, TemporalScheduledById, (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
Expand Down
@@ -0,0 +1,2 @@
ALTER TABLE executions_visibility ADD COLUMN BuildIds JSON GENERATED ALWAYS AS (search_attributes->'BuildIds');
CREATE INDEX by_build_ids ON executions_visibility (namespace_id, (CAST(BuildIds AS CHAR(255) ARRAY)), (COALESCE(close_time, CAST('9999-12-31 23:59:59' AS DATETIME))) DESC, start_time DESC, run_id);
5 changes: 3 additions & 2 deletions schema/mysql/v8/visibility/versioned/v1.3/manifest.json
@@ -1,8 +1,9 @@
{
"CurrVersion": "1.3",
"MinCompatibleVersion": "0.1",
"Description": "add history size bytes",
"Description": "add history size bytes and build IDs visibility columns and indices",
"SchemaUpdateCqlFiles": [
"add_history_size_bytes.sql"
"add_history_size_bytes.sql",
"add_build_ids_search_attribute.sql"
]
}
2 changes: 2 additions & 0 deletions schema/postgresql/v12/visibility/schema.sql
Expand Up @@ -36,6 +36,7 @@ CREATE TABLE executions_visibility (
TemporalScheduledById VARCHAR(255) GENERATED ALWAYS AS (search_attributes->>'TemporalScheduledById') STORED,
TemporalSchedulePaused BOOLEAN GENERATED ALWAYS AS ((search_attributes->'TemporalSchedulePaused')::boolean) STORED,
TemporalNamespaceDivision VARCHAR(255) GENERATED ALWAYS AS (search_attributes->>'TemporalNamespaceDivision') STORED,
BuildIds JSONB GENERATED ALWAYS AS (search_attributes->'BuildIds') STORED,

-- Pre-allocated custom search attributes
Bool01 BOOLEAN GENERATED ALWAYS AS ((search_attributes->'Bool01')::boolean) STORED,
Expand Down Expand Up @@ -82,6 +83,7 @@ CREATE INDEX by_task_queue ON executions_visibility (namespace_id, task
-- Indexes for the predefined search attributes
CREATE INDEX by_temporal_change_version ON executions_visibility USING GIN (namespace_id, TemporalChangeVersion jsonb_path_ops);
CREATE INDEX by_binary_checksums ON executions_visibility USING GIN (namespace_id, BinaryChecksums jsonb_path_ops);
CREATE INDEX by_build_ids ON executions_visibility USING GIN (namespace_id, BuildIds jsonb_path_ops);
CREATE INDEX by_batcher_user ON executions_visibility (namespace_id, BatcherUser, (COALESCE(close_time, '9999-12-31 23:59:59')) DESC, start_time DESC, run_id);
CREATE INDEX by_temporal_scheduled_start_time ON executions_visibility (namespace_id, TemporalScheduledStartTime, (COALESCE(close_time, '9999-12-31 23:59:59')) DESC, start_time DESC, run_id);
CREATE INDEX by_temporal_scheduled_by_id ON executions_visibility (namespace_id, TemporalScheduledById, (COALESCE(close_time, '9999-12-31 23:59:59')) DESC, start_time DESC, run_id);
Expand Down
@@ -0,0 +1,2 @@
ALTER TABLE executions_visibility ADD COLUMN BuildIds JSONB GENERATED ALWAYS AS (search_attributes->'BuildIds') STORED;
CREATE INDEX by_build_ids ON executions_visibility USING GIN (namespace_id, BuildIds jsonb_path_ops);
5 changes: 3 additions & 2 deletions schema/postgresql/v12/visibility/versioned/v1.3/manifest.json
@@ -1,8 +1,9 @@
{
"CurrVersion": "1.3",
"MinCompatibleVersion": "0.1",
"Description": "add history size bytes",
"Description": "add history size bytes and build IDs visibility columns and indices",
"SchemaUpdateCqlFiles": [
"add_history_size_bytes.sql"
"add_history_size_bytes.sql",
"add_build_ids_search_attribute.sql"
]
}
10 changes: 10 additions & 0 deletions schema/sqlite/v3/visibility/schema.sql
Expand Up @@ -22,6 +22,7 @@ CREATE TABLE executions_visibility (
TemporalScheduledById VARCHAR(255) GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.TemporalScheduledById")),
TemporalSchedulePaused BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.TemporalSchedulePaused")),
TemporalNamespaceDivision VARCHAR(255) GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.TemporalNamespaceDivision")),
BuildIds TEXT GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.BuildIds")) STORED,

-- Pre-allocated custom search attributes
Bool01 BOOLEAN GENERATED ALWAYS AS (JSON_EXTRACT(search_attributes, "$.Bool01")),
Expand Down Expand Up @@ -114,6 +115,7 @@ CREATE VIRTUAL TABLE executions_visibility_fts_text USING fts5 (
CREATE VIRTUAL TABLE executions_visibility_fts_keyword_list USING fts5 (
TemporalChangeVersion,
BinaryChecksums,
BuildIds,
KeywordList01,
KeywordList02,
KeywordList03,
Expand All @@ -140,13 +142,15 @@ BEGIN
rowid,
TemporalChangeVersion,
BinaryChecksums,
BuildIds,
KeywordList01,
KeywordList02,
KeywordList03
) VALUES (
NEW.rowid,
NEW.TemporalChangeVersion,
NEW.BinaryChecksums,
NEW.BuildIds,
NEW.KeywordList01,
NEW.KeywordList02,
NEW.KeywordList03
Expand Down Expand Up @@ -175,6 +179,7 @@ BEGIN
rowid,
TemporalChangeVersion,
BinaryChecksums,
BuildIds,
KeywordList01,
KeywordList02,
KeywordList03
Expand All @@ -183,6 +188,7 @@ BEGIN
OLD.rowid,
OLD.TemporalChangeVersion,
OLD.BinaryChecksums,
OLD.BuildIds,
OLD.KeywordList01,
OLD.KeywordList02,
OLD.KeywordList03
Expand Down Expand Up @@ -222,6 +228,7 @@ BEGIN
rowid,
TemporalChangeVersion,
BinaryChecksums,
BuildIds,
KeywordList01,
KeywordList02,
KeywordList03
Expand All @@ -230,6 +237,7 @@ BEGIN
OLD.rowid,
OLD.TemporalChangeVersion,
OLD.BinaryChecksums,
OLD.BuildIds,
OLD.KeywordList01,
OLD.KeywordList02,
OLD.KeywordList03
Expand All @@ -238,13 +246,15 @@ BEGIN
rowid,
TemporalChangeVersion,
BinaryChecksums,
BuildIds,
KeywordList01,
KeywordList02,
KeywordList03
) VALUES (
NEW.rowid,
NEW.TemporalChangeVersion,
NEW.BinaryChecksums,
NEW.BuildIds,
NEW.KeywordList01,
NEW.KeywordList02,
NEW.KeywordList03
Expand Down
3 changes: 2 additions & 1 deletion service/frontend/errors.go
Expand Up @@ -80,7 +80,8 @@ var (
errCronAndStartDelaySet = serviceerror.NewInvalidArgument("CronSchedule and WorkflowStartDelay may not be used together.")
errInvalidWorkflowStartDelaySeconds = serviceerror.NewInvalidArgument("An invalid WorkflowStartDelaySeconds is set on request.")
errRaceConditionAddingSearchAttributes = serviceerror.NewUnavailable("Generated search attributes mapping unavailble.")
errUseVersioningWithoutBuildID = serviceerror.NewInvalidArgument("WorkerVersionStamp must be present if UseVersioning is true.")
errUseVersioningWithoutBuildId = serviceerror.NewInvalidArgument("WorkerVersionStamp must be present if UseVersioning is true.")
errBuildIdTooLong = serviceerror.NewInvalidArgument("Build ID exceeds configured limit.workerBuildIdSize, use a shorter build ID.")

errUpdateMetaNotSet = serviceerror.NewInvalidArgument("Update meta is not set on request.")
errUpdateInputNotSet = serviceerror.NewInvalidArgument("Update input is not set on request.")
Expand Down

0 comments on commit 53a4a3a

Please sign in to comment.