Skip to content

Commit

Permalink
Merge branch 'main' into yj/cache_default_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
yujieli-temporal committed Aug 8, 2023
2 parents 4b1e341 + a3a4ac5 commit eb9f9ee
Show file tree
Hide file tree
Showing 37 changed files with 1,134 additions and 195 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ ALL_SRC := $(shell find . -name "*.go")
ALL_SRC += go.mod
ALL_SCRIPTS := $(shell find . -name "*.sh")

MAIN_BRANCH = master
MERGE_BASE ?= $(shell git merge-base $(MAIN_BRANCH) HEAD)
MODIFIED_FILES := $(shell git diff --name-status $(MERGE_BASE) -- | cut -f2)
MAIN_BRANCH := main

TEST_DIRS := $(sort $(dir $(filter %_test.go,$(ALL_SRC))))
FUNCTIONAL_TEST_ROOT := ./tests
Expand Down Expand Up @@ -234,6 +232,8 @@ copyright:
@printf $(COLOR) "Fix license header..."
@go run ./cmd/tools/copyright/licensegen.go

goimports: MERGE_BASE ?= $(shell git merge-base $(MAIN_BRANCH) HEAD)
goimports: MODIFIED_FILES := $(shell git diff --name-status $(MERGE_BASE) -- | cut -f2)
goimports:
@printf $(COLOR) "Run goimports for modified files..."
@printf "Merge base: $(MERGE_BASE)\n"
Expand Down
20 changes: 8 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,11 @@ Learn more about Temporal at [docs.temporal.io](https://docs.temporal.io).
Execute the following commands to start a pre-built image along with all the dependencies.

```bash
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
docker-compose up
brew install temporal
temporal server start-dev
```

Refer to Temporal [docker-compose](https://github.com/temporalio/docker-compose) repo for more advanced options.

For more details on Docker images refer to [docker-builds](https://github.com/temporalio/docker-builds) repo.
Refer to [Temporal CLI](https://docs.temporal.io/cli/#installation) documentation for more installation options.

### Run the Samples

Expand All @@ -41,21 +38,20 @@ We have a number of [HelloWorld type scenarios](https://github.com/temporalio/sa

### Use CLI

Use [Temporal's command line tool](https://docs.temporal.io/tctl-v1) `tctl` to interact with the local Temporal server.
Use [Temporal CLI](https://docs.temporal.io/cli/) to interact with the running Temporal server.

```bash
alias tctl="docker exec temporal-admin-tools tctl"
tctl namespace list
tctl workflow list
temporal operator namespace list
temporal workflow list
```

### Use Temporal Web UI

Try [Temporal Web UI](https://github.com/temporalio/ui) by opening [http://localhost:8080](http://localhost:8080) for viewing your sample workflows executing on Temporal.
Try [Temporal Web UI](https://docs.temporal.io/web-ui) by opening [http://localhost:8233](http://localhost:8233) for viewing your sample workflows executing on Temporal.

## Repository

This repository contains the source code of the Temporal server. To implement Workflows, Activities and Workers, use [Go SDK](https://github.com/temporalio/sdk-go) or [Java SDK](https://github.com/temporalio/sdk-java).
This repository contains the source code of the Temporal server. To implement Workflows, Activities and Workers, use one of the [supported languages](https://docs.temporal.io/dev-guide/).

## Contributing

Expand Down
70 changes: 63 additions & 7 deletions common/archiver/s3store/visibility_archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"go.temporal.io/api/serviceerror"
workflowpb "go.temporal.io/api/workflow/v1"

"go.temporal.io/server/common/searchattribute"

Expand Down Expand Up @@ -156,6 +157,7 @@ func (v *visibilityArchiver) Archive(
handler.Counter(metrics.VisibilityArchiveSuccessCount.GetMetricName()).Record(1)
return nil
}

func createIndexesToArchive(request *archiverspb.VisibilityRecord) []indexToArchive {
return []indexToArchive{
{primaryIndexKeyWorkflowTypeName, request.WorkflowTypeName, secondaryIndexKeyCloseTimeout, timestamp.TimeValue(request.CloseTime)},
Expand Down Expand Up @@ -209,12 +211,58 @@ func (v *visibilityArchiver) queryAll(
request *archiver.QueryVisibilityRequest,
saTypeMap searchattribute.NameTypeMap,
) (*archiver.QueryVisibilityResponse, error) {
return v.queryPrefix(ctx, uri, &queryVisibilityRequest{
namespaceID: request.NamespaceID,
pageSize: request.PageSize,
nextPageToken: request.NextPageToken,
parsedQuery: &parsedQuery{},
}, saTypeMap, constructVisibilitySearchPrefix(uri.Path(), request.NamespaceID))
// remaining is the number of workflow executions left to return before we reach pageSize.
remaining := request.PageSize
nextPageToken := request.NextPageToken
var executions []*workflowpb.WorkflowExecutionInfo
// We need to loop because the number of workflow executions returned by each call to query may be fewer than
// pageSize. This is because we may have to skip some workflow executions after querying S3 (client-side filtering)
// because there are 2 entries in S3 for each workflow execution indexed by workflowTypeName (one for closeTimeout
// and one for startTimeout), and we only want to return one entry per workflow execution. See
// createIndexesToArchive for a list of all indexes.
for {
searchPrefix := constructVisibilitySearchPrefix(uri.Path(), request.NamespaceID)
// We suffix searchPrefix with workflowTypeName because the data in S3 is duplicated across combinations of 2
// different primary indices (workflowID and workflowTypeName) and 2 different secondary indices (closeTimeout
// and startTimeout). We only want to return one entry per workflow execution, but the full path to the S3 key
// is <primaryIndexKey>/<primaryIndexValue>/<secondaryIndexKey>/<secondaryIndexValue>/<runID>, and we don't have
// the primaryIndexValue when we make the call to query, so we can only specify the primaryIndexKey.
searchPrefix += "/" + primaryIndexKeyWorkflowTypeName
// The pageSize we supply here is actually the maximum number of keys to fetch from S3. For each execution,
// there should be 2 keys in S3 for this prefix, so you might think that we should multiply the pageSize by 2.
// However, if we do that, we may end up returning more than pageSize workflow executions to the end user of
// this API. This is because we aren't guaranteed that both keys for a given workflow execution will be returned
// in the same call. For example, if the user supplies a pageSize of 1, and we specify a maximum number of keys
// of 2 to S3, we may get back entries from S3 for 2 different workflow executions. You might think that we can
// just truncate this result to 1 workflow execution, but then the nextPageToken would be incorrect. So, we may
// need to make multiple calls to S3 to get the correct number of workflow executions, which will probably make
// this API call slower.
res, err := v.queryPrefix(ctx, uri, &queryVisibilityRequest{
namespaceID: request.NamespaceID,
pageSize: remaining,
nextPageToken: nextPageToken,
parsedQuery: &parsedQuery{},
}, saTypeMap, searchPrefix, func(key string) bool {
// We only want to return entries for the closeTimeout secondary index, which will always be of the form:
// .../closeTimeout/<closeTimeout>/<runID>, so we split the key on "/" and check that the third-to-last
// element is "closeTimeout".
elements := strings.Split(key, "/")
return len(elements) >= 3 && elements[len(elements)-3] == secondaryIndexKeyCloseTimeout
})
if err != nil {
return nil, err
}
nextPageToken = res.NextPageToken
executions = append(executions, res.Executions...)
remaining -= len(res.Executions)
if len(nextPageToken) == 0 || remaining <= 0 {
break
}
}
return &archiver.QueryVisibilityResponse{
Executions: executions,
NextPageToken: nextPageToken,
}, nil
}

func (v *visibilityArchiver) query(
Expand Down Expand Up @@ -260,15 +308,19 @@ func (v *visibilityArchiver) query(
)
}

return v.queryPrefix(ctx, URI, request, saTypeMap, prefix)
return v.queryPrefix(ctx, URI, request, saTypeMap, prefix, nil)
}

// queryPrefix returns all workflow executions in the archive that match the given prefix. The keyFilter function is an
// optional filter that can be used to further filter the results. If keyFilter returns false for a given key, that key
// will be skipped, and the object will not be downloaded from S3 or included in the results.
func (v *visibilityArchiver) queryPrefix(
ctx context.Context,
uri archiver.URI,
request *queryVisibilityRequest,
saTypeMap searchattribute.NameTypeMap,
prefix string,
keyFilter func(key string) bool,
) (*archiver.QueryVisibilityResponse, error) {
ctx, cancel := ensureContextTimeout(ctx)
defer cancel()
Expand Down Expand Up @@ -299,6 +351,10 @@ func (v *visibilityArchiver) queryPrefix(
response.NextPageToken = serializeQueryVisibilityToken(*results.NextContinuationToken)
}
for _, item := range results.Contents {
if keyFilter != nil && !keyFilter(*item.Key) {
continue
}

encodedRecord, err := Download(ctx, v.s3cli, uri, *item.Key)
if err != nil {
return nil, serviceerror.NewUnavailable(err.Error())
Expand Down
32 changes: 12 additions & 20 deletions common/archiver/s3store/visibility_archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,42 +396,34 @@ func (s *visibilityArchiverSuite) TestQuery_EmptyQuery_Pagination() {
uri, err := archiver.NewURI(testBucketURI)
s.NoError(err)

response := &archiver.QueryVisibilityResponse{
Executions: nil,
NextPageToken: nil,
}

limit := 10
executions := make(map[string]*workflowpb.WorkflowExecutionInfo, limit)
executions := make(map[string]*workflowpb.WorkflowExecutionInfo, len(s.visibilityRecords))
var nextPageToken []byte

for i := 0; i < limit; i++ {
for {
req := &archiver.QueryVisibilityRequest{
NamespaceID: testNamespaceID,
PageSize: 1,
NextPageToken: response.NextPageToken,
NextPageToken: nextPageToken,
Query: "",
}
response, err = arc.Query(context.Background(), uri, req, searchattribute.TestNameTypeMap)
response, err := arc.Query(context.Background(), uri, req, searchattribute.TestNameTypeMap)
s.NoError(err)
s.NotNil(response)
s.Len(response.Executions, 1)

if response.NextPageToken == nil {
break
}

nextPageToken = response.NextPageToken
for _, execution := range response.Executions {
key := execution.Execution.GetWorkflowId() +
"/" + execution.Execution.GetRunId() +
"/" + execution.CloseTime.String()
if executions[key] != nil {
s.Fail("duplicate key", key)
}
executions[key] = execution
}

if len(executions) > 1 {
return
if len(nextPageToken) == 0 {
break
}
}
s.Fail("there should be at least 2 unique executions across all pages")
s.Len(executions, len(s.visibilityRecords))
}

type precisionTest struct {
Expand Down
2 changes: 2 additions & 0 deletions common/authorization/frontend_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ var readOnlyNamespaceAPI = map[string]struct{}{
"ListScheduleMatchingTimes": {},
"DescribeBatchOperation": {},
"ListBatchOperations": {},
"GetWorkerBuildIdCompatibility": {},
"GetWorkerTaskReachability": {},
}

var readOnlyGlobalAPI = map[string]struct{}{
Expand Down
52 changes: 37 additions & 15 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1564,23 +1564,45 @@ var (
NamespaceReplicationEnqueueDLQCount = NewCounterDef("namespace_replication_dlq_enqueue_requests")
ParentClosePolicyProcessorSuccess = NewCounterDef("parent_close_policy_processor_requests")
ParentClosePolicyProcessorFailures = NewCounterDef("parent_close_policy_processor_errors")
ScheduleMissedCatchupWindow = NewCounterDef("schedule_missed_catchup_window")
ScheduleRateLimited = NewCounterDef("schedule_rate_limited")
ScheduleBufferOverruns = NewCounterDef("schedule_buffer_overruns")
ScheduleActionSuccess = NewCounterDef("schedule_action_success")
ScheduleActionErrors = NewCounterDef("schedule_action_errors")
ScheduleCancelWorkflowErrors = NewCounterDef("schedule_cancel_workflow_errors")
ScheduleTerminateWorkflowErrors = NewCounterDef("schedule_terminate_workflow_errors")
ScheduleMissedCatchupWindow = NewCounterDef(
"schedule_missed_catchup_window",
WithDescription("The number of times a schedule missed an action due to the configured catchup window"),
)
ScheduleRateLimited = NewCounterDef(
"schedule_rate_limited",
WithDescription("The number of times a schedule action was delayed by more than 1s due to rate limiting"),
)
ScheduleBufferOverruns = NewCounterDef(
"schedule_buffer_overruns",
WithDescription("The number of schedule actions that were dropped due to the action buffer being full"),
)
ScheduleActionSuccess = NewCounterDef(
"schedule_action_success",
WithDescription("The number of schedule actions that were successfully taken by a schedule"),
)
ScheduleActionErrors = NewCounterDef(
"schedule_action_errors",
WithDescription("The number of schedule actions that failed to start"),
)
ScheduleCancelWorkflowErrors = NewCounterDef(
"schedule_cancel_workflow_errors",
WithDescription("The number of times a schedule got an error trying to cancel a previous run"),
)
ScheduleTerminateWorkflowErrors = NewCounterDef(
"schedule_terminate_workflow_errors",
WithDescription("The number of times a schedule got an error trying to terminate a previous run"),
)

// Force replication
EncounterZombieWorkflowCount = NewCounterDef("encounter_zombie_workflow_count")
EncounterNotFoundWorkflowCount = NewCounterDef("encounter_not_found_workflow_count")
GenerateReplicationTasksLatency = NewTimerDef("generate_replication_tasks_latency")
VerifyReplicationTaskSuccess = NewCounterDef("verify_replication_task_success")
VerifyReplicationTaskNotFound = NewCounterDef("verify_replication_task_not_found")
VerifyReplicationTaskFailed = NewCounterDef("verify_replication_task_failed")
VerifyReplicationTasksLatency = NewTimerDef("verify_replication_tasks_latency")
VerifyDescribeMutableStateLatency = NewTimerDef("verify_describe_mutable_state_latency")
EncounterZombieWorkflowCount = NewCounterDef("encounter_zombie_workflow_count")
EncounterNotFoundWorkflowCount = NewCounterDef("encounter_not_found_workflow_count")
EncounterCloseToRetentionWorkflowCount = NewCounterDef("encounter_close_to_retention_workflow_count")
GenerateReplicationTasksLatency = NewTimerDef("generate_replication_tasks_latency")
VerifyReplicationTaskSuccess = NewCounterDef("verify_replication_task_success")
VerifyReplicationTaskNotFound = NewCounterDef("verify_replication_task_not_found")
VerifyReplicationTaskFailed = NewCounterDef("verify_replication_task_failed")
VerifyReplicationTasksLatency = NewTimerDef("verify_replication_tasks_latency")
VerifyDescribeMutableStateLatency = NewTimerDef("verify_describe_mutable_state_latency")

// Replication
NamespaceReplicationTaskAckLevelGauge = NewGaugeDef("namespace_replication_task_ack_level")
Expand Down
4 changes: 2 additions & 2 deletions develop/docker-compose/docker-compose.cdc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ version: "3.5"

services:
temporal-ui-standby:
image: temporalio/ui:2.11.2
image: temporalio/ui:2.17.1
container_name: temporal-dev-ui-standby
environment:
- TEMPORAL_UI_PORT=8081
temporal-ui-other:
image: temporalio/ui:2.11.2
image: temporalio/ui:2.17.1
container_name: temporal-dev-ui-other
environment:
- TEMPORAL_UI_PORT=8082
2 changes: 1 addition & 1 deletion develop/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ services:
volumes:
- ./grafana/provisioning/:/etc/grafana/provisioning/
temporal-ui:
image: temporalio/ui:2.11.2
image: temporalio/ui:2.17.1
container_name: temporal-dev-ui

networks:
Expand Down
9 changes: 7 additions & 2 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2477,7 +2477,7 @@ func (wh *WorkflowHandler) CountWorkflowExecutions(ctx context.Context, request
}

resp := &workflowservice.CountWorkflowExecutionsResponse{
Count: persistenceResp.Count,
Count: persistenceResp.Count,
Groups: persistenceResp.Groups,
}
return resp, nil
Expand Down Expand Up @@ -4155,7 +4155,12 @@ func (wh *WorkflowHandler) getHistory(
// noop
case *serviceerror.DataLoss:
// log event
wh.logger.Error("encountered data loss event", tag.WorkflowNamespaceID(namespaceID.String()), tag.WorkflowID(execution.GetWorkflowId()), tag.WorkflowRunID(execution.GetRunId()))
wh.logger.Error("encountered data loss event",
tag.WorkflowNamespaceID(namespaceID.String()),
tag.WorkflowID(execution.GetWorkflowId()),
tag.WorkflowRunID(execution.GetRunId()),
tag.Error(err),
)
return nil, nil, err
default:
return nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion service/history/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var (
// ErrQueryEnteredInvalidState is error indicating query entered invalid state
ErrQueryEnteredInvalidState = serviceerror.NewInvalidArgument("query entered invalid state, this should be impossible")
// ErrConsistentQueryBufferExceeded is error indicating that too many consistent queries have been buffered and until buffered queries are finished new consistent queries cannot be buffered
ErrConsistentQueryBufferExceeded = serviceerror.NewWorkflowNotReady("consistent query buffer is full, this may be caused by too many queries and workflow not able to process query fast enough")
ErrConsistentQueryBufferExceeded = serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, "consistent query buffer is full, this may be caused by too many queries and workflow not able to process query fast enough")
// ErrEmptyHistoryRawEventBatch indicate that one single batch of history raw events is of size 0
ErrEmptyHistoryRawEventBatch = serviceerror.NewInvalidArgument("encountered empty history batch")
// ErrHistorySizeExceedsLimit is error indicating workflow execution has exceeded system defined history size limit
Expand Down
Loading

0 comments on commit eb9f9ee

Please sign in to comment.