Skip to content

Commit

Permalink
TODO
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 committed Apr 5, 2021
1 parent 72d1490 commit e6dc97d
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 45 deletions.
126 changes: 121 additions & 5 deletions service/history/mutablestate/history_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
HistoryBuilderStateImmutable = 1
)

// TODO should the reorderFunc functionality be ported?
type (
HistoryBuilderState int

Expand All @@ -55,6 +56,8 @@ type (
DBEventsBatches [][]*historypb.HistoryEvent
// events to be buffer in execution table
DBBufferBatch []*historypb.HistoryEvent
// whether to clear buffer events on DB
DBClearBuffer bool
// accumulated buffered events, equal to all buffer events from execution table
MemBufferBatch []*historypb.HistoryEvent
// schedule to start event ID mapping for flushed buffered event
Expand All @@ -73,6 +76,7 @@ type (

// buffer events in DB
dbBufferBatch []*historypb.HistoryEvent
dbClearBuffer bool

// in mem events
memEventsBatches [][]*historypb.HistoryEvent
Expand Down Expand Up @@ -100,6 +104,7 @@ func NewHistoryBuilder(
nextEventID: nextEventID,

dbBufferBatch: dbBufferBatch,
dbClearBuffer: false,
memEventsBatches: nil,
memLatestBatch: nil,
memBufferBatch: nil,
Expand Down Expand Up @@ -969,12 +974,18 @@ func (b *HistoryBuilder) HasBufferEvents() bool {
return len(b.dbBufferBatch) > 0 || len(b.memBufferBatch) > 0
}

func (b *HistoryBuilder) FlushBufferEvents() {
b.assertMutable()
func (b *HistoryBuilder) BufferEventSize() int {
return len(b.dbBufferBatch) + len(b.memBufferBatch)
}

func (b *HistoryBuilder) FlushBufferToCurrentBatch() {
if len(b.dbBufferBatch) == 0 && len(b.memBufferBatch) == 0 {
return
}

b.assertMutable()

b.dbClearBuffer = b.dbClearBuffer || len(b.dbBufferBatch) > 0
bufferBatch := append(b.dbBufferBatch, b.memBufferBatch...)
b.dbBufferBatch = nil
b.memBufferBatch = nil
Expand All @@ -991,7 +1002,7 @@ func (b *HistoryBuilder) FlushBufferEvents() {
b.memLatestBatch = append(b.memLatestBatch, bufferBatch...)
}

func (b *HistoryBuilder) FlushEventsBatch() {
func (b *HistoryBuilder) FlushAndCreateNewBatch() {
b.assertMutable()
if len(b.memLatestBatch) == 0 {
return
Expand All @@ -1009,11 +1020,12 @@ func (b *HistoryBuilder) Finish(
}()

if flushBufferEvent {
b.FlushBufferEvents()
b.FlushBufferToCurrentBatch()
}
b.FlushEventsBatch()
b.FlushAndCreateNewBatch()

dbEventsBatches := b.memEventsBatches
dbClearBuffer := b.dbClearBuffer
dbBufferBatch := b.memBufferBatch
memBufferBatch := b.dbBufferBatch
memBufferBatch = append(memBufferBatch, dbBufferBatch...)
Expand All @@ -1022,6 +1034,7 @@ func (b *HistoryBuilder) Finish(
b.memEventsBatches = nil
b.memBufferBatch = nil
b.memLatestBatch = nil
b.dbClearBuffer = false
b.dbBufferBatch = nil
b.scheduleIDToStartedID = nil

Expand All @@ -1031,6 +1044,7 @@ func (b *HistoryBuilder) Finish(

return &HistoryMutation{
DBEventsBatches: dbEventsBatches,
DBClearBuffer: dbClearBuffer,
DBBufferBatch: dbBufferBatch,
MemBufferBatch: memBufferBatch,
ScheduleIDToStartID: scheduleIDToStartedID,
Expand Down Expand Up @@ -1203,3 +1217,105 @@ func (b *HistoryBuilder) wireEventIDs(
}
}
}

// TODO remove this function once we keep all info in DB, e.g. activity / timer / child workflow
// to deprecate
// * HasActivityFinishEvent
// * hasActivityFinishEvent
func (b *HistoryBuilder) HasActivityFinishEvent(
scheduleID int64,
) bool {

if hasActivityFinishEvent(scheduleID, b.dbBufferBatch) {
return true
}

if hasActivityFinishEvent(scheduleID, b.memBufferBatch) {
return true
}

if hasActivityFinishEvent(scheduleID, b.memLatestBatch) {
return true
}

for _, batch := range b.memEventsBatches {
if hasActivityFinishEvent(scheduleID, batch) {
return true
}
}

return false
}

func hasActivityFinishEvent(
scheduleID int64,
events []*historypb.HistoryEvent,
) bool {
for _, event := range events {
switch event.GetEventType() {
case enumspb.EVENT_TYPE_ACTIVITY_TASK_COMPLETED:
if event.GetActivityTaskCompletedEventAttributes().GetScheduledEventId() == scheduleID {
return true
}

case enumspb.EVENT_TYPE_ACTIVITY_TASK_FAILED:
if event.GetActivityTaskFailedEventAttributes().GetScheduledEventId() == scheduleID {
return true
}

case enumspb.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
if event.GetActivityTaskTimedOutEventAttributes().GetScheduledEventId() == scheduleID {
return true
}

case enumspb.EVENT_TYPE_ACTIVITY_TASK_CANCELED:
if event.GetActivityTaskCanceledEventAttributes().GetScheduledEventId() == scheduleID {
return true
}
}
}

return false
}

func (b *HistoryBuilder) HasAndRemoveTimerFireEvent(
timerID string,
) *historypb.HistoryEvent {
var timerFireEvent *historypb.HistoryEvent

b.dbBufferBatch, timerFireEvent = deleteTimerFiredEvent(timerID, b.dbBufferBatch)
if timerFireEvent != nil {
b.dbClearBuffer = true
return timerFireEvent
}

b.memBufferBatch, timerFireEvent = deleteTimerFiredEvent(timerID, b.memBufferBatch)
if timerFireEvent != nil {
b.dbClearBuffer = true
return timerFireEvent
}

return nil
}

func deleteTimerFiredEvent(
timerID string,
events []*historypb.HistoryEvent,
) ([]*historypb.HistoryEvent, *historypb.HistoryEvent) {
// go over all history events. if we find a timer fired event for the given
// timerID, clear it
timerFiredIdx := -1
for idx, event := range events {
if event.GetEventType() == enumspb.EVENT_TYPE_TIMER_FIRED &&
event.GetTimerFiredEventAttributes().GetTimerId() == timerID {
timerFiredIdx = idx
break
}
}
if timerFiredIdx == -1 {
return events, nil
}

timerEvent := events[timerFiredIdx]
return append(events[:timerFiredIdx], events[timerFiredIdx+1:]...), timerEvent
}
88 changes: 48 additions & 40 deletions service/history/mutablestate/history_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1588,6 +1588,7 @@ func (s *historyBuilderSuite) testAppendFlushFinishEventWithoutBufferSingleBatch

s.Equal(&HistoryMutation{
DBEventsBatches: [][]*historypb.HistoryEvent{{event1, event2}},
DBClearBuffer: false,
DBBufferBatch: nil,
MemBufferBatch: nil,
ScheduleIDToStartID: make(map[int64]int64),
Expand Down Expand Up @@ -1644,17 +1645,17 @@ func (s *historyBuilderSuite) testAppendFlushFinishEventWithoutBufferMultiBatch(
// 1st batch
s.historyBuilder.appendEvents(event11)
s.historyBuilder.appendEvents(event12)
s.historyBuilder.FlushEventsBatch()
s.historyBuilder.FlushAndCreateNewBatch()

// 2nd batch
s.historyBuilder.appendEvents(event21)
s.historyBuilder.appendEvents(event22)
s.historyBuilder.FlushEventsBatch()
s.historyBuilder.FlushAndCreateNewBatch()

// 3rd batch
s.historyBuilder.appendEvents(event31)
s.historyBuilder.appendEvents(event32)
s.historyBuilder.FlushEventsBatch()
s.historyBuilder.FlushAndCreateNewBatch()

historyMutation, err := s.historyBuilder.Finish(flushBuffer)
s.NoError(err)
Expand All @@ -1666,6 +1667,7 @@ func (s *historyBuilderSuite) testAppendFlushFinishEventWithoutBufferMultiBatch(
{event21, event22},
{event31, event32},
},
DBClearBuffer: false,
DBBufferBatch: nil,
MemBufferBatch: nil,
ScheduleIDToStartID: make(map[int64]int64),
Expand Down Expand Up @@ -1697,6 +1699,7 @@ func (s *historyBuilderSuite) TestAppendFlushFinishEvent_WithBuffer_WithoutDBBuf

s.Equal(&HistoryMutation{
DBEventsBatches: nil,
DBClearBuffer: false,
DBBufferBatch: []*historypb.HistoryEvent{event1, event2},
MemBufferBatch: []*historypb.HistoryEvent{event1, event2},
ScheduleIDToStartID: make(map[int64]int64),
Expand Down Expand Up @@ -1728,6 +1731,7 @@ func (s *historyBuilderSuite) TestAppendFlushFinishEvent_WithBuffer_WithoutDBBuf

s.Equal(&HistoryMutation{
DBEventsBatches: [][]*historypb.HistoryEvent{{event1, event2}},
DBClearBuffer: false,
DBBufferBatch: nil,
MemBufferBatch: nil,
ScheduleIDToStartID: make(map[int64]int64),
Expand Down Expand Up @@ -1757,6 +1761,7 @@ func (s *historyBuilderSuite) TestAppendFlushFinishEvent_WithoutBuffer_WithDBBuf

s.Equal(&HistoryMutation{
DBEventsBatches: nil,
DBClearBuffer: false,
DBBufferBatch: nil,
MemBufferBatch: []*historypb.HistoryEvent{event1, event2},
ScheduleIDToStartID: make(map[int64]int64),
Expand Down Expand Up @@ -1786,6 +1791,7 @@ func (s *historyBuilderSuite) TestAppendFlushFinishEvent_WithoutBuffer_WithDBBuf

s.Equal(&HistoryMutation{
DBEventsBatches: [][]*historypb.HistoryEvent{{event1, event2}},
DBClearBuffer: true,
DBBufferBatch: nil,
MemBufferBatch: nil,
ScheduleIDToStartID: make(map[int64]int64),
Expand Down Expand Up @@ -1822,12 +1828,50 @@ func (s *historyBuilderSuite) TestAppendFlushFinishEvent_WithBuffer_WithDBBuffer

s.Equal(&HistoryMutation{
DBEventsBatches: nil,
DBClearBuffer: false,
DBBufferBatch: []*historypb.HistoryEvent{event1, event2},
MemBufferBatch: []*historypb.HistoryEvent{event0, event1, event2},
ScheduleIDToStartID: make(map[int64]int64),
}, historyMutation)
}

func (s *historyBuilderSuite) TestAppendFlushFinishEvent_WithBuffer_WithDBBuffer_WithFlushBuffer() {
event0 := &historypb.HistoryEvent{
EventType: enumspb.EVENT_TYPE_TIMER_FIRED,
EventId: common.BufferedEventID,
TaskId: common.EmptyEventTaskID,
}
s.historyBuilder.dbBufferBatch = []*historypb.HistoryEvent{event0}
s.historyBuilder.memEventsBatches = nil
s.historyBuilder.memLatestBatch = nil
s.historyBuilder.memBufferBatch = nil

event1 := &historypb.HistoryEvent{
EventType: enumspb.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED,
EventId: common.BufferedEventID,
TaskId: common.EmptyEventTaskID,
}
event2 := &historypb.HistoryEvent{
EventType: enumspb.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED,
EventId: common.BufferedEventID,
TaskId: common.EmptyEventTaskID,
}

s.historyBuilder.appendEvents(event1)
s.historyBuilder.appendEvents(event2)
historyMutation, err := s.historyBuilder.Finish(true)
s.NoError(err)
s.assertEventIDTaskID(historyMutation)

s.Equal(&HistoryMutation{
DBEventsBatches: [][]*historypb.HistoryEvent{{event0, event1, event2}},
DBClearBuffer: true,
DBBufferBatch: nil,
MemBufferBatch: nil,
ScheduleIDToStartID: make(map[int64]int64),
}, historyMutation)
}

func (s *historyBuilderSuite) TestWireEventIDs_Activity() {
scheduleEventID := rand.Int63()
startEvent := &historypb.HistoryEvent{
Expand Down Expand Up @@ -1973,7 +2017,7 @@ func (s *historyBuilderSuite) testWireEventIDs(
s.historyBuilder.memEventsBatches = nil
s.historyBuilder.memLatestBatch = nil
s.historyBuilder.memBufferBatch = []*historypb.HistoryEvent{finishEvent}
s.historyBuilder.FlushBufferEvents()
s.historyBuilder.FlushBufferToCurrentBatch()

s.Empty(s.historyBuilder.dbBufferBatch)
s.Empty(s.historyBuilder.memEventsBatches)
Expand Down Expand Up @@ -2007,42 +2051,6 @@ func (s *historyBuilderSuite) testWireEventIDs(
}
}

func (s *historyBuilderSuite) TestAppendFlushFinishEvent_WithBuffer_WithDBBuffer_WithFlushBuffer() {
event0 := &historypb.HistoryEvent{
EventType: enumspb.EVENT_TYPE_TIMER_FIRED,
EventId: common.BufferedEventID,
TaskId: common.EmptyEventTaskID,
}
s.historyBuilder.dbBufferBatch = []*historypb.HistoryEvent{event0}
s.historyBuilder.memEventsBatches = nil
s.historyBuilder.memLatestBatch = nil
s.historyBuilder.memBufferBatch = nil

event1 := &historypb.HistoryEvent{
EventType: enumspb.EVENT_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED,
EventId: common.BufferedEventID,
TaskId: common.EmptyEventTaskID,
}
event2 := &historypb.HistoryEvent{
EventType: enumspb.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED,
EventId: common.BufferedEventID,
TaskId: common.EmptyEventTaskID,
}

s.historyBuilder.appendEvents(event1)
s.historyBuilder.appendEvents(event2)
historyMutation, err := s.historyBuilder.Finish(true)
s.NoError(err)
s.assertEventIDTaskID(historyMutation)

s.Equal(&HistoryMutation{
DBEventsBatches: [][]*historypb.HistoryEvent{{event0, event1, event2}},
DBBufferBatch: nil,
MemBufferBatch: nil,
ScheduleIDToStartID: make(map[int64]int64),
}, historyMutation)
}

func (s *historyBuilderSuite) TestHasBufferEvent() {
historyBuilder := NewHistoryBuilder(
s.mockTimeSource,
Expand Down

0 comments on commit e6dc97d

Please sign in to comment.