A very convenient tcp framework in golang.
Supporting protocols
- UDP
- TCP
- KCP(tcpx@v3.0.0 --)
Since some dependencies, (github.com/xtaci/kcp-go
--> github.com/klauspost/reedsolomon
, github.com/xtaci/kcp-go
--> github.com/templexxx/*
), ,stop supporting old go versions, which conflicts to my opinion, tcpx decides stop supporting kcp.And tcp.v3.0.0 is the last version to use it running kcp server.
Table of Contents generated with DocToc
- Why designing tcp framwork rather than the official?
- 1. Start
- 2. Example
- 3. Ussages
- 4. Frequently used methods
- 5. Cross-language gateway
Golang has greate support of tcp protocol in official libraries, but users still need to consider details, most profiling way will make project heavier and heavier.Tpcx aims to use tcp in a most friendly way.Most ussage paterns are like github.com/gin-gonic/gin
.Users don't consider details. All they are advised touching is a context, most apis in gin
are also accessable in tcpx.
go get github.com/fwhezfwhez/tcpx
if you want to run program in this repo,you should prepare protoc,proto-gen-go environment.
It's good to compile yourself from these repos,but there is already release versions referring to their doc.
Make sure run protoc --version
available.
protoc: https://github.com/golang/protobuf
proto-gen-go:https://github.com/golang/protobuf/tree/master/protoc-gen-go
https://github.com/fwhezfwhez/tcpx/blob/master/benchmark_test.go
cases | exec times | cost time per loop | cost mem per loop | cost object num per loop | url |
---|---|---|---|---|---|
OnMessage | 2000000 | 643 ns/op | 1368 B/op | 5 allocs/op | click to location |
Mux without middleware | 2000000 | 761 ns/op | 1368 B/op | 5 allocs/op | click to location |
Mux with middleware | 2000000 | 768 ns/op | 1368 B/op | 5 allocs/op | click to location |
https://github.com/fwhezfwhez/tcpx/tree/master/examples/sayHello
server:
package main
import (
"fmt"
"github.com/fwhezfwhez/tcpx"
)
func main() {
srv := tcpx.NewTcpX(nil)
srv.OnMessage = func(c *tcpx.Context) {
var message []byte
c.Bind(&message)
fmt.Println(string(message))
}
srv.ListenAndServe("tcp", "localhost:8080")
}
client:
package main
import (
"fmt"
"net"
"github.com/fwhezfwhez/tcpx"
//"tcpx"
)
func main() {
conn, e := net.Dial("tcp", "localhost:8080")
if e != nil {
panic(e)
}
var message = []byte("hello world")
buf, e := tcpx.PackWithMarshaller(tcpx.Message{
MessageID: 1,
Header: nil,
Body: message,
}, nil)
if e != nil {
fmt.Println(e.Error())
return
}
_, e = conn.Write(buf)
if e != nil {
fmt.Println(e.Error())
return
}
}
https://github.com/fwhezfwhez/tcpx/tree/master/examples/modules/heartbeat
tcpx has built-in heartbeat handler. Default heartbeat messageID is 1392.It means client should send heartbeat pack in specific interval.When fail received more than 3 times, connection will break by server.
srv side
srv := tcpx.NewTcpX(nil)
srv.HeartBeatModeDetail(true, 10 * time.Second, false, tcpx.DEFAULT_HEARTBEAT_MESSAGEID)
client side
var heartBeat []byte
heartBeat, e = tcpx.PackWithMarshaller(tcpx.Message{
MessageID: tcpx.DEFAULT_HEARTBEAT_MESSAGEID,
Header: nil,
Body: nil,
}, nil)
for {
conn.Write(heartBeat)
time.Sleep(10 * time.Second)
}
rewrite heartbeat handler
srv.RewriteHeartBeatHandler(1300, func(c *tcpx.Context) {
fmt.Println("rewrite heartbeat handler")
c.RecvHeartBeat()
})
https://github.com/fwhezfwhez/tcpx/tree/master/examples/modules/online-offline
https://github.com/fwhezfwhez/tcpx/tree/master/examples/modules/graceful
- Graceful exit
Catch signal and do jobs arranged
- Graceful stop
two strategies:
-
closeAllConnection = false
:Stop listen on, but no effect to existed connection -
closeAllConnection = true
:Stop listen on, stops all connection including connected clients.
- Graceful restart:
Contains graceful stop
and graceful start
. Between them, you can add jobs you want.
https://github.com/fwhezfwhez/tcpx/tree/master/examples/modules/middleware
It tells usages of using middleware.
https://github.com/fwhezfwhez/tcpx/tree/master/examples/modules/pack-detail
Provides tcpx pack detail.
https://github.com/fwhezfwhez/tcpx/tree/master/examples/modules/chat
It examples a chat using tcpx.
https://github.com/fwhezfwhez/tcpx/tree/master/examples/modules/raw
It examples how to send stream without rule, nothing to do with messageID system
. You can send all stream you want. Global middleware and anchor middleware are still working as the example said.
https://github.com/fwhezfwhez/tcpx/tree/master/examples/modules/online-offline
Example shares with 2.2.
Tcpx has its built-in pool to help manage online and offline users. Note that :
- To use built-in pool, you need to run
srv.WithBuiltInPool(true)
. - To online/offline a user, you can do it like
ctx.Offline()
,ctx.Online(username string)
.
Official built-in pool will not extend much. If it doesn't fit your requirement, you should design your own pool.
https://github.com/fwhezfwhez/tcpx/tree/master/examples/modules/auth
Auth makes different sense comparing with middleware. A middleware can easily stop a invalid request after a connection has been established, but It can't avoid a client keep sending heartbeat but do nothing.It still occupy a connection resource.
Auth will start a goroutine once a connection is on. In a specific interval not receiving signal, connection will be forcely dropped by server side.
Now tcpx advises two modes handling stream, using OnMessage requires user handling stream by himself
Using OnMessage
func main(){
srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
srv.OnClose = OnClose
srv.OnConnect = OnConnect
srv.OnMessage = OnMessage
go func(){
fmt.Println("tcp srv listen on 7171")
if e := srv.ListenAndServe("tcp", ":7171"); e != nil {
panic(e)
}
}()
// udp
go func(){
fmt.Println("udp srv listen on 7172")
if e := srv.ListenAndServe("udp", ":7172"); e != nil {
panic(e)
}
}()
// kcp
go func(){
fmt.Println("kcp srv listen on 7173")
if e := srv.ListenAndServe("kcp", ":7173"); e != nil {
panic(e)
}
}()
select {}
}
func OnConnect(c *tcpx.Context) {
fmt.Println(fmt.Sprintf("connecting from remote host %s network %s", c.ClientIP(), c.Network()))
}
func OnClose(c *tcpx.Context) {
fmt.Println(fmt.Sprintf("connecting from remote host %s network %s has stoped", c.ClientIP(), c.Network())
}
var packx = tcpx.NewPackx(tcpx.JsonMarshaller{})
func OnMessage(c *tcpx.Context) {
// handle c.Stream
type ServiceA struct{
Username string `json:"username"`
}
type ServiceB struct{
ServiceName string `json:"service_name"`
}
messageID, e :=packx.MessageIDOf(c.Stream)
if e!=nil {
fmt.Println(errorx.Wrap(e).Error())
return
}
switch messageID {
case 7:
var serviceA ServiceA
// block, e := packx.Unpack(c.Stream, &serviceA)
block, e :=c.Bind(&serviceA)
fmt.Println(block, e)
c.Reply(8, "success")
case 9:
var serviceB ServiceB
//block, e := packx.Unpack(c.Stream, &serviceB)
block, e :=c.Bind(&serviceB)
fmt.Println(block, e)
c.JSON(10, "success")
}
}
Using routine mux
func main(){
srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
srv.OnClose = OnClose
srv.OnConnect = OnConnect
// srv.OnMessage = OnMessage
srv.UseGlobal(MiddlewareGlobal)
srv.Use("middleware1", Middleware1, "middleware2", Middleware2)
srv.AddHandler(1, SayHello)
srv.UnUse("middleware2")
srv.AddHandler(3, SayGoodBye)
if e := srv.ListenAndServe("tcp", ":7171"); e != nil {
panic(e)
}
}
func OnConnect(c *tcpx.Context) {
fmt.Println(fmt.Sprintf("connecting from remote host %s network %s", c.ClientIP(), c.Network()))
}
func OnClose(c *tcpx.Context) {
fmt.Println(fmt.Sprintf("connecting from remote host %s network %s has stoped", c.ClientIP(), c.Network())
}
// func OnMessage(c *tcpx.Context) {
// handle c.Stream
// }
func SayHello(c *tcpx.Context) {
var messageFromClient string
var messageInfo tcpx.Message
messageInfo, e := c.Bind(&messageFromClient)
if e != nil {
panic(e)
}
fmt.Println("receive messageID:", messageInfo.MessageID)
fmt.Println("receive header:", messageInfo.Header)
fmt.Println("receive body:", messageInfo.Body)
var responseMessageID int32 = 2
e = c.Reply(responseMessageID, "hello")
fmt.Println("reply:", "hello")
if e != nil {
fmt.Println(e.Error())
}
}
func SayGoodBye(c *tcpx.Context) {
var messageFromClient string
var messageInfo tcpx.Message
messageInfo, e := c.Bind(&messageFromClient)
if e != nil {
panic(e)
}
fmt.Println("receive messageID:", messageInfo.MessageID)
fmt.Println("receive header:", messageInfo.Header)
fmt.Println("receive body:", messageInfo.Body)
var responseMessageID int32 = 4
e = c.Reply(responseMessageID, "bye")
fmt.Println("reply:", "bye")
if e != nil {
fmt.Println(e.Error())
}
}
func Middleware1(c *tcpx.Context) {
fmt.Println("I am middleware 1 exampled by 'srv.Use(\"middleware1\", Middleware1)'")
}
func Middleware2(c *tcpx.Context) {
fmt.Println("I am middleware 2 exampled by 'srv.Use(\"middleware2\", Middleware2),srv.UnUse(\"middleware2\")'")
}
func Middleware3(c *tcpx.Context) {
fmt.Println("I am middleware 3 exampled by 'srv.AddHandler(5, Middleware3, SayName)'")
}
func MiddlewareGlobal(c *tcpx.Context) {
fmt.Println("I am global middleware exampled by 'srv.UseGlobal(MiddlewareGlobal)'")
}
Middlewares in tcpx has three types: GlobalTypeMiddleware
, MessageIDSelfRelatedTypeMiddleware
,AnchorTypeMiddleware
.
GlobalTypeMiddleware
:
srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
srv.UseGlobal(MiddlewareGlobal)
MessageIDSelfRelatedTypeMiddleware
:
srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
srv.AddHandler(5, Middleware3, SayName)
AnchorTypeMiddleware
:
srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
srv.Use("middleware1", Middleware1, "middleware2", Middleware2)
srv.AddHandler(5, SayName)
middleware example
:
func Middleware1(c *tcpx.Context) {
fmt.Println("I am middleware 1 exampled by 'srv.Use(\"middleware1\", Middleware1)'")
// c.Next()
// c.Abort()
}
middleware order
:
GlobalTypeMiddleware
-> AnchorTypeMiddleware
-> MessageIDSelfRelatedTypeMiddleware
.
if one of middleware has called c.Abort()
, middleware chain stops.
ATTENTION: If srv.OnMessage
is not nil, only GlobalTypeMiddleware
and AnchorTypeMiddleware
will make sense regardless of AnchorTypeMiddleware
being UnUsed or not.
OnMessage
's minimum unit block is each message, whenOnMessage
is not nil, mux
will lose its effects.
srv.OnMessage = OnMessage
srv.AddHandler(1, SayName) // no use, because OnMessage is not nil, user should handle c.Stream by himself
In the mean time, global middlewares and anchor middlewares will all make sense regardless of anchor middlewares being unUsed or not. Here is part of source code:
go func(ctx *Context, tcpx *TcpX) {
if tcpx.OnMessage != nil {
...
} else {
messageID, e := tcpx.Packx.MessageIDOf(ctx.Stream)
if e != nil {
Logger.Println(errorx.Wrap(e).Error())
return
}
handler, ok := tcpx.Mux.Handlers[messageID]
if !ok {
Logger.Println(fmt.Sprintf("messageID %d handler not found", messageID))
return
}
...
}
}(ctx, tcpx)
As you can see,it's ok if you do it like:
func main(){
...
srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
srv.OnMessage = onMessage
...
}
func onMessage(c *tcpx.Context){
func(stream []byte){
// handle raw stream
}(c.Stream)
}
Attention: Stream has been packed per request, no pack stuck probelm.
You don't need to design message block yourself.Instead do it like: client
func main(){
var packx = tcpx.NewPackx(tcpx.JsonMarshaller{})
buf1, e := packx.Pack(5, "hello,I am client xiao ming")
buf2, e := packx.Pack(7, struct{
Username string
Age int
}{"xiaoming", 5})
...
}
If you're not golang client, see 3.5 How client (not only golang) builds expected stream?
Now, tcpx supports json,xml,protobuf,toml,yaml like:
client
var packx = tcpx.NewPackx(tcpx.JsonMarshaller{})
// var packx = tcpx.NewPackx(tcpx.XmlMarshaller{})
// var packx = tcpx.NewPackx(tcpx.ProtobufMarshaller{})
// var packx = tcpx.NewPackx(tcpx.TomlMarshaller{})
// var packx = tcpx.NewPackx(tcpx.YamlMarshaller{})
server
srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
// srv := tcpx.NewTcpX(tcpx.XmlMarshaller{})
// srv := tcpx.NewTcpX(tcpx.ProtobufMarshaller{})
// srv := tcpx.NewTcpX(tcpx.TomlMarshaller{})
// srv := tcpx.NewTcpX(tcpx.YamlMarshaller{})
if you want any marshal way else, design it like:
type OtherMarshaller struct{}
func (om OtherMarshaller) Marshal(v interface{}) ([]byte, error) {
return []byte(""), nil
}
func (om OtherMarshaller) Unmarshal(data []byte, dest interface{}) error {
return nil
}
func (om OtherMarshaller) MarshalName() string{
return "other_marshaller"
}
client
var packx = tcpx.NewPackx(OtherMarshaller{})
server
srv := tcpx.NewTcpX(tcpx.OtherMarshaller{})
Tcpx now only provide packx
realized in golang to build a client sender.If you wants to send message from other language client, you'll have two ways:
- Be aware of messageID block system and build expected stream in specific language.Refer -> 2.5 pack-detail.
- Using http gateway,refers to 5. cross-language gateway
messageID block system
:
[4]byte -- length fixed_size,binary big endian encode
[4]byte -- messageID fixed_size,binary big endian encode
[4]byte -- headerLength fixed_size,binary big endian encode
[4]byte -- bodyLength fixed_size,binary big endian encode
[]byte -- header marshal by json
[]byte -- body marshal by marshaller
Since not all marshal ways support marshal map, header are fixedly using json. Here are some language building stream: java:
//
js:
//
ruby:
//
Welcome to provides all language pack example via pull request, you can valid you result stream refers to unpack http gateway 5. cross-language gateway,
Yes! But you can't share the advantages of messageID usage.
way 1: Refer to 2.7 Raw.In this case, you must start another port and use srv.HandleRaw
.
If you have your own format stream style, which different from messageID system, you can do it like:
way 2: developing……
tcpx's official advised routing way is separating handlers by messageID, like
func main(){
srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
// request messageID 1
// response messageID 2
srv.AddHandler(1, SayHello)
if e := srv.ListenAndServe("tcp", ":7171"); e != nil {
panic(e)
}
}
func SayHello(c *tcpx.Context) {
var messageFromClient string
var messageInfo tcpx.Message
messageInfo, e := c.Bind(&messageFromClient)
if e != nil {
panic(e)
}
fmt.Println("receive messageID:", messageInfo.MessageID)
fmt.Println("receive header:", messageInfo.Header)
fmt.Println("receive body:", messageInfo.Body)
var responseMessageID int32 = 2
e = c.Reply(responseMessageID, "hello")
fmt.Println("reply:", "hello")
if e != nil {
fmt.Println(e.Error())
}
}
All methods can be refered in https://godoc.org/github.com/fwhezfwhez/tcpx Here are those frequently used methods apart by their receiver type. args omit
srv := tcpx.NewTcpX(tcpx.JsonMarshaller{})
methods | desc |
---|---|
srv.GlobalUse() | use global middleware |
srv.Use() | use a middleware |
srv.UnUse() | unUse a middleware, handlers added before this still work on unUsed middleware, handlers after don't |
srv.AddHandler() | add routed handler by messageID(int32) |
srv.ListenAndServe() | start listen on |
var c *tcpx.Context
methods | desc |
---|---|
c.Bind() | bind data of stream into official message type |
c.Reply() | reply to client via c.Conn, marshalled by c.Packx.Marshaller |
c.Next() | middleware goes to next |
c.Abort() | middleware chain stops |
c.JSON() | reply to client via c.Conn, marshalled by tcpx.JsonMarshaller |
c.XML() | reply to client via c.Conn, marshalled by tcpx.XmlMarshaller |
c.YAML() | reply to client via c.Conn, marshalled by tcpx.YamlMarshaller |
c.Protobuf() | reply to client via c.Conn, marshalled by tcpx.ProtobufMarshaller |
c.TOML() | reply to client via c.Conn, marshalled by tcpx.TomlMarshaller |
var packx *tcpx.Packx
methods | desc |
---|---|
packx.Pack() | pack data into expected stream |
packx.UnPack() | reverse above returns official message type |
packx. MessageIDOf() | get messageID of a stream block |
packx.LengthOf() | length of stream except total length, total length +4 or len(c.Stream) |
var message tcpx.Message
methods | desc |
---|---|
message.Get() | get header value by key |
message.Set() | set header value |
gateway repo: https://github.com/fwhezfwhez/tcpx/tree/master/gateway/pack-transfer
example: https://github.com/fwhezfwhez/tcpx/tree/master/examples/use-gateway
go run main.go -port 7000
run the gateway locally in port 7000 or else.
note: Each message should call once
POST http://localhost:7000/gateway/pack/transfer/
application/json
body:
{
"marshal_name":<marshal_name>,
"stream": <stream>,
"message_id": <message_id>,
"header": <header>
}
field | type | desc | example | nessessary |
---|---|---|---|---|
marshal_name | string | ranges in "json","xml", "toml", "yaml", "protobuf" |
"json" | yes |
stream | []byte | stream should be well marshalled by one of marshal_name | yes | |
message_id | int32 | int32 type messageID | 1 | yes |
header | map/object | key-value pairs | {"k1":"v1"} | no |
returns:
{
"message":<message>,
"stream":<stream>
}
field | type | desc | example | nessessary |
---|---|---|---|---|
message | string | "success" when status 200, "success", "error message" when 400/500 | "success" | yes |
stream | []byte | packed stream,when error or status not 200, no stream field | no |
example:
payload:
{"username": "hello, tcpx"} ---json--> "eyJ1c2VybmFtZSI6ImhlbGxvLCB0Y3B4In0="
request:
{
"marshal_name": "json",
"stream": "eyJ1c2VybmFtZSI6ImhlbGxvLCB0Y3B4In0=",
"message_id": 1,
"header": {
"api": "/pack/"
}
}
example response:
{
"stream": "AAAANgAAAAEAAAAQAAAAGnsiYXBpIjoiL3BhY2svIn17InVzZXJuYW1lIjoiaGVsbG8sIHRjcHgifQ=="
}
note: able to unpack many messages once.
POST http://localhost:7000/gateway/unpack/transfer/
application/json
body:
{
"marshal_name": <marshal_name>,
"stream": <stream>
}
field | type | desc | example | nessessary |
---|---|---|---|---|
marshal_name | string | ranges in "json","xml", "toml", "yaml", "protobuf" |
"json" | yes |
stream | []byte | packed stream | no |
returns:
{
"message": <message>,
"blocks" <blocks>
}
field | type | desc | example | nessessary |
---|---|---|---|---|
message | string | "success" when status 200, "success", "error message" when 400/500 | "success" | yes |
blocks | []block | unpacked blocks, when status not 200, no this field | no | |
block | obj | each message block information, when status not 200,no this field | ++ look below++ | no |
block example:
{
"message_id": 1,
"header": {"k1":"v1"},
"marshal_name": "json",
"stream": "eyJ1c2VybmFtZSI6ImhlbGxvLCB0Y3B4In0="
}
example request:
{
"marshal_name": "json",
"stream": "AAAANgAAAAEAAAAQAAAAGnsiYXBpIjoiL3BhY2svIn17InVzZXJuYW1lIjoiaGVsbG8sIHRjcHgifQ=="
}
example response:
{
"message": "success",
"blocks": [
{
"message_id": 1,
"header": {
"k1": "v1"
},
"marshal_name": "json",
"stream": "eyJ1c2VybmFtZSI6ImhlbGxvLCB0Y3B4In0="
}
]
}
to payload:
"eyJ1c2VybmFtZSI6ImhlbGxvLCB0Y3B4In0=" ---json--> {"username": "hello, tcpx"}