Skip to content

lesismal/arpc

Repository files navigation

ARPC - More Effective Network Communication

Slack

Mentioned in Awesome Go MIT licensed Build Status Go Report Card Coverage Statusd

Contents

Features

  • Two-Way Calling
  • Two-Way Notify
  • Sync and Async Calling
  • Sync and Async Response
  • Singleflight Call De-duplication
  • Batch Write | Writev | net.Buffers
  • Broadcast
  • Middleware
  • Pub/Sub
  • Opentracing
Pattern Interactive Directions Description
call two-way:
c -> s
s -> c
request and response
notify two-way:
c -> s
s -> c
request without response

Performance

Here are some thirdparty benchmark including arpc, although these repos have provide the performance report, but I suggest you run the code yourself and get the real result, other than just believe other people's doc:

Header Layout

  • LittleEndian
bodyLen reserved cmd flag methodLen sequence method body
4 bytes 1 byte 1 byte 1 bytes 1 bytes 8 bytes methodLen bytes bodyLen-methodLen bytes

Installation

  1. Get and install arpc
$ go get -u github.com/lesismal/arpc
  1. Import in your code:
import "github.com/lesismal/arpc"

Quick Start

package main

import "github.com/lesismal/arpc"

func main() {
	server := arpc.NewServer()

	// register router
	server.Handler.Handle("/echo", func(ctx *arpc.Context) {
		str := ""
		if err := ctx.Bind(&str); err == nil {
			ctx.Write(str)
		}
	})

	server.Run("localhost:8888")
}
package main

import (
	"log"
	"net"
	"time"

	"github.com/lesismal/arpc"
)

func main() {
	client, err := arpc.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", "localhost:8888", time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	defer client.Stop()

	req := "hello"
	rsp := ""
	err = client.Call("/echo", &req, &rsp, time.Second*5)
	if err != nil {
		log.Fatalf("Call failed: %v", err)
	} else {
		log.Printf("Call Response: \"%v\"", rsp)
	}
}

API Examples

Register Routers

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

// message would be default handled one by one  in the same conn reader goroutine
handler.Handle("/route", func(ctx *arpc.Context) { ... })
handler.Handle("/route2", func(ctx *arpc.Context) { ... })

// this make message handled by a new goroutine
async := true
handler.Handle("/asyncResponse", func(ctx *arpc.Context) { ... }, async)

Register Service Methods

Instead of registering each route by hand, handler.Register(m, svc) reflects over the struct value svc and registers all of its eligible methods at once, each under the "Service.Method" route name m + "." + <MethodName> (or just <MethodName> when m is empty). If no method is registered, Register panics.

There are two method shapes that Register recognizes:

  • The first shape(a typed request/response method): an exported method with the signature func(ctx context.Context, req *Request, rsp *Response) and no return values, where req/rsp are pointers to structs.
  • The second shape(an arpc.HandlerFunc method): an exported method of type func(*arpc.Context).

Register handles three cases, summarized here and shown in the examples below:

Case Methods on the struct Registered handler Reflection at call time? Performance
1. First shape alone Foo(ctx, req, rsp) only an auto-generated handler that news req/rsp, binds the request, calls Foo, then writes the response Yesreflect.New + reflect.Value.Call on every request Slower
2. Second shape alone Bar(ctx *arpc.Context) only (any name, it does not have to end with Binding) the method itself No — a plain function call Fast
3. Both, paired Foo(ctx, req, rsp) + FooBinding(ctx *arpc.Context) the FooBinding method, registered under the route name Foo No — a plain function call Fast

Note: Register always uses reflection while registering(a one-time cost at startup). The table above is about the call-time / per-request path: only case 1's auto-generated handler keeps using reflection on every request, so prefer case 2 or case 3 on hot paths.

In case 3 the two methods must be paired by name: the second method is named <FirstMethodName>Binding. It is registered under the first method's name, and lets you write the binding/response logic yourself instead of relying on the reflection-based auto-generated handler.

type Echo struct{}

type EchoReq struct{ Message string }
type EchoRsp struct{ Message string }

// Case 1: first shape alone(no "HelloBinding"). Register auto-generates a
// reflection-based handler that binds the request, calls Hello and writes the
// response, registered as "Echo.Hello".
func (e *Echo) Hello(ctx context.Context, req *EchoReq, rsp *EchoRsp) {
	rsp.Message = "hello, " + req.Message
}

// Case 2: second shape alone. A plain arpc.HandlerFunc method, registered as
// "Echo.Ping". The name need not end with "Binding".
func (e *Echo) Ping(ctx *arpc.Context) {
	ctx.Write("pong")
}

// Case 3: paired. Say is the first shape, SayBinding is the second shape named
// after it. SayBinding is registered under the first method's name "Echo.Say",
// and the call path is a plain function call(no reflection).
// arpc.Context implements context.Context, so it can be passed through directly.
func (e *Echo) Say(ctx context.Context, req *EchoReq, rsp *EchoRsp) {
	rsp.Message = req.Message
}
func (e *Echo) SayBinding(ctx *arpc.Context) {
	req := &EchoReq{}
	rsp := &EchoRsp{}
	if err := ctx.Bind(req); err != nil {
		ctx.Error(err)
		return
	}
	e.Say(ctx, req, rsp)
	ctx.Write(rsp)
}

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

// registers "Echo.Hello"(case 1, auto-generated), "Echo.Ping"(case 2) and
// "Echo.Say"(case 3, via SayBinding)
if err := handler.Register("Echo", &Echo{}); err != nil {
	log.Fatal(err)
}

Router Middleware

See router middleware, it's easy to implement middlewares yourself

import "github.com/lesismal/arpcex/extension/middleware/router"

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

handler.Use(router.Recover())
handler.Use(router.Logger())
handler.Use(func(ctx *arpc.Context) { ... })
handler.Handle("/echo", func(ctx *arpc.Context) { ... })
handler.Use(func(ctx *arpc.Context) { ... })

Coder Middleware

  • Coder Middleware is used for converting a message data to your designed format, e.g encrypt/decrypt and compress/uncompress
import "github.com/lesismal/arpcex/extension/middleware/coder/gzip"

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

handler.UseCoder(gzip.New())
handler.Handle("/echo", func(ctx *arpc.Context) { ... })

Client Call, CallAsync, Notify

  1. Call (Block, with timeout/context)
request := &Echo{...}
response := &Echo{}
timeout := time.Second*5
err := client.Call("/call/echo", request, response, timeout)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// err := client.CallWith(ctx, "/call/echo", request, response)
  1. CallAsync (Nonblock, with callback and timeout/context)
request := &Echo{...}

timeout := time.Second*5
err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context) {
	response := &Echo{}
	ctx.Bind(response)
	...	
}, timeout)
  1. Notify (same as CallAsync with timeout/context, without callback)
data := &Notify{...}
client.Notify("/notify", data, time.Second)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// client.NotifyWith(ctx, "/notify", data)

Singleflight (Call De-duplication)

Enable singleflight on a router/method to de-duplicate concurrent calls: when several goroutines call the same method with the same key at the same time, only one request is actually sent to the server and all of them share its response. This cuts duplicated round-trips and server load for hot, identical requests. It applies to Call, CallContext/CallWith and CallAsync.

Only methods you register are optimized; every other method behaves exactly as before.

// Configure it on the Handler used by your Client (or use the package-level
// arpc.Singleflight(...) which applies to arpc.DefaultHandler).

// 1) Default key: req.String() when req implements fmt.Stringer,
//    otherwise fmt.Sprintf("%v", req).
handler.Singleflight("/call/echo")

// 2) Or provide a custom func that computes the de-dup key from the req.
handler.Singleflight("/user/get", func(req interface{}) string {
	return req.(*GetUserReq).ID
})

// Create the Client with that Handler, then call as usual; concurrent calls
// sharing the same key are collapsed into a single request.
client, _ := arpc.NewClient(dialer, handler)
err := client.Call("/call/echo", request, response, time.Second*5)

Notes:

  • The leader performs the real round-trip; followers wait for it and each decodes the shared response into their own rsp. The blocking path decodes the payload only once and shares the decoded value, so the shared response should be treated as read-only.
  • Every caller still honors its own timeout/context while waiting.
  • Call and CallContext de-duplicate together; CallAsync de-duplicates within its own kind and each follower's callback is invoked with the shared response (or its own timeout).

Server Call, CallAsync, Notify

  1. Get client and keep it in your application
var client *arpc.Client
server.Handler.Handle("/route", func(ctx *arpc.Context) {
	client = ctx.Client
	// release client
	client.OnDisconnected(func(c *arpc.Client){
		client = nil
	})
})

go func() {
	for {
		time.Sleep(time.Second)
		if client != nil {
			client.Call(...)
			client.CallAsync(...)
			client.Notify(...)
		}
	}
}()
  1. Then Call/CallAsync/Notify

Broadcast - Notify

var mux = sync.RWMutex{}
var clientMap = make(map[*arpc.Client]struct{})

func broadcast() {
	var svr *arpc.Server = ... 
	msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i))
	mux.RLock()
	for client := range clientMap {
		client.PushMsg(msg, arpc.TimeZero)
	}
	mux.RUnlock()
}

Async Response

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

asyncResponse := true // default is true, or set false
handler.Handle("/echo", func(ctx *arpc.Context) {
	req := ...
	err := ctx.Bind(req)
	if err == nil {
		ctx.Write(data)
	}
}, asyncResponse)

Handle New Connection

// package
arpc.DefaultHandler.HandleConnected(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleConnected(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleConnected(func(c *arpc.Client) {
	...
})

Handle Disconnected

// package
arpc.DefaultHandler.HandleDisconnected(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleDisconnected(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleDisconnected(func(c *arpc.Client) {
	...
})

Handle Client's send queue overstock

// package
arpc.DefaultHandler.HandleOverstock(func(c *arpc.Client) {
	...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleOverstock(func(c *arpc.Client) {
	...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleOverstock(func(c *arpc.Client) {
	...
})

Custom Net Protocol

// server
var ln net.Listener = ...
svr := arpc.NewServer()
svr.Serve(ln)

// client
dialer := func() (net.Conn, error) { 
	return ... 
}
client, err := arpc.NewClient(dialer)

Custom Codec

import "github.com/lesismal/arpc/codec"

var codec arpc.Codec = ...

// package
codec.Defaultcodec = codec

// server
svr := arpc.NewServer()
svr.Codec = codec

// client
client, err := arpc.NewClient(...)
client.Codec = codec

Custom Logger

import "github.com/lesismal/arpc/log"

var logger arpc.Logger = ...
log.SetLogger(logger) // log.DefaultLogger = logger

Custom operations before conn's recv and send

arpc.DefaultHandler.BeforeRecv(func(conn net.Conn) error) {
	// ...
})

arpc.DefaultHandler.BeforeSend(func(conn net.Conn) error) {
	// ...
})

Custom arpc.Client's Reader by wrapping net.Conn

arpc.DefaultHandler.SetReaderWrapper(func(conn net.Conn) io.Reader) {
	// ...
})

Custom arpc.Client's send queue capacity

arpc.DefaultHandler.SetSendQueueSize(4096)

JS Client

Web Chat Examples

Pub/Sub Examples

  • start a server
import "github.com/lesismal/arpcex/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func main() {
	s := pubsub.NewServer()
	s.Password = password

	// server publish to all clients
	go func() {
		for i := 0; true; i++ {
			time.Sleep(time.Second)
			s.Publish(topicName, fmt.Sprintf("message from server %v", i))
		}
	}()

	s.Run(address)
}
  • start a subscribe client
import "github.com/lesismal/arpc/log"
import "github.com/lesismal/arpcex/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func onTopic(topic *pubsub.Topic) {
	log.Info("[OnTopic] [%v] \"%v\", [%v]",
		topic.Name,
		string(topic.Data),
		time.Unix(topic.Timestamp/1000000000, topic.Timestamp%1000000000).Format("2006-01-02 15:04:05.000"))
}

func main() {
	client, err := pubsub.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", address, time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	client.Password = password

	// authentication
	err = client.Authenticate()
	if err != nil {
		panic(err)
	}

	// subscribe topic
	if err := client.Subscribe(topicName, onTopic, time.Second); err != nil {
		panic(err)
	}

	<-make(chan int)
}
  • start a publish client
import "github.com/lesismal/arpcex/extension/pubsub"

var (
	address = "localhost:8888"

	password = "123qwe"

	topicName = "Broadcast"
)

func main() {
	client, err := pubsub.NewClient(func() (net.Conn, error) {
		return net.DialTimeout("tcp", address, time.Second*3)
	})
	if err != nil {
		panic(err)
	}
	client.Password = password

	// authentication
	err = client.Authenticate()
	if err != nil {
		panic(err)
	}

	for i := 0; true; i++ {
		if i%5 == 0 {
			// publish msg to all clients
			client.Publish(topicName, fmt.Sprintf("message from client %d", i), time.Second)
		} else {
			// publish msg to only one client
			client.PublishToOne(topicName, fmt.Sprintf("message from client %d", i), time.Second)
		}
		time.Sleep(time.Second)
	}
}

More Examples

About

More effective network communication, two-way calling, notify and broadcast supported.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages