Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
342 lines (253 sloc) 7.92 KB

TRON v0.1

设计目标

  • 会话管理:每个 Client-Server 建立的 TCP 连接可进行读写的管理
  • 超时重连:当 Client 请求超时失败后,尝试指定次数重连

两端通信

在 Golang 中,通过 TCP 进行两端通信是十分简洁的:

server.go

func main() {
	addr, _ := net.ResolveTCPAddr("tcp4", "localhost:8080")
	l, err := net.ListenTCP("tcp4", addr)
	if err != nil {
		log.Fatal(err)
	}
	for {
		conn, err := l.AcceptTCP()
		if err != nil {
			log.Fatal(err)
		}
		go handle(conn)
	}
}

func handle(conn *net.TCPConn) {
	buf := make([]byte, 20)
	n, err := conn.Read(buf)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("client:", string(buf[:n]))
	if _, err = conn.Write([]byte("hi")); err != nil {
		log.Fatal(err)
	}
}

client.go

func main() {
	addr, _ := net.ResolveTCPAddr("tcp4", "localhost:8080")
	conn, err := net.DialTCP("tcp4", nil, addr)
	if err != nil {
		log.Fatal(err)
	}
	if _, err = conn.Write([]byte("packet1")); err != nil {
		log.Fatal(err)
	}
	if _, err = conn.Write([]byte("packet2")); err != nil {
		log.Fatal(err)
	}

	buf := make([]byte, 20)
	n, err := conn.Read(buf)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("server: ", string(buf[:n]))
}

运行:

整个交互过程如下:

  • Server 打开本机的 8080 端口,循环监听等待连接
  • Client 与本机 8080 端口进程建立连接后后直接写入 packet1,packet2, 然后阻塞等待服务端的响应
  • Server 获取到连接后交给 handler 处理,直接写入 hi,断开本次连接

TRON 核心概念

Packet

TCP 层传输的是 []byte 类型的二进制字节流,上边例子两端都是一股脑地读写,如上 Client 端发 2 个包分别写 packet1 和 packet2,Server 端接收时是无感知的,所以接收到的是 packet1packet2

参考计网中 TCP 协议格式,将每个子包长度(偏移量)写入到 header 中,每次按照偏移量读取指定长度数据即可读取一个完整的子包,在 Tron 中也自定义了 TLV,格式如下:

// 序列id + 包类型 + 数据长度 + 原数据 + 包分隔符
// seqid + cate  + dataLen + data  + \r\n

拼包:直接依次写入即可,注意写入二进制数据只面向长度固定的类型,比如 int 要转为 int32 写入

拆包:注意 binary 读取二进制数据时,如果读取 []byte 等长度不定的类型数据时,直接使用 io.ReadFull() 读取指定长度的二进制数据即可,至于读多长,先把 dataLen 读取出来

server.go

func main() {
	addr, _ := net.ResolveTCPAddr("tcp4", "localhost:8080")
	l, err := net.ListenTCP("tcp4", addr)
	if err != nil {
		log.Fatal(err)
	}
	for {
		conn, err := l.AcceptTCP()
		if err != nil {
			log.Fatal(err)
		}
		go handle(conn)
	}
}

func handle(conn *net.TCPConn) {
	for {
		buf := make([]byte, 100)
		n, err := conn.Read(buf)
		packet, err := UnmarshalPacket(buf[:n]) // 可能存在 buf 不够长读出的包不完整的问题
		if err == io.EOF {
			return
		}
		if err != nil {
			log.Fatal(err)
		}
		fmt.Printf("client: %s\n", packet.Data)
		if _, err = conn.Write([]byte("hi")); err != nil {
			log.Fatal(err)
		}
	}
}

client.go

func main() {
	addr, _ := net.ResolveTCPAddr("tcp4", "localhost:8080")
	conn, err := net.DialTCP("tcp4", nil, addr)
	if err != nil {
		log.Fatal(err)
	}
	pack1 := NewPacket(1, []byte("packet1"))
	pack2 := NewPacket(1, []byte("packet2"))
	if _, err = conn.Write(MarshalPacket(*pack1)); err != nil {
		log.Fatal(err)
	}
	time.Sleep(1 * time.Second)
	if _, err = conn.Write(MarshalPacket(*pack2)); err != nil {
		log.Fatal(err)
	}

	buf := make([]byte, 20)
	n, err := conn.Read(buf)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println("server: ", string(buf[:n]))
}

运行:

这里不抠细节,只想说明自定义协议格式 Packet 能让两端约定好读写数据的格式,让数据流读写都可控。

Session

tron 将 Server 接收到的每个连接都视为一个会话方便管理,即将 net.TCPConn 都封装成 Session

type Session struct {
	conn      *net.TCPConn
	cr        *bufio.Reader // 连接缓冲 reader
	cw        *bufio.Writer // 连接缓冲 writer
	ReadCh    chan *Packet  // 读请求的 channel
	WriteCh   chan *Packet  // 写响应的 channel
	living    bool
	idleTimer *time.Timer
	conf      *Config
}

连接两端的连接读写都需要异步处理,于是引入缓冲 channel 和缓冲 reader writer 来可控地进行读写。方法列表:

ReadPacket()  // 持续地从 conn 中读取 Packet 并放入 readCh
WritePacket() // 持续冲 writeCh 中读取 Packet 并写入 conn

ReadPacket 读取到另一端关闭的 io.EOF 错误时,将主动关闭 Session

为了监控连接的空闲时间引入了 idleTimer,时长为 Config 中设置的最大空闲时间,当有数据读入时重置即可。

注明:session 只是对 conn 的封装,Client 和 Server 都可以用它来管理连接。

Server

Server 职责单一,监听 address 地址上的连接,并将连接封装为 session 扔给 worker 调用 handler 处理。

type Server struct {
   address   string
   handler   func(worker *Client, p *Packet)
   conf      *Config
   living    bool
   shutdown  chan struct{}
   keepAlive time.Duration
}

Run 的流程如下:

func (s *Server) run(l *LiveListener) error {
	for s.living {
		conn, err := l.AcceptTCP()
		if err != nil {
			logx.Error(err)
			continue
		}

		// 将连接分发给 server worker 处理
		serverWorker := NewClient(conn, s.conf, s.handler)
		serverWorker.Run()
	}
	return nil
}

注意 shutdown 是长度为 1 的缓冲 channel,相当于互斥锁

// 将服务器的连接关闭,不再接受新连接
func (s *Server) Shutdown() {
	s.living = false
	s.shutdown <- struct{}{} // 立刻停止
	logx.Debug("shutdown...")
}

当 Server 主动关闭时要求立刻停止,需关闭 listener,优雅的关闭方式如下:

func (l *LiveListener) AcceptTCP() (*net.TCPConn, error) {
	for {
		conn, err := l.listener.AcceptTCP()
		select {
		case <-l.closeCh:
			if err := l.listener.Close(); err != nil {
				logx.Error(err)
			}
			return nil, ERR_SERVER_CLOSED
		default:
			// 建立连接前检查下服务器是否已关闭
		}
		if err != nil {
			return nil, err
		}
		conn.SetKeepAlive(true)
		conn.SetKeepAlivePeriod(l.keepAlive)
		return conn, nil
	}
}

Server 在后边的版本会对 handler 进行拆分。

Client

结构如下:

type Client struct {
	conn       *net.TCPConn
	session    *Session
	localAddr  string
	remoteAddr string
	handler    func(cli *Client, p *Packet) // 包处理函数
	conf       *Config
}

每个 Client 都对应着一个 Session 连接,有自己的包处理函数,执行流程如下:

// 从连接中读数据,处理包,写回数据
func (c *Client) Run() {
	// 读写连接
	go c.session.ReadPacket()
	go c.session.WritePacket()

	// 处理接收到的包
	go c.handle()
}

Client 和 Serer 的交互流程如下:

  • Client 连接到 Server(如无重连只会进行一次)
  • Server 将 conn 封装成 Session 启动 Wroker 去交互处理
  • Client 最后与 Worker 进行 Packet 格式数据的交互,二者都有超时重连(回到步骤 1)

超时重连

首先说明,重连机制是二次规避策略,比如设置超时时间为 3s,重试连接的时间序列为:3s, 6s, 12s, 24s…

将 Client 的重连方法以闭包的形式放入 timer.Ticker 的 AfterFunc 进行定时执行,即可实现定时重连。

总结

tron v0.1 只是对 net.TCPConn 进行了简单的封装,加入了重连机制,事件处理机制只有一个 handler,十分简陋。

v0.2 计划:

  • 统一管理 Packet 头部的 seq 序号
  • 加入流量监控
You can’t perform that action at this time.