Skip to content
This repository has been archived by the owner on Mar 29, 2024. It is now read-only.

Commit

Permalink
End operations async
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaavi committed Oct 19, 2021
1 parent ea4d708 commit 5353afb
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 10 deletions.
31 changes: 21 additions & 10 deletions terminal/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,20 @@ func (t *TerminalBase) OpEnd(op Operation, err *Error) {
log.Warningf("spn/terminal: operation %s %s failed: %s", op.Type(), fmtOperationID(t.parentID, t.id, op.ID()), err)
}

// Call operation end function for proper shutdown cleaning up.
op.End(err)
module.StartWorker("end operation", func(_ context.Context) error {
// Call operation end function for proper shutdown cleaning up.
op.End(err)

// Send error to the connected Operation, if the error is internal.
if !err.IsExternal() {
t.addToOpMsgSendBuffer(op.ID(), MsgTypeStop, container.New(err.Pack()), 0)
}
// Send error to the connected Operation, if the error is internal.
if !err.IsExternal() {
t.addToOpMsgSendBuffer(op.ID(), MsgTypeStop, container.New(err.Pack()), 0)
}

// Remove operation from terminal.
t.DeleteActiveOp(op.ID())

// Remove operation from terminal.
t.DeleteActiveOp(op.ID())
return nil
})

return
}
Expand All @@ -212,15 +216,22 @@ func (t *TerminalBase) SetActiveOp(opID uint32, op Operation) {
t.operations[opID] = op
}

// DeleteActiveOp deletes an active operation from the Terminal state and
// returns it.
// DeleteActiveOp deletes an active operation from the Terminal state.
func (t *TerminalBase) DeleteActiveOp(opID uint32) {
t.lock.Lock()
defer t.lock.Unlock()

delete(t.operations, opID)
}

// GetActiveOpCount returns the amount of active operations.
func (t *TerminalBase) GetActiveOpCount() int {
t.lock.RLock()
defer t.lock.RUnlock()

return len(t.operations)
}

func newUnknownOp(id uint32, opType string) *unknownOp {
return &unknownOp{
id: id,
Expand Down
13 changes: 13 additions & 0 deletions terminal/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,19 @@ func (t *TerminalBase) Shutdown(err *Error, sendError bool) {
t.OpEnd(op, nil)
}

// Wait 20s for all operations to end.
// TODO: Use a signal for this instead of polling.
for i := 1; i <= 1000 && t.GetActiveOpCount() > 0; i++ {
time.Sleep(20 * time.Millisecond)
if i == 1000 {
log.Warningf(
"spn/terminal: terminal %s is continuing shutdown with %d active operations",
t.ext.FmtID(),
t.GetActiveOpCount(),
)
}
}

if sendError {
stopMsg := container.New(err.Pack())
MakeMsg(stopMsg, t.id, MsgTypeStop)
Expand Down

0 comments on commit 5353afb

Please sign in to comment.