diff --git a/pkg/rtmp/relay_handler.go b/pkg/rtmp/relay_handler.go index 3628ebac..8635aef3 100644 --- a/pkg/rtmp/relay_handler.go +++ b/pkg/rtmp/relay_handler.go @@ -35,20 +35,6 @@ func NewRTMPRelayHandler(rtmpServer *RTMPServer) *RTMPRelayHandler { } func (h *RTMPRelayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - var err error - defer func() { - var psrpcErr psrpc.Error - - switch { - case errors.As(err, &psrpcErr): - w.WriteHeader(psrpcErr.ToHttp()) - case err == nil: - // Nothing, we already responded - default: - w.WriteHeader(http.StatusInternalServerError) - } - }() - resourceId := strings.TrimLeft(r.URL.Path, "/rtmp/") token := r.URL.Query().Get("token") @@ -56,16 +42,27 @@ func (h *RTMPRelayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Infow("relaying ingress") pr, pw := io.Pipe() - done := make(chan error) + done := make(chan error, 1) go func() { - _, err = io.Copy(w, pr) + _, err := io.Copy(w, pr) done <- err - close(done) }() - err = h.rtmpServer.AssociateRelay(resourceId, token, pw) - if err != nil { + if err := h.rtmpServer.AssociateRelay(resourceId, token, pw); err != nil { + // Ensure the copy goroutine exits before we respond with an error. + _ = pw.CloseWithError(err) + <-done + + var psrpcErr psrpc.Error + status := http.StatusInternalServerError + if errors.As(err, &psrpcErr) { + status = psrpcErr.ToHttp() + } + + log.Warnw("failed to associate RTMP relay", err, "status", status) + w.WriteHeader(status) + return } defer func() { @@ -73,5 +70,7 @@ func (h *RTMPRelayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { h.rtmpServer.DissociateRelay(resourceId) }() - err = <-done + if err := <-done; err != nil && !errors.Is(err, io.ErrClosedPipe) { + log.Warnw("relay stream ended with error", err) + } }