Skip to content

Commit

Permalink
Handle hub disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
jderusse committed Jan 27, 2020
1 parent 822ea5c commit ed684a8
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 70 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ go 1.13

require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/donovanhide/eventsource v0.0.0-20171031113327-3ed64d21fb0b
github.com/gorilla/handlers v1.4.2
github.com/joho/godotenv v1.3.0
github.com/joonix/log v0.0.0-20190524090622-13fe31bbdd7a
github.com/pkg/errors v0.9.1
github.com/r3labs/sse v0.0.0-20200123123541-10c56e11168e
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.4.0
github.com/unrolled/secure v1.0.7
Expand Down
17 changes: 9 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/codegangsta/negroni v1.0.0 h1:+aYywywx4bnKXWvoWtRfJ91vC59NbEhEY03sZjQhbVY=
github.com/codegangsta/negroni v1.0.0/go.mod h1:v0y3T5G7Y1UlFfyxFn/QLRU4a2EuNau2iZY63YTKWo0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand All @@ -20,26 +21,21 @@ github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/joonix/log v0.0.0-20190524090622-13fe31bbdd7a h1:LL1gwNo4Z1LG68SaaNb8bxB+YnMSilYzytRfkF3AigE=
github.com/joonix/log v0.0.0-20190524090622-13fe31bbdd7a/go.mod h1:fS54ONkjDV71zS9CDx3V9K21gJg7byKSvI4ajuWFNJw=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.0 h1:J8lpUdobwIeCI7OiSxHqEwJUKvJwicL5+3v1oe2Yb4k=
github.com/pkg/errors v0.9.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/r3labs/sse v0.0.0-20200123123541-10c56e11168e h1:7RbVXrrMYdlz8VXrKbz6WtFwE+aLxRVjpp2mEkOZhYs=
github.com/r3labs/sse v0.0.0-20200123123541-10c56e11168e/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/unrolled/secure v1.0.5 h1:KRGJ8DQC3jKpERjBKF3H6b3HcAsM/SRTVwfNJnWs25E=
github.com/unrolled/secure v1.0.5/go.mod h1:R6rugAuzh4TQpbFAq69oqZggyBQxFRFQIewtz5z7Jsc=
github.com/unrolled/secure v1.0.7 h1:BcQHp3iKZyZCKj5gRqwQG+5urnGBF00wGgoPPwtheVQ=
github.com/unrolled/secure v1.0.7/go.mod h1:uGc1OcRF8gCVBA+ANksKmvM85Hka6SZtQIbrKc3sHS4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand All @@ -53,6 +49,8 @@ golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 h1:0GoQqolDA55aaLxZyTzK/Y2ePZzZTUrRacwib7cNsYQ=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20191116160921-f9c825593386 h1:ktbWvQrW08Txdxno1PiDpSxPXG6ndGsfnJjRRtkM0LQ=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -71,6 +69,9 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA
google.golang.org/genproto v0.0.0-20190522204451-c2c4e71fbf69 h1:4rNOqY4ULrKzS6twXa619uQgI7h9PaVd4ZhjFQ7C5zs=
google.golang.org/genproto v0.0.0-20190522204451-c2c4e71fbf69/go.mod h1:z3L6/3dTEVtUr6QSP8miRzeRqwQOioJ9I66odjN4I7s=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
58 changes: 19 additions & 39 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package agent
import (
"context"
"fmt"
"net/http"
"net/url"
"sync"

"github.com/cenkalti/backoff"
"github.com/donovanhide/eventsource"
"github.com/pkg/errors"
"github.com/r3labs/sse"
log "github.com/sirupsen/logrus"

"github.com/jderusse/http-broadcast/pkg/config"
Expand All @@ -18,7 +17,8 @@ import (

// Agent listen for request and dispatch it to a target
type Agent struct {
stream *eventsource.Stream
events chan *sse.Event

options *config.Options

inShutdown atomic.Bool
Expand Down Expand Up @@ -62,37 +62,25 @@ func (a *Agent) shuttingDown() bool {
func (a *Agent) listen() error {
log.Debug("agent: starting")

hubRequest, err := http.NewRequest("GET", a.hubURL(), nil)

if err != nil {
return err
}

client := sse.NewClient(a.hubURL())
if a.options.Hub.SubscribeToken != "" {
hubRequest.Header.Set("Authorization", fmt.Sprintf("Bearer %s", a.options.Hub.SubscribeToken))
client.Headers["Authorization"] = fmt.Sprintf("Bearer %s", a.options.Hub.SubscribeToken)
}

retry := backoff.NewExponentialBackOff()
retry.MaxInterval = defaultMaxInterval
retry.MaxElapsedTime = 0
client.OnDisconnect(func(c *sse.Client) {
log.WithFields(log.Fields{"hub": a.hubURL()}).Warn("agent: disconnected")
})

ctx, cancel := context.WithCancel(context.Background())

a.RegisterOnShutdown(func() { cancel() })

err = backoff.Retry(func() error {
stream, err := eventsource.SubscribeWith("", http.DefaultClient, hubRequest)
if err == nil {
a.mu.Lock()
a.stream = stream
a.mu.Unlock()

return nil
}

return err
}, backoff.WithContext(retry, ctx))
retry := backoff.NewExponentialBackOff()
retry.MaxInterval = defaultMaxInterval
retry.MaxElapsedTime = 0
client.ReconnectStrategy = backoff.WithContext(retry, ctx)

if err != nil {
if err := client.SubscribeChanWithContext(ctx, "", a.events); err != nil {
return errors.Wrap(err, "subscribe to stream")
}

Expand All @@ -106,9 +94,9 @@ func (a *Agent) serve() error {
select {
case <-a.getDoneChan():
return ErrServerClosed
case event := <-a.stream.Events:
if event != nil {
go a.replay(event.Id(), []byte(event.Data()))
case event := <-a.events:
if event != nil && len(event.Data) > 0 {
go a.replay(string(event.ID), event.Data)
}
}
}
Expand Down Expand Up @@ -144,7 +132,6 @@ func (a *Agent) Shutdown() error {

a.inShutdown.Store(true)

lnerr := a.closeStreamLocked()
a.closeDoneChanLocked()

for _, f := range a.onShutdown {
Expand All @@ -153,7 +140,7 @@ func (a *Agent) Shutdown() error {

log.Debug("agent: stopped")

return lnerr
return nil
}

func (a *Agent) closeDoneChanLocked() {
Expand Down Expand Up @@ -183,17 +170,10 @@ func (a *Agent) getDoneChanLocked() chan struct{} {
return a.doneChan
}

func (a *Agent) closeStreamLocked() error {
if a.stream != nil {
a.stream.Close()
}

return nil
}

// NewAgent allocates and returns a new Agent.
func NewAgent(options *config.Options) *Agent {
return &Agent{
events: make(chan *sse.Event),
options: options,
}
}
53 changes: 31 additions & 22 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,59 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/donovanhide/eventsource"
"github.com/r3labs/sse"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/jderusse/http-broadcast/pkg/config"
)

var u string
var srv *sse.Server
var server *httptest.Server

func newServer() *sse.Server {
srv = sse.New()

mux := http.NewServeMux()
mux.HandleFunc("/events", srv.HTTPHandler)
server = httptest.NewServer(mux)

srv.CreateStream("foo")

return srv
}

func cleanup() {
server.CloseClientConnections()
server.Close()
srv.Close()
}

func TestNewAgent(t *testing.T) {
NewAgent(&config.Options{})
}

func TestServe(t *testing.T) {
server := eventsource.NewServer()
hubServer := httptest.NewServer(server.Handler("foo"))
newServer()
defer cleanup()

var targetRequest atomic.Value
targetServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
targetRequest.Store(r)
}))
// The server has to be closed before the hubServer is closed.
// Otherwise the hubServer has still an open connection and it can not close.
defer hubServer.Close()
defer server.Close()
defer targetServer.Close()

s := NewAgent(&config.Options{
Agent: config.AgentOptions{
Endpoint: parseSafeURL(targetServer.URL),
},
Hub: config.HubOptions{
Endpoint: parseSafeURL(hubServer.URL),
Endpoint: parseSafeURL(server.URL + "/events?stream=foo"),
},
})

Expand All @@ -49,35 +66,29 @@ func TestServe(t *testing.T) {
go s.serve()
defer s.Shutdown()

event, err := eventsource.NewDecoder(strings.NewReader("id: 123\ndata: {}\n\n")).Decode()
require.Nil(t, err)
server.Publish([]string{"foo"}, event)
srv.Publish("foo", &sse.Event{ID: []byte("123"), Data: []byte("{}")})
time.Sleep(100 * time.Millisecond)

v, _ := targetRequest.Load().(*http.Request)
assert.NotNil(t, v)
}

func TestShutdown(t *testing.T) {
server := eventsource.NewServer()
hubServer := httptest.NewServer(server.Handler("foo"))
newServer()
defer cleanup()

var targetRequest atomic.Value
targetServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
targetRequest.Store(r)
}))
// The server has to be closed before the hubServer is closed.
// Otherwise the hubServer has still an open connection and it can not close.
defer hubServer.Close()
defer server.Close()
defer targetServer.Close()

s := NewAgent(&config.Options{
Agent: config.AgentOptions{
Endpoint: parseSafeURL(targetServer.URL),
},
Hub: config.HubOptions{
Endpoint: parseSafeURL(hubServer.URL),
Endpoint: parseSafeURL(server.URL + "/events?stream=foo"),
},
})

Expand All @@ -87,9 +98,7 @@ func TestShutdown(t *testing.T) {
go s.serve()
s.Shutdown()

event, err := eventsource.NewDecoder(strings.NewReader("id: 123\ndata: {}\n\n")).Decode()
require.Nil(t, err)
server.Publish([]string{"foo"}, event)
srv.Publish("foo", &sse.Event{ID: []byte("123"), Data: []byte("{}")})
time.Sleep(100 * time.Millisecond)

v, _ := targetRequest.Load().(*http.Request)
Expand Down

0 comments on commit ed684a8

Please sign in to comment.