Skip to content

Commit

Permalink
package yarpc: Update documentation
Browse files Browse the repository at this point in the history
I gave all the docs in the top-level yarpc package a look and made
updates/additions where necessary.

I added a minimal package-level doc which can be updated once the README is
ready.
  • Loading branch information
abhinav committed Dec 28, 2016
1 parent 1f896c4 commit b77b590
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 60 deletions.
35 changes: 26 additions & 9 deletions call.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,14 @@ func ResponseHeaders(h *map[string]string) CallOption {
return CallOption{func(o *OutboundCall) { o.responseHeaders = h }}
}

// TODO(abg): Example tests to document the different options

// WithHeader adds a new header to the request.
// WithHeader adds a new header to the request. Header keys are case
// insensitive.
//
// _, err := client.GetValue(ctx, reqBody, yarpc.WithHeader("Token", "10"))
// // ==> {"token": "10"}
//
// resBody, err := client.GetValue(ctx, reqBody, yarpc.WithHeader("Token", "10"))
// If multiple entries have the same normalized header name, newer entries
// override older ones.
func WithHeader(k, v string) CallOption {
return CallOption{func(o *OutboundCall) {
o.headers = append(o.headers, keyValuePair{k: k, v: v})
Expand All @@ -155,18 +158,21 @@ func WithRoutingDelegate(rd string) CallOption {
// InboundCall holds information about the inbound call and its response.
//
// Encoding authors may use InboundCall to provide information about the
// incoming request on the Context and send response headers through
// WriteResponseHeader.
// incoming request on the Context. Response headers written to the context
// using Call.WriteResponseHeader will be recorded and may be written to a
// ResponseWriter using WriteToResponse.
type InboundCall struct {
resHeaders []keyValuePair
req *transport.Request
}

type inboundCallKey struct{} // context key for *InboundCall

// NewInboundCall builds a new InboundCall with the given context.
// NewInboundCall builds a new InboundCall with the given context. A request
// context is returned and must be used in place of the original.
//
// A request context is returned and must be used in place of the original.
// This may be used by encoding authors to provide information about the
// incoming request on the Context.
func NewInboundCall(ctx context.Context) (context.Context, *InboundCall) {
call := &InboundCall{}
return context.WithValue(ctx, inboundCallKey{}, call), call
Expand Down Expand Up @@ -207,7 +213,18 @@ func (ic *InboundCall) WriteToResponse(resw transport.ResponseWriter) error {
return nil
}

// Call provides information about the current request inside handlers.
// Call provides information about the current request inside handlers. An
// instance of Call for the current request can be obtained by calling
// CallFromContext on the request context.
//
// func Get(ctx context.Context, req *GetRequest) (*GetResponse, error) {
// call := yarpc.CallFromContext(ctx)
// fmt.Println("Received request from", call.Caller())
// if err := call.WriteResponseHeader("hello", "world"); err != nil {
// return nil, err
// }
// return response, nil
// }
type Call struct{ ic *InboundCall }

// CallFromContext retrieves information about the current incoming request
Expand Down
111 changes: 75 additions & 36 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,41 +35,60 @@ import (
"github.com/opentracing/opentracing-go"
)

// Config specifies the parameters of a new RPC constructed via New.
// Config specifies the parameters of a new Dispatcher constructed via
// NewDispatcher.
type Config struct {
// Name of the service. This is the name used by other services when
// making requests to this service.
Name string

Inbounds Inbounds
// Inbounds define how this service receives incoming requests from other
// services.
//
// This may be nil if this service does not receive any requests.
Inbounds Inbounds

// Outbounds defines how this service makes requests to other services.
//
// This may be nil if this service does not send any requests.
Outbounds Outbounds

// Inbound and Outbound Middleware that will be applied to all incoming and
// outgoing requests respectively.
// Inbound and Outbound Middleware that will be applied to all incoming
// and outgoing requests respectively.
//
// These may be nil if there is no middleware to apply.
InboundMiddleware InboundMiddleware
OutboundMiddleware OutboundMiddleware

// Tracer is deprecated. The dispatcher does nothing with this propery.
Tracer opentracing.Tracer
}

// Inbounds contains a list of inbound transports
// Inbounds contains a list of inbound transports. Each inbound transport
// specifies a source through which incoming requests are received.
type Inbounds []transport.Inbound

// Outbounds encapsulates a service and its outbounds
// Outbounds provides access to outbounds for a remote service. Outbounds
// define how requests are sent from this service to the remote service.
type Outbounds map[string]transport.Outbounds

// OutboundMiddleware contains the different type of outbound middleware
// OutboundMiddleware contains the different types of outbound middlewares.
type OutboundMiddleware struct {
Unary middleware.UnaryOutbound
Oneway middleware.OnewayOutbound
}

// InboundMiddleware contains the different type of inbound middleware
// InboundMiddleware contains the different types of inbound middlewares.
type InboundMiddleware struct {
Unary middleware.UnaryInbound
Oneway middleware.OnewayInbound
}

// NewDispatcher builds a new Dispatcher using the specified Config.
// NewDispatcher builds a new Dispatcher using the specified Config. At
// minimum, a service name must be specified.
//
// Invalid configurations or errors in constructing the Dispatcher will cause
// panics.
func NewDispatcher(cfg Config) *Dispatcher {
if cfg.Name == "" {
panic("yarpc.NewDispatcher expects a service name")
Expand Down Expand Up @@ -153,9 +172,8 @@ func collectTransports(inbounds Inbounds, outbounds Outbounds) []transport.Trans
return keys
}

// Dispatcher object is used to configure a YARPC application; it is used by
// Clients to send RPCs, and by Procedures to recieve them. This object is what
// enables an application to be transport-agnostic.
// Dispatcher encapsulates a YARPC application. It acts as the entry point to
// send and receive YARPC requests in a transport and encoding agnostic way.
type Dispatcher struct {
table transport.RouteTable
name string
Expand All @@ -176,20 +194,23 @@ func (d *Dispatcher) Inbounds() Inbounds {
return inbounds
}

// ClientConfig produces a configuration object for an encoding-specific
// outbound RPC client.
// ClientConfig provides the configuration needed to talk to the given
// service. This configuration may be directly passed into encoding-specific
// RPC clients.
//
// For example, pass the returned configuration object to client.New() for any
// generated Thrift client.
// keyvalueClient := json.New(dispatcher.ClientConfig("keyvalue"))
//
// This function panics if the service name is not known.
func (d *Dispatcher) ClientConfig(service string) transport.ClientConfig {
if rs, ok := d.outbounds[service]; ok {
return clientconfig.MultiOutbound(d.name, service, rs)
}
panic(noOutboundForService{Service: service})
}

// Register configures the dispatcher's router to route inbound requests to a
// collection of procedure handlers.
// Register registers zero or more procedures with this dispatcher. Incoming
// requests to these procedures will be routed to the handlers specified in
// the given Procedures.
func (d *Dispatcher) Register(rs []transport.Procedure) {
procedures := make([]transport.Procedure, 0, len(rs))

Expand All @@ -214,19 +235,30 @@ func (d *Dispatcher) Register(rs []transport.Procedure) {
d.table.Register(procedures)
}

// Start Start the RPC allowing it to accept and processing new incoming
// requests.
// Start starts the Dispatcher, allowing it to accept and processing new
// incoming requests.
//
// This starts all inbounds and outbounds configured on this Dispatcher.
//
// Blocks until the RPC is ready to start accepting new requests.
// This function returns immediately after everything has been started.
// Servers should add a `select {}` to block to process all incoming requests.
//
// Start goes through the Transports, Outbounds and Inbounds and starts them
// *NOTE* there can be problems if we don't start these in a particular order
// The order should be: Transports -> Outbounds -> Inbounds
// If the Outbounds are started before the Transports we might get a network
// request before the Transports are ready.
// If the Inbounds are started before the Outbounds an Inbound request might
// hit an Outbound before that Outbound is ready to take requests
// if err := dispatcher.Start(); err != nil {
// log.Fatal(err)
// }
// defer dispatcher.Stop()
//
// select {}
func (d *Dispatcher) Start() error {
// NOTE: These MUST be started in the order transports, outbounds, and
// then inbounds.
//
// If the outbounds are started before the transports, we might get a
// network request before the transports are ready.
//
// If the inbounds are started before the outbounds, an inbound request
// might result in an outbound call before the outbound is ready.

var (
mu sync.Mutex
allStarted []transport.Lifecycle
Expand Down Expand Up @@ -294,15 +326,22 @@ func (d *Dispatcher) Start() error {
return nil
}

// Stop goes through the Transports, Outbounds and Inbounds and stops them
// *NOTE* there can be problems if we don't stop these in a particular order
// The order should be: Inbounds -> Outbounds -> Transports
// If the Outbounds are stopped before the Inbounds we might get a network
// request to a stopped Outbound from a still-going Inbound.
// If the Transports are stopped before the Outbounds the `peers` contained in
// the Outbound might be `deleted` from the Transports perspective and cause
// issues
// Stop stops the Dispatcher.
//
// This stops all outbounds and inbounds owned by this Dispatcher.
//
// This function returns after everything has been stopped.
func (d *Dispatcher) Stop() error {
// NOTE: These MUST be stopped in the order inbounds, outbounds, and then
// transports.
//
// If the outbounds are stopped before the inbounds, we might receive a
// request which needs to use a stopped outbound from a still-going
// inbound.
//
// If the transports are stopped before the outbounds, the peers contained
// in the outbound might be deleted from the transport's perspective and
// cause issues.
var allErrs []error

// Stop Inbounds
Expand Down
42 changes: 42 additions & 0 deletions dispatcher_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package yarpc_test

import (
"context"
"fmt"
"log"

"go.uber.org/yarpc"
"go.uber.org/yarpc/encoding/json"
"go.uber.org/yarpc/encoding/raw"
)

func ExampleDispatcher_minimal() {
dispatcher := yarpc.NewDispatcher(yarpc.Config{Name: "myFancyService"})
if err := dispatcher.Start(); err != nil {
log.Fatal(err)
}
defer dispatcher.Stop()
}

// global dispatcher used in the registration examples
var dispatcher = yarpc.NewDispatcher(yarpc.Config{Name: "service"})

func ExampleDispatcher_Register_raw() {
handler := func(ctx context.Context, data []byte) ([]byte, error) {
return data, nil
}

dispatcher.Register(raw.Procedure("echo", handler))
}

// Excuse the weird naming of this function. This lets is show as "JSON"
// rather than "Json"

func ExampleDispatcher_Register_jSON() {
handler := func(ctx context.Context, key string) (string, error) {
fmt.Println("key", key)
return "value", nil
}

dispatcher.Register(json.Procedure("get", handler))
}
24 changes: 24 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// Package yarpc provides the YARPC service framework.
package yarpc

// TODO: Add more package-level docs
26 changes: 21 additions & 5 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,35 @@ func (e noOutboundForService) Error() string {
return fmt.Sprintf("no configured outbound transport for service %q", e.Service)
}

// IsBadRequestError returns true if the request could not be processed
// because it was invalid.
// IsBadRequestError returns true on an error returned by RPC clients if the
// request was rejected by YARPC because it was invalid.
//
// res, err := client.Call(...)
// if yarpc.IsBadRequestError(err) {
// fmt.Println("invalid request:", err)
// }
func IsBadRequestError(err error) bool {
return transport.IsBadRequestError(err)
}

// IsUnexpectedError returns true if the server panicked or failed to process
// the request with an unhandled error.
// IsUnexpectedError returns true on an error returned by RPC clients if the
// server panicked or failed with an unhandled error.
//
// res, err := client.Call(...)
// if yarpc.IsUnexpectedError(err) {
// fmt.Println("internal server error:", err)
// }
func IsUnexpectedError(err error) bool {
return transport.IsUnexpectedError(err)
}

// IsTimeoutError return true if the given error is a TimeoutError.
// IsTimeoutError return true on an error returned by RPC clients if the given
// error is a TimeoutError.
//
// res, err := client.Call(...)
// if yarpc.IsTimeoutError(err) {
// fmt.Println("request timed out:", err)
// }
func IsTimeoutError(err error) bool {
return transport.IsTimeoutError(err)
}
8 changes: 6 additions & 2 deletions header.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ package yarpc

import "go.uber.org/yarpc/api/transport"

// CanonicalizeHeaderKey canonicalizes the given header key for storage into
// the Headers map.
// CanonicalizeHeaderKey canonicalizes the given header key to the same form
// used by the headers map returned by ResponseHeaders.
//
// var headers map[string]string
// res, err := client.Call(ctx, "hello", requestBody, ResponseHeaders(&headers))
// email, ok := headers[CanonicalizeHeaderKey("User-Email-Address")]
func CanonicalizeHeaderKey(k string) string {
return transport.CanonicalizeHeaderKey(k)
}

0 comments on commit b77b590

Please sign in to comment.