/
stream.go
148 lines (122 loc) · 3.22 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package streamsuite
import (
"fmt"
"io"
"net"
"net/http"
"os"
"syscall"
"time"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/tellytv/telly/internal/metrics"
"github.com/tellytv/telly/internal/models"
)
var (
log = &logrus.Logger{
Out: os.Stderr,
Formatter: &logrus.TextFormatter{
FullTimestamp: true,
},
Hooks: make(logrus.LevelHooks),
Level: logrus.DebugLevel,
}
)
const (
// BufferSize is the size of the content buffer we will use.
BufferSize = 1024 * 8
)
// Stream describes a single active video stream in telly.
type Stream struct {
UUID string
Channel *models.LineupChannel
StreamURL string
Transport StreamTransport
StartTime *time.Time
PromLabels []string
StopNow chan bool `json:"-"`
LastWroteAt *time.Time
streamData io.ReadCloser
}
// Start will mark the stream as playing and begin playback.
func (s *Stream) Start(c *gin.Context) {
ctx := c.Request.Context()
now := time.Now()
s.StartTime = &now
metrics.ActiveStreams.WithLabelValues(s.PromLabels...).Inc()
log.Infoln("Transcoding stream via", s.Transport.Type())
sd, streamErr := s.Transport.Start(ctx, s.StreamURL)
if streamErr != nil {
if httpErr, ok := streamErr.(httpError); ok {
c.AbortWithError(httpErr.StatusCode, httpErr)
return
}
c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("error when starting streaming via %s: %s", s.Transport.Type(), streamErr))
return
}
s.streamData = sd
clientGone := c.Writer.CloseNotify()
for key, value := range s.Transport.Headers() {
c.Writer.Header()[key] = value
}
buffer := make([]byte, BufferSize)
writer := wrappedWriter{c.Writer}
forLoop:
for {
select {
case <-s.StopNow:
break forLoop
case <-clientGone:
case <-ctx.Done():
log.Debugln("Stream client is disconnected, returning!")
break forLoop
default:
n, err := s.streamData.Read(buffer)
if n == 0 {
log.Debugln("Read 0 bytes from stream source, returning")
break forLoop
}
if err != nil {
log.WithError(err).Errorln("Received error while reading from stream source")
break forLoop
}
data := buffer[:n]
if _, respWriteErr := writer.Write(data); respWriteErr != nil {
if respWriteErr == io.EOF || respWriteErr == io.ErrUnexpectedEOF || respWriteErr == io.ErrClosedPipe {
log.Debugln("CAUGHT IO ERR")
}
log.WithError(respWriteErr).Errorln("Error while writing to connected stream client")
break forLoop
}
c.Writer.Flush()
}
}
s.Stop()
}
// Stop will tear down the stream.
func (s *Stream) Stop() {
metrics.ActiveStreams.WithLabelValues(s.PromLabels...).Dec()
if closeErr := s.streamData.Close(); closeErr != nil {
log.WithError(closeErr).Errorf("error when closing stream via %s", s.Transport.Type())
return
}
if stopErr := s.Transport.Stop(); stopErr != nil {
log.WithError(stopErr).Errorf("error when cleaning up stream via %s", s.Transport.Type())
return
}
}
type wrappedWriter struct {
writer io.Writer
}
func (w wrappedWriter) Write(p []byte) (int, error) {
n, err := w.writer.Write(p)
if err != nil {
// Filter out broken pipe (user pressed "stop") errors
if nErr, ok := err.(*net.OpError); ok {
if nErr.Err == syscall.EPIPE {
return n, nil
}
}
}
return n, err
}