Skip to content

Commit

Permalink
Fix panic when retrying upload-product
Browse files Browse the repository at this point in the history
[#160181845] pivotal-cf/om #240: Intermittent `POST .../api/v0/available_products: EOF` when uploading tile
  • Loading branch information
ljfranklin authored and jtarchie committed Dec 19, 2018
1 parent 35e67c9 commit cce358e
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 15 deletions.
29 changes: 29 additions & 0 deletions network/fakes/progress_bar.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 23 additions & 1 deletion network/progress_client.go
Expand Up @@ -13,6 +13,7 @@ type progressBar interface {
Start()
Finish()
SetTotal64(int64)
Reset()
NewProxyReader(io.Reader) io.ReadCloser
}

Expand Down Expand Up @@ -44,17 +45,31 @@ func (pc ProgressClient) Do(req *http.Request) (*http.Response, error) {
duration = time.Second
}

// reset bar in case request is being retried
pc.progressBar.Reset()

tl := progress.NewTickingLogger(pc.liveWriter, duration)

startedTicker := make(chan bool)

switch req.Method {
case "POST", "PUT":
req.Body = progress.NewReadCloser(req.Body, pc.progressBar, tl.Start)
req.Body = progress.NewReadCloser(req.Body, pc.progressBar, func() {
tl.Start()
close(startedTicker)
})
pc.progressBar.SetTotal64(req.ContentLength)
case "GET":
tl.Start()
close(startedTicker)
}

resp, err := pc.client.Do(req)

// the req.Body is closed asynchronously, but we'll also guard against
// it never getting closed by continuing after X seconds
waitForChanWithTimeout(startedTicker, 2*time.Second)

tl.Stop()
if err != nil {
return nil, err
Expand All @@ -67,3 +82,10 @@ func (pc ProgressClient) Do(req *http.Request) (*http.Response, error) {

return resp, nil
}

func waitForChanWithTimeout(waitChan <-chan bool, timeout time.Duration) {
select {
case <-waitChan:
case <-time.After(timeout):
}
}
30 changes: 18 additions & 12 deletions network/progress_client_test.go
Expand Up @@ -33,10 +33,13 @@ var _ = Describe("ProgressClient", func() {

Describe("Do", func() {
It("makes a request to upload the product to the Ops Manager", func() {
client.DoReturns(&http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader("{}")),
}, nil)
client.DoStub = func(req *http.Request) (*http.Response, error) {
req.Body.Close()
return &http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader("{}")),
}, nil
}

progressBar.NewProxyReaderReturns(ioutil.NopCloser(strings.NewReader("some content")))

Expand All @@ -62,6 +65,8 @@ var _ = Describe("ProgressClient", func() {
Expect(err).NotTo(HaveOccurred())
Expect(string(rawReqBody)).To(Equal("some content"))

Expect(progressBar.ResetCallCount()).To(Equal(1))

Expect(progressBar.SetTotal64CallCount()).To(Equal(1))
Expect(progressBar.SetTotal64ArgsForCall(0)).To(Equal(int64(12)))

Expand Down Expand Up @@ -204,7 +209,10 @@ var _ = Describe("ProgressClient", func() {
Context("when an error occurs", func() {
Context("when the client errors performing the request", func() {
It("returns an error", func() {
client.DoReturns(&http.Response{}, errors.New("some client error"))
client.DoStub = func(req *http.Request) (*http.Response, error) {
req.Body.Close()
return &http.Response{}, errors.New("some client error")
}

req, err := http.NewRequest("POST", "/some/endpoint", strings.NewReader("some content"))
Expect(err).NotTo(HaveOccurred())
Expand All @@ -216,12 +224,10 @@ var _ = Describe("ProgressClient", func() {

Context("when server responds with timeout error before upload has finished", func() {
It("returns an error", func() {
client.DoStub = func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusRequestTimeout,
Body: ioutil.NopCloser(strings.NewReader(`something from nginx probably xml`)),
}, nil
}
client.DoReturns(&http.Response{
StatusCode: http.StatusRequestTimeout,
Body: ioutil.NopCloser(strings.NewReader(`something from nginx probably xml`)),
}, nil)

var req *http.Request
req, err := http.NewRequest("POST", "/some/endpoint", strings.NewReader("some content"))
Expand All @@ -236,7 +242,7 @@ var _ = Describe("ProgressClient", func() {
close(done)
}()

Eventually(done).Should(BeClosed())
Eventually(done, 3).Should(BeClosed())
Expect(resp.StatusCode).To(Equal(http.StatusRequestTimeout))
})
})
Expand Down
8 changes: 6 additions & 2 deletions progress/progress_bar.go
Expand Up @@ -11,12 +11,12 @@ type Bar struct {
bar *pb.ProgressBar
}

func NewBar() Bar {
func NewBar() *Bar {
bar := pb.New(0)
bar.SetUnits(pb.U_BYTES)
bar.Width = 80
bar.Output = os.Stderr
return Bar{bar}
return &Bar{bar}
}

func (b Bar) NewProxyReader(r io.Reader) io.ReadCloser {
Expand All @@ -34,3 +34,7 @@ func (b Bar) Finish() {
func (b Bar) SetTotal64(size int64) {
b.bar.Total = size
}

func (b *Bar) Reset() {
b.bar = NewBar().bar
}

0 comments on commit cce358e

Please sign in to comment.