Skip to content

Commit

Permalink
Implement Scrobble buffering/retrying
Browse files Browse the repository at this point in the history
  • Loading branch information
deluan committed Jul 2, 2021
1 parent fb183e5 commit 289da56
Show file tree
Hide file tree
Showing 17 changed files with 514 additions and 97 deletions.
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func runNavidrome() {
func startServer() (func() error, func(err error)) {
return func() error {
a := CreateServer(conf.Server.MusicFolder)
a.MountRouter("Subsonic API", consts.URLPathSubsonicAPI, CreateSubsonicAPIRouter())
a.MountRouter("Native API", consts.URLPathNativeAPI, CreateNativeAPIRouter())
a.MountRouter("Subsonic API", consts.URLPathSubsonicAPI, CreateSubsonicAPIRouter())
if conf.Server.DevEnableScrobble {
a.MountRouter("LastFM Auth", consts.URLPathNativeAPI+"/lastfm", CreateLastFMRouter())
}
Expand Down
61 changes: 34 additions & 27 deletions core/agents/lastfm/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,10 @@ func (l *lastfmAgent) callArtistGetTopTracks(ctx context.Context, artistName, mb

func (l *lastfmAgent) NowPlaying(ctx context.Context, userId string, track *model.MediaFile) error {
sk, err := l.sessionKeys.get(ctx, userId)
if err != nil {
return err
if err != nil || sk == "" {
return scrobbler.ErrNotAuthorized
}

err = l.client.UpdateNowPlaying(ctx, sk, ScrobbleInfo{
artist: track.Artist,
track: track.Title,
Expand All @@ -173,38 +174,44 @@ func (l *lastfmAgent) NowPlaying(ctx context.Context, userId string, track *mode
albumArtist: track.AlbumArtist,
})
if err != nil {
return err
log.Warn(ctx, "Last.fm client.updateNowPlaying returned error", "track", track.Title, err)
return scrobbler.ErrUnrecoverable
}
return nil
}

func (l *lastfmAgent) Scrobble(ctx context.Context, userId string, scrobbles []scrobbler.Scrobble) error {
func (l *lastfmAgent) Scrobble(ctx context.Context, userId string, s scrobbler.Scrobble) error {
sk, err := l.sessionKeys.get(ctx, userId)
if err != nil {
return err
if err != nil || sk == "" {
return scrobbler.ErrNotAuthorized
}

if s.Duration <= 30 {
log.Debug(ctx, "Skipping Last.fm scrobble for short song", "track", s.Title, "duration", s.Duration)
return nil
}
err = l.client.Scrobble(ctx, sk, ScrobbleInfo{
artist: s.Artist,
track: s.Title,
album: s.Album,
trackNumber: s.TrackNumber,
mbid: s.MbzTrackID,
duration: int(s.Duration),
albumArtist: s.AlbumArtist,
timestamp: s.TimeStamp,
})
if err == nil {
return nil
}

// TODO Implement batch scrobbling
for _, s := range scrobbles {
if s.Duration <= 30 {
log.Debug(ctx, "Skipping Last.fm scrobble for short song", "track", s.Title, "duration", s.Duration)
continue
}
err = l.client.Scrobble(ctx, sk, ScrobbleInfo{
artist: s.Artist,
track: s.Title,
album: s.Album,
trackNumber: s.TrackNumber,
mbid: s.MbzTrackID,
duration: int(s.Duration),
albumArtist: s.AlbumArtist,
timestamp: s.TimeStamp,
})
if err != nil {
return err
}
lfErr, isLastFMError := err.(*lastFMError)
if !isLastFMError {
log.Warn(ctx, "Last.fm client.scrobble returned error", "track", s.Title, err)
return scrobbler.ErrRetryLater
}
return nil
if lfErr.Code == 11 || lfErr.Code == 16 {
return scrobbler.ErrRetryLater
}
return scrobbler.ErrUnrecoverable
}

func (l *lastfmAgent) IsAuthorized(ctx context.Context, userId string) bool {
Expand Down
56 changes: 52 additions & 4 deletions core/agents/lastfm/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,19 @@ var _ = Describe("lastfmAgent", func() {
Expect(sentParams.Get("duration")).To(Equal(strconv.FormatFloat(float64(track.Duration), 'G', -1, 32)))
Expect(sentParams.Get("mbid")).To(Equal(track.MbzTrackID))
})

It("returns ErrNotAuthorized if user is not linked", func() {
err := agent.NowPlaying(ctx, "user-2", track)
Expect(err).To(MatchError(scrobbler.ErrNotAuthorized))
})
})

Describe("Scrobble", func() {
It("calls Last.fm with correct params", func() {
ts := time.Now()
scrobbles := []scrobbler.Scrobble{{MediaFile: *track, TimeStamp: ts}}
httpClient.Res = http.Response{Body: ioutil.NopCloser(bytes.NewBufferString("{}")), StatusCode: 200}

err := agent.Scrobble(ctx, "user-1", scrobbles)
err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: ts})

Expect(err).ToNot(HaveOccurred())
Expect(httpClient.SavedRequest.Method).To(Equal(http.MethodPost))
Expand All @@ -291,14 +295,58 @@ var _ = Describe("lastfmAgent", func() {

It("skips songs with less than 31 seconds", func() {
track.Duration = 29
scrobbles := []scrobbler.Scrobble{{MediaFile: *track, TimeStamp: time.Now()}}
httpClient.Res = http.Response{Body: ioutil.NopCloser(bytes.NewBufferString("{}")), StatusCode: 200}

err := agent.Scrobble(ctx, "user-1", scrobbles)
err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})

Expect(err).ToNot(HaveOccurred())
Expect(httpClient.SavedRequest).To(BeNil())
})

It("returns ErrNotAuthorized if user is not linked", func() {
err := agent.Scrobble(ctx, "user-2", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
Expect(err).To(MatchError(scrobbler.ErrNotAuthorized))
})

It("returns ErrRetryLater on error 11", func() {
httpClient.Res = http.Response{
Body: ioutil.NopCloser(bytes.NewBufferString(`{"error":11,"message":"Service Offline - This service is temporarily offline. Try again later."}`)),
StatusCode: 400,
}

err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
Expect(err).To(MatchError(scrobbler.ErrRetryLater))
})

It("returns ErrRetryLater on error 16", func() {
httpClient.Res = http.Response{
Body: ioutil.NopCloser(bytes.NewBufferString(`{"error":16,"message":"There was a temporary error processing your request. Please try again"}`)),
StatusCode: 400,
}

err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
Expect(err).To(MatchError(scrobbler.ErrRetryLater))
})

It("returns ErrRetryLater on http errors", func() {
httpClient.Res = http.Response{
Body: ioutil.NopCloser(bytes.NewBufferString(`internal server error`)),
StatusCode: 500,
}

err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
Expect(err).To(MatchError(scrobbler.ErrRetryLater))
})

It("returns ErrUnrecoverable on other errors", func() {
httpClient.Res = http.Response{
Body: ioutil.NopCloser(bytes.NewBufferString(`{"error":8,"message":"Operation failed - Something else went wrong"}`)),
StatusCode: 400,
}

err := agent.Scrobble(ctx, "user-1", scrobbler.Scrobble{MediaFile: *track, TimeStamp: time.Now()})
Expect(err).To(MatchError(scrobbler.ErrUnrecoverable))
})
})
})

Expand Down
115 changes: 115 additions & 0 deletions core/scrobbler/buffered_scrobbler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package scrobbler

import (
"context"
"time"

"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
)

func NewBufferedScrobbler(ds model.DataStore, s Scrobbler, service string) *bufferedScrobbler {
b := &bufferedScrobbler{ds: ds, wrapped: s, service: service}
b.wakeSignal = make(chan struct{}, 1)
go b.run()
return b
}

type bufferedScrobbler struct {
ds model.DataStore
wrapped Scrobbler
service string
wakeSignal chan struct{}
}

func (b *bufferedScrobbler) IsAuthorized(ctx context.Context, userId string) bool {
return b.wrapped.IsAuthorized(ctx, userId)
}

func (b *bufferedScrobbler) NowPlaying(ctx context.Context, userId string, track *model.MediaFile) error {
return b.wrapped.NowPlaying(ctx, userId, track)
}

func (b *bufferedScrobbler) Scrobble(ctx context.Context, userId string, s Scrobble) error {
err := b.ds.ScrobbleBuffer(ctx).Enqueue(b.service, userId, s.ID, s.TimeStamp)
if err != nil {
return err
}

b.sendWakeSignal()
return nil
}

func (b *bufferedScrobbler) sendWakeSignal() {
// Don't block if the previous signal was not read yet
select {
case b.wakeSignal <- struct{}{}:
default:
}
}

func (b *bufferedScrobbler) run() {
ctx := context.Background()
for {
if !b.processQueue(ctx) {
time.AfterFunc(5*time.Second, func() {
b.sendWakeSignal()
})
}
<-b.wakeSignal
}
}

func (b *bufferedScrobbler) processQueue(ctx context.Context) bool {
buffer := b.ds.ScrobbleBuffer(ctx)
userIds, err := buffer.UserIDs(b.service)
if err != nil {
log.Error(ctx, "Error retrieving userIds from scrobble buffer", "scrobbler", b.service, err)
return false
}
result := true
for _, userId := range userIds {
if !b.processUserQueue(ctx, userId) {
result = false
}
}
return result
}

func (b *bufferedScrobbler) processUserQueue(ctx context.Context, userId string) bool {
buffer := b.ds.ScrobbleBuffer(ctx)
for {
entry, err := buffer.Next(b.service, userId)
if err != nil {
log.Error(ctx, "Error reading from scrobble buffer", "scrobbler", b.service, err)
return false
}
if entry == nil {
return true
}
log.Debug(ctx, "Sending scrobble", "scrobbler", b.service, "track", entry.Title, "artist", entry.Artist)
err = b.wrapped.Scrobble(ctx, entry.UserID, Scrobble{
MediaFile: entry.MediaFile,
TimeStamp: entry.PlayTime,
})
if err != nil {
switch err {
case ErrRetryLater:
log.Warn(ctx, "Could not send scrobble. Will be retried", "userId", entry.UserID,
"track", entry.Title, "artist", entry.Artist, "scrobbler", b.service, err)
return false
default:
log.Error(ctx, "Error sending scrobble to service. Discarding", "scrobbler", b.service,
"userId", entry.UserID, "artist", entry.Artist, "track", entry.Title, err)
}
}
err = buffer.Dequeue(entry)
if err != nil {
log.Error(ctx, "Error removing entry from scrobble buffer", "userId", entry.UserID,
"track", entry.Title, "artist", entry.Artist, "scrobbler", b.service, err)
return false
}
}
}

var _ Scrobbler = (*bufferedScrobbler)(nil)
9 changes: 8 additions & 1 deletion core/scrobbler/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scrobbler

import (
"context"
"errors"
"time"

"github.com/navidrome/navidrome/model"
Expand All @@ -12,10 +13,16 @@ type Scrobble struct {
TimeStamp time.Time
}

var (
ErrNotAuthorized = errors.New("not authorized")
ErrRetryLater = errors.New("retry later")
ErrUnrecoverable = errors.New("unrecoverable")
)

type Scrobbler interface {
IsAuthorized(ctx context.Context, userId string) bool
NowPlaying(ctx context.Context, userId string, track *model.MediaFile) error
Scrobble(ctx context.Context, userId string, scrobbles []Scrobble) error
Scrobble(ctx context.Context, userId string, s Scrobble) error
}

type Constructor func(ds model.DataStore) Scrobbler

0 comments on commit 289da56

Please sign in to comment.