Skip to content

Commit

Permalink
opt: optimize client replies list
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Jun 8, 2024
1 parent d51ac14 commit 6f78f00
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 10 deletions.
9 changes: 9 additions & 0 deletions ae.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,19 @@ func (loop *AeLoop) AeWait() (tes []*AeTimeEvent, fes []*AeFileEvent) {
if timeout <= 0 {
timeout = 10 // at least wait 10ms
}

var events [128]unix.EpollEvent
retry:
n, err := unix.EpollWait(loop.fileEventFd, events[:], int(timeout))
if err != nil {
// interrupted system call
if err == unix.EINTR {
goto retry
}
logger.Error().Msgf("epoll wait error: %v", err)
return
}

// collect file events
for _, ev := range events[:n] {
if ev.Events&unix.EPOLLIN != 0 {
Expand All @@ -198,6 +206,7 @@ func (loop *AeLoop) AeWait() (tes []*AeTimeEvent, fes []*AeFileEvent) {
}
}
}

// collect time events
now := GetMsTime()
p := loop.TimeEvents
Expand Down
3 changes: 3 additions & 0 deletions aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func (aof *Aof) Write(buf []byte) (int, error) {
}

func (aof *Aof) Flush() error {
if aof.buf.Len() == 0 {
return nil
}
aof.buf.WriteTo(aof.file)
return aof.file.Sync()
}
Expand Down
2 changes: 1 addition & 1 deletion net.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TcpServer(port int) (int, error) {
if err != nil {
return -1, err
}
err = unix.SetsockoptInt(s, unix.SOL_SOCKET, unix.SO_REUSEPORT, port)
err = unix.SetsockoptInt(s, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
if err != nil {
return -1, err
}
Expand Down
26 changes: 17 additions & 9 deletions rotom.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package main

import (
"container/list"
"fmt"
"strings"

cache "github.com/xgzlucario/GigaCache"
"github.com/xgzlucario/rotom/structx"
)

const (
DEFAULT_IO_BUF = 16 * KB
MAX_BULK = 4 * KB
)

type (
Map = *structx.Map
Set = *structx.Set
Expand All @@ -27,7 +31,7 @@ type Client struct {
fd int
queryLen int
queryBuf []byte
reply *list.List
reply []Value
}

type Server struct {
Expand Down Expand Up @@ -134,8 +138,8 @@ func AcceptHandler(loop *AeLoop, fd int, _ interface{}) {
// create client
client := &Client{
fd: cfd,
reply: list.New(),
queryBuf: make([]byte, KB),
reply: make([]Value, 0, 8),
queryBuf: make([]byte, DEFAULT_IO_BUF),
}
server.clients[cfd] = client
loop.AddFileEvent(cfd, AE_READABLE, ReadQueryFromClient, client)
Expand All @@ -144,6 +148,10 @@ func AcceptHandler(loop *AeLoop, fd int, _ interface{}) {

func ReadQueryFromClient(loop *AeLoop, fd int, extra interface{}) {
client := extra.(*Client)
// grow query buffer
if len(client.queryBuf)-client.queryLen < MAX_BULK {
client.queryBuf = append(client.queryBuf, make([]byte, MAX_BULK)...)
}
n, err := Read(fd, client.queryBuf[client.queryLen:])
if n == 0 || err != nil {
logger.Error().Msgf("client %v read err: %v", fd, err)
Expand Down Expand Up @@ -190,26 +198,26 @@ func ProcessQueryBuf(client *Client) {
res = newErrValue(fmt.Errorf("invalid command: %s", command))
}

client.reply.PushBack(res)
client.reply = append(client.reply, res)
resetClient(client)

// ADD writable event
// add writable event
server.aeLoop.AddFileEvent(client.fd, AE_WRITABLE, SendReplyToClient, client)
}

func SendReplyToClient(loop *AeLoop, fd int, extra interface{}) {
client := extra.(*Client)

// send all replies back
for client.reply.Len() > 0 {
elem := client.reply.Remove(client.reply.Front())
_, err := Write(fd, elem.(Value).Marshal())
for _, elem := range client.reply {
_, err := Write(fd, elem.Marshal())
if err != nil {
logger.Error().Msgf("send reply err: %v", err)
freeClient(client)
return
}
}
client.reply = client.reply[:0]
loop.RemoveFileEvent(fd, AE_WRITABLE)
}

Expand Down

0 comments on commit 6f78f00

Please sign in to comment.