Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catalyst-node: crash on serf failures #262

Merged
merged 3 commits into from
Nov 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 43 additions & 24 deletions cmd/catalyst-node/catalyst-node.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ func runClient(config catalystConfig) error {
event := <-inbox
glog.V(5).Infof("got event: %v", event)

members, err := client.MembersFiltered(mediaFilter, ".*", ".*")
members, err := membersFiltered(mediaFilter, ".*", ".*")

if err != nil {
glog.Errorf("Error getting serf, will retry: %v\n", err)
continue
glog.Errorf("Error getting serf, crashing: %v\n", err)
break
}

balancedServers, err := getMistLoadBalancerServers(config.mistLoadBalancerEndpoint)
Expand Down Expand Up @@ -164,6 +164,8 @@ func runClient(config catalystConfig) error {
}
}
}

return nil
}

func connectSerfAgent(serfRPCAddress, serfRPCAuthKey string) (*serfclient.RPCClient, error) {
Expand Down Expand Up @@ -240,35 +242,33 @@ func getMistLoadBalancerServers(endpoint string) (map[string]interface{}, error)
return mistResponse, nil
}

func execBalancer(balancerArgs []string) error {
func execBalancer(balancerArgs []string) (chan any, error) {
args := append(balancerArgs, "-p", fmt.Sprintf("%d", mistUtilLoadPort))
glog.Infof("Running MistUtilLoad with %v", args)
cmd := exec.Command("MistUtilLoad", args...)

cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

killchan := make(chan any, 1)

go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
s := <-c
glog.Errorf("caught signal=%v killing MistUtilLoad", s)
cmd.Process.Kill()
}
<-killchan
glog.Infof("killing MistUtilLoad")
cmd.Process.Kill()
}()

err := cmd.Start()
if err != nil {
return err
return nil, err
}

err = cmd.Wait()
if err != nil {
return err
}
go func() {
err = cmd.Wait()
panic(fmt.Sprintf("MistUtilLoad exited err=%s", err))
}()

return fmt.Errorf("MistUtilLoad exited cleanly")
return killchan, nil
}

func main() {
Expand Down Expand Up @@ -349,11 +349,12 @@ func main() {
go startInternalWebServer(cliFlags.HTTPInternalAddress, cliFlags.NodeLatitude, cliFlags.NodeLongitude)

config.serfTags = serfConfig.Tags
var killchan chan any

if cliFlags.RunBalancer {
go func() {

err := execBalancer(strings.Split(cliFlags.BalancerArgs, " "))
var err error
killchan, err = execBalancer(strings.Split(cliFlags.BalancerArgs, " "))
if err != nil {
glog.Fatal(err)
}
Expand All @@ -367,8 +368,22 @@ func main() {
err := runClient(*config)
if err != nil {
glog.Errorf("Error starting client: %v", err)
time.Sleep(1 * time.Second)
continue
}
time.Sleep(1 * time.Second)
// nil error means we're shutting down
glog.Infof("Shutting down on Serf client failure")
killchan <- true
}
}()

go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
s := <-c
glog.Errorf("caught signal=%v killing MistUtilLoad", s)
killchan <- true
}
}()

Expand Down Expand Up @@ -574,7 +589,7 @@ func resolveNodeURL(streamURL string) (string, error) {
nodeName := u.Host
protocol := u.Scheme

member, err := getSerfMember(nodeName)
member, err := getSerfMember(map[string]string{}, "alive", nodeName)
if err != nil {
return "", err
}
Expand All @@ -594,8 +609,12 @@ func resolveNodeURL(streamURL string) (string, error) {
return u2.String(), nil
}

func querySerfForMember(name string) (*serfclient.Member, error) {
members, err := serfClient.MembersFiltered(map[string]string{}, "alive", name)
func membersFiltered(filter map[string]string, status, name string) ([]serfclient.Member, error) {
return serfClient.MembersFiltered(filter, status, name)
}

func member(filter map[string]string, status, name string) (*serfclient.Member, error) {
members, err := membersFiltered(filter, status, name)
if err != nil {
return nil, err
}
Expand All @@ -608,7 +627,7 @@ func querySerfForMember(name string) (*serfclient.Member, error) {
return &members[0], nil
}

var getSerfMember = querySerfForMember
var getSerfMember = member

// return the best node available for a given stream. will return any node if nobody has the stream.
func getBestNode(redirectPrefixes []string, playbackID, lat, lon, fallbackPrefix string) (string, string, error) {
Expand Down
10 changes: 5 additions & 5 deletions cmd/catalyst-node/catalyst-node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestRedirectHandler404(t *testing.T) {
defer func() { getClosestNode = defaultFunc }()

defaultSerf := getSerfMember
getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil }
getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil }
defer func() { getSerfMember = defaultSerf }()

path := fmt.Sprintf("/hls/%s/index.m3u8", playbackID)
Expand All @@ -130,7 +130,7 @@ func TestRedirectHandlerHLS_Correct(t *testing.T) {
}
defer func() { getClosestNode = defaultFunc }()
defaultSerf := getSerfMember
getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil }
getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil }
defer func() { getSerfMember = defaultSerf }()

path := fmt.Sprintf("/hls/%s/index.m3u8", playbackID)
Expand All @@ -154,7 +154,7 @@ func TestRedirectHandlerHLSVOD_Correct(t *testing.T) {
}
defer func() { getClosestNode = defaultFunc }()
defaultSerf := getSerfMember
getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil }
getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil }
defer func() { getSerfMember = defaultSerf }()

pathHLS := fmt.Sprintf("/hls/vod+%s/index.m3u8", playbackID)
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestRedirectHandlerHLS_SegmentInPath(t *testing.T) {
getClosestNode = func(string, string, string, string) (string, error) { return closestNodeAddr, nil }
defer func() { getClosestNode = defaultFunc }()
defaultSerf := getSerfMember
getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil }
getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil }
defer func() { getSerfMember = defaultSerf }()

seg := "4_1"
Expand All @@ -215,7 +215,7 @@ func TestRedirectHandlerJS_Correct(t *testing.T) {
getClosestNode = func(string, string, string, string) (string, error) { return closestNodeAddr, nil }
defer func() { getClosestNode = defaultFunc }()
defaultSerf := getSerfMember
getSerfMember = func(string) (*serfclient.Member, error) { return fakeSerfMember, nil }
getSerfMember = func(map[string]string, string, string) (*serfclient.Member, error) { return fakeSerfMember, nil }
defer func() { getSerfMember = defaultSerf }()

path := fmt.Sprintf("/json_%s.js", playbackID)
Expand Down