Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions pkg/rtmp/relay_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,43 +35,42 @@ func NewRTMPRelayHandler(rtmpServer *RTMPServer) *RTMPRelayHandler {
}

func (h *RTMPRelayHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var err error
defer func() {
var psrpcErr psrpc.Error

switch {
case errors.As(err, &psrpcErr):
w.WriteHeader(psrpcErr.ToHttp())
case err == nil:
// Nothing, we already responded
default:
w.WriteHeader(http.StatusInternalServerError)
}
}()

resourceId := strings.TrimLeft(r.URL.Path, "/rtmp/")
token := r.URL.Query().Get("token")

log := logger.Logger(logger.GetLogger().WithValues("resourceID", resourceId))
log.Infow("relaying ingress")

pr, pw := io.Pipe()
done := make(chan error)
done := make(chan error, 1)

go func() {
_, err = io.Copy(w, pr)
_, err := io.Copy(w, pr)
done <- err
close(done)
}()

err = h.rtmpServer.AssociateRelay(resourceId, token, pw)
if err != nil {
if err := h.rtmpServer.AssociateRelay(resourceId, token, pw); err != nil {
// Ensure the copy goroutine exits before we respond with an error.
_ = pw.CloseWithError(err)
<-done

var psrpcErr psrpc.Error
status := http.StatusInternalServerError
if errors.As(err, &psrpcErr) {
status = psrpcErr.ToHttp()
}

log.Warnw("failed to associate RTMP relay", err, "status", status)
w.WriteHeader(status)

return
}
defer func() {
pw.Close()
h.rtmpServer.DissociateRelay(resourceId)
}()

err = <-done
if err := <-done; err != nil && !errors.Is(err, io.ErrClosedPipe) {
log.Warnw("relay stream ended with error", err)
}
}
Loading