Skip to content

Commit

Permalink
GH-75: Added Conn method.
Browse files Browse the repository at this point in the history
  • Loading branch information
jirenius committed May 29, 2020
1 parent 2447095 commit c2dd3dc
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 20 deletions.
13 changes: 7 additions & 6 deletions doc.go
@@ -1,17 +1,18 @@
/*
Package res provides RES service implementations for realtime API's through Resgate:
Package res is used to create REST, real time, and RPC APIs, where all your
reactive web clients are synchronized seamlessly through Resgate:
https://github.com/resgateio/resgate
The implementation provides low level methods to listen to and handle incoming
requests, and to send events.
The implementation provides structs and methods for creating services that
listen to requests and send events over NATS server.
Concurrency
Requests are handled concurrently for multiple resources, but the package
guarantees that only one goroutine is executing handlers for any unique
resource at any one time. This allows handlers to modify models and collections
without additional synchronization such as mutexes.
guarantees that only one goroutine is executing handlers for any unique resource
at any one time. This allows handlers to modify models and collections without
additional synchronization such as mutexes.
Usage
Expand Down
47 changes: 33 additions & 14 deletions service.go
Expand Up @@ -272,6 +272,17 @@ func (s *Service) ProtocolVersion() string {
return protocolVersion
}

// Conn returns the connection instance used by the service.
//
// If the service is not started, nil is returned.
//
// If the service was started using ListenAndServe, the connection will be of
// type *nats.Conn:
// nc := service.Conn().(*nats.Conn)
func (s *Service) Conn() Conn {
return s.nc
}

// infof logs a formatted info entry.
func (s *Service) infof(format string, v ...interface{}) {
if s.logger == nil {
Expand Down Expand Up @@ -507,9 +518,8 @@ func (s *Service) SetOwnedResources(resources, access []string) *Service {
}

// ListenAndServe connects to the NATS server at the url. Once connected, it
// subscribes to incoming requests and serves them on a single goroutine in the
// order they are received. For each request, it calls the appropriate handler,
// or replies with the appropriate error if no handler is available.
// subscribes to incoming requests. For each request, it calls the appropriate
// handler, or replies with the appropriate error if no handler is available.
//
// In case of disconnect, it will try to reconnect until Close is called, or
// until successfully reconnecting, upon which Reset will be called.
Expand Down Expand Up @@ -539,25 +549,34 @@ func (s *Service) ListenAndServe(url string, options ...nats.Option) error {
return err
}

nc.SetReconnectHandler(s.handleReconnect)
nc.SetDisconnectHandler(s.handleDisconnect)
nc.SetClosedHandler(s.handleClosed)

return s.serve(nc)
}

// Serve subscribes to incoming requests on the *Conn nc, serving them on a
// single goroutine in the order they are received. For each request, it calls
// the appropriate handler, or replies with the appropriate error if no handler
// is available.
// Serve starts serving incoming requests received on the connection conn. For
// each request, it calls the appropriate handler, or replies with the
// appropriate error if no handler is available.
//
// If the connection conn is of type *nats.Conn, Service will call
// SetReconnectHandler, SetDisconnectHandler, and SetClosedHandler, replacing
// any existing event handlers.
//
// In case of disconnect, it will try to reconnect until Close is called, or
// until successfully reconnecting, upon which Reset will be called.
//
// Serve returns an error if failes to subscribe. Otherwise, nil is returned
// once the *Conn is closed.
func (s *Service) Serve(nc Conn) error {
// once the connection is closed.
func (s *Service) Serve(conn Conn) error {
if !atomic.CompareAndSwapInt32(&s.state, stateStopped, stateStarting) {
return errNotStopped
}
return s.serve(nc)

if nc, ok := conn.(*nats.Conn); ok {
nc.SetReconnectHandler(s.handleReconnect)
nc.SetDisconnectHandler(s.handleDisconnect)
nc.SetClosedHandler(s.handleClosed)
}

return s.serve(conn)
}

func (s *Service) serve(nc Conn) error {
Expand Down
15 changes: 15 additions & 0 deletions test/00service_test.go
Expand Up @@ -251,3 +251,18 @@ func TestServiceWithGroup_WithMatchingResource_CallsCallback(t *testing.T) {
}
})
}

func TestConn_BeforeServe_ReturnsNil(t *testing.T) {
s := res.NewService("test")
restest.AssertTrue(t, "Conn() returns nil", s.Conn() == nil)
}

func TestConn_AfterServe_ReturnsConn(t *testing.T) {
runTest(t, func(s *res.Service) {
s.Handle("model", res.GetResource(func(r res.GetRequest) { r.NotFound() }))
}, func(s *restest.Session) {
conn, ok := s.Service().Conn().(*restest.MockConn)
restest.AssertTrue(t, "conn is not nil", conn != nil)
restest.AssertTrue(t, "conn is of type *MockConn", ok)
})
}

0 comments on commit c2dd3dc

Please sign in to comment.