-
Notifications
You must be signed in to change notification settings - Fork 109
/
ffmpeg.go
175 lines (157 loc) · 5.63 KB
/
ffmpeg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// Package ffmpeg provides an implementation for an ffmpeg based camera
package ffmpeg
import (
"context"
"errors"
"image"
"image/jpeg"
"io"
"os/exec"
"sync"
"sync/atomic"
"github.com/edaniels/golog"
ffmpeg "github.com/u2takey/ffmpeg-go"
"github.com/viamrobotics/gostream"
"go.uber.org/zap"
"go.uber.org/zap/zapio"
viamutils "go.viam.com/utils"
"go.viam.com/rdk/components/camera"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/rimage/transform"
)
// Config is the attribute struct for ffmpeg cameras.
type Config struct {
resource.TriviallyValidateConfig
CameraParameters *transform.PinholeCameraIntrinsics `json:"intrinsic_parameters,omitempty"`
DistortionParameters *transform.BrownConrady `json:"distortion_parameters,omitempty"`
Debug bool `json:"debug,omitempty"`
VideoPath string `json:"video_path"`
InputKWArgs map[string]interface{} `json:"input_kw_args,omitempty"`
Filters []FilterConfig `json:"filters,omitempty"`
OutputKWArgs map[string]interface{} `json:"output_kw_args,omitempty"`
}
// FilterConfig is a struct to used to configure ffmpeg filters.
type FilterConfig struct {
Name string `json:"name"`
Args []string `json:"args"`
KWArgs map[string]interface{} `json:"kw_args"`
}
var model = resource.DefaultModelFamily.WithModel("ffmpeg")
func init() {
resource.RegisterComponent(camera.API, model, resource.Registration[camera.Camera, *Config]{
Constructor: func(ctx context.Context, _ resource.Dependencies, conf resource.Config, logger golog.Logger) (camera.Camera, error) {
newConf, err := resource.NativeConfig[*Config](conf)
if err != nil {
return nil, err
}
src, err := NewFFMPEGCamera(ctx, newConf, logger)
if err != nil {
return nil, err
}
return camera.FromVideoSource(conf.ResourceName(), src), nil
},
})
}
type ffmpegCamera struct {
gostream.VideoReader
cancelFunc context.CancelFunc
activeBackgroundWorkers sync.WaitGroup
inClose func() error
outClose func() error
}
// NewFFMPEGCamera instantiates a new camera which leverages ffmpeg to handle a variety of potential video types.
func NewFFMPEGCamera(ctx context.Context, conf *Config, logger golog.Logger) (camera.VideoSource, error) {
// make sure ffmpeg is in the path before doing anything else
if _, err := exec.LookPath("ffmpeg"); err != nil {
return nil, err
}
// parse attributes into ffmpeg keyword maps
outArgs := make(map[string]interface{}, len(conf.OutputKWArgs))
for key, value := range conf.OutputKWArgs {
outArgs[key] = value
}
outArgs["update"] = 1 // always interpret the filename as just a filename, not a pattern
outArgs["format"] = "image2" // select image file muxer, used to write video frames to image files
// instantiate camera with cancellable context that will be applied to all spawned processes
cancelableCtx, cancel := context.WithCancel(context.Background())
ffCam := &ffmpegCamera{cancelFunc: cancel}
// launch thread to run ffmpeg and pull images from the url and put them into the pipe
in, out := io.Pipe()
// Note(erd): For some reason, when running with the race detector, we need to close the pipe
// even if we kill the process in order for the underlying command Wait to complete.
ffCam.inClose = in.Close
ffCam.outClose = out.Close
writer := &zapio.Writer{Log: logger.Desugar(), Level: zap.DebugLevel}
ffCam.activeBackgroundWorkers.Add(1)
viamutils.ManagedGo(func() {
stream := ffmpeg.Input(conf.VideoPath, conf.InputKWArgs)
for _, filter := range conf.Filters {
stream = stream.Filter(filter.Name, filter.Args, filter.KWArgs)
}
stream = stream.Output("pipe:", outArgs)
stream.Context = cancelableCtx
cmd := stream.WithOutput(out).WithErrorOutput(writer).Compile()
if err := cmd.Run(); err != nil {
if viamutils.FilterOutError(err, context.Canceled) == nil ||
viamutils.FilterOutError(err, context.DeadlineExceeded) == nil {
return
}
if cmd.ProcessState.ExitCode() != 0 {
panic(err)
}
}
}, func() {
viamutils.UncheckedErrorFunc(writer.Close)
cancel()
ffCam.activeBackgroundWorkers.Done()
})
// launch thread to consume images from the pipe and store the latest in shared memory
gotFirstFrame := make(chan struct{})
var latestFrame atomic.Pointer[image.Image]
var gotFirstFrameOnce bool
ffCam.activeBackgroundWorkers.Add(1)
viamutils.ManagedGo(func() {
for {
if cancelableCtx.Err() != nil {
return
}
img, err := jpeg.Decode(in)
if err != nil {
continue
}
latestFrame.Store(&img)
if !gotFirstFrameOnce {
close(gotFirstFrame)
gotFirstFrameOnce = true
}
}
}, ffCam.activeBackgroundWorkers.Done)
// when next image is requested simply load the image from where it is stored in shared memory
reader := gostream.VideoReaderFunc(func(ctx context.Context) (image.Image, func(), error) {
select {
case <-cancelableCtx.Done():
return nil, nil, cancelableCtx.Err()
case <-ctx.Done():
return nil, nil, ctx.Err()
case <-gotFirstFrame:
}
latest := latestFrame.Load()
if latest == nil {
return nil, func() {}, errors.New("no frame yet")
}
return *latest, func() {}, nil
})
ffCam.VideoReader = reader
return camera.NewVideoSourceFromReader(
ctx,
ffCam,
&transform.PinholeCameraModel{PinholeCameraIntrinsics: conf.CameraParameters},
camera.ColorStream)
}
func (fc *ffmpegCamera) Close(ctx context.Context) error {
fc.cancelFunc()
viamutils.UncheckedError(fc.inClose())
viamutils.UncheckedError(fc.outClose())
fc.activeBackgroundWorkers.Wait()
return nil
}