Skip to content

Commit

Permalink
Merge branch 'master' into heartbeat_marker_api
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed Jun 3, 2020
2 parents d1c3508 + eca98ba commit e3691ce
Show file tree
Hide file tree
Showing 24 changed files with 583 additions and 133 deletions.
2 changes: 1 addition & 1 deletion .gen/go/checksum/checksum.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .gen/go/health/health.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .gen/go/health/metaclient/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .gen/go/health/metafx/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .gen/go/health/metafx/doc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .gen/go/health/metafx/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .gen/go/health/metaserver/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .gen/go/health/metatest/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .gen/go/indexer/indexer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

150 changes: 134 additions & 16 deletions .gen/go/matching/matching.go

Large diffs are not rendered by default.

138 changes: 136 additions & 2 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions common/metrics/defs.go
Expand Up @@ -1541,6 +1541,7 @@ const (
CadenceErrNonDeterministicCounter
CadenceErrUnauthorizedCounter
CadenceErrAuthorizeFailedCounter
CadenceErrRemoteSyncMatchFailedCounter
PersistenceRequests
PersistenceFailures
PersistenceLatency
Expand Down Expand Up @@ -1645,6 +1646,7 @@ const (
CadenceErrNonDeterministicPerTaskListCounter
CadenceErrUnauthorizedPerTaskListCounter
CadenceErrAuthorizeFailedPerTaskListCounter
CadenceErrRemoteSyncMatchFailedPerTaskListCounter

NumCommonMetrics // Needs to be last on this list for iota numbering
)
Expand Down Expand Up @@ -1937,6 +1939,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
CadenceErrNonDeterministicCounter: {metricName: "cadence_errors_nondeterministic", metricType: Counter},
CadenceErrUnauthorizedCounter: {metricName: "cadence_errors_unauthorized", metricType: Counter},
CadenceErrAuthorizeFailedCounter: {metricName: "cadence_errors_authorize_failed", metricType: Counter},
CadenceErrRemoteSyncMatchFailedCounter: {metricName: "cadence_errors_remote_syncmatch_failed", metricType: Counter},
PersistenceRequests: {metricName: "persistence_requests", metricType: Counter},
PersistenceFailures: {metricName: "persistence_errors", metricType: Counter},
PersistenceLatency: {metricName: "persistence_latency", metricType: Timer},
Expand Down Expand Up @@ -2067,6 +2070,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
CadenceErrAuthorizeFailedPerTaskListCounter: {
metricName: "cadence_errors_authorize_failed_per_tl", metricRollupName: "cadence_errors_authorize_failed", metricType: Counter,
},
CadenceErrRemoteSyncMatchFailedPerTaskListCounter: {
metricName: "cadence_errors_remote_syncmatch_failed_per_tl", metricRollupName: "cadence_errors_remote_syncmatch_failed", metricType: Counter,
},
},
History: {
TaskRequests: {metricName: "task_requests", metricType: Counter},
Expand Down
2 changes: 1 addition & 1 deletion idls
4 changes: 2 additions & 2 deletions service/history/queue/interface.go
Expand Up @@ -55,7 +55,7 @@ type (
State() ProcessingQueueState
Split(ProcessingQueueSplitPolicy) []ProcessingQueue
Merge(ProcessingQueue) []ProcessingQueue
AddTasks(map[task.Key]task.Task, bool)
AddTasks(map[task.Key]task.Task, task.Key)
UpdateAckLevel()
// TODO: add Offload() method
}
Expand All @@ -72,7 +72,7 @@ type (
Level() int
Queues() []ProcessingQueue
ActiveQueue() ProcessingQueue
AddTasks(map[task.Key]task.Task, bool)
AddTasks(map[task.Key]task.Task, task.Key)
UpdateAckLevels()
Split(ProcessingQueueSplitPolicy) []ProcessingQueue
Merge([]ProcessingQueue)
Expand Down
4 changes: 2 additions & 2 deletions service/history/queue/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 7 additions & 9 deletions service/history/queue/processing_queue.go
Expand Up @@ -229,7 +229,7 @@ func (q *processingQueueImpl) Merge(

func (q *processingQueueImpl) AddTasks(
tasks map[task.Key]task.Task,
more bool,
newReadLevel task.Key,
) {
for key, task := range tasks {
if _, loaded := q.outstandingTasks[key]; loaded {
Expand All @@ -247,14 +247,9 @@ func (q *processingQueueImpl) AddTasks(
}

q.outstandingTasks[key] = task
if q.state.readLevel.Less(key) {
q.state.readLevel = key
}
}

if !more {
q.state.readLevel = q.state.maxLevel
}
q.state.readLevel = newReadLevel
}

func (q *processingQueueImpl) UpdateAckLevel() {
Expand All @@ -276,9 +271,12 @@ func (q *processingQueueImpl) UpdateAckLevel() {
delete(q.outstandingTasks, key)
}

if len(q.outstandingTasks) == 0 && q.state.readLevel == q.state.maxLevel {
q.state.ackLevel = q.state.maxLevel
if len(q.outstandingTasks) == 0 {
q.state.ackLevel = q.state.readLevel
}

// TODO: add a check for specifically for timer task key
// and override the taskID field for timer task key to 0.
}

func splitProcessingQueue(
Expand Down
7 changes: 4 additions & 3 deletions service/history/queue/processing_queue_collection.go
Expand Up @@ -63,11 +63,12 @@ func (c *processingQueueCollection) ActiveQueue() ProcessingQueue {

func (c *processingQueueCollection) AddTasks(
tasks map[task.Key]task.Task,
more bool,
newReadLevel task.Key,
) {
c.ActiveQueue().AddTasks(tasks, more)
activeQueue := c.ActiveQueue()
activeQueue.AddTasks(tasks, newReadLevel)

if !more {
if taskKeyEquals(activeQueue.State().ReadLevel(), activeQueue.State().MaxLevel()) {
c.resetActiveQueue()
}
}
Expand Down
20 changes: 12 additions & 8 deletions service/history/queue/processing_queue_collection_test.go
Expand Up @@ -106,42 +106,46 @@ func (s *processingQueueCollectionSuite) TestNewCollection_OutOfOrderQueues() {
s.True(s.isQueuesSorted(queueCollection.queues))
}

func (s *processingQueueCollectionSuite) TestAddTasks_WithMoreTasks() {
func (s *processingQueueCollectionSuite) TestAddTasks_ReadNotFinished() {
totalQueues := 4
currentActiveIdx := 1
newReadLevel := &testKey{ID: 9}

mockQueues := []*MockProcessingQueue{}
for i := 0; i != totalQueues; i++ {
mockQueues = append(mockQueues, NewMockProcessingQueue(s.controller))
}
mockQueues[currentActiveIdx].EXPECT().AddTasks(gomock.Any(), true).Times(1)
mockQueues[currentActiveIdx].EXPECT().AddTasks(gomock.Any(), newReadLevel).Times(1)
mockQueues[currentActiveIdx].EXPECT().State().Return(newProcessingQueueState(
s.level,
&testKey{ID: 3},
&testKey{ID: 10},
newReadLevel,
&testKey{ID: 10},
DomainFilter{},
)).AnyTimes()

queueCollection := s.newTestProcessingQueueCollection(s.level, mockQueues)
queueCollection.activeQueue = mockQueues[currentActiveIdx]

queueCollection.AddTasks(map[task.Key]task.Task{}, true)
queueCollection.AddTasks(map[task.Key]task.Task{}, newReadLevel)
s.Equal(mockQueues[currentActiveIdx].State(), queueCollection.ActiveQueue().State())
}

func (s *processingQueueCollectionSuite) TestAddTask_NoMoreTasks() {
func (s *processingQueueCollectionSuite) TestAddTask_ReadFinished() {
totalQueues := 4
currentActiveIdx := 1
newReadLevel := &testKey{ID: 10}

mockQueues := []*MockProcessingQueue{}
for i := 0; i != totalQueues; i++ {
mockQueues = append(mockQueues, NewMockProcessingQueue(s.controller))
}
mockQueues[currentActiveIdx].EXPECT().AddTasks(gomock.Any(), false).Times(1)
mockQueues[currentActiveIdx].EXPECT().AddTasks(gomock.Any(), newReadLevel).Times(1)
for i := 0; i != totalQueues; i++ {
mockQueues[i].EXPECT().State().Return(newProcessingQueueState(
s.level,
&testKey{ID: 3},
&testKey{ID: 10},
newReadLevel,
&testKey{ID: 10},
DomainFilter{},
)).AnyTimes()
Expand All @@ -150,7 +154,7 @@ func (s *processingQueueCollectionSuite) TestAddTask_NoMoreTasks() {
queueCollection := s.newTestProcessingQueueCollection(s.level, mockQueues)
queueCollection.activeQueue = mockQueues[currentActiveIdx]

queueCollection.AddTasks(map[task.Key]task.Task{}, false)
queueCollection.AddTasks(map[task.Key]task.Task{}, newReadLevel)
s.Nil(queueCollection.ActiveQueue())
}

Expand Down
54 changes: 11 additions & 43 deletions service/history/queue/processing_queue_test.go
Expand Up @@ -70,7 +70,7 @@ func (s *processingQueueSuite) TearDownTest() {
s.controller.Finish()
}

func (s *processingQueueSuite) TestAddTasks_WithMoreTasks() {
func (s *processingQueueSuite) TestAddTasks() {
ackLevel := &testKey{ID: 1}
maxLevel := &testKey{ID: 10}

Expand Down Expand Up @@ -99,51 +99,18 @@ func (s *processingQueueSuite) TestAddTasks_WithMoreTasks() {
make(map[task.Key]task.Task),
)

queue.AddTasks(tasks, true)
newReadLevel := &testKey{ID: 10}
queue.AddTasks(tasks, newReadLevel)
s.Len(queue.outstandingTasks, len(taskKeys))
s.Equal(taskKeys[len(taskKeys)-1], queue.state.readLevel)
s.Equal(newReadLevel, queue.state.readLevel)

// add the same set of tasks again, should have no effect
queue.AddTasks(tasks, true)
queue.AddTasks(tasks, newReadLevel)
s.Len(queue.outstandingTasks, len(taskKeys))
s.Equal(taskKeys[len(taskKeys)-1], queue.state.readLevel)
s.Equal(newReadLevel, queue.state.readLevel)
}

func (s *processingQueueSuite) TestAddTasks_NoMoreTasks() {
ackLevel := &testKey{ID: 1}
maxLevel := &testKey{ID: 10}

taskKeys := []task.Key{
&testKey{ID: 2},
&testKey{ID: 3},
&testKey{ID: 5},
&testKey{ID: 9},
}
tasks := make(map[task.Key]task.Task)
for _, key := range taskKeys {
mockTask := task.NewMockTask(s.controller)
mockTask.EXPECT().GetDomainID().Return("some random domainID").AnyTimes()
mockTask.EXPECT().GetWorkflowID().Return("some random workflowID").AnyTimes()
mockTask.EXPECT().GetRunID().Return("some random runID").AnyTimes()
mockTask.EXPECT().GetTaskType().Return(0).AnyTimes()
tasks[key] = mockTask
}

queue := s.newTestProcessingQueue(
0,
ackLevel,
ackLevel,
maxLevel,
NewDomainFilter(nil, true),
make(map[task.Key]task.Task),
)

queue.AddTasks(tasks, false)
s.Len(queue.outstandingTasks, len(taskKeys))
s.Equal(maxLevel, queue.state.readLevel)
}

func (s *processingQueueSuite) TestUpdateAckLevel_WithMoreTasks() {
func (s *processingQueueSuite) TestUpdateAckLevel_WithPendingTasks() {
ackLevel := &testKey{ID: 1}
maxLevel := &testKey{ID: 10}

Expand Down Expand Up @@ -181,8 +148,9 @@ func (s *processingQueueSuite) TestUpdateAckLevel_WithMoreTasks() {
s.Equal(&testKey{ID: 3}, queue.state.ackLevel)
}

func (s *processingQueueSuite) TestUpdateAckLevel_NoMoreTasks() {
func (s *processingQueueSuite) TestUpdateAckLevel_NoPendingTasks() {
ackLevel := &testKey{ID: 1}
readLevel := &testKey{ID: 9}
maxLevel := &testKey{ID: 10}

taskKeys := []task.Key{
Expand All @@ -207,14 +175,14 @@ func (s *processingQueueSuite) TestUpdateAckLevel_NoMoreTasks() {
queue := s.newTestProcessingQueue(
0,
ackLevel,
maxLevel,
readLevel,
maxLevel,
NewDomainFilter(nil, true),
tasks,
)

queue.UpdateAckLevel()
s.Equal(maxLevel, queue.state.ackLevel)
s.Equal(readLevel, queue.state.ackLevel)
}

func (s *processingQueueSuite) TestSplit() {
Expand Down
2 changes: 1 addition & 1 deletion service/history/queue/split_policy_test.go
Expand Up @@ -24,7 +24,7 @@ import (
"sort"
"testing"

gomock "github.com/golang/mock/gomock"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

Expand Down

0 comments on commit e3691ce

Please sign in to comment.