Skip to content

Commit

Permalink
fix(worker): check to send all logs before terminate plugin (close #3448
Browse files Browse the repository at this point in the history
) (#3453)

Signed-off-by: Benjamin Coenen <benjamin.coenen@corp.ovh.com>
  • Loading branch information
bnjjj committed Oct 12, 2018
1 parent 2a90423 commit 6beb13a
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
6 changes: 4 additions & 2 deletions engine/worker/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (w *currentWorker) runGRPCPlugin(ctx context.Context, a *sdk.Action, buildI
}()

chanRes := make(chan sdk.Result, 1)
done := make(chan struct{})
sdk.GoRoutine(ctx, "runGRPCPlugin", func(ctx context.Context) {
params := *params
//For the moment we consider that plugin name = action name
Expand Down Expand Up @@ -131,7 +132,7 @@ func (w *currentWorker) runGRPCPlugin(ctx context.Context, a *sdk.Action, buildI
}

logCtx, stopLogs := context.WithCancel(ctx)
go enablePluginLogger(logCtx, sendLog, pluginSocket)
go enablePluginLogger(logCtx, done, sendLog, pluginSocket)
defer stopLogs()

manifest, err := actionPluginClient.Manifest(ctx, &empty.Empty{})
Expand Down Expand Up @@ -165,12 +166,13 @@ func (w *currentWorker) runGRPCPlugin(ctx context.Context, a *sdk.Action, buildI
select {
case <-ctx.Done():
log.Error("CDS Worker execution cancelled: %v", ctx.Err())
_ = w.sendLog(buildID, "CDS Worker execution cancelled\n", stepOrder, false)
return sdk.Result{
Status: sdk.StatusFail.String(),
Reason: "CDS Worker execution cancelled",
}
case res := <-chanRes:
// Useful to wait all logs are send before sending final status and log
<-done
return res
}
}
Expand Down
12 changes: 10 additions & 2 deletions engine/worker/builtin_deploy_application.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,17 @@ func runDeployApplication(w *currentWorker) BuiltInAction {
}

logCtx, stopLogs := context.WithCancel(ctx)
go enablePluginLogger(logCtx, sendLog, pluginSocket)
defer stopLogs()
done := make(chan struct{})
go enablePluginLogger(logCtx, done, sendLog, pluginSocket)

manifest, err := platformPluginClient.Manifest(ctx, &empty.Empty{})
if err != nil {
res := sdk.Result{
Reason: "Unable to retrieve plugin manifest... Aborting",
Status: sdk.StatusFail.String(),
}
stopLogs()
<-done
sendLog(err.Error())
return res
}
Expand All @@ -82,18 +84,24 @@ func runDeployApplication(w *currentWorker) BuiltInAction {
Status: sdk.StatusFail.String(),
}
sendLog(res.Reason)
stopLogs()
<-done
return res
}

sendLog(fmt.Sprintf("# Details: %s", res.Details))
sendLog(fmt.Sprintf("# Status: %s", res.Status))

if strings.ToUpper(res.Status) == strings.ToUpper(sdk.StatusSuccess.String()) {
stopLogs()
<-done
return sdk.Result{
Status: sdk.StatusSuccess.String(),
}
}

stopLogs()
<-done
return sdk.Result{
Status: sdk.StatusFail.String(),
Reason: res.Details,
Expand Down
3 changes: 2 additions & 1 deletion engine/worker/run_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ type pluginClientSocket struct {
Client interface{}
}

func enablePluginLogger(ctx context.Context, sendLog LoggerFunc, c *pluginClientSocket) {
func enablePluginLogger(ctx context.Context, done chan struct{}, sendLog LoggerFunc, c *pluginClientSocket) {
var accumulator string
var shouldExit bool
defer func() {
if accumulator != "" {
sendLog(accumulator)
}
close(done)
}()

for {
Expand Down

0 comments on commit 6beb13a

Please sign in to comment.