Skip to content

Commit

Permalink
Merge pull request #45 from jim-minter/cleanup_goroutine
Browse files Browse the repository at this point in the history
cleanup goroutine properly in imageprogress.NewPullWriter
  • Loading branch information
smarterclayton committed Jun 2, 2017
2 parents 4ac37bb + a873739 commit c3e2e96
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 81 deletions.
7 changes: 5 additions & 2 deletions dockerclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
Expand All @@ -22,7 +23,6 @@ import (

"github.com/openshift/imagebuilder"
"github.com/openshift/imagebuilder/imageprogress"
"io/ioutil"
)

// NewClientFromEnv is exposed to simplify getting a client when vendoring this library.
Expand Down Expand Up @@ -486,10 +486,13 @@ func (e *ClientExecutor) LoadImage(from string) (*docker.Image, error) {
}
for _, config := range auth {
// TODO: handle IDs?
pullWriter := imageprogress.NewPullWriter(outputProgress)
defer pullWriter.Close()

pullImageOptions := docker.PullImageOptions{
Repository: repository,
Tag: tag,
OutputStream: imageprogress.NewPullWriter(outputProgress),
OutputStream: pullWriter,
RawJSONStream: true,
}
if glog.V(5) {
Expand Down
57 changes: 23 additions & 34 deletions imageprogress/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package imageprogress
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"regexp"
Expand Down Expand Up @@ -126,10 +125,10 @@ func (r report) String() string {
// on pull/push progress of a Docker image. It only reports when the state of the
// different layers has changed and uses time thresholds to limit the
// rate of the reports.
func newWriter(reportFn func(report), layersChangedFn func(report, report) bool) io.Writer {
func newWriter(reportFn func(report), layersChangedFn func(report, report) bool) io.WriteCloser {
writer := &imageProgressWriter{
mutex: &sync.Mutex{},
layerStatus: map[string]progressLine{},
layerStatus: map[string]*progressLine{},
reportFn: reportFn,
layersChangedFn: layersChangedFn,
progressTimeThreshhold: defaultProgressTimeThreshhold,
Expand All @@ -140,8 +139,8 @@ func newWriter(reportFn func(report), layersChangedFn func(report, report) bool)

type imageProgressWriter struct {
mutex *sync.Mutex
internalWriter io.Writer
layerStatus map[string]progressLine
internalWriter io.WriteCloser
layerStatus map[string]*progressLine
lastLayerCount int
stableLines int
stableThreshhold int
Expand All @@ -152,11 +151,6 @@ type imageProgressWriter struct {
layersChangedFn func(report, report) bool
}

func (w *imageProgressWriter) ReadFrom(reader io.Reader) (int64, error) {
decoder := json.NewDecoder(reader)
return 0, w.readProgress(decoder)
}

func (w *imageProgressWriter) Write(data []byte) (int, error) {
w.mutex.Lock()
defer w.mutex.Unlock()
Expand All @@ -174,6 +168,17 @@ func (w *imageProgressWriter) Write(data []byte) (int, error) {
return w.internalWriter.Write(data)
}

func (w *imageProgressWriter) Close() error {
w.mutex.Lock()
defer w.mutex.Unlock()

if w.internalWriter == nil {
return nil
}

return w.internalWriter.Close()
}

func (w *imageProgressWriter) readProgress(decoder *json.Decoder) error {
for {
line := &progressLine{}
Expand All @@ -184,31 +189,23 @@ func (w *imageProgressWriter) readProgress(decoder *json.Decoder) error {
if err != nil {
return err
}
err = w.processLine(line)
if err != nil {
return err
}
w.processLine(line)
}
return nil
}

func (w *imageProgressWriter) processLine(line *progressLine) error {

if err := getError(line); err != nil {
return err
}

func (w *imageProgressWriter) processLine(line *progressLine) {
// determine if it's a line we want to process
if !islayerStatus(line) {
return nil
return
}

w.layerStatus[line.ID] = *line
w.layerStatus[line.ID] = line

// if the number of layers has not stabilized yet, return and wait for more
// progress
if !w.isStableLayerCount() {
return nil
return
}

r := createReport(w.layerStatus)
Expand All @@ -218,7 +215,7 @@ func (w *imageProgressWriter) processLine(line *progressLine) error {
w.lastReport = r
w.lastReportTime = time.Now()
w.reportFn(r)
return nil
return
}
// If layer counts haven't changed, but enough time has passed (30 sec by default),
// at least report on download/push progress
Expand All @@ -227,7 +224,6 @@ func (w *imageProgressWriter) processLine(line *progressLine) error {
w.lastReportTime = time.Now()
w.reportFn(r)
}
return nil
}

func (w *imageProgressWriter) isStableLayerCount() bool {
Expand All @@ -248,7 +244,7 @@ func (w *imageProgressWriter) isStableLayerCount() bool {
return true
}

var layerIDRegexp = regexp.MustCompile("^[a-f,0-9]*$")
var layerIDRegexp = regexp.MustCompile("^[a-f0-9]*$")

func islayerStatus(line *progressLine) bool {
// ignore status lines with no layer id
Expand All @@ -266,14 +262,7 @@ func islayerStatus(line *progressLine) bool {
return true
}

func getError(line *progressLine) error {
if len(line.Error) > 0 {
return errors.New(line.Error)
}
return nil
}

func createReport(dockerProgress map[string]progressLine) report {
func createReport(dockerProgress map[string]*progressLine) report {
r := report{}
for _, line := range dockerProgress {
layerStatus := layerStatusFromDockerString(line.Status)
Expand Down
56 changes: 13 additions & 43 deletions imageprogress/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import (

func TestReports(t *testing.T) {
tests := []struct {
name string
gen func(*progressGenerator)
errExpected bool
expected report
name string
gen func(*progressGenerator)
expected report
}{
{
name: "simple report",
Expand Down Expand Up @@ -53,12 +52,17 @@ func TestReports(t *testing.T) {
},
},
{
name: "detect error",
name: "ignore error",
gen: func(p *progressGenerator) {
p.status("1", "Downloading")
p.status("2", "Pull complete")
p.status("1", "Downloading")
p.err("an error")
},
errExpected: true,
expected: report{
statusDownloading: &layerDetail{Count: 1},
statusComplete: &layerDetail{Count: 1},
},
},
}

Expand All @@ -81,13 +85,7 @@ func TestReports(t *testing.T) {
w.(*imageProgressWriter).stableThreshhold = 0
_, err := io.Copy(w, pipeIn)
if err != nil {
if !test.errExpected {
t.Errorf("%s: unexpected: %v", test.name, err)
}
continue
}
if test.errExpected {
t.Errorf("%s: did not get expected error", test.name)
t.Errorf("%s: unexpected: %v", test.name, err)
continue
}
if !compareReport(lastReport, test.expected) {
Expand All @@ -96,34 +94,6 @@ func TestReports(t *testing.T) {
}
}

func TestErrorOnCopy(t *testing.T) {
// Producer pipe
genIn, genOut := io.Pipe()
p := newProgressGenerator(genOut)

// generate some data
go func() {
for i := 0; i < 100; i++ {
p.status("1", "Downloading")
p.status("2", "Downloading")
p.status("3", "Downloading")
}
p.err("data error")
genOut.Close()
}()

w := newWriter(func(r report) {}, func(a, b report) bool { return true })

// Ensure that the error is propagated to the copy
_, err := io.Copy(w, genIn)
if err == nil {
t.Errorf("Did not get an error when copying to writer")
}
if err.Error() != "data error" {
t.Errorf("Did not get expected error: %v", err)
}
}

func TestStableLayerCount(t *testing.T) {

tests := []struct {
Expand Down Expand Up @@ -161,10 +131,10 @@ func TestStableLayerCount(t *testing.T) {
for _, test := range tests {
w := newWriter(func(report) {}, func(a, b report) bool { return true }).(*imageProgressWriter)
w.lastLayerCount = test.lastLayerCount
w.layerStatus = map[string]progressLine{}
w.layerStatus = map[string]*progressLine{}
w.stableThreshhold = test.stableThreshhold
for i := 0; i < test.layerStatusCount; i++ {
w.layerStatus[strconv.Itoa(i)] = progressLine{}
w.layerStatus[strconv.Itoa(i)] = &progressLine{}
}
var result bool
for i := 0; i < test.callCount; i++ {
Expand Down
2 changes: 1 addition & 1 deletion imageprogress/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// on pull progress of a Docker image. It only reports when the state of the
// different layers has changed and uses time thresholds to limit the
// rate of the reports.
func NewPullWriter(printFn func(string)) io.Writer {
func NewPullWriter(printFn func(string)) io.WriteCloser {
return newWriter(pullReporter(printFn), pullLayersChanged)
}

Expand Down
2 changes: 1 addition & 1 deletion imageprogress/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
// on push progress of a Docker image. It only reports when the state of the
// different layers has changed and uses time thresholds to limit the
// rate of the reports.
func NewPushWriter(printFn func(string)) io.Writer {
func NewPushWriter(printFn func(string)) io.WriteCloser {
return newWriter(pushReporter(printFn), pushLayersChanged)
}

Expand Down

0 comments on commit c3e2e96

Please sign in to comment.