diff --git a/apis/mist/mist.go b/apis/mist/mist.go index b7bca3f38..654810309 100644 --- a/apis/mist/mist.go +++ b/apis/mist/mist.go @@ -21,9 +21,6 @@ import ( const httpTimeout = 2 * time.Second var httpClient = &http.Client{ - // Transport: &http2.Transport{TLSClientConfig: tlsConfig}, - // Transport: &http2.Transport{AllowHTTP: true}, - // Transport: &http2.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, AllowHTTP: true}, Timeout: httpTimeout, } @@ -317,7 +314,6 @@ func (mapi *API) CreateStream(name string, presets []string, profiles []Profile, Name: name, Source: source, Segmentsize: segmentSize, - // Processes: []*Process{{Process: "Livepeer", AccessToken: mapi.livepeerToken, TargetProfiles: targetProfiles, Leastlive: "1", CustomURL: customURL}}, } if !skipTranscoding { audioSelect := "" diff --git a/cmd/api-transcoder/api-transcoder.go b/cmd/api-transcoder/api-transcoder.go index 867516d88..728650778 100644 --- a/cmd/api-transcoder/api-transcoder.go +++ b/cmd/api-transcoder/api-transcoder.go @@ -223,17 +223,7 @@ func transcode(apiKey, apiHost, src, dst string, presets []string, lprofile *liv func main() { flag.Set("logtostderr", "true") - // flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) - // flag.Parse() flag.CommandLine.Parse(nil) - /* - vFlag := flag.Lookup("v") - verbosity := "6" - - flag.CommandLine.Parse(nil) - vFlag.Value.Set(verbosity) - */ - // var echoTimes int var apiKey, apiHost string var presets string var resolution, frameRate, profile string @@ -246,7 +236,6 @@ func main() { Long: `Transcodes video file using Livepeer API.`, Args: cobra.MinimumNArgs(2), Run: func(cmd *cobra.Command, args []string) { - // fmt.Println("transcode: " + strings.Join(args, " ")) var err error inp := args[0] inpExt := filepath.Ext(inp) @@ -275,8 +264,6 @@ func main() { os.Exit(1) } fmt.Printf("api key %q transcode from %q to %q\n", apiKey, inp, output) - // presets := []string{"P144p30fps16x9", "P240p30fps4x3"} - // ffmpeg.P720p30fps16x9 var presetsAr []string if len(presets) > 0 { presetsAr = strings.Split(presets, ",") diff --git a/cmd/eapi/eapi.go b/cmd/eapi/eapi.go index 1f94ea3ed..a0da6e1f0 100644 --- a/cmd/eapi/eapi.go +++ b/cmd/eapi/eapi.go @@ -5,34 +5,18 @@ import ( "flag" "fmt" "log" - "net/http" "os" "runtime" "strings" "time" "github.com/livepeer/stream-tester/apis/etcd" - "github.com/livepeer/stream-tester/apis/livepeer" "github.com/livepeer/stream-tester/internal/utils" "github.com/livepeer/stream-tester/model" "github.com/peterbourgon/ff/v2/ffcli" clientv3 "go.etcd.io/etcd/client/v3" - "golang.org/x/net/http2" ) -const httpTimeout = 16 * time.Second - -var httpClient = &http.Client{ - Timeout: httpTimeout, -} - -var http2Client = &http.Client{ - Transport: &http2.Transport{}, - Timeout: httpTimeout, -} - -var server = livepeer.ACServer - func main() { flag.Set("logtostderr", "true") vFlag := flag.Lookup("v") @@ -45,11 +29,6 @@ func main() { etcdKey := rootFlagSet.String("key", "", "ETCD client certificate key file name") endpoints := strings.Split(*endpointsF, ",") - // token := rootFlagSet.String("token", "", "Livepeer API's access token") - // presets := rootFlagSet.String("presets", "P240p30fps16x9", "Transcoding profiles") - // fServer := rootFlagSet.String("server", livepeer.ACServer, "API server to use") - // streamID := rootFlagSet.String("stream-id", "", "ID of existing stream to use for transcoding") - put := &ffcli.Command{ Name: "put", ShortUsage: "lapi put key_name key_value", @@ -73,10 +52,8 @@ func main() { } else { ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) txn := eapi.Client.Txn(ctx) - // fmt.Printf("txt %+v", txn) cmp := clientv3.Compare(clientv3.CreateRevision(args[0]), "=", 0) cmp = clientv3.Compare(clientv3.CreateRevision(args[0]), ">", -1) - // put := clientv3.OpPut(args[0], args[1], clientv3.WithLease(s.Lease())) put := clientv3.OpPut(args[0], args[1]) // reuse key in case this session already holds the lock get := clientv3.OpGet(args[0]) @@ -136,7 +113,6 @@ func main() { } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) var opts []clientv3.OpOption - // x := clientv3.WithRev(presp.Header.Revision) if len(args) > 1 { opts = append(opts, clientv3.WithRange(args[1])) } else { @@ -151,8 +127,6 @@ func main() { fmt.Printf("No keys found for %s\n", key) } for _, ev := range resp.Kvs { - // fmt.Printf("%s : %s CreateRevision: %d ModRevision %d Version %d\n", ev.Key, ev.Value, ev.CreateRevision, - // ev.ModRevision, ev.Version) fmt.Printf("'%s' CreateRevision: %d ModRevision %d Version %d\n", ev.Key, ev.CreateRevision, ev.ModRevision, ev.Version) } @@ -166,9 +140,6 @@ func main() { Subcommands: []*ffcli.Command{put, ls, del}, } - // if err := root.ParseAndRun(context.Background(), os.Args[1:]); err != nil { - // log.Fatal(err) - // } if err := root.Parse(os.Args[1:]); err != nil { log.Fatal(err) } @@ -178,7 +149,6 @@ func main() { fmt.Println("eapi version: " + model.Version) fmt.Printf("Compiler version: %s %s\n", runtime.Compiler, runtime.Version()) fmt.Printf("Hostname %s OS %s IPs %v\n", hostName, runtime.GOOS, utils.GetIPs()) - // flag.Parse() if err := root.Run(context.Background()); err != nil { log.Fatal(err) diff --git a/cmd/mapi/mapi.go b/cmd/mapi/mapi.go index 61b7da503..4733f9dea 100644 --- a/cmd/mapi/mapi.go +++ b/cmd/mapi/mapi.go @@ -20,7 +20,6 @@ import ( "github.com/livepeer/stream-tester/internal/utils/uhttp" "github.com/livepeer/stream-tester/model" "github.com/peterbourgon/ff/v2/ffcli" - "golang.org/x/net/http2" ) const httpTimeout = 16 * time.Second @@ -30,11 +29,6 @@ var httpClient = &http.Client{ Timeout: httpTimeout, } -var http2Client = &http.Client{ - Transport: &http2.Transport{}, - Timeout: httpTimeout, -} - func main() { flag.Set("logtostderr", "true") vFlag := flag.Lookup("v") diff --git a/cmd/orch-tester/broadcaster_metrics.go b/cmd/orch-tester/broadcaster_metrics.go index 620d940ba..673f8ab9c 100644 --- a/cmd/orch-tester/broadcaster_metrics.go +++ b/cmd/orch-tester/broadcaster_metrics.go @@ -127,9 +127,6 @@ func (bm *broadcasterMetrics) avg(m string) float64 { } func (bm *broadcasterMetrics) errorCount() map[string]int { - bm.mu.Lock() - bm.mu.Unlock() - res := map[string]int{} for key, value := range bm.incErrs { res[key] = value diff --git a/cmd/streamtester/streamtester.go b/cmd/streamtester/streamtester.go index 011bd8f18..96c6f1ba1 100644 --- a/cmd/streamtester/streamtester.go +++ b/cmd/streamtester/streamtester.go @@ -77,7 +77,6 @@ func main() { apiToken := flag.String("api-token", "", "Token of the Livepeer API to be used by the Mist server") apiServer := flag.String("api-server", "livepeer.com", "Server of the Livepeer API to be used") lapiFlag := flag.Bool("lapi", false, "Use Livepeer API to create streams. api-token should be specified") - presets := flag.String("presets", "", "Comma separate list of transcoding profiels to use along with Livepeer API") skipTime := flag.Duration("skip-time", 0, "Skips first x(s|m)") picartoFlag := flag.Bool("picarto", false, "Do Picarto-pull testing") adult := flag.Bool("adult", false, "Adult Picarto") @@ -110,16 +109,12 @@ func main() { fmt.Printf("Production: %v\n", model.Production) if *version { - // fmt.Println("Stream tester version: " + model.Version) - // fmt.Printf("Compiler version: %s %s\n", runtime.Compiler, runtime.Version()) return } if *latencyThreshold > 0 { *latency = true } metrics.InitCensus(hostName, model.Version, "streamtester") - // codec.MTest() - // return gctx, gcancel := context.WithCancel(context.Background()) // to be used as global parent context, in the future messenger.Init(gctx, *discordURL, *discordUserName, *discordUsersToNotify, *botToken, *channelID, *apiToken) defer time.Sleep(2 * time.Second) @@ -153,12 +148,7 @@ func main() { testers.IgnoreNoCodecError = true testers.IgnoreTimeDrift = true } - // puller := testers.NewInfinitePuller(gctx, *infinitePull, *save, *wowza, *mist) - // puller.Start() glog.Infof(`Starting infinite pull from %s`, *infinitePull) - // messenger.SendMessage(msg) - // sr2 := testers.NewStreamer2(gctx, *wowza, *mist) - // sr2.StartPulling(*mediaURL) started := time.Now() var wg sync.WaitGroup for i := 0; i < int(*sim); i++ { @@ -225,8 +215,6 @@ func main() { mc := testers.NewMistController(*bhost, int(*picartoStreams), *profiles, *adult, *gaming, *save, mapi, *picartoBlackList, *picartoExternalHost, *picartoStatsInterval, *picartoSDCutOff, *picartoCountry) - // emsg := fmt.Sprintf("Starting **%d** Picarto streams (ver %s)", *picartoStreams, model.Version) - // messenger.SendMessage(emsg) go startWebServer() // needed for /metrics endpoint err = mc.Start() @@ -242,26 +230,9 @@ func main() { time.Sleep(time.Second) return } - // var streamDuration time.Duration - // if *stime != "" { - // if streamDuration, err = server.ParseStreamDurationArgument(*stime); err != nil { - // panic(err) - // } - // if *repeat > 1 { - // // glog.Fatal("Can't set both -time and -repeat.") - // } - // } if *mediaURL != "" && *rtmpURL == "" { glog.Fatal("Should also specifiy -rtmp-url") - // glog.Infof(`Starting infinite pull from %s`, *mediaURL) - // messenger.SendMessage(msg) - // sr2 := testers.NewStreamer2(gctx, *wowza, *mist) - // sr2.StartPulling(*mediaURL) - // started := time.Now() - // downloader := testers.NewM3utester2(gctx, *mediaURL, *wowza, *mist, *save, 15*time.Second, nil) // starts to download at creation - // <-downloader.Done() - // glog.Infof(`Streaming stopped after %s`, time.Since(started)) return } if *rtmpURL != "" { @@ -285,9 +256,6 @@ func main() { stats, _ := sr2.Stats() fmt.Printf("Stats: %+v\n", stats) os.Exit(model.ExitCode) - // to not exit - // s := server.NewStreamerServer(*wowza) - // s.StartWebServer("localhost:7933") return } @@ -311,25 +279,13 @@ func main() { mapi = mistapi.NewMist(*bhost, mcreds[0], mcreds[1], *apiToken, *mistPort) mapi.Login() - // mapi.CreateStream("dark1", "P720p30fps16x9") - // mapi.DeleteStreams("dark1") } if *lapiFlag { if *apiToken == "" { glog.Fatalf("-api-token should be specified") } - if !*httpIngest { - // glog.Fatal("Using Livepeer API currently only implemented for HTTP ingest") - // glog.Fatal("Using Livepeer API currently only implemented for RTMP ingest") - // API webhook doesn't authenicate RTMP streams - } - if *presets == "" { - // glog.Fatal("Presets should be specified") - } go startWebServer() // needed for /metrics endpoint hostName, _ := os.Hostname() - // presetsParts := strings.Split(*presets, ",") - // model.ProfilesNum = len(presetsParts) lapi = livepeer.NewLivepeer(*apiToken, *apiServer, nil) lapi.Init() glog.Infof("Choosen server: %s", lapi.GetServer()) @@ -386,7 +342,6 @@ func main() { os.Exit(model.ExitCode) return } - // fmt.Printf("Args: %+v\n", flag.Args()) glog.Infof("Starting stream tester %s, file %s number of streams is %d, repeat %d times no bar %v", model.Version, fn, *sim, *repeat, *noBar) defer glog.Infof("Exiting") @@ -406,14 +361,9 @@ func main() { time.Sleep(25 * time.Second) stats, _ := sr.Stats("") fmt.Println(stats.FormatForConsole()) - // fmt.Println(sr.DownStatsFormatted()) } }() } - // go func() { - // time.Sleep(10 * time.Second) - // sr.Cancel() - // }() // Catch interrupt signal to shut down transcoder glog.Infof("Waiting for test to complete") <-sr.Done() @@ -438,7 +388,6 @@ func main() { } } - // fmt.Println(sr.AnalyzeFormatted(false)) if *latencyThreshold > 0 && stats.TranscodedLatencies.P95 > 0 { // check latencies, report failure or success var msg string @@ -456,5 +405,4 @@ func main() { s := server.NewStreamerServer(*wowza, "", "", *mistPort) s.StartWebServer(gctx, *serverAddr) } - // messenger.SendMessage(sr.AnalyzeFormatted(true)) } diff --git a/internal/app/mistapiconnector/mistapiconnector_app.go b/internal/app/mistapiconnector/mistapiconnector_app.go index 99e92835a..a6c2d1e65 100644 --- a/internal/app/mistapiconnector/mistapiconnector_app.go +++ b/internal/app/mistapiconnector/mistapiconnector_app.go @@ -38,7 +38,6 @@ const traefikKeyPathRouters = `traefik/http/routers/` const traefikKeyPathServices = `traefik/http/services/` const traefikKeyPathMiddlewares = `traefik/http/middlewares/` const audioAlways = "always" -const audioNever = "never" const audioRecord = "record" const audioEnabledStreamSuffix = "rec" const etcdDialTimeout = 5 * time.Second diff --git a/internal/app/vodtester/continuous_vod_tester.go b/internal/app/vodtester/continuous_vod_tester.go index 2c54c6ab3..b849542ec 100644 --- a/internal/app/vodtester/continuous_vod_tester.go +++ b/internal/app/vodtester/continuous_vod_tester.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net/url" - "strconv" "time" "github.com/PagerDuty/go-pagerduty" @@ -144,12 +143,3 @@ func (cvt *continuousVodTester) Cancel() { func (cvt *continuousVodTester) Done() <-chan struct{} { return cvt.ctx.Done() } - -var nth = []string{"0", "first", "second", "third", "forth", "fifth"} - -func getNth(i int) string { - if i > 0 && i < len(nth) { - return nth[i] - } - return strconv.Itoa(i) -} diff --git a/internal/metrics/census.go b/internal/metrics/census.go index fee19f1bd..d277e330a 100644 --- a/internal/metrics/census.go +++ b/internal/metrics/census.go @@ -16,11 +16,6 @@ import ( "go.opencensus.io/tag" ) -const ( - logLevel = 6 // TODO move log levels definitions to separate package - // importing `common` package here introduces import cycles -) - type ( censusMetricsCounter struct { nodeID string diff --git a/internal/testers/http_streamer.go b/internal/testers/http_streamer.go index 8af35c0e8..4b9e67110 100644 --- a/internal/testers/http_streamer.go +++ b/internal/testers/http_streamer.go @@ -49,9 +49,7 @@ type httpStreamer struct { type httpStats struct { triedToSend int - sent int failedToSend int - downloaded int downloadFailures int transcodeFailures int success int diff --git a/internal/testers/infinite_puller.go b/internal/testers/infinite_puller.go deleted file mode 100644 index dd18b09f9..000000000 --- a/internal/testers/infinite_puller.go +++ /dev/null @@ -1,50 +0,0 @@ -package testers - -/* - -import ( - "context" - "fmt" - "time" - - "github.com/golang/glog" - "github.com/livepeer/stream-tester/model" -) - -type infinitePuller struct { - url string - save bool - wowza bool - mist bool - ctx context.Context -} - -// NewInfinitePuller ... -func NewInfinitePuller(ctx context.Context, url string, save, wowza, mist bool) model.InfinitePuller { - return &infinitePuller{ - ctx: ctx, - url: url, - save: save, - wowza: wowza, - mist: mist, - } -} - -func (ip *infinitePuller) Start() { - msg := fmt.Sprintf("Starting to pull infinite stream from %s", ip.url) - glog.Info(msg) - down := newM3utester2(ip.ctx, ip.url, ip.wowza, ip.mist, time.Minute, nil) // starts to download at creation - started := time.Now() - <-down.Done() - msg = fmt.Sprintf("Done pulling from %s after %s", ip.url, time.Since(started)) - glog.Info(msg) - - // down := newM3UTester(ip.ctx, nil, ip.wowza, false, false, true, ip.save, nil, nil, "") - // messenger.SendMessage(msg) - // down.Start(ip.url) - // <-down.ctx.Done() - // msg = fmt.Sprintf("Done pulling from %s", ip.url) - // messenger.SendMessage(msg) -} - -*/ diff --git a/internal/testers/m3utester.go b/internal/testers/m3utester.go index a407a6f44..fd25e3625 100644 --- a/internal/testers/m3utester.go +++ b/internal/testers/m3utester.go @@ -32,22 +32,15 @@ const stopAfterMediaStreamsDrop = false const HTTPTimeout = 16 * time.Second var httpClient = &http.Client{ - // Transport: &http2.Transport{TLSClientConfig: tlsConfig}, - // Transport: &http2.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}}, - // Transport: &http2.Transport{AllowHTTP: true}, Timeout: HTTPTimeout, } var http2Client = &http.Client{ - // Transport: &http2.Transport{TLSClientConfig: tlsConfig}, - // Transport: &http2.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}}, - // Transport: &http2.Transport{AllowHTTP: true}, Transport: &http2.Transport{}, Timeout: HTTPTimeout, } var wowzaSessionRE *regexp.Regexp = regexp.MustCompile(`_(w\d+)_`) -var wowzaBandwidthRE *regexp.Regexp = regexp.MustCompile(`_b(\d+)\.`) var mistSessionRE *regexp.Regexp = regexp.MustCompile(`(\?sessId=\d+)`) type downStats2 struct { @@ -94,22 +87,8 @@ type m3utester struct { saveDirName string cancel context.CancelFunc shouldSkip [][]string - // downloadResults fullDownloadResultsMap - // dm sync.Mutex - // gaps int } -/* -type fullDownloadResultsMap map[string]*fullDownloadResults - -type fullDownloadResults struct { - results []downloadResult - mediaPlaylistName string - resolution string -} - -*/ - type fullDownloadResult struct { downloadResult mediaPlaylistName string @@ -138,17 +117,11 @@ type downloadResult struct { } func (r *downloadResult) String() string { - // return fmt.Sprintf("%10s %14s seq %3d: mySeq %3d time %7s duration %7s size %7d bytes appearance time %s (%d)", - // r.resolution, r.name, r.seqNo, r.mySeqNo, r.startTime, r.duration, r.bytes, r.appTime, r.appTime.UnixNano()) return fmt.Sprintf("%10s %14s seq %3d: time %7s duration %7s size %7d bytes appearance time %s (%d)", r.resolution, r.name, r.seqNo, r.startTime, r.duration, r.bytes, r.appTime, r.appTime.UnixNano()) } func (r *downloadResult) String2() string { - // return fmt.Sprintf("%10s %14s seq %3d: mySeq %3d time %7s duration %7s size %7d bytes appearance at first place time %s (%d)", - // r.resolution, r.name, r.seqNo, r.mySeqNo, r.startTime, r.duration, r.bytes, r.timeAtFirstPlace, r.timeAtFirstPlace.UnixNano()) - // return fmt.Sprintf("%10s %20s seq %3d: time %7s duration %7s size %7d bytes at first %s (%d)", - // r.resolution, r.name, r.seqNo, r.startTime, r.duration, r.bytes, r.timeAtFirstPlace, r.timeAtFirstPlace.UnixNano()) return fmt.Sprintf("%10s %20s seq %3d: time %7s duration %7s first %s app %s", r.resolution, r.name, r.seqNo, r.startTime, r.duration, r.timeAtFirstPlace.Format(printTimeFormat), r.appTime.Format(printTimeFormat)) } @@ -156,33 +129,6 @@ func (r *downloadResult) String2() string { // const printTimeFormat = "2006-01-02T15:04:05" const printTimeFormat = "2006-01-02T15:04:05.999999999" -type downloadResultsBySeq []*downloadResult - -func (p downloadResultsBySeq) Len() int { return len(p) } -func (p downloadResultsBySeq) Less(i, j int) bool { - // return p[i].seqNo < p[j].seqNo - // return p[i].mySeqNo < p[j].mySeqNo - return p[i].appTime.Before(p[j].appTime) -} -func (p downloadResultsBySeq) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -func (p downloadResultsBySeq) findBySeqNo(seqNo uint64) *downloadResult { - for _, seg := range p { - if seg.seqNo == seqNo { - return seg - } - } - return nil -} -func (p downloadResultsBySeq) findByMySeqNo(seqNo uint64) *downloadResult { - for _, seg := range p { - if seg.mySeqNo == seqNo { - return seg - } - } - return nil -} - -// newM3UTester ... func newM3UTester(ctx context.Context, sentTimesMap *utils.SyncedTimesMap, wowzaMode, mistMode, picartoMode, infiniteMode, save bool, sm *segmentsMatcher, shouldSkip [][]string, name string) *m3utester { @@ -200,7 +146,6 @@ func newM3UTester(ctx context.Context, sentTimesMap *utils.SyncedTimesMap, wowza name: name, fullResultsCh: make(chan *fullDownloadResult, 32), downSegs: make(map[string]map[string]*fullDownloadResult), - // downloadResults: make(map[string]*fullDownloadResults), } ct, cancel := context.WithCancel(ctx) t.ctx = ct @@ -251,72 +196,6 @@ func (mt *m3utester) Start(u string) { mt.downStats2.url = u go mt.manifestDownloadLoop() go mt.workerLoop() - // if mt.infiniteMode { - // go mt.anaylysePrintingLoop() - // } -} - -/* -func (mt *m3utester) anaylysePrintingLoop() string { - for { - time.Sleep(30 * time.Second) - if !mt.startTime.IsZero() { - mt.dm.Lock() - a, _ := analyzeDownloads(mt.downloadResults, false, false) - mt.dm.Unlock() - glog.Infof("Analysis from start %s:\n%s", time.Since(mt.startTime), a) - } - } -} -*/ - -/* -func sortByResolution(results map[string]*fullDownloadResults) []string { - r := make([]string, 0, len(results)) - return r -} - -func containsString(ss []string, stf string) bool { - for _, s := range ss { - if s == stf { - return true - } - } - return false -} - -func (fdr fullDownloadResultsMap) getResolutions() []string { - res := make([]string, 0) - for _, dr := range fdr { - if !containsString(res, dr.resolution) { - res = append(res, dr.resolution) - } - } - return res -} - -func (fdr fullDownloadResultsMap) byResolution(resolution string) []*fullDownloadResults { - res := make([]*fullDownloadResults, 0) - for _, dr := range fdr { - if dr.resolution == resolution { - res = append(res, dr) - } - } - return res -} -*/ - -/* - Should consider 10.867s and 10.866s to be equal -*/ -func isTimeEqual(t1, t2 time.Duration) bool { - diff := t1 - t2 - if diff < 0 { - diff *= -1 - } - // 1000000 - // return diff <= time.Millisecond - return diff <= 1*time.Second } func isTimeEqualM(t1, t2 time.Duration) bool { @@ -328,15 +207,6 @@ func isTimeEqualM(t1, t2 time.Duration) bool { return diff <= 100*time.Millisecond } -func isTimeEqualT(t1, t2 time.Time) bool { - diff := t1.Sub(t2) - if diff < 0 { - diff *= -1 - } - // 1000000 - return diff <= 10*time.Millisecond -} - func isTimeEqualTD(t1, t2 time.Time, d time.Duration) bool { diff := t1.Sub(t2) if diff < 0 { @@ -353,215 +223,6 @@ func absTimeTiff(t1, t2 time.Duration) time.Duration { return diff } -func absTimeTiffT(t1, t2 time.Time) time.Duration { - diff := t1.Sub(t2) - if diff < 0 { - diff *= -1 - } - return diff -} - -/* -func analyzeDownloads(downloadResults fullDownloadResultsMap, short, streamEnded bool) (string, int) { - res := "" - resolutions := downloadResults.getResolutions() - byRes := make(map[string]downloadResultsBySeq) - short = false - var gaps int - for _, resolution := range resolutions { - results := make(downloadResultsBySeq, 0) - fresults := downloadResults.byResolution(resolution) - for _, rs := range fresults { - for _, r := range rs.results { - rl := r - results = append(results, &rl) - } - } - sort.Sort(results) - byRes[resolution] = results - res += fmt.Sprintf("=== For resolution %s:\n", resolution) - allGood := "=== All is good! ===\n" - if len(results) == 0 { - res += "No segments!!!!\n" - continue - } - if !short { - res += fmt.Sprintf("==== Results sorted:\n") - var tillNext time.Duration - var problem string - for i, r := range results { - problem = "" - tillNext = 0 - if i < len(results)-1 { - ns := results[i+1] - tillNext = ns.startTime - r.startTime - if tillNext > 0 && !isTimeEqualM(r.duration, tillNext) { - problem = fmt.Sprintf(" ===> possible gap - to big time difference %s", r.duration-tillNext) - } - } - res += fmt.Sprintf("%10s %14s seq %3d: mySeq %3d time %s duration %s till next %s appearance time %s %s\n", - resolution, r.name, r.seqNo, r.mySeqNo, r.startTime, r.duration, tillNext, r.appTime, problem) - } - } - if results[0].seqNo > 1 { - res += fmt.Sprintf("Segments start from %d\n", results[0].seqNo) - } - var lastSeq, lastRSeq uint64 - var lastStartTime time.Duration - var lastFileName string - for _, seg := range results { - if seg.mySeqNo != lastSeq+1 { - if seg.mySeqNo > lastSeq { - res += fmt.Sprintf("Gap in sequence - file %s with seqNo %d mySeq %d (start time %s), previous seqNo is %d mySeq %d (start time %s)\n", - seg.name, seg.seqNo, seg.mySeqNo, seg.startTime, lastSeq, lastRSeq, lastStartTime) - allGood = "" - gaps++ - } else if seg.mySeqNo == lastSeq { - if seg.startTime != lastStartTime { - res += fmt.Sprintf("Media stream switched, but corresponding segments have different time stamp: file %s with seqNo %d (start time %s), previous file %s seqNo is %d (start time %s)\n", - seg.name, seg.seqNo, seg.startTime, lastFileName, lastSeq, lastStartTime) - allGood = "" - } - } else if seg.mySeqNo < lastSeq { - res += fmt.Sprintf("Very strange problem - seq is less than previous: file %s with seqNo %d (start time %s), previous seqNo is %d (start time %s)\n", seg.name, seg.seqNo, seg.startTime, lastSeq, lastStartTime) - allGood = "" - } - } - lastSeq = seg.mySeqNo - lastRSeq = seg.seqNo - lastStartTime = seg.startTime - lastFileName = seg.name - } - res += allGood - } - // now check timestamps alignments in different renditions - lastTimeDiffs := make(map[string]map[string]time.Duration) - oneStep := make(map[string]map[string]bool) - for resolution, resRes := range byRes { - for i, seg := range resRes { - for sresolution, resRes2 := range byRes { - if sresolution == resolution { - continue - } - if _, has := oneStep[resolution]; !has { - oneStep[resolution] = make(map[string]bool) - } - if _, has := lastTimeDiffs[resolution]; !has { - lastTimeDiffs[resolution] = make(map[string]time.Duration) - } - // altSeg := resRes2.findBySeqNo(seg.seqNo) - altSeg := resRes2.findByMySeqNo(seg.mySeqNo) - if altSeg == nil { - if streamEnded || i < len(resRes)-4 { - res += fmt.Sprintf("Segment %10s seqNo %3d mySeq %3d doesn't have corresponding segment in %10s\n", resolution, seg.seqNo, seg.mySeqNo, sresolution) - } - } else { - if !isTimeEqual(seg.startTime, altSeg.startTime) { - altSegM := resRes2.findBySeqNo(seg.seqNo - 1) - altSegP := resRes2.findBySeqNo(seg.seqNo + 1) - altSegM2 := resRes2.findBySeqNo(seg.seqNo - 2) - altSegP2 := resRes2.findBySeqNo(seg.seqNo + 2) - diff := seg.startTime - altSeg.startTime - lastDiff := lastTimeDiffs[resolution][sresolution] - if diff != lastDiff { - // if !oneStep[resolution][sresolution] { - res += fmt.Sprintf("Segment %10s seqNo %5d mySeq %3d has time %s but segment %10s seqNo %5d mySeq %3d has time %s diff %s\n", - resolution, seg.seqNo, seg.mySeqNo, seg.startTime, sresolution, altSeg.seqNo, altSeg.mySeqNo, altSeg.startTime, seg.startTime-altSeg.startTime) - // } - if altSegM != nil && isTimeEqual(seg.startTime, altSegM.startTime) { - if !oneStep[resolution][sresolution] { - res += fmt.Sprintf("Stream %10s is one step behind stream %10s\n", resolution, sresolution) - oneStep[resolution][sresolution] = true - // break - } - } - if altSegP != nil && isTimeEqual(seg.startTime, altSegP.startTime) { - if !oneStep[resolution][sresolution] { - res += fmt.Sprintf("Stream %10s is one step ahead stream %10s\n", resolution, sresolution) - oneStep[resolution][sresolution] = true - // break - } - } - if altSegM2 != nil && isTimeEqual(seg.startTime, altSegM2.startTime) { - if !oneStep[resolution][sresolution] { - res += fmt.Sprintf("Stream %10s is two steps behind stream %10s\n", resolution, sresolution) - oneStep[resolution][sresolution] = true - // break - } - } - if altSegP2 != nil && isTimeEqual(seg.startTime, altSegP2.startTime) { - if !oneStep[resolution][sresolution] { - res += fmt.Sprintf("Stream %10s is two steps ahead stream %10s\n", resolution, sresolution) - oneStep[resolution][sresolution] = true - // break - } - } - // res += fmt.Sprintf("%d - %d\n", seg.startTime, altSeg.startTime) - lastTimeDiffs[resolution][sresolution] = diff - } - } - } - } - } - } - return res, gaps -} - -type fullDownloadResultsArray []*fullDownloadResults - -func (p fullDownloadResultsArray) Len() int { return len(p) } -func (p fullDownloadResultsArray) Less(i, j int) bool { - ms1 := wowzaBandwidthRE.FindStringSubmatch(p[i].mediaPlaylistName) - ms2 := wowzaBandwidthRE.FindStringSubmatch(p[j].mediaPlaylistName) - // glog.Infof("name1 %s name2 %d res %+v res2 %+v", p[i].mediaPlaylistName, p[j].mediaPlaylistName, ms1, ms2) - b1, _ := strconv.Atoi(ms1[1]) - b2, _ := strconv.Atoi(ms2[1]) - return b1 < b2 -} -func (p fullDownloadResultsArray) Swap(i, j int) { p[i], p[j] = p[j], p[i] } - -func sortByBandwidth(results map[string]*fullDownloadResults) fullDownloadResultsArray { - res := make(fullDownloadResultsArray, 0, len(results)) - for _, r := range results { - res = append(res, r) - } - sort.Sort(res) - return res -} - -func (mt *m3utester) DownloadStatsFormatted() string { - mt.dm.Lock() - defer mt.dm.Unlock() - res := fmt.Sprintf("Has %d media playlists:\n", len(mt.downloadResults)) - sortedResults := sortByBandwidth(mt.downloadResults) - for _, cdr := range sortedResults { - res += fmt.Sprintf("Media playlist %25s resolution %10s segments %4d\n", cdr.mediaPlaylistName, cdr.resolution, len(cdr.results)) - } - for _, cdr := range sortedResults { - res += fmt.Sprintf("Media playlist %s:\n", cdr.mediaPlaylistName) - for _, dr := range cdr.results { - res += fmt.Sprintf("%s %s seqNo=%3d start time %s duration %s appearance time %s\n", cdr.resolution, dr.name, dr.seqNo, dr.startTime, dr.duration, dr.appTime) - // startTime time.Duration - // duration time.Duration - // name string - // seqNo uint64 - } - } - // res += "\nAnalysis:\n" - // res += analyzeDownloads(mt.downloadResults) - return res -} - -func (mt *m3utester) AnalyzeFormatted(short bool) string { - res := "\nAnalysis:\n" - mt.dm.Lock() - res1, _ := analyzeDownloads(mt.downloadResults, short, false) - res += res1 - mt.dm.Unlock() - return res -} -*/ - // GetFIrstSegmentTime return timestamp of first frame of first segment. // Second returned value is true if already found. func (mt *m3utester) GetFIrstSegmentTime() (time.Duration, bool) { @@ -608,28 +269,9 @@ func (mt *m3utester) stats() downloadStats { d.mu.Unlock() } mt.mu.RUnlock() - // mt.dm.Lock() - // _, gaps := analyzeDownloads(mt.downloadResults, true, false) - // stats.gaps = gaps - // mt.dm.Unlock() return stats } -/* -func (mt *m3utester) StatsFormatted() string { - mt.mu.RLock() - keys := getSortedKeys(mt.downloads) - r := "" - for _, u := range keys { - d := mt.downloads[u] - r += fmt.Sprintf("Stats for %s\n", u) - r += d.statsFormatted() - } - mt.mu.RUnlock() - return r -} -*/ - func (mt *m3utester) getDownStats2() *downStats2 { mt.succ2mu.Lock() dr := mt.downStats2.clone() @@ -656,7 +298,6 @@ func (mt *m3utester) workerLoop() { now := time.Now() glog.V(model.INSANE).Infof("=====>>>>>>>>>>>>>>>>>>>>>>>>>") glog.V(model.VVERBOSE).Infof("source key = %s, down stats2 %+v", sourceKey, mt.downStats2) - // glog.Infof("%+v", mt.downSegs) for dk, dr := range mt.downSegs[sourceKey] { if now.Sub(dr.downloadCompetedAt) > 40*time.Second { mt.downStats2.downSource++ @@ -725,17 +366,6 @@ func (mt *m3utester) workerLoop() { } } - // downSegs map[string]map[string]*downloadResult - /* - mt.dm.Lock() - if _, has := mt.downloadResults[fr.mediaPlaylistName]; !has { - mt.downloadResults[fr.mediaPlaylistName] = &fullDownloadResults{resolution: fr.resolution, mediaPlaylistName: fr.mediaPlaylistName} - } - // turn off for now - // r := mt.downloadResults[fr.mediaPlaylistName] - // r.results = append(r.results, fr.downloadResult) - mt.dm.Unlock() - */ if mt.save { err := ioutil.WriteFile(mt.savePlayListName, mt.savePlayList.Encode().Bytes(), 0644) if err != nil { @@ -781,7 +411,6 @@ func (mt *m3utester) waitForVODdownloads() { glog.V(model.SHORT).Infof("=====>>>>>>>>>>>>>>>>>>>>>>>>>") sourceKey := mt.downloadsKeys[0] glog.V(model.SHORT).Infof("source key = %s, down stats2 %+v", sourceKey, mt.downStats2) - // glog.Infof("%+v", mt.downSegs) for dk, dr := range mt.downSegs[sourceKey] { mt.downStats2.downSource++ mt.downStats2.sourceBytes += int64(dr.bytes) @@ -851,18 +480,6 @@ func (mt *m3utester) manifestDownloadLoop() { } mt.mu.Unlock() } - /* - if gotPlaylistWaitingForEnd { - time.Sleep(2 * time.Second) - loops++ - if loops%2 == 0 { - if glog.V(model.DEBUG) { - fmt.Println(mt.StatsFormatted()) - } - } - continue - } - */ resp, err := httpClient.Do(uhttp.GetRequest(surl)) if err != nil { glog.Infof("===== get error getting master playlist %s: %v", surl, err) @@ -983,23 +600,8 @@ func (mt *m3utester) manifestDownloadLoop() { } } mt.mu.Unlock() - // glog.Infof("Processed playlist with %d variant, not checking anymore", len(mpl.Variants)) - // return } - if mt.picartoMode && !stopAfterMediaStreamsDrop { - // return - } - // } - // glog.Info(string(b)) time.Sleep(period) - /* - loops++ - if loops%2 == 0 { - if glog.V(model.DEBUG) { - fmt.Println(mt.StatsFormatted()) - } - } - */ } } diff --git a/internal/testers/m3utester2.go b/internal/testers/m3utester2.go index 60ffa52cd..39537d3e3 100644 --- a/internal/testers/m3utester2.go +++ b/internal/testers/m3utester2.go @@ -173,9 +173,6 @@ func (mut *m3utester2) VODStats() model.VODStats { } // glog.Infof("==> all results: %+v", mut.allResults) for resolution, drs := range mut.allResults { - if mut.sourceRes == resolution { - } else { - } for _, seg := range drs { vs.SegmentsDur[resolution] += seg.duration vs.SegmentsNum[resolution]++ @@ -403,40 +400,17 @@ func (mut *m3utester2) workerLoop() { } } } - // debug print - /* - if len(correspondingSegments) > 1 { - glog.Infof("=======>>> Corresponding segments for %s (%d segs)", dres.String(), len(correspondingSegments)) - for _, res := range seenResolutions { - if seg, has := correspondingSegments[res]; has { - // glog.Infof("=====> %s", seg.String2()) - fmt.Printf("=====> %s\n", seg.String2()) - } - } - } - glog.Infof("=======>>> segments by time at first ") - for _, res := range seenResolutions { - for _, seg := range results[res] { - // glog.Infof("=====> %s", seg.String2()) - fmt.Printf("=====> %s\n", seg.String2()) - } - } - */ // check for drift - // glog.Infof("=====> seen resolutions: %+v", seenResolutions) for i := 0; i < len(seenResolutions)-1; i++ { for j := i + 1; j < len(seenResolutions); j++ { res1 := seenResolutions[i] res2 := seenResolutions[j] - // glog.Infof("===> i %d j %d res1 %s res2 %s", i, j, res1, res2) seg1, has1 := correspondingSegments[res1] seg2, has2 := correspondingSegments[res2] if has1 && has2 { if diff := absTimeTiff(seg1.startTime, seg2.startTime); diff > 4*time.Second { msg := fmt.Sprintf("Too big (%s) time difference between %s stream (time %s seqNo %d) and %s stream (time %s seqNo %d)", diff, res1, seg1.startTime, seg1.seqNo, res2, seg2.startTime, seg2.seqNo) - // glog.Info("================#########>>>>>>") - // glog.Info(msg) if !IgnoreTimeDrift { mut.fatalEnd(errors.New(msg)) return @@ -448,36 +422,11 @@ func (mut *m3utester2) workerLoop() { } lastTimeDriftReportTime = time.Now() } - // if problems > 100 { - // panic(msg) - // } problems++ } } } } - /* - for ri := 0; ri <= len(seenResolutions); ri++ { - // if resolution == dres.resolution { - // continue - // } - - resolution1 := seenResolutions[i] - // find corresponding segment - for i := len(ress) - 1; i >= 0; i-- { - cseg := ress[i] - if isTimeEqualT(dres.appTime, cseg.appTime) { - glog.Infof("=======>>> Corresponding segments:\n%s\n%s\n", dres.String(), cseg.String()) - diff := absTimeTiff(dres.startTime, cseg.startTime) - if diff > 2*time.Second { - // msg := fmt.Sprintf("\nTime drift detected: %s : %s\ndiff: %d", dres.String(), cseg.String(), diff) - // mut.fatalEnd(msg) - } - } - break - } - } - */ } } } @@ -485,10 +434,8 @@ func (mut *m3utester2) workerLoop() { func (f *finite) fatalEnd(err error) { f.globalError = err model.ExitCode = 127 - // glog.Error(msg) messenger.SendFatalMessage(err.Error()) f.cancel() - // panic(msg) } func (f *finite) Done() <-chan struct{} { @@ -612,10 +559,6 @@ func (mut *m3utester2) manifestPullerLoop(waitForTarget time.Duration) { continue } gpl, plt, err := m3u8.Decode(*bytes.NewBuffer(b), true) - // glog.Infof("playlist type is %d", plt) - // err = mpl.DecodeFrom(resp.Body, true) - // mpl := m3u8.NewMasterPlaylist() - // err = mpl.Decode(*bytes.NewBuffer(b), true) if err != nil { glog.Error("===== error parsing master playlist: ", err) // glog.Error(err) @@ -828,9 +771,6 @@ func (ms *m3uMediaStream) workerLoop(masterDR chan *downloadResult, latencyResul rdur, downloadedSegmentsTotalDuration, successRate, successRateRounded, firstSegmentPTS, rlt) successRate = successRateRounded } - // dres.data = nil - // glog.Infof("downloaded: %+v", *dres) - // glog.Infof("downloaded task: %+v", *dres.task) if ms.save { _, segFileName := filepath.Split(dres.name) fullSegFileName := filepath.Join(ms.saveDirName, segFileName) @@ -1025,14 +965,6 @@ func (ms *m3uMediaStream) manifestPullerLoop(wowzaMode bool) { return } pl := gpl.(*m3u8.MediaPlaylist) - // pl, err := m3u8.NewMediaPlaylist(100, 100) - // if err != nil { - // glog.Fatal(err) - // } - // err = pl.Decode(*bytes.NewBuffer(b), true) - // if err != nil { - // glog.Fatal(err) - // } if !gotManifest && ms.save { ms.savePlayList.TargetDuration = pl.TargetDuration ms.savePlayList.SeqNo = pl.SeqNo @@ -1040,9 +972,6 @@ func (ms *m3uMediaStream) manifestPullerLoop(wowzaMode bool) { } glog.V(model.VVERBOSE).Infof("Got media playlist %s with %d (really %d (%d)) segments of url %s:", ms.resolution, len(pl.Segments), countSegments(pl), pl.Len(), surl) glog.V(model.INSANE2).Info(string(b)) - // glog.V(model.VVERBOSE).Info(pl) - // glog.Infof("Got media playlist %s with %d (really %d) segments of url %s:", ms.resolution, len(pl.Segments), countSegments(pl), surl) - // glog.Info(pl) now := time.Now() var lastTimeDownloadStarted time.Time for i, segment := range pl.Segments { @@ -1083,10 +1012,8 @@ func (ms *m3uMediaStream) manifestPullerLoop(wowzaMode bool) { } lastTimeDownloadStarted = time.Now() now = now.Add(time.Millisecond) - // glog.V(model.VERBOSE).Infof("segment %s is of length %f seqId=%d", segment.URI, segment.Duration, segment.SeqId) } } - // pl.Live = false if pl.Len() > 0 && (!pl.Live || pl.MediaType == m3u8.EVENT) { // VoD and Event's should show the entire playlist glog.Infof("Playlist %s is VOD, so stopping manifest puller loop", surl) diff --git a/internal/testers/mediadownloader.go b/internal/testers/mediadownloader.go index 7090a35c7..d4b6d6980 100644 --- a/internal/testers/mediadownloader.go +++ b/internal/testers/mediadownloader.go @@ -492,20 +492,11 @@ func (md *mediaDownloader) manifestDownloadLoop() { time.Sleep(time.Second) continue } - // if !md.source { - // fmt.Println("-----################") - // fmt.Println(string(b)) - // } - // glog.Infoln("-----################") - // glog.Infoln(string(b)) - // err = mpl.DecodeFrom(resp.Body, true) pl, err := m3u8.NewMediaPlaylist(100, 100) if err != nil { glog.Fatal(err) } err = pl.Decode(*bytes.NewBuffer(b), true) - // err = pl.DecodeFrom(resp.Body, true) - // resp.Body.Close() if err != nil { glog.Fatal(err) } @@ -523,12 +514,9 @@ func (md *mediaDownloader) manifestDownloadLoop() { md.savePlayList.SeqNo = pl.SeqNo gotManifest = true } - // for i := len(pl.Segments) - 1; i >= 0; i-- now := time.Now() for i, segment := range pl.Segments { - // segment := pl.Segments[i] if segment != nil { - // glog.Infof("Segment: %+v", *segment) if md.wowzaMode { // remove Wowza's session id from URL segment.URI = wowzaSessionRE.ReplaceAllString(segment.URI, "_") diff --git a/internal/testers/mistcontroller.go b/internal/testers/mistcontroller.go index a71882b77..d8a7d550b 100644 --- a/internal/testers/mistcontroller.go +++ b/internal/testers/mistcontroller.go @@ -24,14 +24,12 @@ import ( "github.com/livepeer/stream-tester/messenger" "github.com/livepeer/stream-tester/model" "github.com/patrickmn/go-cache" - "golang.org/x/text/message" ) const ( // picartoCountry = "us-east1" hlsURLTemplate = "http://%s:8080/hls/golive+%s/index.m3u8" baseStreamName = "golive" - streamsStartStep = 5 mainLoopStepDuration = 32 * time.Second ) @@ -73,7 +71,6 @@ var ( // ErrTooBigDurationsDeviation ... ErrTooBigDurationsDeviation = errors.New("Too big deviation of segment's durations") - mp = message.NewPrinter(message.MatchLanguage("en")) mhttpClient = &http.Client{ // Transport: &http2.Transport{TLSClientConfig: tlsConfig}, // Transport: &http2.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: false}}, diff --git a/internal/testers/rtmp_streamer.go b/internal/testers/rtmp_streamer.go index 317785e14..a3d0329fe 100644 --- a/internal/testers/rtmp_streamer.go +++ b/internal/testers/rtmp_streamer.go @@ -192,11 +192,6 @@ func (rs *rtmpStreamer) StartUpload(fn, rtmpURL string, streamDuration, waitForT rs.cancel() }() - // pio.RecommendBufioSize = 1024 * 8 - // rtmp.Debug = true - // rtmp.Debug2 = true - // conn, err := rtmp.Dial("rtmp://localhost:1935/" + manifestID) - // conn, err := rtmp.Dial(rtmpURL) started := time.Now() rs.started = started for { diff --git a/internal/testers/saver.go b/internal/testers/saver.go index 1dc0b9f36..686b201e3 100644 --- a/internal/testers/saver.go +++ b/internal/testers/saver.go @@ -12,7 +12,6 @@ import ( // SaveNewStreams monitors /status endpoint of provided broadcasters, and then 1 new stream // arrives - pulls it and save to files func SaveNewStreams(ctx context.Context, broadcasters ...string) { - if len(broadcasters) == 0 { panic("no broadcasters provided") } diff --git a/internal/testers/segmenter.go b/internal/testers/segmenter.go index ff89f377a..494296dee 100644 --- a/internal/testers/segmenter.go +++ b/internal/testers/segmenter.go @@ -17,15 +17,6 @@ import ( ) // segmenter take video file and cuts it into .ts segments -// type segmenter struct { -// stopAtFileEnd bool -// } - -// func newSegmenter(stopAtFileEnd bool) *segmenter { -// return &segmenter{ -// stopAtFileEnd: stopAtFileEnd, -// } -// } func StartSegmentingR(ctx context.Context, reader io.ReadSeekCloser, stopAtFileEnd bool, stopAfter, skipFirst, segLen time.Duration, useWallTime bool, out chan<- *model.HlsSegment) error { inFile, err := avutil.OpenRC(reader) diff --git a/messenger/dbot.go b/messenger/dbot.go index 651684638..4509975e9 100644 --- a/messenger/dbot.go +++ b/messenger/dbot.go @@ -119,40 +119,6 @@ func (bot *discordBot) start() { glog.Infof("Bot stopped") } -// This function will be called (due to AddHandler above) every time a new -// message is created on any channel that the autenticated bot has access to. -func messageCreate(s *discordgo.Session, m *discordgo.MessageCreate) { - - // selfMessage := m.Author.ID == s.State.User.ID - // glog.Infof("got self=%v message %+m", selfMessage, m) - mg := "" - if m.Member != nil { - mg = m.Member.GuildID - } - glog.Infof("Channl id %s eq user id %v (my user id %s) mess guild %s member guild %s", m.ChannelID, m.ChannelID == s.State.User.ID, s.State.User.ID, - m.GuildID, mg) - - // Ignore all messages created by the bot itself - // This isn't required in this specific example but it's a good practice. - if m.Author.ID == s.State.User.ID { - return - } - isDirectMessage := m.GuildID == "" - if !isDirectMessage { - return - } - - // If the message is "ping" reply with "Pong!" - if m.Content == "ping" { - s.ChannelMessageSend(m.ChannelID, "Pong!") - } - - // If the message is "pong" reply with "Ping!" - if m.Content == "pong" { - s.ChannelMessageSend(m.ChannelID, "Ping!") - } -} - // This function will be called (due to AddHandler above) when the bot receives // the "ready" event from Discord. func ready(s *discordgo.Session, event *discordgo.Ready) { @@ -164,18 +130,7 @@ func ready(s *discordgo.Session, event *discordgo.Ready) { func (bot *discordBot) setupRouter() *exrouter.Route { router := exrouter.New() - /* - router.On("sub", nil). - On("sub2", func(ctx *exrouter.Context) { - ctx.Reply("sub2 called with arguments:\n", strings.Join(ctx.Args, ";")) - }). - On("sub3", func(ctx *exrouter.Context) { - ctx.Reply("sub3 called with arguments:\n", strings.Join(ctx.Args, ";")) - }) - */ router.On("bstreams", bot.bstreams).Desc("Show active streams across all broadcasters available through API in AC") - // router.On("stats", bot.stats) - // router.On("img", bot.imgTest) router.Default = router.On("help", func(ctx *exrouter.Context) { var f func(depth int, r *exrouter.Route) string @@ -192,26 +147,6 @@ func (bot *discordBot) setupRouter() *exrouter.Route { return router } -func (bot *discordBot) imgTest(ctx *exrouter.Context) { - ctx.Reply("Hello") - // ctx.Ses.ChannelMessageSendComplex(ctx.Msg.ChannelID, &discordgo.MessageSend{discordgo.File: &discordgo.File{Name: name, Reader: r}}) - name := "name01.jpg" - contType := "image/jpeg" - img := codec.TSFirstImage(nil) - if img == nil { - return - } - jpg := utils.Img2Jpeg(img) - r := bytes.NewReader(jpg) - ctx.Ses.ChannelMessageSendComplex(ctx.Msg.ChannelID, &discordgo.MessageSend{ - File: &discordgo.File{ - Name: name, - ContentType: contType, - Reader: r, - }, - }) -} - type streamDesc struct { broadcaster string mid string diff --git a/messenger/messenger.go b/messenger/messenger.go index 7ad6bb3c0..14e9ed4c5 100644 --- a/messenger/messenger.go +++ b/messenger/messenger.go @@ -253,12 +253,7 @@ func sendLoop() { continue } } - // if !timer.Stop() { - // <-timer.C - // } - // timer.Reset(2 * time.Second) timer = time.NewTimer(2 * time.Second) - // glog.Infof("Reset for 2s done") continue } } @@ -271,32 +266,13 @@ func sendLoop() { frlreset, err := strconv.ParseFloat(rlreset, 64) if err != nil { panic(err) - continue } wait := time.Duration(frlreset*1000.0+100.0) * time.Millisecond glog.V(model.VVERBOSE).Infof("Need wait %s", wait) goodAfter = time.Now().Add(wait) - // timer.Reset(wait) timer = time.NewTimer(wait) - // glog.Infof("Reset for %s done", wait) - /* - rlreset := headers.Get("X-Ratelimit-Reset") - if rlreset == "" { - continue - } - frlreset, err := strconv.ParseFloat(rlreset, 64) - if err != nil { - continue - } - nextTime := time.Unix(0, order.Created*int64(time.Millisecond)) - */ } else if len(msgQueue) > 0 { - // glog.Infof("Queue not empty") - // if !timer.Stop() { - // <-timer.C - // } - // timer.Reset(50 * time.Millisecond) timer = time.NewTimer(50 * time.Millisecond) } } @@ -353,17 +329,8 @@ func postMessage(msg []byte) (int, http.Header) { if webhookURL == "" { return 0, nil } - /* - dm := &discordMessage{ - Content: msg, - UserName: userName, - } - data, _ := json.Marshal(dm) - var body io.Reader - */ body := bytes.NewReader(msg) - // resp, err := http.Post(webhookURL, "application/json", body) - req, _ := http.NewRequest("POST", webhookURL, body) + req, _ := http.NewRequest(http.MethodPost, webhookURL, body) req.Header.Add("User-Agent", "stream-tester/"+model.Version) req.Header.Add("Content-Type", "application/json") req.Header.Add("X-RateLimit-Precision", "millisecond") diff --git a/model/models.go b/model/models.go index da8c5a500..75f98b644 100644 --- a/model/models.go +++ b/model/models.go @@ -53,14 +53,6 @@ var FailHardOnBadSegments bool // ExitCode exit code var ExitCode int -// InfinitePuller interface -/* -type InfinitePuller interface { - // Start blocks - Start() -} -*/ - // IFinite public interface type IFinite interface { Done() <-chan struct{} @@ -105,17 +97,9 @@ type StreamStarter func(ctx context.Context, sourceFileName string, waitForTarge // Streamer interface (deprecated) type Streamer interface { IFinite - // StartStreams2(sourceFileName, traceID string, streamDuration time.Duration) error - // StartStreams old one, shouldn't be used StartStreams(sourceFileName, bhost, rtmpPort, mhost, mediaPort string, simStreams, repeat uint, streamDuration time.Duration, notFinal, measureLatency, noBar bool, groupStartBy int, startDelayBetweenGroups, waitForTarget time.Duration) (string, error) Stats(basedManifestID string) (*Stats, error) - // StatsFormatted() string - // DownStatsFormatted() string - // AnalyzeFormatted(short bool) string - // Done() <-chan struct{} - // // Stop() // Stop active streams - // Cancel() } // Latencies contains latencies @@ -310,16 +294,7 @@ Bytes dowloaded: %12d`, st.RTMPstreams, st.MediaStreams, st.SentKeyFrames, st.DownloadedKeyFrames, st.DownloadedSourceSegments, st.DownloadedTranscodedSegments, st.SuccessRate2, st.BytesDownloaded) if len(st.Errors) > 0 { r += "\n" - // r = fmt.Sprintf("%s\nErrors: %+v\n", r, st.Errors) } - /* disable temporarily - if len(st.RawTranscodeLatenciesPerStream) > 0 { - r += "\nTranscode latencies per stream:\n" - for _, rtl := range st.RawTranscodeLatenciesPerStream { - r += formatLatenciesSlice(rtl) + "\n" - } - } - */ return r } @@ -344,18 +319,3 @@ func (ls *Latencies) String() string { r := fmt.Sprintf(`{Average: %s, P50: %s, P95: %s, P99: %s}`, ls.Avg, ls.P50, ls.P95, ls.P99) return r } - -func formatLatenciesSlice(lat []time.Duration) string { - var buf []string - if len(lat) < 17 { - return fmt.Sprintf("%+v", lat) - } - for i := 0; i < 8; i++ { - buf = append(buf, fmt.Sprintf("%s", lat[i])) - } - buf = append(buf, "...") - for i := len(lat) - 9; i < len(lat); i++ { - buf = append(buf, fmt.Sprintf("%s", lat[i])) - } - return "[" + strings.Join(buf, " ") + "]" -}