Skip to content

Commit 724564e

Browse files
authored
fix: worker pull usage of ram (#5097)
1 parent b52a3ae commit 724564e

File tree

15 files changed

+506
-80
lines changed

15 files changed

+506
-80
lines changed

engine/api/router.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ func (r *Router) handle(uri string, scope HandlerScope, handlers ...*service.Han
411411
ctx, err = m(ctx, responseWriter, req, rc)
412412
if err != nil {
413413
observability.Record(r.Background, Errors, 1)
414-
service.WriteError(ctx, w, req, err)
414+
service.WriteError(ctx, responseWriter, req, err)
415415
deferFunc(ctx)
416416
return
417417
}

engine/worker/cmd_cache.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func cmdCachePush() *cobra.Command {
7474
Inside a project, you can create a cache from your worker with a tag (useful for vendors for example)
7575
worker cache push <tagValue> dir/file
7676
77-
You can use you storage integration:
77+
You can use you storage integration:
7878
worker cache push --destination=MyStorageIntegration <tagValue> dir/file
7979
`,
8080
Example: "worker cache push {{.cds.workflow}}-{{.cds.version}} ./pathToUpload",
@@ -111,7 +111,7 @@ func cachePushCmd() func(cmd *cobra.Command, args []string) {
111111
}
112112

113113
c := sdk.Cache{
114-
Tag: args[0],
114+
Tag: base64.RawURLEncoding.EncodeToString([]byte(args[0])),
115115
Files: files,
116116
WorkingDirectory: cwd,
117117
IntegrationName: cmdStorageIntegrationName,
@@ -125,7 +125,7 @@ func cachePushCmd() func(cmd *cobra.Command, args []string) {
125125
fmt.Printf("Worker cache push in progress... (tag: %s)\n", args[0])
126126
req, errRequest := http.NewRequest(
127127
"POST",
128-
fmt.Sprintf("http://127.0.0.1:%d/cache/%s/push", port, base64.RawURLEncoding.EncodeToString([]byte(args[0]))),
128+
fmt.Sprintf("http://127.0.0.1:%d/cache/push", port),
129129
bytes.NewReader(data),
130130
)
131131
if errRequest != nil {
@@ -209,7 +209,10 @@ func cachePullCmd() func(cmd *cobra.Command, args []string) {
209209
fmt.Printf("Worker cache pull in progress... (tag: %s)\n", args[0])
210210
req, errRequest := http.NewRequest(
211211
"GET",
212-
fmt.Sprintf("http://127.0.0.1:%d/cache/%s/pull?path=%s&integration=%s", port, base64.RawURLEncoding.EncodeToString([]byte(args[0])), url.QueryEscape(dir), url.QueryEscape(cmdStorageIntegrationName)),
212+
fmt.Sprintf("http://127.0.0.1:%d/cache/%s/pull?path=%s&integration=%s", port,
213+
base64.RawURLEncoding.EncodeToString([]byte(args[0])),
214+
url.QueryEscape(dir),
215+
url.QueryEscape(cmdStorageIntegrationName)),
213216
nil,
214217
)
215218
if errRequest != nil {

engine/worker/internal/action/builtin_serve_static_files.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package action
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"fmt"
@@ -79,16 +80,17 @@ func RunServeStaticFiles(ctx context.Context, wk workerruntime.Runtime, a sdk.Ac
7980
}
8081

8182
wk.SendLog(ctx, workerruntime.LevelInfo, "Fetching files in progress...")
82-
file, _, err := sdk.CreateTarFromPaths(aferoFS, path, filesPath, &sdk.TarOptions{TrimDirName: filepath.Dir(path)})
83-
if err != nil {
83+
84+
buf := new(bytes.Buffer)
85+
if err := sdk.CreateTarFromPaths(aferoFS, path, filesPath, buf, &sdk.TarOptions{TrimDirName: filepath.Dir(path)}); err != nil {
8486
return res, fmt.Errorf("cannot tar files: %v", err)
8587
}
8688

8789
integrationName := sdk.DefaultIfEmptyStorage(strings.TrimSpace(sdk.ParameterValue(a.Parameters, "destination")))
8890
projectKey := sdk.ParameterValue(wk.Parameters(), "cds.project")
8991

9092
wk.SendLog(ctx, workerruntime.LevelInfo, fmt.Sprintf(`Upload and serving files in progress... with entrypoint "%s"`, entrypoint.Value))
91-
publicURL, _, _, err := wk.Client().QueueStaticFilesUpload(ctx, projectKey, integrationName, jobID, name.Value, entrypoint.Value, staticKey, file)
93+
publicURL, _, _, err := wk.Client().QueueStaticFilesUpload(ctx, projectKey, integrationName, jobID, name.Value, entrypoint.Value, staticKey, buf)
9294
if err != nil {
9395
return res, fmt.Errorf("Cannot upload static files: %v", err)
9496
}

engine/worker/internal/handler_cache.go

Lines changed: 71 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,21 @@ import (
1515
"github.com/gorilla/mux"
1616
"github.com/spf13/afero"
1717

18+
"github.com/ovh/cds/engine/worker/pkg/workerruntime"
1819
"github.com/ovh/cds/sdk"
1920
"github.com/ovh/cds/sdk/log"
2021
)
2122

2223
func cachePushHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc {
2324
return func(w http.ResponseWriter, r *http.Request) {
24-
vars := mux.Vars(r)
25-
// Get body
26-
data, errRead := ioutil.ReadAll(r.Body)
27-
if errRead != nil {
28-
errRead = sdk.Error{
29-
Message: "worker cache push > Cannot read body : " + errRead.Error(),
25+
data, err := ioutil.ReadAll(r.Body)
26+
if err != nil {
27+
err = sdk.Error{
28+
Message: "worker cache push > Cannot read body : " + err.Error(),
3029
Status: http.StatusInternalServerError,
3130
}
32-
log.Error(ctx, "%v", errRead)
33-
writeError(w, r, errRead)
31+
log.Error(ctx, "%v", err)
32+
writeError(w, r, err)
3433
return
3534
}
3635

@@ -45,38 +44,77 @@ func cachePushHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc {
4544
return
4645
}
4746

48-
res, size, errTar := sdk.CreateTarFromPaths(afero.NewOsFs(), c.WorkingDirectory, c.Files, nil)
49-
if errTar != nil {
50-
errTar = sdk.Error{
51-
Message: fmt.Sprintf("worker cache push > Cannot tar (%+v) : %v", c.Files, errTar.Error()),
47+
tmpDirectory, err := workerruntime.TmpDirectory(wk.currentJob.context)
48+
if err != nil {
49+
err = sdk.Error{
50+
Message: "worker cache push > Cannot get tmp directory : " + err.Error(),
51+
Status: http.StatusInternalServerError,
52+
}
53+
log.Error(ctx, "%v", err)
54+
writeError(w, r, err)
55+
return
56+
}
57+
58+
tarF, err := afero.TempFile(wk.BaseDir(), tmpDirectory.Name(), "tar-")
59+
if err != nil {
60+
err = sdk.Error{
61+
Message: "worker cache push > Cannot create tmp tar file : " + err.Error(),
62+
Status: http.StatusInternalServerError,
63+
}
64+
log.Error(ctx, "%v", err)
65+
writeError(w, r, err)
66+
return
67+
}
68+
defer tarF.Close() // nolint
69+
70+
if err := sdk.CreateTarFromPaths(afero.NewOsFs(), c.WorkingDirectory, c.Files, tarF, nil); err != nil {
71+
err = sdk.Error{
72+
Message: fmt.Sprintf("worker cache push > Cannot tar (%+v) : %v", c.Files, err.Error()),
5273
Status: http.StatusBadRequest,
5374
}
54-
log.Error(ctx, "%v", errTar)
55-
writeError(w, r, errTar)
75+
log.Error(ctx, "%v", err)
76+
writeError(w, r, err)
77+
return
78+
}
79+
80+
tarInfo, err := tarF.Stat()
81+
if err != nil {
82+
err = sdk.Error{
83+
Message: "worker cache push > Cannot get tmp tar file info : " + err.Error(),
84+
Status: http.StatusInternalServerError,
85+
}
86+
log.Error(ctx, "%v", err)
87+
writeError(w, r, err)
5688
return
5789
}
90+
5891
params := wk.currentJob.wJob.Parameters
5992
projectKey := sdk.ParameterValue(params, "cds.project")
6093
if projectKey == "" {
61-
errP := sdk.Error{
94+
err := sdk.Error{
6295
Message: "worker cache push > Cannot find project",
6396
Status: http.StatusInternalServerError,
6497
}
65-
log.Error(ctx, "%v", errP)
66-
writeError(w, r, errP)
98+
log.Error(ctx, "%v", err)
99+
writeError(w, r, err)
67100
return
68101
}
69102

70103
var errPush error
71104
for i := 0; i < 10; i++ {
72-
if errPush = wk.client.WorkflowCachePush(projectKey, sdk.DefaultIfEmptyStorage(c.IntegrationName), vars["ref"], res, size); errPush == nil {
73-
return
105+
// Seek to be able to read the content of the file from beginning just after it had been written or in case of retry
106+
if _, err := tarF.Seek(0, 0); err != nil {
107+
errPush = err
108+
} else {
109+
if errPush = wk.client.WorkflowCachePush(projectKey, sdk.DefaultIfEmptyStorage(c.IntegrationName), c.Tag, tarF, int(tarInfo.Size())); errPush == nil {
110+
return
111+
}
74112
}
75113
time.Sleep(3 * time.Second)
76114
log.Error(ctx, "worker cache push > cannot push cache (retry x%d) : %v", i, errPush)
77115
}
78116

79-
err := sdk.Error{
117+
err = sdk.Error{
80118
Message: "worker cache push > Cannot push cache: " + errPush.Error(),
81119
Status: http.StatusInternalServerError,
82120
}
@@ -86,23 +124,25 @@ func cachePushHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc {
86124
}
87125

88126
func cachePullHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc {
89-
return func(w http.ResponseWriter, r *http.Request) {
90-
vars := mux.Vars(r)
91-
path := r.FormValue("path")
92-
integrationName := sdk.DefaultIfEmptyStorage(r.FormValue("integration"))
127+
return func(w http.ResponseWriter, req *http.Request) {
128+
vars := mux.Vars(req)
129+
path := req.FormValue("path")
130+
integrationName := sdk.DefaultIfEmptyStorage(req.FormValue("integration"))
93131
params := wk.currentJob.wJob.Parameters
94132
projectKey := sdk.ParameterValue(params, "cds.project")
95-
bts, err := wk.client.WorkflowCachePull(projectKey, integrationName, vars["ref"])
133+
r, err := wk.client.WorkflowCachePull(projectKey, integrationName, vars["ref"])
96134
if err != nil {
97135
err = sdk.Error{
98136
Message: "worker cache pull > Cannot pull cache: " + err.Error(),
99137
Status: http.StatusNotFound,
100138
}
101-
writeError(w, r, err)
139+
writeError(w, req, err)
102140
return
103141
}
104142

105-
tr := tar.NewReader(bts)
143+
log.Debug("cachePullHandler> Start read cache tar")
144+
145+
tr := tar.NewReader(r)
106146
for {
107147
header, errH := tr.Next()
108148
if errH == io.EOF {
@@ -122,6 +162,8 @@ func cachePullHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc {
122162
continue
123163
}
124164

165+
log.Debug("cachePullHandler> Tar contains file %s", header.Name)
166+
125167
// the target location where the dir/file should be created
126168
target := filepath.Join(path, header.Name)
127169

@@ -163,6 +205,8 @@ func cachePullHandler(ctx context.Context, wk *CurrentWorker) http.HandlerFunc {
163205
}
164206
}
165207

208+
log.Debug("cachePullHandler> Create file at %s", target)
209+
166210
f, err := os.OpenFile(target, os.O_CREATE|os.O_WRONLY, os.FileMode(header.Mode))
167211
if err != nil {
168212
sdkErr := sdk.Error{

0 commit comments

Comments
 (0)