Skip to content

Commit

Permalink
Implement poison-pill method to exit infinite loops
Browse files Browse the repository at this point in the history
All cooperating goroutines regularly try to read from the shared "poison"
channel.  If the read succeeds, they exit by calling die(), assuming
somebody else cracked open the poison pill.

When any of these goroutines is done with its job, it signals other
goroutines to exit by calling open_poison() on the shared channel.

This approach takes advantage of the fact that reads from a closed
channel always succeed.

The driving goroutine (Client.Loop() in this case), is called from the
"main" goroutine. And because when the "main" goroutine exits, the whole
program exit (using os.Exit()) irrepective of liveness of other goroutines,
we could not use the same "poison" channel to wait in the driving goroutine.

Instead, we use sync.WaitGroup to wait for spawned goroutines, because
we want the spawned goroutines to cleanup and exit cleanly.
  • Loading branch information
gurjeet committed Dec 17, 2016
1 parent 3e895cf commit 731a3ad
Showing 1 changed file with 78 additions and 41 deletions.
119 changes: 78 additions & 41 deletions gotty-client.go
Expand Up @@ -13,7 +13,6 @@ import (
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -79,8 +78,7 @@ type Client struct {
URL string
WriteMutex *sync.Mutex
Output io.Writer
QuitChan chan struct{}
QuitChanClosed int32 // atomic value
poison chan bool
SkipTLSVerify bool
UseProxyFromEnv bool
Connected bool
Expand Down Expand Up @@ -210,12 +208,11 @@ func (c *Client) Close() {
c.Conn.Close()
}

// ExitLoop will kill all goroutine
// ExitLoop will kill all goroutines launched by c.Loop()
// ExitLoop() -> wait Loop() -> Close()
func (c *Client) ExitLoop() {
if atomic.CompareAndSwapInt32(&c.QuitChanClosed, 0, 1) {
close(c.QuitChan)
}
fname := "ExitLoop"
open_poison(fname, c.poison)
}

// Loop will look indefinitely for new messages
Expand All @@ -227,23 +224,21 @@ func (c *Client) Loop() error {
}
}

var wg sync.WaitGroup
done := make(chan bool)
wg := &sync.WaitGroup{}

wg.Add(1)
go c.termsizeLoop(&wg)
go c.termsizeLoop(wg)

wg.Add(1)
go c.readLoop(done, &wg)
go c.readLoop(wg)

wg.Add(1)
go c.writeLoop(done, &wg)
select {
case <-done:
if atomic.CompareAndSwapInt32(&c.QuitChanClosed, 0, 1) {
close(c.QuitChan)
}
case <-c.QuitChan:
}
go c.writeLoop(wg)

/* Wait for all of the above goroutines to finish */
wg.Wait()

logrus.Info("Client.Loop() exiting")
return nil
}

Expand All @@ -255,8 +250,47 @@ type winsize struct {
y uint16
}

func (c *Client) termsizeLoop(wg *sync.WaitGroup) {
type PosionAction int

const (
CommittedSuicide = iota
Killed
)

func open_poison(fname string, poison chan bool) PosionAction {
logrus.Info(fname + " suicide")

/*
* The close() may raise panic if multiple goroutines commit suicide at the
* same time. Prevent that panic from bubbling up.
*/
defer func() {
if r := recover(); r != nil {
logrus.Error("Prevented panic() of simultaneous suicides", r)
}
}()

/* Signal others to die */
close(poison)
return CommittedSuicide
}

func die(fname string, poison chan bool) PosionAction {
logrus.Info(fname + " died")

was_open := <-poison
if was_open {
logrus.Error("ERROR: The channel was open when it wasn't suppoed to be")
}

return Killed
}

func (c *Client) termsizeLoop(wg *sync.WaitGroup) PosionAction {

defer wg.Done()
fname := "termsizeLoop"

ch := make(chan os.Signal, 1)
notifySignalSIGWINCH(ch)
defer resetSignalSIGWINCH()
Expand All @@ -270,8 +304,9 @@ func (c *Client) termsizeLoop(wg *sync.WaitGroup) {
}
}
select {
case <-c.QuitChan:
return
case <-c.poison:
/* Somebody poisoned the well; die */
return die(fname, c.poison)
case <-ch:
}
}
Expand All @@ -281,8 +316,10 @@ type exposeFd interface {
Fd() uintptr
}

func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) {
func (c *Client) writeLoop(wg *sync.WaitGroup) PosionAction {

defer wg.Done()
fname := "writeLoop"

buff := make([]byte, 128)
oldState, err := terminal.MakeRaw(0)
Expand All @@ -294,17 +331,17 @@ func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) {
reader := io.Reader(os.Stdin)
for {
select {
case <-c.QuitChan:
return
case <-c.poison:
/* Somebody poisoned the well; die */
return die(fname, c.poison)
default:
}

rdfs.Zero()
rdfs.Set(reader.(exposeFd).Fd())
err := goselect.Select(1, rdfs, nil, nil, 50*time.Millisecond)
if err != nil {
done <- true
return
return open_poison(fname, c.poison)
}
if rdfs.IsSet(reader.(exposeFd).Fd()) {
size, err := reader.Read(buff)
Expand All @@ -318,13 +355,11 @@ func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) {
err = c.write(append([]byte("0"), byte(4)))

if err != nil {
done <- true
return
return open_poison(fname, c.poison)
}
continue
} else {
done <- true
return
return open_poison(fname, c.poison)
}
}

Expand All @@ -335,15 +370,16 @@ func (c *Client) writeLoop(done chan bool, wg *sync.WaitGroup) {
data := buff[:size]
err = c.write(append([]byte("0"), data...))
if err != nil {
done <- true
return
return open_poison(fname, c.poison)
}
}
}
}

func (c *Client) readLoop(done chan bool, wg *sync.WaitGroup) {
func (c *Client) readLoop(wg *sync.WaitGroup) PosionAction {

defer wg.Done()
fname := "readLoop"

type MessageNonBlocking struct {
Data []byte
Expand All @@ -358,20 +394,21 @@ func (c *Client) readLoop(done chan bool, wg *sync.WaitGroup) {
}()

select {
case <-c.QuitChan:
return
case <-c.poison:
/* Somebody poisoned the well; die */
return die(fname, c.poison)
case msg := <-msgChan:
if msg.Err != nil {
done <- true

if _, ok := msg.Err.(*websocket.CloseError); !ok {
logrus.Warnf("c.Conn.ReadMessage: %v", msg.Err)
}
return
return open_poison(fname, c.poison)
}
if len(msg.Data) == 0 {
done <- true

logrus.Warnf("An error has occured")
return
return open_poison(fname, c.poison)
}
switch msg.Data[0] {
case '0': // data
Expand Down Expand Up @@ -427,6 +464,6 @@ func NewClient(inputURL string) (*Client, error) {
URL: url,
WriteMutex: &sync.Mutex{},
Output: os.Stdout,
QuitChan: make(chan struct{}),
poison: make(chan bool),
}, nil
}

0 comments on commit 731a3ad

Please sign in to comment.