Skip to content

Commit

Permalink
Add schedule-related search attributes (#2953)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnr committed Jun 6, 2022
1 parent 96a0bcd commit bef2790
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 11 deletions.
20 changes: 16 additions & 4 deletions common/searchattribute/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ const (
BatcherNamespace = "BatcherNamespace"
BatcherUser = "BatcherUser"

// added to workflows started by a schedule
TemporalScheduledStartTime = "TemporalScheduledStartTime"
TemporalScheduledById = "TemporalScheduledById"

// used by scheduler workflow
TemporalSchedulePaused = "TemporalSchedulePaused"
TemporalScheduleInfoJSON = "TemporalScheduleInfoJSON"

MemoEncoding = "MemoEncoding"
Memo = "Memo"
VisibilityTaskKey = "VisibilityTaskKey"
Expand All @@ -74,10 +82,14 @@ var (

// predefined are internal search attributes which are passed and stored in SearchAttributes object together with custom search attributes.
predefined = map[string]enumspb.IndexedValueType{
TemporalChangeVersion: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
BinaryChecksums: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
BatcherNamespace: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
BatcherUser: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
TemporalChangeVersion: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
BinaryChecksums: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
BatcherNamespace: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
BatcherUser: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
TemporalScheduledStartTime: enumspb.INDEXED_VALUE_TYPE_DATETIME,
TemporalScheduledById: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
TemporalSchedulePaused: enumspb.INDEXED_VALUE_TYPE_BOOL,
TemporalScheduleInfoJSON: enumspb.INDEXED_VALUE_TYPE_KEYWORD,
}

// reserved are internal field names that can't be used as search attribute names.
Expand Down
2 changes: 1 addition & 1 deletion schema/elasticsearch/visibility/index_template_v6.json
2 changes: 1 addition & 1 deletion schema/elasticsearch/visibility/index_template_v7.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
{
"order": 0,
"index_patterns": [
"temporal_visibility_v1*"
],
"settings": {
"index": {
"number_of_shards": "1",
"number_of_replicas": "0",
"auto_expand_replicas": "0-2",
"sort.field": [ "CloseTime", "StartTime", "RunId" ],
"sort.order": [ "desc", "desc", "desc" ],
"sort.missing": [ "_first", "_first", "_first" ]
}
},
"mappings": {
"_doc": {
"dynamic": "false",
"properties": {
"NamespaceId": {
"type": "keyword"
},
"WorkflowId": {
"type": "keyword"
},
"RunId": {
"type": "keyword"
},
"WorkflowType": {
"type": "keyword"
},
"StartTime": {
"type": "date"
},
"ExecutionTime": {
"type": "date"
},
"CloseTime": {
"type": "date"
},
"ExecutionDuration": {
"type": "long"
},
"ExecutionStatus": {
"type": "keyword"
},
"TaskQueue": {
"type": "keyword"
},
"TemporalChangeVersion": {
"type": "keyword"
},
"BatcherNamespace": {
"type": "keyword"
},
"BatcherUser": {
"type": "keyword"
},
"BinaryChecksums": {
"type": "keyword"
},
"HistoryLength": {
"type": "long"
},
"StateTransitionCount": {
"type": "long"
},
"TemporalScheduledStartTime": {
"type": "date"
},
"TemporalScheduledById": {
"type": "keyword"
},
"TemporalSchedulePaused": {
"type": "boolean"
},
"TemporalScheduleInfoJSON": {
"type": "keyword"
}
}
}
},
"aliases": {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
{
"order": 0,
"index_patterns": [
"temporal_visibility_v1*"
],
"settings": {
"index": {
"number_of_shards": "1",
"number_of_replicas": "0",
"auto_expand_replicas": "0-2",
"search.idle.after": "365d",
"sort.field": [ "CloseTime", "StartTime", "RunId" ],
"sort.order": [ "desc", "desc", "desc" ],
"sort.missing": [ "_first", "_first", "_first" ]
}
},
"mappings": {
"dynamic": "false",
"properties": {
"NamespaceId": {
"type": "keyword"
},
"WorkflowId": {
"type": "keyword"
},
"RunId": {
"type": "keyword"
},
"WorkflowType": {
"type": "keyword"
},
"StartTime": {
"type": "date_nanos"
},
"ExecutionTime": {
"type": "date_nanos"
},
"CloseTime": {
"type": "date_nanos"
},
"ExecutionDuration": {
"type": "long"
},
"ExecutionStatus": {
"type": "keyword"
},
"TaskQueue": {
"type": "keyword"
},
"TemporalChangeVersion": {
"type": "keyword"
},
"BatcherNamespace": {
"type": "keyword"
},
"BatcherUser": {
"type": "keyword"
},
"BinaryChecksums": {
"type": "keyword"
},
"HistoryLength": {
"type": "long"
},
"StateTransitionCount": {
"type": "long"
},
"TemporalScheduledStartTime": {
"type": "date_nanos"
},
"TemporalScheduledById": {
"type": "keyword"
},
"TemporalSchedulePaused": {
"type": "boolean"
},
"TemporalScheduleInfoJSON": {
"type": "keyword"
}
}
},
"aliases": {}
}
68 changes: 68 additions & 0 deletions schema/elasticsearch/visibility/versioned/v2/upgrade.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env bash

set -eu -o pipefail

# Prerequisites:
# - jq
# - curl

# Input parameters.
: "${ES_SCHEME:=http}"
: "${ES_SERVER:=127.0.0.1}"
: "${ES_PORT:=9200}"
: "${ES_USER:=}"
: "${ES_PWD:=}"
: "${ES_VERSION:=v7}"
: "${ES_VIS_INDEX_V1:=temporal_visibility_v1_dev}"
: "${AUTO_CONFIRM:=}"
: "${SLICES_COUNT:=auto}"

es_endpoint="${ES_SCHEME}://${ES_SERVER}:${ES_PORT}"

echo "=== Step 0. Sanity check if Elasticsearch index is accessible ==="

if ! curl --silent --fail --user "${ES_USER}":"${ES_PWD}" "${es_endpoint}/${ES_VIS_INDEX_V1}/_stats/docs" --write-out "\n"; then
echo "Elasticsearch index ${ES_VIS_INDEX_V1} is not accessible at ${es_endpoint}."
exit 1
fi

echo "=== Step 1. Add new builtin search attributes ==="

case $ES_VERSION in
v6) date_type='date' ; doc_type='/_doc' ;;
*) date_type='date_nanos' ; doc_type='' ;;
esac

new_mapping='
{
"properties": {
"TemporalScheduledStartTime": {
"type": "'$date_type'"
},
"TemporalScheduledById": {
"type": "keyword"
},
"TemporalSchedulePaused": {
"type": "boolean"
},
"TemporalScheduleInfoJSON": {
"type": "keyword"
}
}
}
'

if [ -z "${AUTO_CONFIRM}" ]; then
read -p "Add new builtin search attributes to the index ${ES_VIS_INDEX_V1}? (N/y)" -n 1 -r
echo
else
REPLY="y"
fi
if [ "${REPLY}" = "y" ]; then
curl --silent --user "${ES_USER}":"${ES_PWD}" -X PUT "${es_endpoint}/${ES_VIS_INDEX_V1}${doc_type}/_mapping" -H "Content-Type: application/json" --data-binary "$new_mapping" | jq
# Wait for mapping changes to go through.
until curl --silent --user "${ES_USER}":"${ES_PWD}" "${es_endpoint}/_cluster/health/${ES_VIS_INDEX_V1}" | jq --exit-status '.status=="green" | .'; do
echo "Waiting for Elasticsearch index ${ES_VIS_INDEX_V1} become green."
sleep 1
done
fi
8 changes: 3 additions & 5 deletions service/worker/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
schedspb "go.temporal.io/server/api/schedule/v1"
"go.temporal.io/server/common/payload"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/common/searchattribute"
)

const (
Expand All @@ -64,9 +65,6 @@ const (
// Maximum number of times to list per ListMatchingTimes query. (This is used only in a
// query so it can be changed without breaking history.)
maxListMatchingTimesCount = 1000

searchAttrStartTime = "TemporalScheduledStartTime"
searchAttrScheduleById = "TemporalScheduledById"
)

type (
Expand Down Expand Up @@ -744,10 +742,10 @@ func (s *scheduler) addSearchAttrs(
) *commonpb.SearchAttributes {
fields := maps.Clone(attrs.GetIndexedFields())
if p, err := payload.Encode(nominal); err == nil {
fields[searchAttrStartTime] = p
fields[searchattribute.TemporalScheduledStartTime] = p
}
if p, err := payload.Encode(s.State.ScheduleId); err == nil {
fields[searchAttrScheduleById] = p
fields[searchattribute.TemporalScheduledById] = p
}
return &commonpb.SearchAttributes{
IndexedFields: fields,
Expand Down

0 comments on commit bef2790

Please sign in to comment.