Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize writes of JSON playlist #1932

Merged
merged 3 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
- \#1888 Should not save (when recording) segments with zero video frames (@darkdarkdragon)
- \#1908 Prevent Broadcaster from sending low face value PM tickets (@kyriediculous)
- \#1934 http push: return 422 for non-retryable errors (@darkdarkdragon)
- \#1933 server: Return 0 video frame segments unchanged
- \#1943 log maximum transcoding price when monitoring is enabled (@kyriediculous)
- \#1950 Fix extremely long delay before uploaded segment gets transcoded (@darkdarkdragon)
- \#1933 server: Return 0 video frame segments unchanged (@darkdarkdragon)
- \#1932 Serialize writes of JSON playlist (@darkdarkdragon)

#### Orchestrator

Expand Down
11 changes: 7 additions & 4 deletions core/capabilities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sort"
"testing"
"time"

"github.com/livepeer/go-livepeer/drivers"
"github.com/livepeer/go-livepeer/net"
Expand Down Expand Up @@ -270,10 +271,12 @@ func (os *stubOS) GetInfo() *net.OSInfo {
}
return &net.OSInfo{StorageType: net.OSInfo_StorageType(os.storageType)}
}
func (os *stubOS) EndSession() {}
func (os *stubOS) SaveData(string, []byte, map[string]string) (string, error) { return "", nil }
func (os *stubOS) IsExternal() bool { return false }
func (os *stubOS) IsOwn(url string) bool { return true }
func (os *stubOS) EndSession() {}
func (os *stubOS) SaveData(string, []byte, map[string]string, time.Duration) (string, error) {
return "", nil
}
func (os *stubOS) IsExternal() bool { return false }
func (os *stubOS) IsOwn(url string) bool { return true }
func (os *stubOS) ListFiles(ctx context.Context, prefix, delim string) (drivers.PageInfo, error) {
return nil, nil
}
Expand Down
6 changes: 3 additions & 3 deletions core/livepeernode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,23 +130,23 @@ func TestServiceURIChange(t *testing.T) {

drivers.NodeStorage = drivers.NewMemoryDriver(n.GetServiceURI())
sesh := drivers.NodeStorage.NewSession("testpath")
savedUrl, err := sesh.SaveData("testdata1", []byte{0, 0, 0}, nil)
savedUrl, err := sesh.SaveData("testdata1", []byte{0, 0, 0}, nil, 0)
require.Nil(err)
assert.Equal("test://testurl.com/stream/testpath/testdata1", savedUrl)

glog.Infof("Setting service URL to newurl")
newUrl, err := url.Parse("test://newurl.com")
n.SetServiceURI(newUrl)
require.Nil(err)
furl, err := sesh.SaveData("testdata2", []byte{0, 0, 0}, nil)
furl, err := sesh.SaveData("testdata2", []byte{0, 0, 0}, nil, 0)
require.Nil(err)
assert.Equal("test://newurl.com/stream/testpath/testdata2", furl)

glog.Infof("Setting service URL to secondurl")
secondUrl, err := url.Parse("test://secondurl.com")
n.SetServiceURI(secondUrl)
require.Nil(err)
surl, err := sesh.SaveData("testdata3", []byte{0, 0, 0}, nil)
surl, err := sesh.SaveData("testdata3", []byte{0, 0, 0}, nil, 0)
require.Nil(err)
assert.Equal("test://secondurl.com/stream/testpath/testdata3", surl)
}
Expand Down
2 changes: 1 addition & 1 deletion core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func (n *LivepeerNode) transcodeSeg(config transcodeConfig, seg *stream.HLSSegme
// Need to store segment in our local OS
var err error
name := fmt.Sprintf("%d.tempfile", seg.SeqNo)
url, err = config.LocalOS.SaveData(name, seg.Data, nil)
url, err = config.LocalOS.SaveData(name, seg.Data, nil, 0)
if err != nil {
return terr(err)
}
Expand Down
55 changes: 32 additions & 23 deletions core/playlistmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@ import (
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/drivers"
"github.com/livepeer/go-livepeer/monitor"
ffmpeg "github.com/livepeer/lpms/ffmpeg"
"github.com/livepeer/m3u8"
)

const LIVE_LIST_LENGTH uint = 6

const jsonPlaylistRotationInterval = 60 * 60 * 1000 // 1 hour (in ms)
const (
jsonPlaylistRotationInterval = 60 * 60 * 1000 // 1 hour (in ms)
jsonPlaylistMaxRetries = 30
JsonPlaylistInitialTimeout = 5 * time.Second
JsonPlaylistMaxTimeout = 120 * time.Second
)

var JsonPlaylistQuitTimeout = 60 * time.Second

// PlaylistManager manages playlists and data for one video stream, backed by one object storage.
type PlaylistManager interface {
Expand Down Expand Up @@ -47,11 +53,12 @@ type BasicPlaylistManager struct {
recordSession drivers.OSSession
manifestID ManifestID
// Live playlist used for broadcasting
masterPList *m3u8.MasterPlaylist
mediaLists map[string]*m3u8.MediaPlaylist
mapSync *sync.RWMutex
jsonList *JsonPlaylist
jsonListSync *sync.Mutex
masterPList *m3u8.MasterPlaylist
mediaLists map[string]*m3u8.MediaPlaylist
mapSync *sync.RWMutex
jsonList *JsonPlaylist
jsonListWriteQueue *drivers.OverwriteQueue
jsonListSync *sync.Mutex
}

type jsonSeg struct {
Expand Down Expand Up @@ -226,16 +233,31 @@ func NewBasicPlaylistManager(manifestID ManifestID,
if recordSession != nil {
bplm.jsonList = NewJSONPlaylist()
bplm.jsonListSync = &sync.Mutex{}
bplm.makeNewOverwriteQueue()
}
return bplm
}

func (mgr *BasicPlaylistManager) makeNewOverwriteQueue() {
if mgr.jsonListWriteQueue != nil {
mgr.jsonListWriteQueue.StopAfter(JsonPlaylistQuitTimeout)
}
mgr.jsonListWriteQueue = drivers.NewOverwriteQueue(mgr.recordSession, mgr.jsonList.name,
fmt.Sprintf("json playlist for manifestId=%s", mgr.manifestID),
jsonPlaylistMaxRetries, JsonPlaylistInitialTimeout, JsonPlaylistMaxTimeout)
}

func (mgr *BasicPlaylistManager) ManifestID() ManifestID {
return mgr.manifestID
}

func (mgr *BasicPlaylistManager) Cleanup() {
mgr.storageSession.EndSession()
if mgr.storageSession != nil {
mgr.storageSession.EndSession()
}
if mgr.jsonListWriteQueue != nil {
mgr.jsonListWriteQueue.StopAfter(JsonPlaylistQuitTimeout)
}
}

func (mgr *BasicPlaylistManager) GetOSSession() drivers.OSSession {
Expand All @@ -255,23 +277,10 @@ func (mgr *BasicPlaylistManager) FlushRecord() {
glog.Error("Error encoding playlist: ", err)
return
}
go func(name string, data []byte) {
now := time.Now()
_, err := mgr.recordSession.SaveData(name, b, nil)
took := time.Since(now)
if err != nil {
glog.Errorf("Error saving json playlist name=%s bytes=%d took=%s err=%v", name,
len(b), took, err)
} else {
glog.V(common.VERBOSE).Infof("Saving json playlist name=%s bytes=%d took=%s err=%v", name,
len(b), took, err)
}
if monitor.Enabled {
monitor.RecordingPlaylistSaved(took, err)
}
}(mgr.jsonList.name, b)
go mgr.jsonListWriteQueue.Save(b)
if mgr.jsonList.DurationMs > jsonPlaylistRotationInterval {
mgr.jsonList = NewJSONPlaylist()
mgr.makeNewOverwriteQueue()
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion core/playlistmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"github.com/stretchr/testify/assert"
)

func init() {
JsonPlaylistQuitTimeout = 0 * time.Second
}

func TestJSONList1(t *testing.T) {
assert := assert.New(t)
jspl1 := NewJSONPlaylist()
Expand Down Expand Up @@ -128,6 +132,7 @@ func TestJsonFlush(t *testing.T) {
data, err := ioutil.ReadAll(fir.Body)
assert.Nil(err)
assert.Equal(`{"duration_ms":43200000,"tracks":[{"name":"source","bandwidth":400000,"resolution":"256x144"}],"segments":{"source":[{"seq_no":1,"uri":"test_seg/1.ts","duration_ms":43200000}]}}`, string(data))
c.Cleanup()
}

func TestGetMasterPlaylist(t *testing.T) {
Expand All @@ -151,6 +156,7 @@ func TestGetMasterPlaylist(t *testing.T) {
assert.NotNil(mpl)
s := mpl.Segments[0]
assert.Equal(segName, s.URI)
c.Cleanup()
}

func TestGetOrCreatePL(t *testing.T) {
Expand Down Expand Up @@ -198,6 +204,7 @@ func TestGetOrCreatePL(t *testing.T) {
if len(masterPL.Variants) != 2 || masterPL.Variants[1].Resolution != vProfile.Resolution {
t.Error("Master PL had some unexpected variants or properties")
}
c.Cleanup()
}

func TestPlaylists(t *testing.T) {
Expand Down Expand Up @@ -267,6 +274,7 @@ func TestPlaylists(t *testing.T) {
if !compareSeg(seg1, newPL.Segments[0]) || !compareSeg(pl.Segments[1], newPL.Segments[0]) {
t.Error("Unexpected seg properties in new playlist")
}
c.Cleanup()

}

Expand All @@ -281,7 +289,7 @@ func TestCleanup(t *testing.T) {
testData := []byte{1, 2, 3, 4}

c := NewBasicPlaylistManager(mid, osSession, nil)
uri, err := c.GetOSSession().SaveData("testName", testData, nil)
uri, err := c.GetOSSession().SaveData("testName", testData, nil, 0)
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions drivers/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type PageInfo interface {
type OSSession interface {
OS() OSDriver

SaveData(name string, data []byte, meta map[string]string) (string, error)
SaveData(name string, data []byte, meta map[string]string, timeout time.Duration) (string, error)
EndSession()

// Info in order to have this session used via RPC
Expand Down Expand Up @@ -187,12 +187,12 @@ func ParseOSURL(input string, useFullAPI bool) (OSDriver, error) {
// SaveRetried tries to SaveData specified number of times
func SaveRetried(sess OSSession, name string, data []byte, meta map[string]string, retryCount int) (string, error) {
if retryCount < 1 {
return "", fmt.Errorf("Invalid retry count %d", retryCount)
return "", fmt.Errorf("invalid retry count %d", retryCount)
}
var uri string
var err error
for i := 0; i < retryCount; i++ {
uri, err = sess.SaveData(name, data, meta)
uri, err = sess.SaveData(name, data, meta, 0)
if err == nil {
return uri, err
}
Expand Down
9 changes: 6 additions & 3 deletions drivers/gs.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (os *gsSession) createClient() error {
return nil
}

func (os *gsSession) SaveData(name string, data []byte, meta map[string]string) (string, error) {
func (os *gsSession) SaveData(name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) {
if os.useFullAPI {
if os.client == nil {
if err := os.createClient(); err != nil {
Expand All @@ -163,7 +163,10 @@ func (os *gsSession) SaveData(name string, data []byte, meta map[string]string)
keyname := os.key + "/" + name
objh := os.client.Bucket(os.bucket).Object(keyname)
glog.V(common.VERBOSE).Infof("Saving to GS %s/%s", os.bucket, keyname)
ctx, cancel := context.WithTimeout(context.Background(), saveTimeout)
if timeout == 0 {
timeout = saveTimeout
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
wr := objh.NewWriter(ctx)
if len(meta) > 0 && wr.Metadata == nil {
Expand All @@ -185,7 +188,7 @@ func (os *gsSession) SaveData(name string, data []byte, meta map[string]string)
glog.V(common.VERBOSE).Infof("Saved to GS %s", uri)
return uri, err
}
return os.s3Session.SaveData(name, data, meta)
return os.s3Session.SaveData(name, data, meta, timeout)
}

type gsPageInfo struct {
Expand Down
3 changes: 2 additions & 1 deletion drivers/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"strings"
"sync"
"time"

"github.com/livepeer/go-livepeer/net"
)
Expand Down Expand Up @@ -194,7 +195,7 @@ func (ostore *MemorySession) GetInfo() *net.OSInfo {
return nil
}

func (ostore *MemorySession) SaveData(name string, data []byte, meta map[string]string) (string, error) {
func (ostore *MemorySession) SaveData(name string, data []byte, meta map[string]string, timeout time.Duration) (string, error) {
path, file := path.Split(ostore.getAbsolutePath(name))

ostore.dLock.Lock()
Expand Down
8 changes: 4 additions & 4 deletions drivers/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ func TestLocalOS(t *testing.T) {
assert.NoError((err))
os := NewMemoryDriver(u)
sess := os.NewSession(("sesspath")).(*MemorySession)
path, err := sess.SaveData("name1/1.ts", copyBytes(tempData1), nil)
path, err := sess.SaveData("name1/1.ts", copyBytes(tempData1), nil, 0)
glog.Info(path)
fmt.Println(path)
assert.Equal("fake.com/url/stream/sesspath/name1/1.ts", path)
data := sess.GetData("sesspath/name1/1.ts")
fmt.Printf("got Data: '%s'\n", data)
assert.Equal(tempData1, string(data))
path, err = sess.SaveData("name1/1.ts", copyBytes(tempData2), nil)
path, err = sess.SaveData("name1/1.ts", copyBytes(tempData2), nil, 0)
data = sess.GetData("sesspath/name1/1.ts")
assert.Equal(tempData2, string(data))
path, err = sess.SaveData("name1/2.ts", copyBytes(tempData3), nil)
path, err = sess.SaveData("name1/2.ts", copyBytes(tempData3), nil, 0)
data = sess.GetData("sesspath/name1/2.ts")
assert.Equal(tempData3, string(data))
// Test trim prefix when baseURI != nil
Expand All @@ -55,7 +55,7 @@ func TestLocalOS(t *testing.T) {
// Test trim prefix when baseURI = nil
os = NewMemoryDriver(nil)
sess = os.NewSession("sesspath").(*MemorySession)
path, err = sess.SaveData("name1/1.ts", copyBytes(tempData1), nil)
path, err = sess.SaveData("name1/1.ts", copyBytes(tempData1), nil, 0)
assert.Nil(err)
assert.Equal("/stream/sesspath/name1/1.ts", path)

Expand Down