diff --git a/cmd/catalyst-node/catalyst-node.go b/cmd/catalyst-node/catalyst-node.go index 9b050b68c..09b32a3ea 100644 --- a/cmd/catalyst-node/catalyst-node.go +++ b/cmd/catalyst-node/catalyst-node.go @@ -109,11 +109,11 @@ func runClient(config catalystConfig) error { event := <-inbox glog.V(5).Infof("got event: %v", event) - members, err := client.MembersFiltered(mediaFilter, ".*", ".*") + members, err := membersFiltered(mediaFilter, ".*", ".*") if err != nil { - glog.Errorf("Error getting serf, will retry: %v\n", err) - continue + glog.Errorf("Error getting serf, crashing: %v\n", err) + break } balancedServers, err := getMistLoadBalancerServers(config.mistLoadBalancerEndpoint) @@ -164,6 +164,8 @@ func runClient(config catalystConfig) error { } } } + + return nil } func connectSerfAgent(serfRPCAddress, serfRPCAuthKey string) (*serfclient.RPCClient, error) { @@ -240,7 +242,7 @@ func getMistLoadBalancerServers(endpoint string) (map[string]interface{}, error) return mistResponse, nil } -func execBalancer(balancerArgs []string) error { +func execBalancer(balancerArgs []string) (chan any, error) { args := append(balancerArgs, "-p", fmt.Sprintf("%d", mistUtilLoadPort)) glog.Infof("Running MistUtilLoad with %v", args) cmd := exec.Command("MistUtilLoad", args...) @@ -248,27 +250,25 @@ func execBalancer(balancerArgs []string) error { cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr + killchan := make(chan any, 1) + go func() { - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) - for { - s := <-c - glog.Errorf("caught signal=%v killing MistUtilLoad", s) - cmd.Process.Kill() - } + <-killchan + glog.Infof("killing MistUtilLoad") + cmd.Process.Kill() }() err := cmd.Start() if err != nil { - return err + return nil, err } - err = cmd.Wait() - if err != nil { - return err - } + go func() { + err = cmd.Wait() + panic(fmt.Sprintf("MistUtilLoad exited err=%s", err)) + }() - return fmt.Errorf("MistUtilLoad exited cleanly") + return killchan, nil } func main() { @@ -349,11 +349,12 @@ func main() { go startInternalWebServer(cliFlags.HTTPInternalAddress, cliFlags.NodeLatitude, cliFlags.NodeLongitude) config.serfTags = serfConfig.Tags + var killchan chan any if cliFlags.RunBalancer { go func() { - - err := execBalancer(strings.Split(cliFlags.BalancerArgs, " ")) + var err error + killchan, err = execBalancer(strings.Split(cliFlags.BalancerArgs, " ")) if err != nil { glog.Fatal(err) } @@ -367,8 +368,22 @@ func main() { err := runClient(*config) if err != nil { glog.Errorf("Error starting client: %v", err) + time.Sleep(1 * time.Second) + continue } - time.Sleep(1 * time.Second) + // nil error means we're shutting down + glog.Infof("Shutting down on Serf client failure") + killchan <- true + } + }() + + go func() { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) + for { + s := <-c + glog.Errorf("caught signal=%v killing MistUtilLoad", s) + killchan <- true } }() @@ -574,7 +589,7 @@ func resolveNodeURL(streamURL string) (string, error) { nodeName := u.Host protocol := u.Scheme - member, err := getSerfMember(nodeName) + member, err := getSerfMember(map[string]string{}, "alive", nodeName) if err != nil { return "", err } @@ -594,8 +609,12 @@ func resolveNodeURL(streamURL string) (string, error) { return u2.String(), nil } -func querySerfForMember(name string) (*serfclient.Member, error) { - members, err := serfClient.MembersFiltered(map[string]string{}, "alive", name) +func membersFiltered(filter map[string]string, status, name string) ([]serfclient.Member, error) { + return serfClient.MembersFiltered(filter, status, name) +} + +func member(filter map[string]string, status, name string) (*serfclient.Member, error) { + members, err := membersFiltered(filter, status, name) if err != nil { return nil, err } @@ -608,7 +627,7 @@ func querySerfForMember(name string) (*serfclient.Member, error) { return &members[0], nil } -var getSerfMember = querySerfForMember +var getSerfMember = member // return the best node available for a given stream. will return any node if nobody has the stream. func getBestNode(redirectPrefixes []string, playbackID, lat, lon, fallbackPrefix string) (string, string, error) { diff --git a/cmd/catalyst-node/catalyst-node_test.go b/cmd/catalyst-node/catalyst-node_test.go index ab999e326..2d512cb26 100644 --- a/cmd/catalyst-node/catalyst-node_test.go +++ b/cmd/catalyst-node/catalyst-node_test.go @@ -106,7 +106,7 @@ func TestRedirectHandler404(t *testing.T) { defer func() { getClosestNode = defaultFunc }() defaultSerf := getSerfMember - getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil } + getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil } defer func() { getSerfMember = defaultSerf }() path := fmt.Sprintf("/hls/%s/index.m3u8", playbackID) @@ -130,7 +130,7 @@ func TestRedirectHandlerHLS_Correct(t *testing.T) { } defer func() { getClosestNode = defaultFunc }() defaultSerf := getSerfMember - getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil } + getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil } defer func() { getSerfMember = defaultSerf }() path := fmt.Sprintf("/hls/%s/index.m3u8", playbackID) @@ -154,7 +154,7 @@ func TestRedirectHandlerHLSVOD_Correct(t *testing.T) { } defer func() { getClosestNode = defaultFunc }() defaultSerf := getSerfMember - getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil } + getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil } defer func() { getSerfMember = defaultSerf }() pathHLS := fmt.Sprintf("/hls/vod+%s/index.m3u8", playbackID) @@ -189,7 +189,7 @@ func TestRedirectHandlerHLS_SegmentInPath(t *testing.T) { getClosestNode = func(string, string, string, string) (string, error) { return closestNodeAddr, nil } defer func() { getClosestNode = defaultFunc }() defaultSerf := getSerfMember - getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil } + getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil } defer func() { getSerfMember = defaultSerf }() seg := "4_1" @@ -215,7 +215,7 @@ func TestRedirectHandlerJS_Correct(t *testing.T) { getClosestNode = func(string, string, string, string) (string, error) { return closestNodeAddr, nil } defer func() { getClosestNode = defaultFunc }() defaultSerf := getSerfMember - getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil } + getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil } defer func() { getSerfMember = defaultSerf }() path := fmt.Sprintf("/json_%s.js", playbackID)