Skip to content

Commit

Permalink
opt: optimize client argsBuf alloc
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Jun 20, 2024
1 parent 37e1a8f commit 686341a
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ test-cover:
rm coverage.txt

pprof:
go tool pprof -http=:18081 "http://172.17.21.2:6060/debug/pprof/profile?seconds=30"
go tool pprof -http=:18081 "http://192.168.1.6:6060/debug/pprof/profile?seconds=30"

heap:
go tool pprof http://192.168.1.6:6060/debug/pprof/heap
Expand Down
2 changes: 1 addition & 1 deletion aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (aof *Aof) Read(fn func(args []RESP)) error {

// Iterate over the records in the file, applying the function to each.
reader := NewReader(data)
argsBuf := make([]RESP, 3)
argsBuf := make([]RESP, 8)
for {
values, err := reader.ReadNextCommand(argsBuf)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@ import (
)

func startup() {
config := &Config{Port: 20082}
config := &Config{
Port: 20082,
}
if err := InitDB(config); err != nil {
log.Panic().Msgf("init db error: %v", err)
}
if err := initServer(config); err != nil {
log.Panic().Msgf("init server error: %v", err)
}
server.aeLoop.AddFileEvent(server.fd, AE_READABLE, AcceptHandler, nil)
server.aeLoop.AddTimeEvent(AE_NORMAL, 100, ServerCronFlush, nil)
server.aeLoop.AddTimeEvent(AE_NORMAL, 100, ServerCronEvict, nil)
server.aeLoop.AeMain()
}

Expand Down
18 changes: 0 additions & 18 deletions dict/bench_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dict

import (
"maps"
"testing"
)

Expand Down Expand Up @@ -104,20 +103,3 @@ func BenchmarkRemove(b *testing.B) {
}
})
}

func BenchmarkMigrate(b *testing.B) {
b.Run("stdmap", func(b *testing.B) {
m := getStdmap(10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
maps.Clone(m)
}
})
b.Run("dict", func(b *testing.B) {
m := getDict(10000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Migrate()
}
})
}
2 changes: 1 addition & 1 deletion dict/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Options struct {
var DefaultOptions = Options{
ShardCount: 1024,
IndexSize: 1024,
BufferSize: 64 * KB,
BufferSize: 32 * KB,
MigrateRatio: 0.4,
}

Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func main() {
if err != nil {
log.Fatal().Msgf("load config error: %v", err)
}

log.Debug().Bool("appendonly", config.AppendOnly).Int("port", config.Port).Msg("read config file")

if err = initServer(config); err != nil {
log.Fatal().Msgf("init server error: %v", err)
}
Expand Down
35 changes: 18 additions & 17 deletions rotom.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
)

const (
READER_BUF_SIZE = 16 * KB
WRITER_BUF_SIZE = 4 * KB
READ_BUF_SIZE = 16 * KB
WRITE_BUF_SIZE = 4 * KB
MAX_READER_SIZE = 4 * KB
)

Expand All @@ -28,10 +28,11 @@ type DB struct {
}

type Client struct {
fd int
queryLen int
queryBuf []byte
replyBuf *RESPWriter
fd int
queryLen int
queryBuf []byte
argsBuf []RESP
replyWriter *RESPWriter
}

type Server struct {
Expand Down Expand Up @@ -60,7 +61,7 @@ func InitDB(config *Config) (err error) {
log.Debug().Msg("start loading aof file...")

// Load the initial data into memory by processing each stored command.
emptyWriter := NewWriter(WRITER_BUF_SIZE)
emptyWriter := NewWriter(WRITE_BUF_SIZE)
return db.aof.Read(func(args []RESP) {
command := args[0].ToString()

Expand All @@ -84,9 +85,10 @@ func AcceptHandler(loop *AeLoop, fd int, _ interface{}) {
}
// create client
client := &Client{
fd: cfd,
replyBuf: NewWriter(WRITER_BUF_SIZE),
queryBuf: make([]byte, READER_BUF_SIZE),
fd: cfd,
replyWriter: NewWriter(WRITE_BUF_SIZE),
queryBuf: make([]byte, READ_BUF_SIZE),
argsBuf: make([]RESP, 8),
}
server.clients[cfd] = client
loop.AddFileEvent(cfd, AE_READABLE, ReadQueryFromClient, client)
Expand Down Expand Up @@ -130,11 +132,10 @@ func freeClient(client *Client) {

func ProcessQueryBuf(client *Client) {
queryBuf := client.queryBuf[:client.queryLen]
argsBuf := make([]RESP, 3)

reader := NewReader(queryBuf)
for {
args, err := reader.ReadNextCommand(argsBuf)
args, err := reader.ReadNextCommand(client.argsBuf)
if err != nil {
if err == io.EOF {
break
Expand All @@ -149,13 +150,13 @@ func ProcessQueryBuf(client *Client) {
// lookup for command
cmd := lookupCommand(command)
if cmd != nil {
cmd.processCommand(client.replyBuf, args)
if server.config.AppendOnly && cmd.persist { // TODO: optimize error RESP
cmd.processCommand(client.replyWriter, args)
if server.config.AppendOnly && cmd.persist { // TODO: optimize AOF operation
db.aof.Write(queryBuf)
}
} else {
err := ErrUnknownCommand(command)
client.replyBuf.WriteError(err)
client.replyWriter.WriteError(err)
log.Warn().Msgf("%v", err)
}
}
Expand All @@ -168,7 +169,7 @@ func ProcessQueryBuf(client *Client) {

func SendReplyToClient(loop *AeLoop, fd int, extra interface{}) {
client := extra.(*Client)
sentbuf := client.replyBuf.b.Bytes()
sentbuf := client.replyWriter.b.Bytes()

n, err := Write(fd, sentbuf)
if err != nil {
Expand All @@ -180,7 +181,7 @@ func SendReplyToClient(loop *AeLoop, fd int, extra interface{}) {
log.Error().Msgf("send packet size error: %d %d", n, len(sentbuf))
}

client.replyBuf.Reset()
client.replyWriter.Reset()
loop.RemoveFileEvent(fd, AE_WRITABLE)
}

Expand Down

0 comments on commit 686341a

Please sign in to comment.