From 9fb66636826ffe0ef53b96da7f635c8ced9b8835 Mon Sep 17 00:00:00 2001 From: Eli Mallon Date: Wed, 9 Nov 2022 22:05:08 -0800 Subject: [PATCH 1/3] catalyst-node: refactor membersFiltered into one code path --- cmd/catalyst-node/catalyst-node.go | 12 ++++++++---- cmd/catalyst-node/catalyst-node_test.go | 10 +++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/cmd/catalyst-node/catalyst-node.go b/cmd/catalyst-node/catalyst-node.go index 9b050b68c..888b878bb 100644 --- a/cmd/catalyst-node/catalyst-node.go +++ b/cmd/catalyst-node/catalyst-node.go @@ -574,7 +574,7 @@ func resolveNodeURL(streamURL string) (string, error) { nodeName := u.Host protocol := u.Scheme - member, err := getSerfMember(nodeName) + member, err := getSerfMember(map[string]string{}, "alive", nodeName) if err != nil { return "", err } @@ -594,8 +594,12 @@ func resolveNodeURL(streamURL string) (string, error) { return u2.String(), nil } -func querySerfForMember(name string) (*serfclient.Member, error) { - members, err := serfClient.MembersFiltered(map[string]string{}, "alive", name) +func membersFiltered(filter map[string]string, status, name string) ([]serfclient.Member, error) { + return serfClient.MembersFiltered(filter, status, name) +} + +func member(filter map[string]string, status, name string) (*serfclient.Member, error) { + members, err := membersFiltered(map[string]string{}, status, name) if err != nil { return nil, err } @@ -608,7 +612,7 @@ func querySerfForMember(name string) (*serfclient.Member, error) { return &members[0], nil } -var getSerfMember = querySerfForMember +var getSerfMember = member // return the best node available for a given stream. will return any node if nobody has the stream. func getBestNode(redirectPrefixes []string, playbackID, lat, lon, fallbackPrefix string) (string, string, error) { diff --git a/cmd/catalyst-node/catalyst-node_test.go b/cmd/catalyst-node/catalyst-node_test.go index ab999e326..2d512cb26 100644 --- a/cmd/catalyst-node/catalyst-node_test.go +++ b/cmd/catalyst-node/catalyst-node_test.go @@ -106,7 +106,7 @@ func TestRedirectHandler404(t *testing.T) { defer func() { getClosestNode = defaultFunc }() defaultSerf := getSerfMember - getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil } + getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil } defer func() { getSerfMember = defaultSerf }() path := fmt.Sprintf("/hls/%s/index.m3u8", playbackID) @@ -130,7 +130,7 @@ func TestRedirectHandlerHLS_Correct(t *testing.T) { } defer func() { getClosestNode = defaultFunc }() defaultSerf := getSerfMember - getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil } + getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil } defer func() { getSerfMember = defaultSerf }() path := fmt.Sprintf("/hls/%s/index.m3u8", playbackID) @@ -154,7 +154,7 @@ func TestRedirectHandlerHLSVOD_Correct(t *testing.T) { } defer func() { getClosestNode = defaultFunc }() defaultSerf := getSerfMember - getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil } + getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil } defer func() { getSerfMember = defaultSerf }() pathHLS := fmt.Sprintf("/hls/vod+%s/index.m3u8", playbackID) @@ -189,7 +189,7 @@ func TestRedirectHandlerHLS_SegmentInPath(t *testing.T) { getClosestNode = func(string, string, string, string) (string, error) { return closestNodeAddr, nil } defer func() { getClosestNode = defaultFunc }() defaultSerf := getSerfMember - getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil } + getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil } defer func() { getSerfMember = defaultSerf }() seg := "4_1" @@ -215,7 +215,7 @@ func TestRedirectHandlerJS_Correct(t *testing.T) { getClosestNode = func(string, string, string, string) (string, error) { return closestNodeAddr, nil } defer func() { getClosestNode = defaultFunc }() defaultSerf := getSerfMember - getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil } + getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil } defer func() { getSerfMember = defaultSerf }() path := fmt.Sprintf("/json_%s.js", playbackID) From 1859029305dbeff1f228bf41f1daef84364826e7 Mon Sep 17 00:00:00 2001 From: Eli Mallon Date: Wed, 9 Nov 2022 23:36:39 -0800 Subject: [PATCH 2/3] catalyst-node: crash on failure --- cmd/catalyst-node/catalyst-node.go | 49 ++++++++++++++++-------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/cmd/catalyst-node/catalyst-node.go b/cmd/catalyst-node/catalyst-node.go index 888b878bb..af0e6fc90 100644 --- a/cmd/catalyst-node/catalyst-node.go +++ b/cmd/catalyst-node/catalyst-node.go @@ -12,12 +12,10 @@ import ( "net/url" "os" "os/exec" - "os/signal" "regexp" "runtime" "strings" "sync" - "syscall" "time" serfclient "github.com/hashicorp/serf/client" @@ -109,11 +107,11 @@ func runClient(config catalystConfig) error { event := <-inbox glog.V(5).Infof("got event: %v", event) - members, err := client.MembersFiltered(mediaFilter, ".*", ".*") + members, err := membersFiltered(mediaFilter, ".*", ".*") if err != nil { - glog.Errorf("Error getting serf, will retry: %v\n", err) - continue + glog.Errorf("Error getting serf, crashing: %v\n", err) + break } balancedServers, err := getMistLoadBalancerServers(config.mistLoadBalancerEndpoint) @@ -164,6 +162,8 @@ func runClient(config catalystConfig) error { } } } + + return nil } func connectSerfAgent(serfRPCAddress, serfRPCAuthKey string) (*serfclient.RPCClient, error) { @@ -240,7 +240,7 @@ func getMistLoadBalancerServers(endpoint string) (map[string]interface{}, error) return mistResponse, nil } -func execBalancer(balancerArgs []string) error { +func execBalancer(balancerArgs []string) (chan any, error) { args := append(balancerArgs, "-p", fmt.Sprintf("%d", mistUtilLoadPort)) glog.Infof("Running MistUtilLoad with %v", args) cmd := exec.Command("MistUtilLoad", args...) @@ -248,27 +248,25 @@ func execBalancer(balancerArgs []string) error { cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr + killchan := make(chan any, 1) + go func() { - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) - for { - s := <-c - glog.Errorf("caught signal=%v killing MistUtilLoad", s) - cmd.Process.Kill() - } + <-killchan + glog.Infof("killing MistUtilLoad") + cmd.Process.Kill() }() err := cmd.Start() if err != nil { - return err + return nil, err } - err = cmd.Wait() - if err != nil { - return err - } + go func() { + err = cmd.Wait() + panic(fmt.Sprintf("MistUtilLoad exited err=%s", err)) + }() - return fmt.Errorf("MistUtilLoad exited cleanly") + return killchan, nil } func main() { @@ -349,11 +347,12 @@ func main() { go startInternalWebServer(cliFlags.HTTPInternalAddress, cliFlags.NodeLatitude, cliFlags.NodeLongitude) config.serfTags = serfConfig.Tags + var killchan chan any if cliFlags.RunBalancer { go func() { - - err := execBalancer(strings.Split(cliFlags.BalancerArgs, " ")) + var err error + killchan, err = execBalancer(strings.Split(cliFlags.BalancerArgs, " ")) if err != nil { glog.Fatal(err) } @@ -367,8 +366,12 @@ func main() { err := runClient(*config) if err != nil { glog.Errorf("Error starting client: %v", err) + time.Sleep(1 * time.Second) + continue } - time.Sleep(1 * time.Second) + // nil error means we're shutting down + glog.Infof("Shutting down on Serf client failure") + killchan <- true } }() @@ -599,7 +602,7 @@ func membersFiltered(filter map[string]string, status, name string) ([]serfclien } func member(filter map[string]string, status, name string) (*serfclient.Member, error) { - members, err := membersFiltered(map[string]string{}, status, name) + members, err := membersFiltered(filter, status, name) if err != nil { return nil, err } From 76cd4a0e86d52e7f6e9352ef3787250b8f307f54 Mon Sep 17 00:00:00 2001 From: Eli Mallon Date: Fri, 18 Nov 2022 08:10:13 -0800 Subject: [PATCH 3/3] catalyst-node: re-add MistUtilLoad shutdown --- cmd/catalyst-node/catalyst-node.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/cmd/catalyst-node/catalyst-node.go b/cmd/catalyst-node/catalyst-node.go index af0e6fc90..09b32a3ea 100644 --- a/cmd/catalyst-node/catalyst-node.go +++ b/cmd/catalyst-node/catalyst-node.go @@ -12,10 +12,12 @@ import ( "net/url" "os" "os/exec" + "os/signal" "regexp" "runtime" "strings" "sync" + "syscall" "time" serfclient "github.com/hashicorp/serf/client" @@ -375,6 +377,16 @@ func main() { } }() + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) + for { + s := <-c + glog.Errorf("caught signal=%v killing MistUtilLoad", s) + killchan <- true + } + }() + // Everything past this is booting up Serf tmpFile, err := writeSerfConfig(&serfConfig) if err != nil {