Skip to content

Commit

Permalink
Explicity write http headers on streaming endpoints
Browse files Browse the repository at this point in the history
This works around issues with the otel http handler wrapper causing
multiple calls to `WriteHeader` when a `Flush` is called before `Write`.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
(cherry picked from commit 707ab48)
Signed-off-by: Paweł Gronowski <pawel.gronowski@docker.com>
  • Loading branch information
cpuguy83 authored and vvoland committed May 17, 2024
1 parent 8e96db1 commit 06e19ec
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 18 deletions.
7 changes: 6 additions & 1 deletion api/server/httputils/write_log_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"net/http"
"net/url"
"sort"

Expand All @@ -16,7 +17,11 @@ import (

// WriteLogStream writes an encoded byte stream of log messages from the
// messages channel, multiplexing them with a stdcopy.Writer if mux is true
func WriteLogStream(_ context.Context, w io.Writer, msgs <-chan *backend.LogMessage, config *container.LogsOptions, mux bool) {
func WriteLogStream(_ context.Context, w http.ResponseWriter, msgs <-chan *backend.LogMessage, config *container.LogsOptions, mux bool) {
// See https://github.com/moby/moby/issues/47448
// Trigger headers to be written immediately.
w.WriteHeader(http.StatusOK)

wf := ioutils.NewWriteFlusher(w)
defer wf.Close()

Expand Down
15 changes: 12 additions & 3 deletions api/server/router/container/container_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,18 @@ func (s *containerRouter) getContainersStats(ctx context.Context, w http.Respons
}

return s.backend.ContainerStats(ctx, vars["name"], &backend.ContainerStatsConfig{
Stream: stream,
OneShot: oneShot,
OutStream: w,
Stream: stream,
OneShot: oneShot,
OutStream: func() io.Writer {
// Assume that when this is called the request is OK.
w.WriteHeader(http.StatusOK)
if !stream {
return w
}
wf := ioutils.NewWriteFlusher(w)
wf.Flush()
return wf
},
})
}

Expand Down
1 change: 1 addition & 0 deletions api/server/router/system/system_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
output := ioutils.NewWriteFlusher(w)
defer output.Close()
output.Flush()
Expand Down
2 changes: 1 addition & 1 deletion api/types/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type LogSelector struct {
type ContainerStatsConfig struct {
Stream bool
OneShot bool
OutStream io.Writer
OutStream func() io.Writer
}

// ExecInspect holds information about a running process started
Expand Down
18 changes: 5 additions & 13 deletions daemon/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/docker/docker/api/types/backend"
"github.com/docker/docker/container"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/ioutils"
)

// ContainerStats writes information about the container to the stream
Expand All @@ -27,9 +26,11 @@ func (daemon *Daemon) ContainerStats(ctx context.Context, prefixOrName string, c
return errdefs.InvalidParameter(errors.New("cannot have stream=true and one-shot=true"))
}

enc := json.NewEncoder(config.OutStream())

// If the container is either not running or restarting and requires no stream, return an empty stats.
if (!ctr.IsRunning() || ctr.IsRestarting()) && !config.Stream {
return json.NewEncoder(config.OutStream).Encode(&types.StatsJSON{
return enc.Encode(&types.StatsJSON{
Name: ctr.Name,
ID: ctr.ID,
})
Expand All @@ -41,15 +42,7 @@ func (daemon *Daemon) ContainerStats(ctx context.Context, prefixOrName string, c
if err != nil {
return err
}
return json.NewEncoder(config.OutStream).Encode(stats)
}

outStream := config.OutStream
if config.Stream {
wf := ioutils.NewWriteFlusher(outStream)
defer wf.Close()
wf.Flush()
outStream = wf
return enc.Encode(stats)
}

var preCPUStats types.CPUStats
Expand All @@ -65,12 +58,11 @@ func (daemon *Daemon) ContainerStats(ctx context.Context, prefixOrName string, c
return &ss
}

enc := json.NewEncoder(outStream)

updates := daemon.subscribeToContainerStats(ctr)
defer daemon.unsubscribeToContainerStats(ctr, updates)

noStreamFirstFrame := !config.OneShot

for {
select {
case v, ok := <-updates:
Expand Down

0 comments on commit 06e19ec

Please sign in to comment.