Skip to content

Commit

Permalink
Add playback redirection to the closest Mist node (#61)
Browse files Browse the repository at this point in the history
* Add playback redirection to the closest Mist node

* Update for `http-addr` flag and Mist Load Balancer 'video+' prefix

* Add E2E test for stream redirection

* Remove unused params in E2E Test

* Update cmd/catalyst-node/catalyst-node.go

Co-authored-by: Eli Mallon <eli@livepeer.org>

* Update cmd/catalyst-node/catalyst-node.go

Co-authored-by: Eli Mallon <eli@livepeer.org>

* Update cmd/catalyst-node/catalyst-node_test.go

Co-authored-by: Eli Mallon <eli@livepeer.org>

* Update cmd/catalyst-node/catalyst-node.go

Co-authored-by: Eli Mallon <eli@livepeer.org>

* Minor fixes after code review

* Handle Mist returning "FULL" from LoadBalancer API

* go fmt

* Fix Revive warnings

* Fix CI builds

Co-authored-by: Eli Mallon <eli@livepeer.org>
  • Loading branch information
leszko and iameli committed Jul 29, 2022
1 parent 3f0b406 commit c9002a7
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 12 deletions.
77 changes: 77 additions & 0 deletions cmd/catalyst-node/catalyst-node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"os"
"os/exec"
"regexp"
"runtime"
"strings"
"time"
Expand All @@ -23,6 +24,11 @@ import (
"github.com/peterbourgon/ff/v3"
)

const (
httpPort = "8090"
mistUtilLoadPort = "8042"
)

var Version = "unknown"

type catalystConfig struct {
Expand Down Expand Up @@ -245,6 +251,9 @@ func main() {
runBalancer := fs.Bool("run-balancer", true, "run MistUtilLoad")
balancerArgs := fs.String("balancer-args", "", "arguments passed to MistUtilLoad")

// Catalyst web server
httpAddr := fs.String("http-addr", fmt.Sprintf("127.0.0.1:%s", httpPort), "Address to bind for Catalyst HTTP commands")

// Serf commands passed straight through to the agent
serfConfig := agent.Config{}
fs.StringVar(&serfConfig.BindAddr, "bind", "0.0.0.0:9935", "Address to bind network listeners to. To use an IPv6 address, specify [::1] or [::1]:7946.")
Expand Down Expand Up @@ -283,6 +292,8 @@ func main() {
return
}

go startCatalystWebServer(*httpAddr)

if *retryJoin != "" {
serfConfig.RetryJoin = strings.Split(*retryJoin, ",")
}
Expand Down Expand Up @@ -382,3 +393,69 @@ func writeSerfConfig(config *agent.Config) (string, error) {

return tmpFile.Name(), err
}

func startCatalystWebServer(httpAddr string) {
http.Handle("/hls/", redirectHlsHandler())
glog.Infof("HTTP server listening on %s", httpAddr)
glog.Fatal(http.ListenAndServe(httpAddr, nil))
}

var getClosestNode = queryMistForClosestNode

func redirectHlsHandler() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
playbackID, isValid := parsePlaybackID(r.URL.Path)
if !isValid {
w.WriteHeader(http.StatusNotFound)
return
}

nodeAddr, err := getClosestNode(playbackID)
if err != nil {
glog.Errorf("error finding origin server playbackID=%s error=%s", playbackID, err)
w.WriteHeader(http.StatusNotFound)
return
}

rURL := fmt.Sprintf("%s://%s/hls/%s/index.m3u8", protocol(r), nodeAddr, playbackID)
http.Redirect(w, r, rURL, http.StatusFound)
})
}

func parsePlaybackID(path string) (string, bool) {
r := regexp.MustCompile("^/hls/([a-zA-Z0-9_\\-+]+)/index.m3u8$")
m := r.FindStringSubmatch(path)
if len(m) < 2 {
return "", false
}
return m[1], true
}

func protocol(r *http.Request) string {
if r.Header.Get("X-Forwarded-Proto") == "https" {
return "https"
}
return "http"
}

func queryMistForClosestNode(playbackID string) (string, error) {
url := fmt.Sprintf("http://localhost:%s/%s", mistUtilLoadPort, playbackID)
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("GET request '%s' failed with http status code %d", url, resp.StatusCode)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("GET request '%s' failed while reading response body", url)
}
if string(body) == "FULL" {
return "", fmt.Errorf("GET request '%s' returned 'FULL'", url)
}

return string(body), nil
}
80 changes: 80 additions & 0 deletions cmd/catalyst-node/catalyst-node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"fmt"
"github.com/stretchr/testify/require"
"net/http"
"net/http/httptest"
"testing"
)

const (
closestNodeAddr = "someurl.com"
playbackID = "abc_XYZ-123"
)

func TestRedirectHandler_Correct(t *testing.T) {
defaultFunc := getClosestNode
getClosestNode = func(string) (string, error) { return closestNodeAddr, nil }
defer func() { getClosestNode = defaultFunc }()

requireReq(t, fmt.Sprintf("/hls/%s/index.m3u8", playbackID)).
result().
hasStatus(http.StatusFound).
hasHeader("Location", fmt.Sprintf("http://%s/hls/%s/index.m3u8", closestNodeAddr, playbackID))

requireReq(t, fmt.Sprintf("/hls/%s/index.m3u8", playbackID)).
withHeader("X-Forwarded-Proto", "https").
result().
hasStatus(http.StatusFound).
hasHeader("Location", fmt.Sprintf("https://%s/hls/%s/index.m3u8", closestNodeAddr, playbackID))
}

func TestRedirectHandler_InvalidPath(t *testing.T) {
requireReq(t, "/hls").result().hasStatus(http.StatusNotFound)
requireReq(t, "/hls").result().hasStatus(http.StatusNotFound)
requireReq(t, "/hls/").result().hasStatus(http.StatusNotFound)
requireReq(t, "/hls/12345").result().hasStatus(http.StatusNotFound)
requireReq(t, "/hls/12345/somepath").result().hasStatus(http.StatusNotFound)
requireReq(t, "/hls/12345/somepath/index.m3u8").result().hasStatus(http.StatusNotFound)
}

type httpReq struct {
*testing.T
*http.Request
}

type httpCheck struct {
*testing.T
*httptest.ResponseRecorder
}

func requireReq(t *testing.T, path string) httpReq {
req, err := http.NewRequest("GET", path, nil)
if err != nil {
t.Fatal(err)
}

return httpReq{t, req}
}

func (hr httpReq) withHeader(key, value string) httpReq {
hr.Header.Set(key, value)
return hr
}

func (hr httpReq) result() httpCheck {
rr := httptest.NewRecorder()
redirectHlsHandler().ServeHTTP(rr, hr.Request)
return httpCheck{hr.T, rr}
}

func (hc httpCheck) hasStatus(code int) httpCheck {
require.Equal(hc, code, hc.Code)
return hc
}

func (hc httpCheck) hasHeader(key, value string) httpCheck {
require.Equal(hc, value, hc.Header().Get(key))
return hc
}
79 changes: 67 additions & 12 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
)

const (
webConsolePort = "4242"
serfPort = "7373"
httpPort = "8080"
rtmpPort = "1935"
webConsolePort = "4242"
serfPort = "7373"
httpPort = "8080"
httpCatalystPort = "8090"
rtmpPort = "1935"
)

type cliParams struct {
Expand Down Expand Up @@ -62,10 +63,12 @@ type network struct {

type catalystContainer struct {
testcontainers.Container
webConsole string
serf string
http string
rtmp string
webConsole string
serf string
http string
httpCatalyst string
rtmp string
ip string
}

func (c *catalystContainer) Terminate(ctx context.Context) {
Expand Down Expand Up @@ -96,7 +99,12 @@ func TestMultiNodeCatalyst(t *testing.T) {

// then
requireTwoMembers(t, c1, c2)
requireReplicatedStream(t, c1, c2)

p := startStream(t, c1)
defer p.Kill()

requireReplicatedStream(t, c2)
requireStreamRedirection(t, c1, c2)
}

func createNetwork(ctx context.Context, t *testing.T) *network {
Expand Down Expand Up @@ -135,9 +143,12 @@ func startCatalyst(ctx context.Context, t *testing.T, hostname, network string,

req := testcontainers.ContainerRequest{
Image: params.ImageName,
ExposedPorts: []string{tcp(webConsolePort), tcp(serfPort), tcp(httpPort), tcp(rtmpPort)},
ExposedPorts: []string{tcp(webConsolePort), tcp(serfPort), tcp(httpPort), tcp(httpCatalystPort), tcp(rtmpPort)},
Hostname: hostname,
Networks: []string{network},
Env: map[string]string{
"CATALYST_NODE_HTTP_ADDR": "0.0.0.0:8090",
},
Mounts: []testcontainers.ContainerMount{{
Source: testcontainers.GenericBindMountSource{
HostPath: configAbsPath,
Expand Down Expand Up @@ -174,10 +185,22 @@ func startCatalyst(ctx context.Context, t *testing.T, hostname, network string,
require.NoError(t, err)
catalyst.http = mappedPort.Port()

mappedPort, err = container.MappedPort(ctx, httpCatalystPort)
require.NoError(t, err)
catalyst.httpCatalyst = mappedPort.Port()

mappedPort, err = container.MappedPort(ctx, rtmpPort)
require.NoError(t, err)
catalyst.rtmp = mappedPort.Port()

// container IP
cid := container.GetContainerID()
dockerClient, _, _, err := testcontainers.NewDockerClient()
require.NoError(t, err)
inspect, err := dockerClient.ContainerInspect(ctx, cid)
require.NoError(t, err)
catalyst.ip = inspect.NetworkSettings.Networks[network].IPAddress

return catalyst
}

Expand All @@ -201,15 +224,17 @@ func requireTwoMembers(t *testing.T, containers ...*catalystContainer) {
require.Eventually(t, numberOfMembersIsTwo, 5*time.Minute, time.Second)
}

func requireReplicatedStream(t *testing.T, c1 *catalystContainer, c2 *catalystContainer) {
func startStream(t *testing.T, c1 *catalystContainer) *os.Process {
// Send a stream to the node catalyst-one
ffmpegParams := []string{"-re", "-f", "lavfi", "-i", "testsrc=size=1920x1080:rate=30,format=yuv420p", "-f", "lavfi", "-i", "sine", "-c:v", "libx264", "-b:v", "1000k", "-x264-params", "keyint=60", "-c:a", "aac", "-f", "flv"}
ffmpegParams = append(ffmpegParams, fmt.Sprintf("rtmp://localhost:%s/live/stream+foo", c1.rtmp))
cmd := exec.Command("ffmpeg", ffmpegParams...)
err := cmd.Start()
require.NoError(t, err)
defer cmd.Process.Kill()
return cmd.Process
}

func requireReplicatedStream(t *testing.T, c2 *catalystContainer) {
// Read stream from the node catalyst-two
correctStream := func() bool {
resp, err := http.Get(fmt.Sprintf("http://localhost:%s/hls/stream+foo/index.m3u8", c2.http))
Expand All @@ -231,3 +256,33 @@ func requireReplicatedStream(t *testing.T, c1 *catalystContainer, c2 *catalystCo
}
require.Eventually(t, correctStream, 5*time.Minute, time.Second)
}

func requireStreamRedirection(t *testing.T, c1 *catalystContainer, c2 *catalystContainer) {
require := require.New(t)
redirect := func() bool {
client := &http.Client{
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
resp, err := client.Get(fmt.Sprintf("http://localhost:%s/hls/stream+foo/index.m3u8", c1.httpCatalyst))
if err != nil {
return false
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusFound {
return false
}

c1URL := fmt.Sprintf("http://%s/hls/stream+foo/index.m3u8", c1.ip)
c2URL := fmt.Sprintf("http://%s/hls/stream+foo/index.m3u8", c2.ip)
rURL := resp.Header.Get("Location")
if rURL == c1URL || rURL == c2URL {
return true
}

return false
}
require.Eventually(redirect, 5*time.Minute, time.Second)
}

0 comments on commit c9002a7

Please sign in to comment.