diff --git a/cmd/loadtester/loadtester.go b/cmd/loadtester/loadtester.go index e29600c3a..36f380b68 100644 --- a/cmd/loadtester/loadtester.go +++ b/cmd/loadtester/loadtester.go @@ -202,7 +202,7 @@ func main() { } glog.V(model.SHORT).Infof("RTMP: %s", rtmpURL) glog.V(model.SHORT).Infof("MEDIA: %s", mediaURL) - sr2 := testers.NewStreamer2(ctx, false, *mist, false, false, false) + sr2 := testers.NewStreamer2(ctx, testers.Streamer2Options{MistMode: *mist}) go sr2.StartStreaming(sourceFileName, rtmpURL, mediaURL, waitForTarget, timeToStream) go func() { <-sr2.Done() @@ -225,7 +225,7 @@ func main() { // glog.Error(err) return nil, err } - sr2 := testers.NewStreamer2(ctx, false, false, false, false, false) + sr2 := testers.NewStreamer2(ctx, testers.Streamer2Options{}) go sr2.StartStreaming(sourceFileName, rtmpURL, mediaURL, waitForTarget, timeToStream) return sr2, nil } diff --git a/cmd/recordtester/recordtester.go b/cmd/recordtester/recordtester.go index 982612848..ba6343291 100644 --- a/cmd/recordtester/recordtester.go +++ b/cmd/recordtester/recordtester.go @@ -16,6 +16,7 @@ import ( "github.com/golang/glog" "github.com/livepeer/joy4/format" + "github.com/livepeer/livepeer-data/pkg/client" "github.com/livepeer/stream-tester/apis/livepeer" "github.com/livepeer/stream-tester/internal/app/recordtester" "github.com/livepeer/stream-tester/internal/metrics" @@ -48,15 +49,18 @@ func main() { pauseDuration := fs.Duration("pause-dur", 0, "How long to wait between two consecutive RTMP streams that will comprise one user session") apiToken := fs.String("api-token", "", "Token of the Livepeer API to be used") apiServer := fs.String("api-server", "livepeer.com", "Server of the Livepeer API to be used") + analyzerServers := fs.String("analyzer-servers", "", "Comma-separated list of base URLs to connect for the Stream Health Analyzer API (defaults to --api-server)") fileArg := fs.String("file", "bbb_sunflower_1080p_30fps_normal_t02.mp4", "File to stream") continuousTest := fs.Duration("continuous-test", 0, "Do continuous testing") useHttp := fs.Bool("http", false, "Do HTTP tests instead of RTMP") testMP4 := fs.Bool("mp4", false, "Download MP4 of recording") + testStreamHealth := fs.Bool("stream-health", false, "Check stream health during test") discordURL := fs.String("discord-url", "", "URL of Discord's webhook to send messages to Discord channel") discordUserName := fs.String("discord-user-name", "", "User name to use when sending messages to Discord") discordUsersToNotify := fs.String("discord-users", "", "Id's of users to notify in case of failure") pagerDutyIntegrationKey := fs.String("pagerduty-integration-key", "", "PagerDuty integration key") pagerDutyComponent := fs.String("pagerduty-component", "", "PagerDuty component") + pagerDutyLowUrgency := fs.Bool("pagerduty-low-urgency", false, "Whether to send only low-urgency PagerDuty alerts") bind := fs.String("bind", "0.0.0.0:9090", "Address to bind metric server to") _ = fs.String("config", "", "config file (optional)") @@ -65,6 +69,7 @@ func main() { ff.WithConfigFileFlag("config"), ff.WithConfigFileParser(ff.PlainParser), ff.WithEnvVarPrefix("RT"), + ff.WithEnvVarIgnoreCommas(true), ) flag.CommandLine.Parse(nil) vFlag.Value.Set(*verbosity) @@ -78,7 +83,7 @@ func main() { if *version { return } - metrics.InitCensus(hostName, model.Version, "loadtester") + metrics.InitCensus(hostName, model.Version, "recordtester") testers.IgnoreNoCodecError = true testers.IgnoreGaps = true testers.IgnoreTimeDrift = true @@ -93,8 +98,11 @@ func main() { fmt.Println("Pause should be less than 5 min") os.Exit(1) } - var err error + if *analyzerServers == "" { + *analyzerServers = *apiServer + } var fileName string + var err error gctx, gcancel := context.WithCancel(context.Background()) // to be used as global parent context, in the future defer gcancel() @@ -155,6 +163,12 @@ func main() { lapi.Init() glog.Infof("Choosen server: %s", lapi.GetServer()) + userAgent := model.AppName + "/" + model.Version + lanalyzers := testers.AnalyzerByRegion{} + for _, url := range strings.Split(*analyzerServers, ",") { + lanalyzers[url] = client.NewAnalyzer(url, *apiToken, userAgent, 0) + } + /* sessionsx, err := lapi.GetSessions("1f770f0a-9177-49bd-a848-023abee7c09b") if err != nil { @@ -185,7 +199,7 @@ func main() { start := time.Now() for i := 0; i < *sim; i++ { - rt := recordtester.NewRecordTester(gctx, lapi, useForceURL, *useHttp, *testMP4) + rt := recordtester.NewRecordTester(gctx, lapi, lanalyzers, useForceURL, *useHttp, *testMP4, *testStreamHealth) eses = append(eses, 0) testers = append(testers, rt) wg.Add(1) @@ -196,7 +210,7 @@ func main() { if les != 0 { es = les } - if err != nil { + if lerr != nil { err = lerr } wg.Done() @@ -219,7 +233,7 @@ func main() { } else if *continuousTest > 0 { metricServer := server.NewMetricsServer() go metricServer.Start(gctx, *bind) - crt := recordtester.NewContinuousRecordTester(gctx, lapi, *pagerDutyIntegrationKey, *pagerDutyComponent, *useHttp, *testMP4) + crt := recordtester.NewContinuousRecordTester(gctx, lapi, lanalyzers, *pagerDutyIntegrationKey, *pagerDutyComponent, *pagerDutyLowUrgency, *useHttp, *testMP4, *testStreamHealth) err := crt.Start(fileName, *testDuration, *pauseDuration, *continuousTest) if err != nil { glog.Warningf("Continuous test ended with err=%v", err) @@ -228,7 +242,7 @@ func main() { return } // just one stream - rt := recordtester.NewRecordTester(gctx, lapi, useForceURL, *useHttp, *testMP4) + rt := recordtester.NewRecordTester(gctx, lapi, lanalyzers, useForceURL, *useHttp, *testMP4, *testStreamHealth) es, err := rt.Start(fileName, *testDuration, *pauseDuration) exit(es, fileName, *fileArg, err) } diff --git a/cmd/streamtester/streamtester.go b/cmd/streamtester/streamtester.go index c7fda3afe..74dac0d0a 100644 --- a/cmd/streamtester/streamtester.go +++ b/cmd/streamtester/streamtester.go @@ -265,7 +265,7 @@ func main() { } msg := fmt.Sprintf(`Starting %s stream to %s, pulling from %s`, durs, *rtmpURL, *mediaURL) messenger.SendMessage(msg) - sr2 := testers.NewStreamer2(gctx, *wowza, *mist, *save, true, true) + sr2 := testers.NewStreamer2(gctx, testers.Streamer2Options{*wowza, *mist, *save, true, true}) sr2.StartStreaming(fn, *rtmpURL, *mediaURL, *waitForTarget, *streamDuration) if *wowza { // let Wowza remove session @@ -365,7 +365,7 @@ func main() { msg := fmt.Sprintf(`Starting %s stream to %s`, dur, *rtmpURL) glog.Info(msg) - sr2 := testers.NewStreamer2(gctx, *wowza, *mist, *save, true, true) + sr2 := testers.NewStreamer2(gctx, testers.Streamer2Options{*wowza, *mist, *save, true, true}) sr2.StartStreaming(fn, *rtmpURL, *mediaURL, *waitForTarget, *streamDuration) } err = lapi.DeleteStream(stream.ID) diff --git a/go.mod b/go.mod index 4c726bf4a..1b68a0052 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/gosuri/uiprogress v0.0.1 github.com/livepeer/joy4 v0.1.2-0.20210601043311-c1b885884cc7 github.com/livepeer/leaderboard-serverless v1.0.0 - github.com/livepeer/livepeer-data v0.4.0 + github.com/livepeer/livepeer-data v0.4.5-beta.3 github.com/livepeer/m3u8 v0.11.1 github.com/mattn/go-isatty v0.0.8 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible @@ -25,6 +25,7 @@ require ( github.com/prometheus/client_golang v1.7.1 github.com/prometheus/common v0.15.0 github.com/rabbitmq/amqp091-go v1.1.0 + github.com/rabbitmq/rabbitmq-stream-go-client v0.1.0-beta.0.20211027081212-fd5e6d497413 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.0-rc.0 go.opencensus.io v0.23.0 diff --git a/go.sum b/go.sum index 54bd9090e..adfd6a10a 100644 --- a/go.sum +++ b/go.sum @@ -117,6 +117,7 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -143,6 +144,7 @@ github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTg github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= +github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -227,6 +229,7 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -239,8 +242,9 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -338,16 +342,19 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= @@ -355,8 +362,12 @@ github.com/livepeer/joy4 v0.1.2-0.20210601043311-c1b885884cc7 h1:gGU8CE90+YL2I+i github.com/livepeer/joy4 v0.1.2-0.20210601043311-c1b885884cc7/go.mod h1:RDTLvmm/NJWjzuUpEDyIWmLTqSfpZEcnPnacG8sfh34= github.com/livepeer/leaderboard-serverless v1.0.0 h1:QqUxAEorjtfFxQO7tqTbxOLK+LFTJU27hvXf3/8YRZw= github.com/livepeer/leaderboard-serverless v1.0.0/go.mod h1:0H4gFyyTO3lXtqfRIwyRaWPuW0pBMXkNUDtIjHHniW0= -github.com/livepeer/livepeer-data v0.4.0 h1:17rELy6TFoY+gjFgQtKbVNkn1jF7+QFD7aotMVlhnfI= -github.com/livepeer/livepeer-data v0.4.0/go.mod h1:XRMZAh6E48fZS9M6i5JHZMfEUXuwaC3Zig5OSogROOM= +github.com/livepeer/livepeer-data v0.4.5-beta h1:iyXhhpybqq6dceM0ldvFxHKivM3Rgrl3WktRuzSQlO4= +github.com/livepeer/livepeer-data v0.4.5-beta/go.mod h1:KMmKC+o3b3C/n9NS5Zvp5SumxnzSW55pj45lJDb4b50= +github.com/livepeer/livepeer-data v0.4.5-beta.2 h1:WN2CULcaixYSK2/S8wNJBFez3D1qpD3B6eR5ntZ5RIM= +github.com/livepeer/livepeer-data v0.4.5-beta.2/go.mod h1:KMmKC+o3b3C/n9NS5Zvp5SumxnzSW55pj45lJDb4b50= +github.com/livepeer/livepeer-data v0.4.5-beta.3 h1:ms9wKvze5zmFKRKevI1I+q3nCeV6kRFr6WKz9UgXA4Y= +github.com/livepeer/livepeer-data v0.4.5-beta.3/go.mod h1:KMmKC+o3b3C/n9NS5Zvp5SumxnzSW55pj45lJDb4b50= github.com/livepeer/m3u8 v0.11.1 h1:VkUJzfNTyjy9mqsgp5JPvouwna8wGZMvd/gAfT5FinU= github.com/livepeer/m3u8 v0.11.1/go.mod h1:IUqAtwWPAG2CblfQa4SVzTQoDcEMPyfNOaBSxqHMS04= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= @@ -400,6 +411,7 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/nbio/hitch v0.0.0-20200727225606-6dd6dd293f2b/go.mod h1:pCpSEQ+v0QJ68bwwrFH5BRKkPQyZUlKcxcYhIAlq48U= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -441,6 +453,7 @@ github.com/peterbourgon/ff/v2 v2.0.0 h1:lx0oYI5qr/FU1xnpNhQ+EZM04gKgn46jyYvGEEqB github.com/peterbourgon/ff/v2 v2.0.0/go.mod h1:xjwr+t+SjWm4L46fcj/D+Ap+6ME7+HqFzaP22pP5Ggk= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -482,6 +495,7 @@ github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFB github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/rabbitmq/amqp091-go v1.1.0 h1:qx8cGMJha71/5t31Z+LdPLdPrkj/BvD38cqC3Bi1pNI= github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= +github.com/rabbitmq/rabbitmq-stream-go-client v0.1.0-RC1/go.mod h1:31GXVWngt/bGfAPj/dQaVadf6YBmLjNhdoLHWoJEvh0= github.com/rabbitmq/rabbitmq-stream-go-client v0.1.0-beta.0.20211027081212-fd5e6d497413 h1:dHcuFg2/Ull4o4j2DZeu2cRFrlMEt+wERsXyajGG3vE= github.com/rabbitmq/rabbitmq-stream-go-client v0.1.0-beta.0.20211027081212-fd5e6d497413/go.mod h1:salB4WXuCH7GAMrciUT77dqEfkMvKFCpjqmx+vbonYk= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -490,6 +504,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= diff --git a/internal/app/recordtester/continuous_record_tester.go b/internal/app/recordtester/continuous_record_tester.go index 228b8b24b..da9af057d 100644 --- a/internal/app/recordtester/continuous_record_tester.go +++ b/internal/app/recordtester/continuous_record_tester.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/url" + "os" "strconv" "time" @@ -27,13 +28,16 @@ type ( continuousRecordTester struct { lapi *livepeer.API + lanalyzers testers.AnalyzerByRegion ctx context.Context cancel context.CancelFunc host string // API host being tested pagerDutyIntegrationKey string pagerDutyComponent string + pagerDutyLowUrgency bool useHTTP bool mp4 bool + streamHealth bool } pagerDutyLink struct { @@ -43,44 +47,55 @@ type ( ) // NewContinuousRecordTester returns new object -func NewContinuousRecordTester(gctx context.Context, lapi *livepeer.API, pagerDutyIntegrationKey, pagerDutyComponent string, useHTTP, mp4 bool) IContinuousRecordTester { +func NewContinuousRecordTester(gctx context.Context, lapi *livepeer.API, lanalyzers testers.AnalyzerByRegion, pagerDutyIntegrationKey, pagerDutyComponent string, pagerDutyLowUrgency bool, useHTTP, mp4, streamHealth bool) IContinuousRecordTester { ctx, cancel := context.WithCancel(gctx) server := lapi.GetServer() u, _ := url.Parse(server) crt := &continuousRecordTester{ lapi: lapi, + lanalyzers: lanalyzers, ctx: ctx, cancel: cancel, host: u.Host, pagerDutyIntegrationKey: pagerDutyIntegrationKey, pagerDutyComponent: pagerDutyComponent, + pagerDutyLowUrgency: pagerDutyLowUrgency, useHTTP: useHTTP, mp4: mp4, + streamHealth: streamHealth, } return crt } func (crt *continuousRecordTester) Start(fileName string, testDuration, pauseDuration, pauseBetweenTests time.Duration) error { - glog.Infof("Starting continuous test of %s", crt.host) + hostname, _ := os.Hostname() + messenger.SendMessage(fmt.Sprintf("Starting continuous test of %s on hostname=%s", crt.host, hostname)) try := 0 notRtmpTry := 0 + maxTestDuration := 2*testDuration + pauseDuration + 15*time.Minute for { msg := fmt.Sprintf(":arrow_right: Starting %s recordings test stream to %s", 2*testDuration, crt.host) - glog.Info(msg) messenger.SendMessage(msg) - rt := NewRecordTester(crt.ctx, crt.lapi, true, crt.useHTTP, crt.mp4) + + ctx, cancel := context.WithTimeout(crt.ctx, maxTestDuration) + rt := NewRecordTester(ctx, crt.lapi, crt.lanalyzers, true, crt.useHTTP, crt.mp4, crt.streamHealth) es, err := rt.Start(fileName, testDuration, pauseDuration) - if err == context.Canceled { - msg := fmt.Sprintf("Test of %s cancelled", crt.host) - messenger.SendMessage(msg) - return err + rt.Clean() + ctxErr := ctx.Err() + cancel() + + if crt.ctx.Err() != nil { + messenger.SendMessage(fmt.Sprintf("Continuous record test of %s cancelled on hostname=%s", crt.host, hostname)) + return crt.ctx.Err() + } else if ctxErr != nil { + msg := fmt.Sprintf("Record test of %s timed out, potential deadlock! ctxErr=%q err=%q", crt.host, ctxErr, err) + messenger.SendFatalMessage(msg) } else if err != nil || es != 0 { var re *testers.RTMPError if errors.As(err, &re) && try < 4 { msg := fmt.Sprintf(":rotating_light: Test of %s ended with RTMP err=%v errCode=%v try=%d, trying %s time", crt.host, err, es, try, getNth(try+2)) messenger.SendMessage(msg) - rt.Clean() try++ time.Sleep(10 * time.Second) continue @@ -89,7 +104,6 @@ func (crt *continuousRecordTester) Start(fileName string, testDuration, pauseDur msg := fmt.Sprintf(":rotating_light: Test of %s ended with some err=%v errCode=%v try=%d, trying %s time", crt.host, err, es, notRtmpTry, getNth(notRtmpTry+2)) messenger.SendMessage(msg) - rt.Clean() notRtmpTry++ time.Sleep(5 * time.Second) continue @@ -97,59 +111,79 @@ func (crt *continuousRecordTester) Start(fileName string, testDuration, pauseDur msg := fmt.Sprintf(":rotating_light: Test of %s ended with err=%v errCode=%v", crt.host, err, es) messenger.SendFatalMessage(msg) glog.Warning(msg) - if crt.pagerDutyIntegrationKey != "" { - event := pagerduty.V2Event{ - RoutingKey: crt.pagerDutyIntegrationKey, - Action: "trigger", - Payload: &pagerduty.V2Payload{ - Source: crt.host, - Component: crt.pagerDutyComponent, - Severity: "error", - Summary: crt.host + ": " + err.Error(), - }, - } - sid := rt.StreamID() - if sid != "" { - link := pagerDutyLink{ - Href: "https://livepeer.com/app/stream/" + sid, - Text: "Stream", - } - event.Links = append(event.Links, link) - stream := rt.Stream() - if stream != nil { - plink := pagerDutyLink{ - Href: "https://my.papertrailapp.com/events?q=" + stream.ID + "+OR+" + stream.StreamKey + "+OR+" + stream.PlaybackID, - Text: "Papertrail", - } - event.Links = append(event.Links, plink) - } - } - resp, err := pagerduty.ManageEvent(event) - if err != nil { - glog.Error(fmt.Errorf("PAGERDUTY Error: %w", err)) - messenger.SendFatalMessage(fmt.Sprintf("Error creating PagerDuty event: %v", err)) - } else { - glog.Infof("Incident status: %s message: %s", resp.Status, resp.Message) - } - } + crt.sendPagerdutyEvent(rt, err) } else { - msg := fmt.Sprintf(":white_check_mark: Test of %s succeed", crt.host) + msg := fmt.Sprintf(":white_check_mark: Test of %s succeeded", crt.host) messenger.SendMessage(msg) - glog.Warning(msg) + glog.Info(msg) + crt.sendPagerdutyEvent(rt, nil) } try = 0 notRtmpTry = 0 - rt.Clean() glog.Infof("Waiting %s before next test", pauseBetweenTests) - time.Sleep(pauseBetweenTests) select { case <-crt.ctx.Done(): - return context.Canceled - default: + messenger.SendMessage(fmt.Sprintf("Continuous record test of %s cancelled on hostname=%s", crt.host, hostname)) + return err + case <-time.After(pauseBetweenTests): } } } +func (crt *continuousRecordTester) sendPagerdutyEvent(rt IRecordTester, err error) { + if crt.pagerDutyIntegrationKey == "" { + return + } + severity, lopriPrefix, dedupKey := "error", "", fmt.Sprintf("cont-record-tester:%s", crt.host) + if crt.pagerDutyLowUrgency { + severity, lopriPrefix = "warning", "[LOPRI] " + dedupKey = "lopri-" + dedupKey + } + event := pagerduty.V2Event{ + RoutingKey: crt.pagerDutyIntegrationKey, + Action: "trigger", + DedupKey: dedupKey, + } + if err == nil { + event.Action = "resolve" + _, err := pagerduty.ManageEvent(event) + if err != nil { + messenger.SendMessage(fmt.Sprintf("Error resolving PagerDuty event: %v", err)) + } + return + } + event.Payload = &pagerduty.V2Payload{ + Source: crt.host, + Component: crt.pagerDutyComponent, + Severity: severity, + Summary: fmt.Sprintf("%s:movie_camera: %s for %s error: %v", lopriPrefix, crt.pagerDutyComponent, crt.host, err), + Timestamp: time.Now().UTC().Format(time.RFC3339), + } + sid := rt.StreamID() + if sid != "" { + link := pagerDutyLink{ + Href: "https://livepeer.com/dashboard/streams/" + sid, + Text: "Stream", + } + event.Links = append(event.Links, link) + stream := rt.Stream() + if stream != nil { + link = pagerDutyLink{ + Href: "https://my.papertrailapp.com/events?q=" + stream.ID + "+OR+" + stream.StreamKey + "+OR+" + stream.PlaybackID, + Text: "Papertrail", + } + event.Links = append(event.Links, link) + } + } + resp, err := pagerduty.ManageEvent(event) + if err != nil { + glog.Error(fmt.Errorf("PAGERDUTY Error: %w", err)) + messenger.SendFatalMessage(fmt.Sprintf("Error creating PagerDuty event: %v", err)) + } else { + glog.Infof("Incident status: %s message: %s", resp.Status, resp.Message) + } +} + func (crt *continuousRecordTester) Cancel() { crt.cancel() } diff --git a/internal/app/recordtester/recordtester_app.go b/internal/app/recordtester/recordtester_app.go index 3659042cd..b4df60012 100644 --- a/internal/app/recordtester/recordtester_app.go +++ b/internal/app/recordtester/recordtester_app.go @@ -33,15 +33,17 @@ type ( } recordTester struct { - lapi *livepeer.API - useForceURL bool - ctx context.Context - cancel context.CancelFunc - vodeStats model.VODStats - streamID string - stream *livepeer.CreateStreamResp - useHTTP bool - mp4 bool + lapi *livepeer.API + lanalyzers testers.AnalyzerByRegion + useForceURL bool + ctx context.Context + cancel context.CancelFunc + vodeStats model.VODStats + streamID string + stream *livepeer.CreateStreamResp + useHTTP bool + mp4 bool + streamHealth bool } ) @@ -81,15 +83,17 @@ var standardProfiles = []livepeer.Profile{ } // NewRecordTester ... -func NewRecordTester(gctx context.Context, lapi *livepeer.API, useForceURL, useHTTP, mp4 bool) IRecordTester { +func NewRecordTester(gctx context.Context, lapi *livepeer.API, lanalyzers testers.AnalyzerByRegion, useForceURL, useHTTP, mp4, streamHealth bool) IRecordTester { ctx, cancel := context.WithCancel(gctx) rt := &recordTester{ - lapi: lapi, - useForceURL: useForceURL, - ctx: ctx, - cancel: cancel, - useHTTP: useHTTP, - mp4: mp4, + lapi: lapi, + lanalyzers: lanalyzers, + useForceURL: useForceURL, + ctx: ctx, + cancel: cancel, + useHTTP: useHTTP, + mp4: mp4, + streamHealth: streamHealth, } return rt } @@ -170,13 +174,20 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. rtmpURL := fmt.Sprintf("%s/%s", ingests[0].Ingest, stream.StreamKey) // rtmpURL = fmt.Sprintf("%s/%s", ingests[0].Ingest, stream.ID) + testerFuncs := []testers.StartTestFunc{} + if rt.streamHealth { + testerFuncs = append(testerFuncs, func(ctx context.Context, mediaURL string, waitForTarget time.Duration, opts testers.Streamer2Options) testers.Finite { + return testers.NewStreamHealth(ctx, stream.ID, rt.lanalyzers, waitForTarget) + }) + } + mediaURL := fmt.Sprintf("%s/%s/index.m3u8", ingests[0].Playback, stream.PlaybackID) glog.V(model.SHORT).Infof("RTMP: %s", rtmpURL) glog.V(model.SHORT).Infof("MEDIA: %s", mediaURL) if rt.useHTTP { sterr := rt.doOneHTTPStream(fileName, streamName, broadcasters[0], testDuration, stream) if sterr != nil { - glog.Warning("Streaming returned error err=%v", sterr) + glog.Warningf("Streaming returned error err=%v", sterr) return 3, err } if pauseDuration > 0 { @@ -184,14 +195,14 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. time.Sleep(pauseDuration) sterr = rt.doOneHTTPStream(fileName, streamName, broadcasters[0], testDuration, stream) if sterr != nil { - glog.Warning("Second time streaming returned error err=%v", sterr) + glog.Warningf("Second time streaming returned error err=%v", sterr) return 3, err } testDuration *= 2 } } else { - sr2 := testers.NewStreamer2(rt.ctx, false, false, false, false, false) + sr2 := testers.NewStreamer2(rt.ctx, testers.Streamer2Options{}, testerFuncs...) sr2.StartStreaming(fileName, rtmpURL, mediaURL, 30*time.Second, testDuration) // <-sr2.Done() srerr := sr2.Err() @@ -201,12 +212,12 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. return 2, re } if srerr != nil { - glog.Warning("Streaming returned error err=%v", srerr) + glog.Warningf("Streaming returned error err=%v", srerr) return 3, err } stats, err := sr2.Stats() if err != nil { - glog.Warning("Stats returned error err=%v", err) + glog.Warningf("Stats returned error err=%v", err) return 21, err } glog.Infof("Streaming success rate=%v", stats.SuccessRate) @@ -216,7 +227,7 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. if pauseDuration > 0 { glog.Infof("Pause specified, waiting %s before streaming second time", pauseDuration) time.Sleep(pauseDuration) - sr2 := testers.NewStreamer2(rt.ctx, false, false, false, false, false) + sr2 := testers.NewStreamer2(rt.ctx, testers.Streamer2Options{}, testerFuncs...) go sr2.StartStreaming(fileName, rtmpURL, mediaURL, 30*time.Second, testDuration) <-sr2.Done() srerr := sr2.Err() @@ -226,12 +237,12 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. return 2, re } if srerr != nil { - glog.Warning("Streaming second returned error err=%v", srerr) + glog.Warningf("Streaming second returned error err=%v", srerr) return 3, err } stats, err := sr2.Stats() if err != nil { - glog.Warning("Stats returned error err=%v", err) + glog.Warningf("Stats returned error err=%v", err) return 21, err } glog.Infof("Streaming second time success rate=%v", stats.SuccessRate) diff --git a/internal/testers/m3utester2.go b/internal/testers/m3utester2.go index 52ed5499c..97729a750 100644 --- a/internal/testers/m3utester2.go +++ b/internal/testers/m3utester2.go @@ -50,6 +50,13 @@ const ( type ( resolution string + Finite interface { + Done() <-chan struct{} + Cancel() + Finished() bool + GlobalErr() error + } + finite struct { ctx context.Context cancel context.CancelFunc @@ -118,7 +125,7 @@ type ( } ) -// NewM3utester2 pubic method +// NewM3utester2 public method func NewM3utester2(pctx context.Context, u string, wowzaMode, mistMode, failIfTranscodingStops, save bool, waitForTarget time.Duration, sm *segmentsMatcher, statsOnly bool) model.IVODTester { @@ -183,13 +190,11 @@ func (mut *m3utester2) VODStats() model.VODStats { return vs } -func (mut *m3utester2) doSavePlaylist() { +func (mut *m3utester2) doSavePlaylist() error { if mut.savePlayList != nil { - err := ioutil.WriteFile(mut.savePlayListName, mut.savePlayList.Encode().Bytes(), 0644) - if err != nil { - glog.Fatal(err) - } + return ioutil.WriteFile(mut.savePlayListName, mut.savePlayList.Encode().Bytes(), 0644) } + return nil } func (mut *m3utester2) initSave(streamURL, mediaURL string) { @@ -214,7 +219,7 @@ func (mut *m3utester2) initSave(streamURL, mediaURL string) { } func newM3uMediaStream(ctx context.Context, cancel context.CancelFunc, name, resolution string, u *url.URL, wowzaMode bool, masterDR chan *downloadResult, - sm *segmentsMatcher, latencyResults chan *latencyResult, save, failIfTranscodingStops, statsOnly bool) *m3uMediaStream { + sm *segmentsMatcher, latencyResults chan *latencyResult, save, failIfTranscodingStops, statsOnly bool) (*m3uMediaStream, error) { ms := &m3uMediaStream{ finite: finite{ @@ -240,7 +245,7 @@ func newM3uMediaStream(ctx context.Context, cancel context.CancelFunc, name, res if ms.save { streamName, mediaStreamName, err := parseMediaURL(u.String()) if err != nil { - glog.Fatal(err) + return nil, err } ms.saveDirName = filepath.Join(streamName, mediaStreamName) ms.savePlayListName = filepath.Join(ms.saveDirName, mediaStreamName+".m3u8") @@ -249,14 +254,14 @@ func newM3uMediaStream(ctx context.Context, cancel context.CancelFunc, name, res } glog.Infof("Save dir name: '%s', main playlist save name %s", ms.saveDirName, ms.savePlayListName) mpl, err := m3u8.NewMediaPlaylist(0, 1024) - mpl.MediaType = m3u8.VOD - mpl.Live = false if err != nil { - panic(err) + return nil, err } + mpl.MediaType = m3u8.VOD + mpl.Live = false ms.savePlayList = mpl } - return ms + return ms, nil } func (mut *m3utester2) Stats() model.Stats1 { @@ -490,6 +495,10 @@ func (f *finite) Done() <-chan struct{} { return f.ctx.Done() } +func (f *finite) GlobalErr() error { + return f.globalError +} + func (f *finite) Cancel() { f.cancel() } @@ -625,10 +634,17 @@ func (mut *m3utester2) manifestPullerLoop(waitForTarget time.Duration) { if mut.save { mut.initSave("", surl) mut.savePlayList.Append(mediaName+"/"+mediaName+".m3u8", nil, m3u8.VariantParams{Name: mediaName}) - mut.doSavePlaylist() + if err := mut.doSavePlaylist(); err != nil { + mut.fatalEnd(err) + return + } } - stream := newM3uMediaStream(mut.ctx, mut.cancel, mediaName, mres, mut.initialURL, mut.wowzaMode, mut.driftCheckResults, mut.segmentsMatcher, mut.latencyResults, + stream, err := newM3uMediaStream(mut.ctx, mut.cancel, mediaName, mres, mut.initialURL, mut.wowzaMode, mut.driftCheckResults, mut.segmentsMatcher, mut.latencyResults, mut.save, mut.failIfTranscodingStops, mut.statsOnly) + if err != nil { + mut.fatalEnd(err) + return + } mut.streams[resolution(mres)] = stream return } @@ -706,20 +722,28 @@ func (mut *m3utester2) manifestPullerLoop(waitForTarget time.Duration) { if mut.sourceRes == "" { mut.sourceRes = ress } - stream := newM3uMediaStream(mut.ctx, mut.cancel, variant.URI, ress, pvrui, mut.wowzaMode, mut.driftCheckResults, + stream, err := newM3uMediaStream(mut.ctx, mut.cancel, variant.URI, ress, pvrui, mut.wowzaMode, mut.driftCheckResults, mut.segmentsMatcher, mut.latencyResults, mut.save, mut.failIfTranscodingStops, mut.statsOnly) + if err != nil { + mut.fatalEnd(err) + return + } mut.streams[res] = stream if mut.save { needSavePlaylist = true _, mediaName, err := parseMediaURL(pvrui.String()) if err != nil { - glog.Fatal(err) + mut.fatalEnd(err) + return } mut.savePlayList.Append(mediaName+"/"+mediaName+".m3u8", nil, variant.VariantParams) } } if needSavePlaylist { - mut.doSavePlaylist() + if err := mut.doSavePlaylist(); err != nil { + mut.fatalEnd(err) + return + } } time.Sleep(2 * time.Second) } @@ -816,10 +840,12 @@ func (ms *m3uMediaStream) workerLoop(masterDR chan *downloadResult, latencyResul seg.Duration = dres.task.duration seg.Title = dres.task.title if err := ms.insertSegmentToSavePlaylist(dres.task.seqNo, seg); err != nil { - glog.Fatal(err) + ms.fatalEnd(err) + return } if err := ioutil.WriteFile(ms.savePlayListName, ms.savePlayList.Encode().Bytes(), 0644); err != nil { - glog.Fatal(err) + ms.fatalEnd(err) + return } go func(segFileName, fullpath string, b []byte) { if err := ioutil.WriteFile(fullpath, b, 0644); err != nil { @@ -991,10 +1017,12 @@ func (ms *m3uMediaStream) manifestPullerLoop(wowzaMode bool) { gpl, plt, err := m3u8.Decode(*bytes.NewBuffer(b), true) if err != nil { - glog.Fatal(err) + ms.fatalEnd(err) + return } if plt != m3u8.MEDIA { - glog.Fatalf("Expecting media playlist, got %d (url=%s)", plt, surl) + ms.fatalEnd(fmt.Errorf("Expecting media playlist, got %d (url=%s)", plt, surl)) + return } pl := gpl.(*m3u8.MediaPlaylist) // pl, err := m3u8.NewMediaPlaylist(100, 100) @@ -1044,7 +1072,12 @@ func (ms *m3uMediaStream) manifestPullerLoop(wowzaMode bool) { ms.downloadResults <- &downloadResult{name: segment.URI, seqNo: segSeqNo, status: "200 OK", duration: time.Duration(segment.Duration * float64(time.Second))} } else { - ms.downTasks <- downloadTask{baseURL: ms.u, url: segment.URI, seqNo: segSeqNo, title: segment.Title, duration: segment.Duration, appTime: now} + segUrl, err := url.Parse(segment.URI) + if err != nil { + ms.fatalEnd(err) + return + } + ms.downTasks <- downloadTask{baseURL: ms.u, url: segUrl, seqNo: segSeqNo, title: segment.Title, duration: segment.Duration, appTime: now} ms.segmentsToDownload++ metrics.Census.IncSegmentsToDownload() } @@ -1074,37 +1107,32 @@ func (ms *m3uMediaStream) isFiniteDownloadsFinished() bool { } func (ms *m3uMediaStream) insertSegmentToSavePlaylist(seqNo uint64, seg *m3u8.MediaSegment) error { - var err error - err = ms.savePlayList.InsertSegment(seqNo, seg) - if err == m3u8.ErrPlaylistFull { - mpl, err := m3u8.NewMediaPlaylist(0, uint(len(ms.savePlayList.Segments)*2)) - if err != nil { - glog.Fatal(err) - } - mpl.TargetDuration = ms.savePlayList.TargetDuration - mpl.SeqNo = ms.savePlayList.SeqNo - mpl.MediaType = m3u8.VOD - mpl.Live = false - for _, oseg := range ms.savePlayList.Segments { - if oseg != nil { - if err = mpl.InsertSegment(oseg.SeqId, oseg); err != nil { - glog.Fatal(err) - } + err := ms.savePlayList.InsertSegment(seqNo, seg) + if err != m3u8.ErrPlaylistFull { + return err + } + mpl, err := m3u8.NewMediaPlaylist(0, uint(len(ms.savePlayList.Segments)*2)) + if err != nil { + return err + } + mpl.TargetDuration = ms.savePlayList.TargetDuration + mpl.SeqNo = ms.savePlayList.SeqNo + mpl.MediaType = m3u8.VOD + mpl.Live = false + for _, oseg := range ms.savePlayList.Segments { + if oseg != nil { + if err = mpl.InsertSegment(oseg.SeqId, oseg); err != nil { + return err } } - err = ms.savePlayList.InsertSegment(seqNo, seg) } - return err + return ms.savePlayList.InsertSegment(seqNo, seg) } func downloadSegment(task *downloadTask, res chan *downloadResult) { - purl, err := url.Parse(task.url) - if err != nil { - glog.Fatal(err) - } - fsurl := task.url - if !purl.IsAbs() { - fsurl = task.baseURL.ResolveReference(purl).String() + fsurl := task.url.String() + if !task.url.IsAbs() { + fsurl = task.baseURL.ResolveReference(task.url).String() } try := 0 for { @@ -1155,7 +1183,7 @@ func downloadSegment(task *downloadTask, res chan *downloadResult) { glog.V(model.DEBUG).Infof("==============>>>>>>>>>>>>> Saving segment %s", sn) ioutil.WriteFile(sn, b, 0644) sid := strconv.FormatInt(time.Now().Unix(), 10) - if savedName, service, serr := SaveToExternalStorage(sid+"_"+task.url, b); serr != nil { + if savedName, service, serr := SaveToExternalStorage(sid+"_"+task.url.String(), b); serr != nil { messenger.SendFatalMessage(fmt.Sprintf("Failure to save segment to %s %v", service, serr)) } else { messenger.SendMessage(fmt.Sprintf("Segment %s (which can't be parsed) saved to %s %s", task.url, service, savedName)) @@ -1167,7 +1195,7 @@ func downloadSegment(task *downloadTask, res chan *downloadResult) { // ioutil.WriteFile(sn, b, 0644) // glog.V(model.DEBUG).Infof("Download %s result: %s len %d timeStart %s segment duration %s", fsurl, resp.Status, len(b), fsttim, dur) glog.V(model.DEBUG).Infof("Download %s result: %s len %d timeStart %s segment duration %s took=%s", fsurl, resp.Status, len(b), fsttim, dur, time.Since(start)) - res <- &downloadResult{status: resp.Status, bytes: len(b), try: try, name: task.url, seqNo: task.seqNo, + res <- &downloadResult{status: resp.Status, bytes: len(b), try: try, name: task.url.String(), seqNo: task.seqNo, videoParseError: verr, startTime: fsttim, duration: dur, mySeqNo: task.mySeqNo, appTime: task.appTime, downloadCompetedAt: completedAt, downloadStartedAt: start, data: b, task: task, } diff --git a/internal/testers/mediadownloader.go b/internal/testers/mediadownloader.go index 7eaac4e74..7090a35c7 100644 --- a/internal/testers/mediadownloader.go +++ b/internal/testers/mediadownloader.go @@ -41,7 +41,7 @@ type downloadStats struct { type downloadTask struct { baseURL *url.URL - url string + url *url.URL seqNo uint64 title string duration float64 @@ -186,13 +186,9 @@ func (md *mediaDownloader) statsFormatted() string { } func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan downloadResult) { - purl, err := url.Parse(task.url) - if err != nil { - glog.Fatal(err) - } - fsurl := task.url - if !purl.IsAbs() { - fsurl = md.u.ResolveReference(purl).String() + fsurl := task.url.String() + if !task.url.IsAbs() { + fsurl = md.u.ResolveReference(task.url).String() } try := 0 for { @@ -250,7 +246,7 @@ func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan download } } if md.picartoMode { - fsttim = time.Duration(mistGetTimeFromSegURI(task.url)) * time.Millisecond + fsttim = time.Duration(mistGetTimeFromSegURI(task.url.String())) * time.Millisecond } } else { // add keys @@ -319,7 +315,7 @@ func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan download if md.saveSegmentsToDisk { seg := new(m3u8.MediaSegment) - seg.URI = task.url + seg.URI = task.url.String() seg.SeqId = task.seqNo seg.Duration = task.duration seg.Title = task.title @@ -331,7 +327,7 @@ func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan download upts := strings.Split(fsurl, "/") // fn := upts[len(upts)-2] + "-" + path.Base(task.url) ind := len(upts) - 2 - fn := path.Base(task.url) + fn := path.Base(task.url.String()) if !md.livepeerNameSchema { // ind = 0 // fn = upts[0] @@ -378,7 +374,7 @@ func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan download glog.V(model.DEBUG).Infof("Segment %s saved to %s", seg.URI, filepath.Join(md.saveDir, fn)) }(fn, fullpath, b) } - res <- downloadResult{status: resp.Status, bytes: len(b), try: try, name: task.url, seqNo: task.seqNo, downloadCompetedAt: now, + res <- downloadResult{status: resp.Status, bytes: len(b), try: try, name: task.url.String(), seqNo: task.seqNo, downloadCompetedAt: now, videoParseError: verr, startTime: fsttim, duration: dur, mySeqNo: task.mySeqNo, appTime: task.appTime, keyFrames: keyFrames} return } @@ -560,7 +556,11 @@ func (md *mediaDownloader) manifestDownloadLoop() { if err == nil { seqNo = parsedSeq } - md.downTasks <- downloadTask{url: segment.URI, seqNo: seqNo, title: segment.Title, duration: segment.Duration, mySeqNo: mySeqNo, appTime: now} + segUrl, err := url.Parse(segment.URI) + if err != nil { + glog.Fatal(err) + } + md.downTasks <- downloadTask{url: segUrl, seqNo: seqNo, title: segment.Title, duration: segment.Duration, mySeqNo: mySeqNo, appTime: now} md.segmentsToDownload++ now = now.Add(time.Millisecond) // glog.V(model.VERBOSE).Infof("segment %s is of length %f seqId=%d", segment.URI, segment.Duration, segment.SeqId) diff --git a/internal/testers/stream_health.go b/internal/testers/stream_health.go new file mode 100644 index 000000000..4e66095f2 --- /dev/null +++ b/internal/testers/stream_health.go @@ -0,0 +1,134 @@ +package testers + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/golang/glog" + "github.com/livepeer/livepeer-data/pkg/client" + "github.com/livepeer/stream-tester/model" +) + +type ( + // AnalyzerByRegion is a map of regions (generally just the base URL) to an + // analyzer client configured to connect there. + AnalyzerByRegion map[string]client.Analyzer + + streamHealth struct { + finite + streamID string + clients AnalyzerByRegion + } +) + +func NewStreamHealth(parent context.Context, streamID string, clients AnalyzerByRegion, waitForTarget time.Duration) Finite { + ctx, cancel := context.WithCancel(parent) + sh := &streamHealth{ + finite: finite{ + ctx: ctx, + cancel: cancel, + }, + streamID: streamID, + clients: clients, + } + go sh.workerLoop(waitForTarget) + return sh +} + +func (h *streamHealth) workerLoop(waitForTarget time.Duration) { + defer h.cancel() + unhealthyTimeout := time.After(waitForTarget) + checkTicker := time.NewTicker(5 * time.Second) + logErrs := false + time.AfterFunc(waitForTarget/2, func() { logErrs = true }) + + defer checkTicker.Stop() + var unhealthyRegions []checkResult + for { + select { + case <-h.ctx.Done(): + return + case <-checkTicker.C: + results := h.checkAllRegions(logErrs) + unhealthyRegions = unhealthyRegions[:0] + for res := range results { + if res.err != nil { + unhealthyRegions = append(unhealthyRegions, res) + } + } + if len(unhealthyRegions) == 0 { + unhealthyTimeout = nil + } else if unhealthyTimeout == nil { + unhealthyTimeout = time.After(waitForTarget) + } + case <-unhealthyTimeout: + errsRegions := map[string][]string{} + for _, check := range unhealthyRegions { + err := check.err.Error() + errsRegions[err] = append(errsRegions[err], fmt.Sprintf("`%s`", check.region)) + } + aggErrs := make([]string, 0, len(errsRegions)) + for err, regions := range errsRegions { + regionsStr := "`all` regions" + if len(regions) < len(h.clients) { + regionsStr = "[" + strings.Join(regions, ", ") + "]" + } + sort.Slice(regions, func(i, j int) bool { return regions[i] < regions[j] }) + aggErrs = append(aggErrs, fmt.Sprintf("%s in %s", err, regionsStr)) + } + sort.Slice(aggErrs, func(i, j int) bool { return aggErrs[i] < aggErrs[j] }) + + msg := fmt.Sprintf("Global Stream Health API: stream did not become healthy on global analyzers after `%s`: %s", + waitForTarget, strings.Join(aggErrs, "; ")) + h.fatalEnd(errors.New(msg)) + return + } + } +} + +type checkResult struct { + region string + err error +} + +func (h *streamHealth) checkAllRegions(logErrs bool) <-chan checkResult { + results := make(chan checkResult, 2) + wg := sync.WaitGroup{} + startTime := time.Now() + for region := range h.clients { + wg.Add(1) + go func(region string) { + defer wg.Done() + glog.V(model.INSANE).Infof("Checking stream health for region=%s", region) + health, err := h.clients[region].GetStreamHealth(h.ctx, h.streamID) + if err != nil { + // do nothing, we'll log the error below if asked for. + } else if healthy := health.Healthy.Status; healthy == nil { + err = fmt.Errorf("`healthy` condition unavailable") + } else if !*healthy { + err = fmt.Errorf("`healthy` condition is `false`") + } else if age := startTime.Sub(health.Healthy.LastProbeTime.Time); age > time.Minute { + err = fmt.Errorf("stream health is outdated (`%s`)", age) + } + if err != nil && (logErrs || bool(glog.V(model.VVERBOSE))) { + rawHealth, jsonErr := json.Marshal(health) + if jsonErr != nil { + rawHealth = []byte(fmt.Sprintf("%+v", health)) + } + glog.Warningf("Stream not healthy on region=%q, err=%q, health=%s", region, err, rawHealth) + } + results <- checkResult{region, err} + }(region) + } + go func() { + wg.Wait() + close(results) + }() + return results +} diff --git a/internal/testers/streamer2.go b/internal/testers/streamer2.go index c26443876..f58a1d40f 100644 --- a/internal/testers/streamer2.go +++ b/internal/testers/streamer2.go @@ -2,7 +2,9 @@ package testers import ( "context" + "errors" "fmt" + "strings" "time" "github.com/golang/glog" @@ -18,33 +20,37 @@ var IgnoreNoCodecError bool var StartDelayBetweenGroups = 2 * time.Second type ( + Streamer2Options struct { + WowzaMode bool + MistMode bool + Save bool + FailIfTranscodingStops bool + PrintStats bool + } + + StartTestFunc func(ctx context.Context, mediaURL string, waitForTarget time.Duration, opts Streamer2Options) Finite + // streamer2 is used for running continious tests against Wowza servers streamer2 struct { finite - uploader *rtmpStreamer - downloader *m3utester2 - wowzaMode bool - mistMode bool - save bool - failIfTranscodingStops bool - printStats bool - err error + Streamer2Options + uploader *rtmpStreamer + downloader *m3utester2 + additionalTests []StartTestFunc + err error } ) // NewStreamer2 returns new streamer2 -func NewStreamer2(pctx context.Context, wowzaMode, mistMode, save, failIfTranscodingStops, printStats bool) model.Streamer2 { +func NewStreamer2(pctx context.Context, opts Streamer2Options, additionalTests ...StartTestFunc) model.Streamer2 { ctx, cancel := context.WithCancel(pctx) return &streamer2{ finite: finite{ ctx: ctx, cancel: cancel, }, - wowzaMode: wowzaMode, - mistMode: mistMode, - save: save, - failIfTranscodingStops: failIfTranscodingStops, - printStats: printStats, + Streamer2Options: opts, + additionalTests: additionalTests, } } @@ -70,9 +76,7 @@ func (sr *streamer2) StartStreaming(sourceFileName string, rtmpIngestURL, mediaU } // check if we can make TCP connection to RTMP target if err := utils.WaitForTCP(waitForTarget, rtmpIngestURL); err != nil { - glog.Info(err) - sr.globalError = err - messenger.SendFatalMessage(err.Error()) + sr.fatalEnd(err) return } @@ -85,15 +89,39 @@ func (sr *streamer2) StartStreaming(sourceFileName string, rtmpIngestURL, mediaU go func() { sr.uploader.StartUpload(sourceFileName, rtmpIngestURL, timeToStream, waitForTarget) }() - sr.downloader = newM3utester2(sr.ctx, mediaURL, sr.wowzaMode, sr.mistMode, - sr.failIfTranscodingStops, sr.save, sr.printStats, waitForTarget, sm, false) // starts to download at creation + sr.downloader = newM3utester2(sr.ctx, mediaURL, sr.WowzaMode, sr.MistMode, + sr.FailIfTranscodingStops, sr.Save, sr.PrintStats, waitForTarget, sm, false) // starts to download at creation + tests := []Finite{sr.downloader} + for _, startFunc := range sr.additionalTests { + tests = append(tests, startFunc(sr.ctx, mediaURL, waitForTarget, sr.Streamer2Options)) + } go func() { - select { - case <-sr.ctx.Done(): - case <-sr.downloader.Done(): - sr.globalError = sr.downloader.globalError - // media downloader exited (probably with error), stop streaming - sr.cancel() + var ( + ctx, cancel = context.WithCancel(sr.ctx) + testsDone = onAnyDone(ctx, tests) + errs = []string{} + ) + defer cancel() + for { + select { + case test := <-testsDone: + if err := test.GlobalErr(); err != nil { + if sr.globalError == nil { + sr.globalError = err + time.AfterFunc(10*time.Second, cancel) + } + errs = append(errs, err.Error()) + } + case <-ctx.Done(): + if len(errs) > 0 { + msg := errs[0] + if len(errs) > 1 { + msg = "Multiple errors: " + strings.Join(errs, "; ") + } + sr.fatalEnd(errors.New(msg)) + } + return + } } }() started := time.Now() @@ -108,6 +136,20 @@ func (sr *streamer2) StartStreaming(sourceFileName string, rtmpIngestURL, mediaU sr.cancel() } +func onAnyDone(ctx context.Context, finites []Finite) <-chan Finite { + finished := make(chan Finite, len(finites)) + for _, f := range finites { + go func(f Finite) { + select { + case <-f.Done(): + finished <- f + case <-ctx.Done(): + } + }(f) + } + return finished +} + // StartPulling pull arbitrary HLS stream and report found errors /* func (sr *streamer2) StartPulling(mediaURL string) {