From 1dfde5f13914052dc61d698bc6d08ada685f602e Mon Sep 17 00:00:00 2001 From: Benjamin Coenen Date: Fri, 12 Oct 2018 10:52:42 +0200 Subject: [PATCH] fix(worker): check to send all logs before terminate plugin (close #3448) Signed-off-by: Benjamin Coenen --- engine/worker/builtin.go | 6 ++++-- engine/worker/builtin_deploy_application.go | 12 ++++++++++-- engine/worker/run_plugin.go | 3 ++- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/engine/worker/builtin.go b/engine/worker/builtin.go index 44e8b3e536..afd9fc4d72 100644 --- a/engine/worker/builtin.go +++ b/engine/worker/builtin.go @@ -74,6 +74,7 @@ func (w *currentWorker) runGRPCPlugin(ctx context.Context, a *sdk.Action, buildI }() chanRes := make(chan sdk.Result, 1) + done := make(chan struct{}) sdk.GoRoutine(ctx, "runGRPCPlugin", func(ctx context.Context) { params := *params //For the moment we consider that plugin name = action name @@ -131,7 +132,7 @@ func (w *currentWorker) runGRPCPlugin(ctx context.Context, a *sdk.Action, buildI } logCtx, stopLogs := context.WithCancel(ctx) - go enablePluginLogger(logCtx, sendLog, pluginSocket) + go enablePluginLogger(logCtx, done, sendLog, pluginSocket) defer stopLogs() manifest, err := actionPluginClient.Manifest(ctx, &empty.Empty{}) @@ -165,12 +166,13 @@ func (w *currentWorker) runGRPCPlugin(ctx context.Context, a *sdk.Action, buildI select { case <-ctx.Done(): log.Error("CDS Worker execution cancelled: %v", ctx.Err()) - _ = w.sendLog(buildID, "CDS Worker execution cancelled\n", stepOrder, false) return sdk.Result{ Status: sdk.StatusFail.String(), Reason: "CDS Worker execution cancelled", } case res := <-chanRes: + // Useful to wait all logs are send before sending final status and log + <-done return res } } diff --git a/engine/worker/builtin_deploy_application.go b/engine/worker/builtin_deploy_application.go index c3440b2b7b..50838a2e24 100644 --- a/engine/worker/builtin_deploy_application.go +++ b/engine/worker/builtin_deploy_application.go @@ -56,8 +56,8 @@ func runDeployApplication(w *currentWorker) BuiltInAction { } logCtx, stopLogs := context.WithCancel(ctx) - go enablePluginLogger(logCtx, sendLog, pluginSocket) - defer stopLogs() + done := make(chan struct{}) + go enablePluginLogger(logCtx, done, sendLog, pluginSocket) manifest, err := platformPluginClient.Manifest(ctx, &empty.Empty{}) if err != nil { @@ -65,6 +65,8 @@ func runDeployApplication(w *currentWorker) BuiltInAction { Reason: "Unable to retrieve plugin manifest... Aborting", Status: sdk.StatusFail.String(), } + stopLogs() + <-done sendLog(err.Error()) return res } @@ -82,6 +84,8 @@ func runDeployApplication(w *currentWorker) BuiltInAction { Status: sdk.StatusFail.String(), } sendLog(res.Reason) + stopLogs() + <-done return res } @@ -89,11 +93,15 @@ func runDeployApplication(w *currentWorker) BuiltInAction { sendLog(fmt.Sprintf("# Status: %s", res.Status)) if strings.ToUpper(res.Status) == strings.ToUpper(sdk.StatusSuccess.String()) { + stopLogs() + <-done return sdk.Result{ Status: sdk.StatusSuccess.String(), } } + stopLogs() + <-done return sdk.Result{ Status: sdk.StatusFail.String(), Reason: res.Details, diff --git a/engine/worker/run_plugin.go b/engine/worker/run_plugin.go index 17453f3ad4..5f224edf98 100644 --- a/engine/worker/run_plugin.go +++ b/engine/worker/run_plugin.go @@ -29,13 +29,14 @@ type pluginClientSocket struct { Client interface{} } -func enablePluginLogger(ctx context.Context, sendLog LoggerFunc, c *pluginClientSocket) { +func enablePluginLogger(ctx context.Context, done chan struct{}, sendLog LoggerFunc, c *pluginClientSocket) { var accumulator string var shouldExit bool defer func() { if accumulator != "" { sendLog(accumulator) } + close(done) }() for {