From c9002a7e4759a87a5bab2f002ff2b450157ddcfb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Fri, 29 Jul 2022 23:46:37 +0200 Subject: [PATCH] Add playback redirection to the closest Mist node (#61) * Add playback redirection to the closest Mist node * Update for `http-addr` flag and Mist Load Balancer 'video+' prefix * Add E2E test for stream redirection * Remove unused params in E2E Test * Update cmd/catalyst-node/catalyst-node.go Co-authored-by: Eli Mallon * Update cmd/catalyst-node/catalyst-node.go Co-authored-by: Eli Mallon * Update cmd/catalyst-node/catalyst-node_test.go Co-authored-by: Eli Mallon * Update cmd/catalyst-node/catalyst-node.go Co-authored-by: Eli Mallon * Minor fixes after code review * Handle Mist returning "FULL" from LoadBalancer API * go fmt * Fix Revive warnings * Fix CI builds Co-authored-by: Eli Mallon --- cmd/catalyst-node/catalyst-node.go | 77 ++++++++++++++++++++++++ cmd/catalyst-node/catalyst-node_test.go | 80 +++++++++++++++++++++++++ test/e2e/e2e_test.go | 79 ++++++++++++++++++++---- 3 files changed, 224 insertions(+), 12 deletions(-) create mode 100644 cmd/catalyst-node/catalyst-node_test.go diff --git a/cmd/catalyst-node/catalyst-node.go b/cmd/catalyst-node/catalyst-node.go index 336247de2..00ef86cdf 100644 --- a/cmd/catalyst-node/catalyst-node.go +++ b/cmd/catalyst-node/catalyst-node.go @@ -10,6 +10,7 @@ import ( "net/url" "os" "os/exec" + "regexp" "runtime" "strings" "time" @@ -23,6 +24,11 @@ import ( "github.com/peterbourgon/ff/v3" ) +const ( + httpPort = "8090" + mistUtilLoadPort = "8042" +) + var Version = "unknown" type catalystConfig struct { @@ -245,6 +251,9 @@ func main() { runBalancer := fs.Bool("run-balancer", true, "run MistUtilLoad") balancerArgs := fs.String("balancer-args", "", "arguments passed to MistUtilLoad") + // Catalyst web server + httpAddr := fs.String("http-addr", fmt.Sprintf("127.0.0.1:%s", httpPort), "Address to bind for Catalyst HTTP commands") + // Serf commands passed straight through to the agent serfConfig := agent.Config{} fs.StringVar(&serfConfig.BindAddr, "bind", "0.0.0.0:9935", "Address to bind network listeners to. To use an IPv6 address, specify [::1] or [::1]:7946.") @@ -283,6 +292,8 @@ func main() { return } + go startCatalystWebServer(*httpAddr) + if *retryJoin != "" { serfConfig.RetryJoin = strings.Split(*retryJoin, ",") } @@ -382,3 +393,69 @@ func writeSerfConfig(config *agent.Config) (string, error) { return tmpFile.Name(), err } + +func startCatalystWebServer(httpAddr string) { + http.Handle("/hls/", redirectHlsHandler()) + glog.Infof("HTTP server listening on %s", httpAddr) + glog.Fatal(http.ListenAndServe(httpAddr, nil)) +} + +var getClosestNode = queryMistForClosestNode + +func redirectHlsHandler() http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + playbackID, isValid := parsePlaybackID(r.URL.Path) + if !isValid { + w.WriteHeader(http.StatusNotFound) + return + } + + nodeAddr, err := getClosestNode(playbackID) + if err != nil { + glog.Errorf("error finding origin server playbackID=%s error=%s", playbackID, err) + w.WriteHeader(http.StatusNotFound) + return + } + + rURL := fmt.Sprintf("%s://%s/hls/%s/index.m3u8", protocol(r), nodeAddr, playbackID) + http.Redirect(w, r, rURL, http.StatusFound) + }) +} + +func parsePlaybackID(path string) (string, bool) { + r := regexp.MustCompile("^/hls/([a-zA-Z0-9_\\-+]+)/index.m3u8$") + m := r.FindStringSubmatch(path) + if len(m) < 2 { + return "", false + } + return m[1], true +} + +func protocol(r *http.Request) string { + if r.Header.Get("X-Forwarded-Proto") == "https" { + return "https" + } + return "http" +} + +func queryMistForClosestNode(playbackID string) (string, error) { + url := fmt.Sprintf("http://localhost:%s/%s", mistUtilLoadPort, playbackID) + resp, err := http.Get(url) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("GET request '%s' failed with http status code %d", url, resp.StatusCode) + } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("GET request '%s' failed while reading response body", url) + } + if string(body) == "FULL" { + return "", fmt.Errorf("GET request '%s' returned 'FULL'", url) + } + + return string(body), nil +} diff --git a/cmd/catalyst-node/catalyst-node_test.go b/cmd/catalyst-node/catalyst-node_test.go new file mode 100644 index 000000000..9bf74e30a --- /dev/null +++ b/cmd/catalyst-node/catalyst-node_test.go @@ -0,0 +1,80 @@ +package main + +import ( + "fmt" + "github.com/stretchr/testify/require" + "net/http" + "net/http/httptest" + "testing" +) + +const ( + closestNodeAddr = "someurl.com" + playbackID = "abc_XYZ-123" +) + +func TestRedirectHandler_Correct(t *testing.T) { + defaultFunc := getClosestNode + getClosestNode = func(string) (string, error) { return closestNodeAddr, nil } + defer func() { getClosestNode = defaultFunc }() + + requireReq(t, fmt.Sprintf("/hls/%s/index.m3u8", playbackID)). + result(). + hasStatus(http.StatusFound). + hasHeader("Location", fmt.Sprintf("http://%s/hls/%s/index.m3u8", closestNodeAddr, playbackID)) + + requireReq(t, fmt.Sprintf("/hls/%s/index.m3u8", playbackID)). + withHeader("X-Forwarded-Proto", "https"). + result(). + hasStatus(http.StatusFound). + hasHeader("Location", fmt.Sprintf("https://%s/hls/%s/index.m3u8", closestNodeAddr, playbackID)) +} + +func TestRedirectHandler_InvalidPath(t *testing.T) { + requireReq(t, "/hls").result().hasStatus(http.StatusNotFound) + requireReq(t, "/hls").result().hasStatus(http.StatusNotFound) + requireReq(t, "/hls/").result().hasStatus(http.StatusNotFound) + requireReq(t, "/hls/12345").result().hasStatus(http.StatusNotFound) + requireReq(t, "/hls/12345/somepath").result().hasStatus(http.StatusNotFound) + requireReq(t, "/hls/12345/somepath/index.m3u8").result().hasStatus(http.StatusNotFound) +} + +type httpReq struct { + *testing.T + *http.Request +} + +type httpCheck struct { + *testing.T + *httptest.ResponseRecorder +} + +func requireReq(t *testing.T, path string) httpReq { + req, err := http.NewRequest("GET", path, nil) + if err != nil { + t.Fatal(err) + } + + return httpReq{t, req} +} + +func (hr httpReq) withHeader(key, value string) httpReq { + hr.Header.Set(key, value) + return hr +} + +func (hr httpReq) result() httpCheck { + rr := httptest.NewRecorder() + redirectHlsHandler().ServeHTTP(rr, hr.Request) + return httpCheck{hr.T, rr} +} + +func (hc httpCheck) hasStatus(code int) httpCheck { + require.Equal(hc, code, hc.Code) + return hc +} + +func (hc httpCheck) hasHeader(key, value string) httpCheck { + require.Equal(hc, value, hc.Header().Get(key)) + return hc +} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index c62c1932b..f528c5a0e 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -21,10 +21,11 @@ import ( ) const ( - webConsolePort = "4242" - serfPort = "7373" - httpPort = "8080" - rtmpPort = "1935" + webConsolePort = "4242" + serfPort = "7373" + httpPort = "8080" + httpCatalystPort = "8090" + rtmpPort = "1935" ) type cliParams struct { @@ -62,10 +63,12 @@ type network struct { type catalystContainer struct { testcontainers.Container - webConsole string - serf string - http string - rtmp string + webConsole string + serf string + http string + httpCatalyst string + rtmp string + ip string } func (c *catalystContainer) Terminate(ctx context.Context) { @@ -96,7 +99,12 @@ func TestMultiNodeCatalyst(t *testing.T) { // then requireTwoMembers(t, c1, c2) - requireReplicatedStream(t, c1, c2) + + p := startStream(t, c1) + defer p.Kill() + + requireReplicatedStream(t, c2) + requireStreamRedirection(t, c1, c2) } func createNetwork(ctx context.Context, t *testing.T) *network { @@ -135,9 +143,12 @@ func startCatalyst(ctx context.Context, t *testing.T, hostname, network string, req := testcontainers.ContainerRequest{ Image: params.ImageName, - ExposedPorts: []string{tcp(webConsolePort), tcp(serfPort), tcp(httpPort), tcp(rtmpPort)}, + ExposedPorts: []string{tcp(webConsolePort), tcp(serfPort), tcp(httpPort), tcp(httpCatalystPort), tcp(rtmpPort)}, Hostname: hostname, Networks: []string{network}, + Env: map[string]string{ + "CATALYST_NODE_HTTP_ADDR": "0.0.0.0:8090", + }, Mounts: []testcontainers.ContainerMount{{ Source: testcontainers.GenericBindMountSource{ HostPath: configAbsPath, @@ -174,10 +185,22 @@ func startCatalyst(ctx context.Context, t *testing.T, hostname, network string, require.NoError(t, err) catalyst.http = mappedPort.Port() + mappedPort, err = container.MappedPort(ctx, httpCatalystPort) + require.NoError(t, err) + catalyst.httpCatalyst = mappedPort.Port() + mappedPort, err = container.MappedPort(ctx, rtmpPort) require.NoError(t, err) catalyst.rtmp = mappedPort.Port() + // container IP + cid := container.GetContainerID() + dockerClient, _, _, err := testcontainers.NewDockerClient() + require.NoError(t, err) + inspect, err := dockerClient.ContainerInspect(ctx, cid) + require.NoError(t, err) + catalyst.ip = inspect.NetworkSettings.Networks[network].IPAddress + return catalyst } @@ -201,15 +224,17 @@ func requireTwoMembers(t *testing.T, containers ...*catalystContainer) { require.Eventually(t, numberOfMembersIsTwo, 5*time.Minute, time.Second) } -func requireReplicatedStream(t *testing.T, c1 *catalystContainer, c2 *catalystContainer) { +func startStream(t *testing.T, c1 *catalystContainer) *os.Process { // Send a stream to the node catalyst-one ffmpegParams := []string{"-re", "-f", "lavfi", "-i", "testsrc=size=1920x1080:rate=30,format=yuv420p", "-f", "lavfi", "-i", "sine", "-c:v", "libx264", "-b:v", "1000k", "-x264-params", "keyint=60", "-c:a", "aac", "-f", "flv"} ffmpegParams = append(ffmpegParams, fmt.Sprintf("rtmp://localhost:%s/live/stream+foo", c1.rtmp)) cmd := exec.Command("ffmpeg", ffmpegParams...) err := cmd.Start() require.NoError(t, err) - defer cmd.Process.Kill() + return cmd.Process +} +func requireReplicatedStream(t *testing.T, c2 *catalystContainer) { // Read stream from the node catalyst-two correctStream := func() bool { resp, err := http.Get(fmt.Sprintf("http://localhost:%s/hls/stream+foo/index.m3u8", c2.http)) @@ -231,3 +256,33 @@ func requireReplicatedStream(t *testing.T, c1 *catalystContainer, c2 *catalystCo } require.Eventually(t, correctStream, 5*time.Minute, time.Second) } + +func requireStreamRedirection(t *testing.T, c1 *catalystContainer, c2 *catalystContainer) { + require := require.New(t) + redirect := func() bool { + client := &http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + } + resp, err := client.Get(fmt.Sprintf("http://localhost:%s/hls/stream+foo/index.m3u8", c1.httpCatalyst)) + if err != nil { + return false + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusFound { + return false + } + + c1URL := fmt.Sprintf("http://%s/hls/stream+foo/index.m3u8", c1.ip) + c2URL := fmt.Sprintf("http://%s/hls/stream+foo/index.m3u8", c2.ip) + rURL := resp.Header.Get("Location") + if rURL == c1URL || rURL == c2URL { + return true + } + + return false + } + require.Eventually(redirect, 5*time.Minute, time.Second) +}