Skip to content

Commit

Permalink
Terminate all MPV instances when stopping Navidrome (#3008)
Browse files Browse the repository at this point in the history
* Terminate all mpv instances when stopping Navidrome

* Exit trackSwitcher goroutine when terminating

* Remove potential race condition when starting the Playback device

* Fix lint error

* Removed unused and unneeded vars/functions

* Use device short name in log

* Small refactor

* Small nitpick

* Make start functions more uniform
  • Loading branch information
deluan committed May 9, 2024
1 parent 677d994 commit 6408dda
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 79 deletions.
19 changes: 8 additions & 11 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,9 @@ func runNavidrome() {
g.Go(startServer(ctx))
g.Go(startSignaller(ctx))
g.Go(startScheduler(ctx))
g.Go(startPlaybackServer(ctx))
g.Go(schedulePeriodicScan(ctx))

if conf.Server.Jukebox.Enabled {
g.Go(startPlaybackServer(ctx))
}

if err := g.Wait(); err != nil && !errors.Is(err, interrupted) {
log.Error("Fatal error in Navidrome. Aborting", err)
}
Expand Down Expand Up @@ -151,21 +148,21 @@ func schedulePeriodicScan(ctx context.Context) func() error {
}

func startScheduler(ctx context.Context) func() error {
log.Info(ctx, "Starting scheduler")
schedulerInstance := scheduler.GetInstance()

return func() error {
log.Info(ctx, "Starting scheduler")
schedulerInstance := scheduler.GetInstance()
schedulerInstance.Run(ctx)
return nil
}
}

func startPlaybackServer(ctx context.Context) func() error {
log.Info(ctx, "Starting playback server")

playbackInstance := GetPlaybackServer()

return func() error {
if !conf.Server.Jukebox.Enabled {
return nil
}
log.Info(ctx, "Starting playback server")
playbackInstance := GetPlaybackServer()
return playbackInstance.Run(ctx)
}
}
Expand Down
56 changes: 30 additions & 26 deletions core/playback/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"

"github.com/navidrome/navidrome/core/playback/mpv"
"github.com/navidrome/navidrome/log"
Expand All @@ -22,6 +23,7 @@ type Track interface {
}

type playbackDevice struct {
serviceCtx context.Context
ParentPlaybackServer PlaybackServer
Default bool
User string
Expand All @@ -31,7 +33,7 @@ type playbackDevice struct {
Gain float32
PlaybackDone chan bool
ActiveTrack Track
TrackSwitcherStarted bool
startTrackSwitcher sync.Once
}

type DeviceStatus struct {
Expand All @@ -43,8 +45,6 @@ type DeviceStatus struct {

const DefaultGain float32 = 1.0

var EmptyStatus = DeviceStatus{CurrentIndex: -1, Playing: false, Gain: DefaultGain, Position: 0}

func (pd *playbackDevice) getStatus() DeviceStatus {
pos := 0
if pd.ActiveTrack != nil {
Expand All @@ -61,16 +61,16 @@ func (pd *playbackDevice) getStatus() DeviceStatus {
// NewPlaybackDevice creates a new playback device which implements all the basic Jukebox mode commands defined here:
// http://www.subsonic.org/pages/api.jsp#jukeboxControl
// Starts the trackSwitcher goroutine for the device.
func NewPlaybackDevice(playbackServer PlaybackServer, name string, deviceName string) *playbackDevice {
func NewPlaybackDevice(ctx context.Context, playbackServer PlaybackServer, name string, deviceName string) *playbackDevice {
return &playbackDevice{
serviceCtx: ctx,
ParentPlaybackServer: playbackServer,
User: "",
Name: name,
DeviceName: deviceName,
Gain: DefaultGain,
PlaybackQueue: NewQueue(),
PlaybackDone: make(chan bool),
TrackSwitcherStarted: false,
}
}

Expand Down Expand Up @@ -103,14 +103,13 @@ func (pd *playbackDevice) Set(ctx context.Context, ids []string) (DeviceStatus,
func (pd *playbackDevice) Start(ctx context.Context) (DeviceStatus, error) {
log.Debug(ctx, "Processing Start action", "device", pd)

if !pd.TrackSwitcherStarted {
pd.startTrackSwitcher.Do(func() {
log.Info(ctx, "Starting trackSwitcher goroutine")
// Start one trackSwitcher goroutine with each device
go func() {
pd.trackSwitcherGoroutine()
}()
pd.TrackSwitcherStarted = true
}
})

if pd.ActiveTrack != nil {
if pd.isPlaying() {
Expand Down Expand Up @@ -255,25 +254,30 @@ func (pd *playbackDevice) isPlaying() bool {
func (pd *playbackDevice) trackSwitcherGoroutine() {
log.Debug("Started trackSwitcher goroutine", "device", pd)
for {
<-pd.PlaybackDone
log.Debug("Track switching detected")
if pd.ActiveTrack != nil {
pd.ActiveTrack.Close()
pd.ActiveTrack = nil
}

if !pd.PlaybackQueue.IsAtLastElement() {
pd.PlaybackQueue.IncreaseIndex()
log.Debug("Switching to next song", "queue", pd.PlaybackQueue.String())
err := pd.switchActiveTrackByIndex(pd.PlaybackQueue.Index)
if err != nil {
log.Error("Error switching track", err)
}
select {
case <-pd.PlaybackDone:
log.Debug("Track switching detected")
if pd.ActiveTrack != nil {
pd.ActiveTrack.Unpause()
pd.ActiveTrack.Close()
pd.ActiveTrack = nil
}
} else {
log.Debug("There is no song left in the playlist. Finish.")

if !pd.PlaybackQueue.IsAtLastElement() {
pd.PlaybackQueue.IncreaseIndex()
log.Debug("Switching to next song", "queue", pd.PlaybackQueue.String())
err := pd.switchActiveTrackByIndex(pd.PlaybackQueue.Index)
if err != nil {
log.Error("Error switching track", err)
}
if pd.ActiveTrack != nil {
pd.ActiveTrack.Unpause()
}
} else {
log.Debug("There is no song left in the playlist. Finish.")
}
case <-pd.serviceCtx.Done():
log.Debug("Stopping trackSwitcher goroutine", "device", pd.Name)
return
}
}
}
Expand All @@ -285,7 +289,7 @@ func (pd *playbackDevice) switchActiveTrackByIndex(index int) error {
return errors.New("could not get current track")
}

track, err := mpv.NewTrack(pd.PlaybackDone, pd.DeviceName, *currentTrack)
track, err := mpv.NewTrack(pd.serviceCtx, pd.PlaybackDone, pd.DeviceName, *currentTrack)
if err != nil {
return err
}
Expand Down
9 changes: 3 additions & 6 deletions core/playback/mpv/mpv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
"github.com/navidrome/navidrome/log"
)

func start(args []string) (Executor, error) {
func start(ctx context.Context, args []string) (Executor, error) {
log.Debug("Executing mpv command", "cmd", args)
j := Executor{args: args}
j.PipeReader, j.out = io.Pipe()
err := j.start()
err := j.start(ctx)
if err != nil {
return Executor{}, err
}
Expand All @@ -38,12 +38,9 @@ type Executor struct {
out *io.PipeWriter
args []string
cmd *exec.Cmd
ctx context.Context
}

func (j *Executor) start() error {
ctx := context.Background()
j.ctx = ctx
func (j *Executor) start(ctx context.Context) error {
cmd := exec.CommandContext(ctx, j.args[0], j.args[1:]...) // #nosec
cmd.Stdout = j.out
if log.IsGreaterOrEqualTo(log.LevelTrace) {
Expand Down
9 changes: 5 additions & 4 deletions core/playback/mpv/track.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package mpv
// https://mpv.io/manual/master/#properties

import (
"context"
"fmt"
"os"
"time"
Expand All @@ -24,7 +25,7 @@ type MpvTrack struct {
CloseCalled bool
}

func NewTrack(playbackDoneChannel chan bool, deviceName string, mf model.MediaFile) (*MpvTrack, error) {
func NewTrack(ctx context.Context, playbackDoneChannel chan bool, deviceName string, mf model.MediaFile) (*MpvTrack, error) {
log.Debug("Loading track", "trackPath", mf.Path, "mediaType", mf.ContentType())

if _, err := mpvCommand(); err != nil {
Expand All @@ -34,7 +35,7 @@ func NewTrack(playbackDoneChannel chan bool, deviceName string, mf model.MediaFi
tmpSocketName := socketName("mpv-ctrl-", ".socket")

args := createMPVCommand(deviceName, mf.Path, tmpSocketName)
exe, err := start(args)
exe, err := start(ctx, args)
if err != nil {
log.Error("Error starting mpv process", err)
return nil, err
Expand Down Expand Up @@ -110,13 +111,13 @@ func (t *MpvTrack) Close() {
log.Debug("sending shutdown command")
_, err := t.Conn.Call("quit")
if err != nil {
log.Error("Error sending quit command to mpv-ipc socket", err)
log.Warn("Error sending quit command to mpv-ipc socket", err)

if t.Exe != nil {
log.Debug("cancelling executor")
err = t.Exe.Cancel()
if err != nil {
log.Error("Error canceling executor", err)
log.Warn("Error canceling executor", err)
}
}
}
Expand Down
31 changes: 13 additions & 18 deletions core/playback/playbackserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type PlaybackServer interface {
Run(ctx context.Context) error
GetDeviceForUser(user string) (*playbackDevice, error)
GetMediaFile(id string) (*model.MediaFile, error)
GetCtx() *context.Context
}

type playbackServer struct {
Expand All @@ -37,45 +36,41 @@ func GetInstance(ds model.DataStore) PlaybackServer {

// Run starts the playback server which serves request until canceled using the given context
func (ps *playbackServer) Run(ctx context.Context) error {
devices, err := ps.initDeviceStatus(conf.Server.Jukebox.Devices, conf.Server.Jukebox.Default)
ps.playbackDevices = devices
ps.ctx = &ctx

devices, err := ps.initDeviceStatus(ctx, conf.Server.Jukebox.Devices, conf.Server.Jukebox.Default)
if err != nil {
return err
}
ps.playbackDevices = devices
log.Info(ctx, fmt.Sprintf("%d audio devices found", len(devices)))

defaultDevice, _ := ps.getDefaultDevice()

log.Info(ctx, "Using audio device: "+defaultDevice.DeviceName)

ps.ctx = &ctx

<-ctx.Done()
return nil
}

// GetCtx produces the context this server was started with. Used for data-retrieval and cancellation
func (ps *playbackServer) GetCtx() *context.Context {
return ps.ctx
// Should confirm all subprocess are terminated before returning
return nil
}

func (ps *playbackServer) initDeviceStatus(devices []conf.AudioDeviceDefinition, defaultDevice string) ([]playbackDevice, error) {
func (ps *playbackServer) initDeviceStatus(ctx context.Context, devices []conf.AudioDeviceDefinition, defaultDevice string) ([]playbackDevice, error) {
pbDevices := make([]playbackDevice, max(1, len(devices)))
defaultDeviceFound := false

if defaultDevice == "" {
// if there are no devices given and no default device, we create a synthetic device named "auto"
if len(devices) == 0 {
pbDevices[0] = *NewPlaybackDevice(ps, "auto", "auto")
pbDevices[0] = *NewPlaybackDevice(ctx, ps, "auto", "auto")
}

// if there is but only one entry and no default given, just use that.
if len(devices) == 1 {
if len(devices[0]) != 2 {
return []playbackDevice{}, fmt.Errorf("audio device definition ought to contain 2 fields, found: %d ", len(devices[0]))
}
pbDevices[0] = *NewPlaybackDevice(ps, devices[0][0], devices[0][1])
pbDevices[0] = *NewPlaybackDevice(ctx, ps, devices[0][0], devices[0][1])
}

if len(devices) > 1 {
Expand All @@ -91,7 +86,7 @@ func (ps *playbackServer) initDeviceStatus(devices []conf.AudioDeviceDefinition,
return []playbackDevice{}, fmt.Errorf("audio device definition ought to contain 2 fields, found: %d ", len(audioDevice))
}

pbDevices[idx] = *NewPlaybackDevice(ps, audioDevice[0], audioDevice[1])
pbDevices[idx] = *NewPlaybackDevice(ctx, ps, audioDevice[0], audioDevice[1])

if audioDevice[0] == defaultDevice {
pbDevices[idx].Default = true
Expand All @@ -106,12 +101,12 @@ func (ps *playbackServer) initDeviceStatus(devices []conf.AudioDeviceDefinition,
}

func (ps *playbackServer) getDefaultDevice() (*playbackDevice, error) {
for idx, audioDevice := range ps.playbackDevices {
if audioDevice.Default {
for idx := range ps.playbackDevices {
if ps.playbackDevices[idx].Default {
return &ps.playbackDevices[idx], nil
}
}
return &playbackDevice{}, fmt.Errorf("no default device found")
return nil, fmt.Errorf("no default device found")
}

// GetMediaFile retrieves the MediaFile given by the id parameter
Expand All @@ -125,7 +120,7 @@ func (ps *playbackServer) GetDeviceForUser(user string) (*playbackDevice, error)
// README: here we might plug-in the user-device mapping one fine day
device, err := ps.getDefaultDevice()
if err != nil {
return &playbackDevice{}, err
return nil, err
}
device.User = user
return device, nil
Expand Down
14 changes: 0 additions & 14 deletions core/playback/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,3 @@ func (pd *Queue) IncreaseIndex() {
pd.SetIndex(pd.Index + 1)
}
}

func max(x, y int) int {
if x < y {
return y
}
return x
}

func min(x, y int) int {
if x > y {
return y
}
return x
}

0 comments on commit 6408dda

Please sign in to comment.