From 3078317dff8bd28ccbdfa37a0e483a5c032807d4 Mon Sep 17 00:00:00 2001 From: Martin Milata Date: Thu, 12 Jan 2017 16:43:36 +0100 Subject: [PATCH] wip --- api/swagger-spec/openshift-openapi-spec.json | 36 ++-- pkg/build/api/types.go | 4 +- pkg/build/api/v1/zz_generated.conversion.go | 20 +-- pkg/build/builder/dockerutil.go | 170 +++++++++---------- pkg/build/builder/dockerutil_test.go | 164 +++++++++++++++--- 5 files changed, 241 insertions(+), 153 deletions(-) diff --git a/api/swagger-spec/openshift-openapi-spec.json b/api/swagger-spec/openshift-openapi-spec.json index 09f498ed8e34..505d69f85cfc 100644 --- a/api/swagger-spec/openshift-openapi-spec.json +++ b/api/swagger-spec/openshift-openapi-spec.json @@ -48201,6 +48201,24 @@ } } }, + "v1.BuildStatusOutput": { + "description": "BuildStatusOutput contains the status of the built image.", + "properties": { + "to": { + "description": "to describes the status of the built image being pushed to a registry.", + "$ref": "#/definitions/v1.BuildStatusOutputTo" + } + } + }, + "v1.BuildStatusOutputTo": { + "description": "BuildStatusOutputTo describes the status of the built image with regards to image registry to which it was supposed to be pushed.", + "properties": { + "imageDigest": { + "description": "imageDigest is the digest of the built Docker image. The digest uniquely identifies the image in the registry to which it was pushed. Please note that this field may not always set even if the push completes successfully.", + "type": "string" + } + } + }, "v1.BuildStrategy": { "description": "BuildStrategy contains the details of how to perform a build.", "required": [ @@ -48229,24 +48247,6 @@ } } }, - "v1.BuildStatusOutput": { - "description": "BuildStatusOutput contains the status of the built image.", - "properties": { - "to": { - "description": "to describes the status of the built image being pushed to a registry.", - "$ref": "#/definitions/v1.BuildStatusOutputTo" - } - } - }, - "v1.BuildStatusOutputTo": { - "description": "BuildStatusOutputTo describes the status of the built image with regards to image registry to which it was supposed to be pushed.", - "properties": { - "imageDigest": { - "description": "imageDigest is the digest of the built Docker image. The digest uniquely identifies the image in the registry to which it was pushed. Please note that this field may not always set even if the push completes successfully.", - "type": "string" - } - } - }, "v1.BuildTriggerCause": { "description": "BuildTriggerCause holds information about a triggered build. It is used for displaying build trigger data for each build and build configuration in oc describe. It is also used to describe which triggers led to the most recent update in the build configuration.", "properties": { diff --git a/pkg/build/api/types.go b/pkg/build/api/types.go index cbfa98ff694f..5f826c184d42 100644 --- a/pkg/build/api/types.go +++ b/pkg/build/api/types.go @@ -341,7 +341,7 @@ const ( // BuildStatusOutput contains the status of the built image. type BuildStatusOutput struct { // To describes the status of the built image being pushed to a registry. - To *BuildStatusOutputTo `json:"to,omitempty" protobuf:"bytes,1,opt,name=to"` + To *BuildStatusOutputTo } // BuildStatusOutputTo describes the status of the built image with regards to @@ -351,7 +351,7 @@ type BuildStatusOutputTo struct { // identifies the image in the registry to which it was pushed. Please note // that this field may not always set even if the push completes // successfully. - ImageDigest string `json:"imageDigest,omitempty" protobuf:"bytes,1,opt,name=imageDigest"` + ImageDigest string } // BuildSource is the input used for the build. diff --git a/pkg/build/api/v1/zz_generated.conversion.go b/pkg/build/api/v1/zz_generated.conversion.go index 4e70f497a9c7..972bd2bbca51 100644 --- a/pkg/build/api/v1/zz_generated.conversion.go +++ b/pkg/build/api/v1/zz_generated.conversion.go @@ -806,15 +806,7 @@ func Convert_api_BuildStatus_To_v1_BuildStatus(in *api.BuildStatus, out *BuildSt } func autoConvert_v1_BuildStatusOutput_To_api_BuildStatusOutput(in *BuildStatusOutput, out *api.BuildStatusOutput, s conversion.Scope) error { - if in.To != nil { - in, out := &in.To, &out.To - *out = new(api.BuildStatusOutputTo) - if err := Convert_v1_BuildStatusOutputTo_To_api_BuildStatusOutputTo(*in, *out, s); err != nil { - return err - } - } else { - out.To = nil - } + out.To = (*api.BuildStatusOutputTo)(unsafe.Pointer(in.To)) return nil } @@ -823,15 +815,7 @@ func Convert_v1_BuildStatusOutput_To_api_BuildStatusOutput(in *BuildStatusOutput } func autoConvert_api_BuildStatusOutput_To_v1_BuildStatusOutput(in *api.BuildStatusOutput, out *BuildStatusOutput, s conversion.Scope) error { - if in.To != nil { - in, out := &in.To, &out.To - *out = new(BuildStatusOutputTo) - if err := Convert_api_BuildStatusOutputTo_To_v1_BuildStatusOutputTo(*in, *out, s); err != nil { - return err - } - } else { - out.To = nil - } + out.To = (*BuildStatusOutputTo)(unsafe.Pointer(in.To)) return nil } diff --git a/pkg/build/builder/dockerutil.go b/pkg/build/builder/dockerutil.go index 55bf7069bc4b..c36585639626 100644 --- a/pkg/build/builder/dockerutil.go +++ b/pkg/build/builder/dockerutil.go @@ -3,11 +3,12 @@ package builder import ( "bytes" "encoding/json" + "errors" "fmt" "io" + "io/ioutil" "os" "strings" - "sync" "time" docker "github.com/fsouza/go-dockerclient" @@ -81,16 +82,14 @@ func pushImage(client DockerClient, name string, authConfig docker.AuthConfigura var progressWriter io.Writer if glog.Is(5) { - simpleWriter := newSimpleWriter(os.Stderr) - defer simpleWriter.Close() - progressWriter = simpleWriter + progressWriter = newSimpleWriter(os.Stderr) } else { logProgress := func(s string) { glog.V(0).Infof("%s", s) } progressWriter = imageprogress.NewPushWriter(logProgress) } - digestWriter := &digestWriter{} + digestWriter := newDigestWriter() opts := docker.PushImageOptions{ Name: repository, @@ -104,7 +103,7 @@ func pushImage(client DockerClient, name string, authConfig docker.AuthConfigura for retries := 0; retries <= DefaultPushRetryCount; retries++ { err = client.PushImage(opts, authConfig) if err == nil { - return digestWriter.Digest(), nil + return digestWriter.Digest, nil } errMsg := fmt.Sprintf("%s", err) @@ -235,35 +234,59 @@ type progressAux struct { Size int64 `json:"Size"` } -// digestWriter consumes stream of json messages from docker client push -// operation and looks for digest of the pushed image. -type digestWriter struct { - bytes.Buffer - digest string +type pushWriterCallback func(progressLine) error + +// pushWriter is an io.Writer which consumes a stream of json messages returned +// by docker client when it pushes image to registry. It calls the provided +// callback for each decoded JSON object. +type pushWriter struct { + buf *bytes.Buffer + callback pushWriterCallback } -// Digest returns the digest of the pushed image. Call this method after all -// jsons messages have been written to the digestWriter. -func (d *digestWriter) Digest() string { - if len(d.digest) > 0 { - return d.digest +func newPushWriter(cb pushWriterCallback) *pushWriter { + return &pushWriter{ + buf: &bytes.Buffer{}, + callback: cb, } +} - decoder := json.NewDecoder(d) - for decoder.More() { +func (t *pushWriter) Write(data []byte) (int, error) { + n, err := t.buf.Write(data) + if err != nil { + return n, err + } + dec := json.NewDecoder(t.buf) + + for { + // save the not yet parsed input so we can restore it in case it + // contains part of valid JSON + savedBuf, err := ioutil.ReadAll(dec.Buffered()) + savedBuf = append(savedBuf, t.buf.Bytes()...) + + // try decoding a value line := &progressLine{} - err := decoder.Decode(line) - if err != nil { - break - } + err = dec.Decode(line) - if len(line.Aux.Digest) > 0 { - d.digest = line.Aux.Digest - break + switch err { + // decoded a value, pass it to callback + case nil: + if callbackErr := t.callback(*line); callbackErr != nil { + return n, callbackErr + } + // no more values + case io.EOF: + return n, nil + // there's no whole JSON but we consumed bytes that might be part of + // one - restore the saved buffer + case io.ErrUnexpectedEOF: + t.buf = bytes.NewBuffer(savedBuf) + return n, nil + // actual error happened + default: + return n, err } } - - return d.digest } // simpleProgressWriter is an io.Writer which consumes a stream of json @@ -271,74 +294,39 @@ func (d *digestWriter) Digest() string { // writes simple human-readable indication of the push progress to the output // io.Writer. The output format mimics what go-dockerclient writes when called // with RawJSONStream=false. -type simpleProgressWriter struct { - mutex *sync.Mutex - internalWriter *io.PipeWriter - output io.Writer -} - -func newSimpleWriter(output io.Writer) *simpleProgressWriter { - writer := &simpleProgressWriter{ - mutex: &sync.Mutex{}, - output: output, - } - return writer -} - -func (w *simpleProgressWriter) Write(data []byte) (int, error) { - w.mutex.Lock() - defer w.mutex.Unlock() - if w.internalWriter == nil { - var pipeIn *io.PipeReader - pipeIn, w.internalWriter = io.Pipe() - decoder := json.NewDecoder(pipeIn) - go func() { - err := w.readProgress(decoder) - if err != nil { - pipeIn.CloseWithError(err) - } - }() - } - return w.internalWriter.Write(data) -} - -func (w *simpleProgressWriter) readProgress(decoder *json.Decoder) error { - for { - line := &progressLine{} - err := decoder.Decode(line) - if err == io.EOF { - break - } - if err != nil { - return err +func newSimpleWriter(output io.Writer) io.Writer { + return newPushWriter(func(line progressLine) error { + if line.Stream != "" { + fmt.Fprint(output, line.Stream) + } else if line.Progress != "" { + fmt.Fprintf(output, "%s %s\r", line.Status, line.Progress) + } else if line.Error != "" { + return errors.New(line.Error) } - err = w.processLine(line) - if err != nil { - return err + if line.Status != "" { + fmt.Fprintln(output, line.Status) } - } - return nil + return nil + }) } -func (w *simpleProgressWriter) processLine(line *progressLine) error { - if line.Stream != "" { - fmt.Fprint(w.output, line.Stream) - } else if line.Progress != "" { - fmt.Fprintf(w.output, "%s %s\r", line.Status, line.Progress) - } else if line.Error != "" { - fmt.Fprintf(w.output, "error: %s\r", line.Error) - } - if line.Status != "" { - fmt.Fprintln(w.output, line.Status) - } - return nil +// digestWriter consumes stream of json messages from docker client push +// operation and looks for digest of the pushed image. +type digestWriter struct { + *pushWriter + Digest string } -func (w *simpleProgressWriter) Close() error { - w.mutex.Lock() - defer w.mutex.Unlock() - if w.internalWriter != nil { - return w.internalWriter.Close() - } - return nil +func newDigestWriter() *digestWriter { + dw := digestWriter{} + dw.pushWriter = newPushWriter(func(line progressLine) error { + if line.Error != "" { + return errors.New(line.Error) + } + if len(dw.Digest) == 0 && len(line.Aux.Digest) > 0 { + dw.Digest = line.Aux.Digest + } + return nil + }) + return &dw } diff --git a/pkg/build/builder/dockerutil_test.go b/pkg/build/builder/dockerutil_test.go index 1a40039aaf8e..8c11df0b857f 100644 --- a/pkg/build/builder/dockerutil_test.go +++ b/pkg/build/builder/dockerutil_test.go @@ -9,6 +9,7 @@ import ( "path/filepath" "reflect" "regexp" + "strings" "testing" "github.com/fsouza/go-dockerclient" @@ -180,10 +181,116 @@ func TestPushImage(t *testing.T) { defer func() { fooBarRunTimes = 0 }() } +func TestPushWriter(t *testing.T) { + tests := []struct { + Writes []string + Expected []progressLine + ExpectErr string + }{ + { + Writes: []string{"", "\n"}, + Expected: []progressLine{}, + }, + { + // The writer doesn't know if another write is coming or not so + // this is not an error. + Writes: []string{"{"}, + Expected: []progressLine{}, + }, + { + Writes: []string{"{}"}, + Expected: []progressLine{{}}, + }, + { + Writes: []string{"{", "}{", "}"}, + Expected: []progressLine{{}, {}}, + }, + { + Writes: []string{" ", "{", " ", "}{}{", "}"}, + Expected: []progressLine{{}, {}, {}}, + }, + { + Writes: []string{"{}\r\n{}\r\n{}\r\n"}, + Expected: []progressLine{{}, {}, {}}, + }, + { + Writes: []string{"{\"progress\": \"1\"}\r\n{\"progress\": \"2\"}\r\n{\"progress\": \"3\"}\r\n"}, + Expected: []progressLine{ + {Progress: "1"}, + {Progress: "2"}, + {Progress: "3"}, + }, + }, + { + Writes: []string{"}"}, + ExpectErr: "invalid character", + }, + { + Writes: []string{`{"error": "happened"}`}, + ExpectErr: "happened", + }, + { + Writes: []string{`{"status": "good!"}{"`, `error": "front fell off"}`}, + ExpectErr: "front fell off", + }, + { + Writes: []string{`{"status": "good!"}{"st`, `atus": `, `"even better"}`}, + Expected: []progressLine{ + {Status: "good!"}, + {Status: "even better"}, + }, + }, + } + +main: + for i, tc := range tests { + decoded := []progressLine{} + w := newPushWriter(func(line progressLine) error { + decoded = append(decoded, line) + if line.Error != "" { + return errors.New(line.Error) + } else { + return nil + } + }) + + for _, part := range tc.Writes { + n, err := w.Write([]byte(part)) + + partLen := len([]byte(part)) + if n != partLen { + t.Errorf("[%d] Wrote %d bytes but Write() returned %d", i, partLen, n) + continue main + } + + if err != nil { + if tc.ExpectErr != "" && !strings.Contains(err.Error(), tc.ExpectErr) { + t.Errorf("[%d] Expected error: %s, got: %s", i, tc.ExpectErr, err) + } + if tc.ExpectErr == "" { + t.Errorf("[%d] Unexpected error: %s", i, err) + } + continue main + } + } + + if tc.ExpectErr != "" { + t.Errorf("[%d] Expected error %q, got none", i, tc.ExpectErr) + continue main + } + + if !reflect.DeepEqual(tc.Expected, decoded) { + t.Errorf("[%d] Expected: %#v\nGot: %#v\n", i, tc.Expected, decoded) + continue main + } + } +} + func TestPushImageDigests(t *testing.T) { tests := []struct { - Filename string - Expected string + Filename string + Expected string + ExpectErr bool }{ { Filename: "docker-push-1.10.txt", @@ -206,16 +313,16 @@ func TestPushImageDigests(t *testing.T) { Expected: "sha256:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", }, { - Filename: "docker-push-malformed1.txt", - Expected: "", + Filename: "docker-push-malformed1.txt", + ExpectErr: true, }, { - Filename: "docker-push-malformed2.txt", - Expected: "", + Filename: "docker-push-malformed2.txt", + ExpectErr: true, }, { - Filename: "docker-push-malformed3.txt", - Expected: "sha256:29f5d56d12684887bdfa50dcd29fc31eea4aaf4ad3bec43daf19026a7ce69912", + Filename: "docker-push-malformed3.txt", + ExpectErr: true, }, { Filename: "empty.txt", @@ -237,7 +344,7 @@ func TestPushImageDigests(t *testing.T) { _, err = io.Copy(opts.OutputStream, fh) if err != nil { - return fmt.Errorf("Failed to process output stream: %s", err) + return fmt.Errorf("[%s] Failed to process output stream for: %s", tc.Filename, err) } return nil @@ -245,14 +352,17 @@ func TestPushImageDigests(t *testing.T) { testAuth := docker.AuthConfiguration{} digest, err := pushImage(fakeDocker, "testimage/"+tc.Filename, testAuth) - if err != nil { + if err != nil && !tc.ExpectErr { t.Errorf("[%s] Unexpected error: %v", tc.Filename, err) - break + continue + } else if err == nil && tc.ExpectErr { + t.Errorf("[%s] Expected error, got success", tc.Filename) + continue } if digest != tc.Expected { t.Errorf("[%s] Digest mismatch: expected %q, got %q", tc.Filename, tc.Expected, digest) - break + continue } } @@ -260,8 +370,9 @@ func TestPushImageDigests(t *testing.T) { func TestSimpleProgress(t *testing.T) { tests := []struct { - Filename string - Expected string + Filename string + Expected string + ExpectErr bool }{ { Filename: "docker-push-1.10.txt", @@ -284,16 +395,19 @@ func TestSimpleProgress(t *testing.T) { Expected: "(?ms)The push.*Preparing.*Pushing.*Pushed.*digest", }, { - Filename: "docker-push-malformed1.txt", - Expected: "(?ms)The push", + Filename: "docker-push-malformed1.txt", + Expected: "(?ms)The push", + ExpectErr: true, }, { - Filename: "docker-push-malformed2.txt", - Expected: "(?ms)The push.*Preparing.*Pushing.*Pushed.*digest", + Filename: "docker-push-malformed2.txt", + Expected: "(?ms)The push.*Preparing.*Pushing.*Pushed.*digest", + ExpectErr: true, }, { - Filename: "docker-push-malformed3.txt", - Expected: "(?ms)The push.*Preparing.*Pushing.*Pushed.*digest", + Filename: "docker-push-malformed3.txt", + Expected: "(?ms)The push.*Preparing.*Pushing.*Pushed.*digest", + ExpectErr: true, }, { Filename: "empty.txt", @@ -305,21 +419,23 @@ func TestSimpleProgress(t *testing.T) { fh, err := os.Open(filepath.Join("testdata", tc.Filename)) if err != nil { t.Errorf("Cannot open %q: %s", tc.Filename, err) - break + continue } output := &bytes.Buffer{} writer := newSimpleWriter(output) _, err = io.Copy(writer, fh) - if err != nil { + if err != nil && !tc.ExpectErr { t.Errorf("Failed to process %q: %s", tc.Filename, err) - break + continue + } else if err == nil && tc.ExpectErr { + t.Errorf("Expected error for %q, got success", tc.Filename) } if outputStr := output.String(); !regexp.MustCompile(tc.Expected).MatchString(outputStr) { t.Errorf("%s: expected %q, got:\n%s\n", tc.Filename, tc.Expected, outputStr) - break + continue } } }