Skip to content

Commit

Permalink
Merge branch 'master' into mutable-state-size-limit
Browse files Browse the repository at this point in the history
  • Loading branch information
pdoerner committed May 24, 2023
2 parents 4163ba6 + 68856fe commit e059493
Show file tree
Hide file tree
Showing 15 changed files with 224 additions and 19 deletions.
1 change: 1 addition & 0 deletions common/metrics/metric_defs.go
Expand Up @@ -1617,6 +1617,7 @@ var (
ElasticsearchBulkProcessorWaitAddLatency = NewTimerDef("elasticsearch_bulk_processor_wait_add_latency")
ElasticsearchBulkProcessorWaitStartLatency = NewTimerDef("elasticsearch_bulk_processor_wait_start_latency")
ElasticsearchBulkProcessorBulkSize = NewDimensionlessHistogramDef("elasticsearch_bulk_processor_bulk_size")
ElasticsearchBulkProcessorBulkResquestTookLatency = NewTimerDef("elasticsearch_bulk_processor_bulk_request_took_latency")
ElasticsearchDocumentParseFailuresCount = NewCounterDef("elasticsearch_document_parse_failures_counter")
ElasticsearchDocumentGenerateFailuresCount = NewCounterDef("elasticsearch_document_generate_failures_counter")
ElasticsearchCustomOrderByClauseCount = NewCounterDef("elasticsearch_custom_order_by_clause_counter")
Expand Down
26 changes: 21 additions & 5 deletions common/metrics/tally_metrics_handler.go
Expand Up @@ -78,28 +78,44 @@ func (tmp *tallyMetricsHandler) WithTags(tags ...Tag) Handler {
// Counter obtains a counter for the given name and MetricOptions.
func (tmp *tallyMetricsHandler) Counter(counter string) CounterIface {
return CounterFunc(func(i int64, t ...Tag) {
tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags)).Counter(counter).Inc(i)
scope := tmp.scope
if len(t) > 0 {
scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags))
}
scope.Counter(counter).Inc(i)
})
}

// Gauge obtains a gauge for the given name and MetricOptions.
func (tmp *tallyMetricsHandler) Gauge(gauge string) GaugeIface {
return GaugeFunc(func(f float64, t ...Tag) {
tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags)).Gauge(gauge).Update(f)
scope := tmp.scope
if len(t) > 0 {
scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags))
}
scope.Gauge(gauge).Update(f)
})
}

// Timer obtains a timer for the given name and MetricOptions.
func (tmp *tallyMetricsHandler) Timer(timer string) TimerIface {
return TimerFunc(func(d time.Duration, tag ...Tag) {
tmp.scope.Tagged(tagsToMap(tag, tmp.excludeTags)).Timer(timer).Record(d)
return TimerFunc(func(d time.Duration, t ...Tag) {
scope := tmp.scope
if len(t) > 0 {
scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags))
}
scope.Timer(timer).Record(d)
})
}

// Histogram obtains a histogram for the given name and MetricOptions.
func (tmp *tallyMetricsHandler) Histogram(histogram string, unit MetricUnit) HistogramIface {
return HistogramFunc(func(i int64, t ...Tag) {
tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags)).Histogram(histogram, tmp.perUnitBuckets[unit]).RecordValue(float64(i))
scope := tmp.scope
if len(t) > 0 {
scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags))
}
scope.Histogram(histogram, tmp.perUnitBuckets[unit]).RecordValue(float64(i))
})
}

Expand Down
Expand Up @@ -232,6 +232,10 @@ func (p *processorImpl) bulkBeforeAction(_ int64, requests []elastic.BulkableReq

// bulkAfterAction is triggered after bulk processor commit
func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
// Record how long the Elasticsearch took to process the bulk request.
p.metricsHandler.Timer(metrics.ElasticsearchBulkProcessorBulkResquestTookLatency.GetMetricName()).
Record(time.Duration(response.Took) * time.Millisecond)

if err != nil {
const logFirstNRequests = 5
var httpStatus int
Expand Down
Expand Up @@ -288,6 +288,7 @@ func (s *processorSuite) TestBulkAfterAction_Ack() {
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricUnit(),
).Return(queuedRequestHistogram)
queuedRequestHistogram.EXPECT().Record(int64(0))
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorBulkResquestTookLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorRequestLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
mapVal := newAckFuture()
s.esProcessor.mapToAckFuture.Put(testKey, mapVal)
Expand Down Expand Up @@ -337,6 +338,7 @@ func (s *processorSuite) TestBulkAfterAction_Nack() {
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricUnit(),
).Return(queuedRequestHistogram)
queuedRequestHistogram.EXPECT().Record(int64(0))
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorBulkResquestTookLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorRequestLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
mapVal := newAckFuture()
s.esProcessor.mapToAckFuture.Put(testKey, mapVal)
Expand Down Expand Up @@ -377,6 +379,7 @@ func (s *processorSuite) TestBulkAfterAction_Error() {
Items: []map[string]*elastic.BulkResponseItem{mFailed},
}

s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorBulkResquestTookLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
counterMetric := metrics.NewMockCounterIface(s.controller)
s.mockMetricHandler.EXPECT().Counter(metrics.ElasticsearchBulkProcessorFailures.GetMetricName()).Return(counterMetric)
counterMetric.EXPECT().Record(int64(1), metrics.HttpStatusTag(400))
Expand Down Expand Up @@ -595,6 +598,7 @@ func (s *processorSuite) Test_End2End() {
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorWaitStartLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc).Times(docsCount)
s.esProcessor.bulkBeforeAction(0, bulkIndexRequests)

s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorBulkResquestTookLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorRequestLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc).Times(docsCount)
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorCommitLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc).Times(docsCount)
s.esProcessor.bulkAfterAction(0, bulkIndexRequests, bulkIndexResponse, nil)
Expand Down
9 changes: 6 additions & 3 deletions common/util.go
Expand Up @@ -332,9 +332,12 @@ func IsServiceClientTransientError(err error) bool {
return true
}

switch err.(type) {
case *serviceerror.ResourceExhausted,
*serviceerrors.ShardOwnershipLost:
switch err := err.(type) {
case *serviceerror.ResourceExhausted:
if err.Cause != enumspb.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW {
return true
}
case *serviceerrors.ShardOwnershipLost:
return true
}
return false
Expand Down
50 changes: 50 additions & 0 deletions develop/buildkite/docker-compose-os2.yml
@@ -0,0 +1,50 @@
version: "3.5"

services:
cassandra:
image: cassandra:3.11
networks:
services-network:
aliases:
- cassandra

opensearch:
image: opensearchproject/opensearch:2.6.0
networks:
services-network:
aliases:
- opensearch
environment:
- discovery.type=single-node
- plugins.security.disabled=true

integration-test-cassandra:
build:
context: ../..
dockerfile: ./develop/buildkite/Dockerfile
environment:
- "CASSANDRA_SEEDS=cassandra"
- "ES_SEEDS=opensearch"
- "ES_VERSION=v8"
- "PERSISTENCE_TYPE=nosql"
- "PERSISTENCE_DRIVER=cassandra"
- "TEMPORAL_VERSION_CHECK_DISABLED=1"
- BUILDKITE_AGENT_ACCESS_TOKEN
- BUILDKITE_JOB_ID
- BUILDKITE_BUILD_ID
- BUILDKITE_BUILD_NUMBER
depends_on:
- cassandra
- opensearch
volumes:
- ../..:/temporal
- /usr/bin/buildkite-agent:/usr/bin/buildkite-agent
networks:
services-network:
aliases:
- integration-test

networks:
services-network:
name: services-network
driver: bridge
15 changes: 15 additions & 0 deletions develop/buildkite/pipeline.yml
Expand Up @@ -65,6 +65,21 @@ steps:
run: integration-test-cassandra
config: ./develop/buildkite/docker-compose-es8.yml

- label: ":golang: functional test with cassandra (OpenSearch 2)"
agents:
queue: "default"
docker: "*"
command: "make functional-test-coverage"
artifact_paths:
- ".coverage/*.out"
retry:
automatic:
limit: 1
plugins:
- docker-compose#v3.8.0:
run: integration-test-cassandra
config: ./develop/buildkite/docker-compose-os2.yml

- label: ":golang: functional xdc test with cassandra"
agents:
queue: "default"
Expand Down
4 changes: 4 additions & 0 deletions develop/buildkite/scripts/coverage-report.sh
Expand Up @@ -10,6 +10,10 @@ buildkite-agent artifact download ".coverage/integration_coverprofile.out" . --s
buildkite-agent artifact download ".coverage/functional_cassandra_coverprofile.out" . --step ":golang: functional test with cassandra (ES8)" --build "${BUILDKITE_BUILD_ID}"
mv ./.coverage/functional_cassandra_coverprofile.out ./.coverage/functional_cassandra_es8_coverprofile.out

# OpenSearch 2.
buildkite-agent artifact download ".coverage/functional_cassandra_coverprofile.out" . --step ":golang: functional test with cassandra (OpenSearch 2)" --build "${BUILDKITE_BUILD_ID}"
mv ./.coverage/functional_cassandra_coverprofile.out ./.coverage/functional_cassandra_os2_coverprofile.out

# Cassandra.
buildkite-agent artifact download ".coverage/functional_cassandra_coverprofile.out" . --step ":golang: functional test with cassandra" --build "${BUILDKITE_BUILD_ID}"
buildkite-agent artifact download ".coverage/functional_xdc_cassandra_coverprofile.out" . --step ":golang: functional xdc test with cassandra" --build "${BUILDKITE_BUILD_ID}"
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -44,7 +44,7 @@ require (
go.opentelemetry.io/otel/metric v0.36.0
go.opentelemetry.io/otel/sdk v1.13.0
go.opentelemetry.io/otel/sdk/metric v0.36.0
go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577
go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66
go.temporal.io/sdk v1.22.2
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -1123,8 +1123,8 @@ go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI
go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw=
go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.temporal.io/api v1.19.1-0.20230322213042-07fb271d475b/go.mod h1:PLQJqp1YZZikmtGm9jIbzWpP3p6zS39WQjhsO/Hiw30=
go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577 h1:BDU+5DlZuQicarZIXLhwXtup1dj8WUk+7XiK6m0brvA=
go.temporal.io/api v1.19.1-0.20230511202036-4dee2bb54577/go.mod h1:uITFvsxpTQT/ZRGMHTzmEXhdDkfA9o8Ik4cgw91TlM4=
go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66 h1:nLBDjkSXTJO/aoptKUSGmhVu78qiNIupn0j0RQGTs5M=
go.temporal.io/api v1.19.1-0.20230515221100-0caa7c878f66/go.mod h1:uITFvsxpTQT/ZRGMHTzmEXhdDkfA9o8Ik4cgw91TlM4=
go.temporal.io/sdk v1.22.2 h1:4bGxYekEN+FHAGXkRAxZcHs9k+fNO3RUmBRf97WH3So=
go.temporal.io/sdk v1.22.2/go.mod h1:LqYtPesETgMHktpH98Vk7WegNcikxErmmuaZPNWEnPw=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down
2 changes: 1 addition & 1 deletion schema/postgresql/v12/visibility/schema.sql
@@ -1,4 +1,4 @@
CREATE EXTENSION btree_gin;
CREATE EXTENSION IF NOT EXISTS btree_gin;

-- convert_ts converts a timestamp in RFC3339 to UTC timestamp without time zone.
CREATE FUNCTION convert_ts(s VARCHAR) RETURNS TIMESTAMP AS $$
Expand Down
@@ -1,4 +1,4 @@
CREATE EXTENSION btree_gin;
CREATE EXTENSION IF NOT EXISTS btree_gin;

-- convert_ts converts a timestamp in RFC3339 to UTC timestamp without time zone.
CREATE FUNCTION convert_ts(s VARCHAR) RETURNS TIMESTAMP AS $$
Expand Down
3 changes: 3 additions & 0 deletions service/history/consts/const.go
Expand Up @@ -27,6 +27,7 @@ package consts
import (
"errors"

"go.temporal.io/api/enums/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

Expand Down Expand Up @@ -96,6 +97,8 @@ var (
ErrNamespaceHandover = common.ErrNamespaceHandover
// ErrWorkflowTaskStateInconsistent is error indicating workflow task state is inconsistent, for example there was no workflow task scheduled but buffered events are present.
ErrWorkflowTaskStateInconsistent = serviceerror.NewUnavailable("Workflow task state is inconsistent.")
// ErrResourceExhaustedBusyWorkflow is an error indicating workflow resource is exhausted and should not be retried by service handler and client
ErrResourceExhaustedBusyWorkflow = serviceerror.NewResourceExhausted(enums.RESOURCE_EXHAUSTED_CAUSE_BUSY_WORKFLOW, "Workflow is busy.")

// FailedWorkflowStatuses is a set of failed workflow close states, used for start workflow policy
// for start workflow execution API
Expand Down
44 changes: 39 additions & 5 deletions service/history/workflow/cache/cache.go
Expand Up @@ -38,12 +38,14 @@ import (

"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
)
Expand Down Expand Up @@ -85,8 +87,10 @@ type (
var NoopReleaseFn ReleaseCacheFunc = func(err error) {}

const (
cacheNotReleased int32 = 0
cacheReleased int32 = 1
cacheNotReleased int32 = 0
cacheReleased int32 = 1
workflowLockTimeoutTailTime = 500 * time.Millisecond
nonApiContextLockTimeout = 500 * time.Millisecond
)

func NewCache(shard shard.Context) Cache {
Expand Down Expand Up @@ -198,16 +202,46 @@ func (c *CacheImpl) getOrCreateWorkflowExecutionInternal(
// Consider revisiting this if it causes too much GC activity
releaseFunc := c.makeReleaseFunc(key, workflowCtx, forceClearContext, lockPriority)

if err := workflowCtx.Lock(ctx, lockPriority); err != nil {
// ctx is done before lock can be acquired
c.Release(key)
if err := c.lockWorkflowExecution(ctx, workflowCtx, key, lockPriority); err != nil {
handler.Counter(metrics.CacheFailures.GetMetricName()).Record(1)
handler.Counter(metrics.AcquireLockFailedCounter.GetMetricName()).Record(1)
return nil, nil, err
}

return workflowCtx, releaseFunc, nil
}

func (c *CacheImpl) lockWorkflowExecution(ctx context.Context,
workflowCtx workflow.Context,
key definition.WorkflowKey,
lockPriority workflow.LockPriority) error {

// skip if there is no deadline
if deadline, ok := ctx.Deadline(); ok {
var cancel context.CancelFunc
if headers.GetCallerInfo(ctx).CallerType != headers.CallerTypeAPI {
newDeadline := time.Now().Add(nonApiContextLockTimeout)
if newDeadline.Before(deadline) {
ctx, cancel = context.WithDeadline(ctx, newDeadline)
defer cancel()
}
} else {
newDeadline := deadline.Add(-workflowLockTimeoutTailTime)
if newDeadline.After(time.Now()) {
ctx, cancel = context.WithDeadline(ctx, newDeadline)
defer cancel()
}
}
}

if err := workflowCtx.Lock(ctx, lockPriority); err != nil {
// ctx is done before lock can be acquired
c.Release(key)
return consts.ErrResourceExhaustedBusyWorkflow
}
return nil
}

func (c *CacheImpl) makeReleaseFunc(
key definition.WorkflowKey,
context workflow.Context,
Expand Down

0 comments on commit e059493

Please sign in to comment.