diff --git a/README.md b/README.md index 05f7dc9..1d7eb48 100644 --- a/README.md +++ b/README.md @@ -4,17 +4,40 @@ ## 介绍 -这里是 rotom,一个使用 Go 编写的 tiny Redis Server。 在 `v2` 版本中,项目废弃了之前版本的内嵌形式,后续将以 `net server` 的形式维护下去,一方面是为了在实践中学习 Linux 网络编程,另一方面也是为了兼容社区中大量成熟的 redis 相关工具来辅助开发。 +这里是 rotom,一个使用 Go 编写的 tiny Redis Server。基于 IO 多路复用技术,还原了 Redis 中的 AeLoop 核心事件循环机制。 -实现特性: +rotom 基于 [godis](https://github.com/archeryue/godis) 项目 -1. 基于 unix 编程实现的基于 epoll 的无锁 AeLoop 事件循环机制([参考godis](https://github.com/archeryue/godis)) +### 实现特性 + +1. 使用 unix 网络编程实现的基于 epoll 的 AeLoop 事件循环 2. 兼容 Redis RESP 协议,你可以使用任何 redis 客户端连接 rotom 3. DB hashmap 基于 [GigaCache](https://github.com/xgzlucario/GigaCache) 4. AOF 支持 5. 目前仅支持部分命令如 `ping`, `set`, `get`, `hset`, `hget` -目前的精力主要放在框架设计与优化上,短期内不会兼容更多的 commands。 +### 原理介绍 + +**IO 多路复用** + +IO多路复用是一种同时监听多个 socket 的技术,当一个或多个 socket 的读或写操作就绪时,程序会得到就绪事件通知,并进行相应的读写操作。常用的 IO多路复用机制有 select, poll, kqueue 等,在 Linux 平台上使用的是 epoll。 + +**AeLoop 事件循环** + +AeLoop(Async Event Loop) 是 Redis 的核心异步事件驱动机制,主要有以下步骤: + +1. 文件事件(FileEvent):使用 IO 多路复用处理网络 socket 上的读写事件。事件类型分为 `AE_READABLE` 和 `AE_WRIABLE` +2. 时间事件(TimeEvent):处理需要延迟执行或定时执行的任务,如每隔 `100ms` 进行过期淘汰 +3. 当事件就绪时,通过该事件绑定的回调函数进行处理 + +在 rotom 内部实现中,还原了 Redis 中的 AeLoop 事件循环机制,具体来说: + +1. 当一个新的 tcp 连接到达时,通过 `AcceptHandler` 获取该 socket 连接的 fd,并添加至事件循环,注册 `AE_READABLE` 读事件 +2. 读事件就绪时,通过 `ReadQueryFromClient` 将数据读出至 `queryBuf` +3. 通过 `ProcessQueryBuf` 从 `queryBuf` 中解析并执行对应命令 +4. 保存命令执行结果,并注册 socket fd 的 `AE_WRIABLE` 写事件 +5. 写事件就绪时,通过 `SendReplyToClient` 将所有结果写回客户端,一个写事件可能一次性写回多个读事件的结果 +6. 资源释放,并不断循环上述过程,直到服务关闭 ## 使用 @@ -26,13 +49,13 @@ git clone https://github.com/xgzlucario/rotom ``` -确保本地 golang 环境 `>= 1.22`,在项目目录下执行 `go run .` 启动服务,默认监听 `6969` 端口: +确保本地 golang 环境 `>= 1.22`,在项目目录下执行 `go run .` 启动服务,默认监听 `6379` 端口: ``` $ go run . 2024/06/05 15:26:47 cmd arguments: config=config.json, debug=false 2024/06/05 15:26:47 read config file: { - "port": 6969, + "port": 6379, "appendonly": false, "appendfilename": "appendonly.aof" } @@ -45,16 +68,16 @@ $ go run . ``` REPOSITORY TAG IMAGE ID CREATED SIZE -rotom latest 270888260e99 3 minutes ago 21.1MB +rotom latest 270888260e99 3 minutes ago 21.2MB ``` 然后启动容器: ```bash -docker run --rm -p 6969:6969 --name rotom rotom:latest +docker run --rm -p 6379:6379 --name rotom rotom:latest ``` -## 性能测试 +## Benchmark 测试将在同一台机器上运行 rotom,关闭 `appendonly`,并使用官方 `redis-benchmark` 工具测试不同命令的耗时。 @@ -62,79 +85,298 @@ docker run --rm -p 6969:6969 --name rotom rotom:latest goos: linux goarch: amd64 pkg: github.com/xgzlucario/rotom -cpu: Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz +cpu: 13th Gen Intel(R) Core(TM) i5-13600KF ``` -SET +![img](bench.png) + +ROTOM SET ```bash -$ redis-benchmark -t set -p 6969 ====== SET ====== - 100000 requests completed in 0.99 seconds + 100000 requests completed in 0.31 seconds 50 parallel clients 3 bytes payload keep alive: 1 multi-thread: no -0.00% <= 0.1 milliseconds -0.02% <= 0.2 milliseconds -96.59% <= 0.3 milliseconds -97.43% <= 0.4 milliseconds -98.82% <= 0.5 milliseconds -99.80% <= 0.6 milliseconds -99.87% <= 0.7 milliseconds -99.95% <= 0.8 milliseconds -99.97% <= 0.9 milliseconds -99.98% <= 1.0 milliseconds -100.00% <= 1.1 milliseconds -101214.58 requests per second +Latency by percentile distribution: +0.000% <= 0.039 milliseconds (cumulative count 1) +50.000% <= 0.087 milliseconds (cumulative count 66525) +75.000% <= 0.095 milliseconds (cumulative count 81798) +87.500% <= 0.103 milliseconds (cumulative count 87619) +93.750% <= 0.119 milliseconds (cumulative count 94105) +96.875% <= 0.151 milliseconds (cumulative count 96975) +98.438% <= 0.215 milliseconds (cumulative count 98491) +99.219% <= 0.295 milliseconds (cumulative count 99241) +99.609% <= 0.407 milliseconds (cumulative count 99625) +99.805% <= 0.599 milliseconds (cumulative count 99827) +99.902% <= 0.791 milliseconds (cumulative count 99906) +99.951% <= 0.935 milliseconds (cumulative count 99962) +99.976% <= 1.439 milliseconds (cumulative count 99976) +99.988% <= 1.455 milliseconds (cumulative count 99994) +99.997% <= 1.463 milliseconds (cumulative count 99999) +99.999% <= 1.471 milliseconds (cumulative count 100000) +100.000% <= 1.471 milliseconds (cumulative count 100000) + +Cumulative distribution of latencies: +87.619% <= 0.103 milliseconds (cumulative count 87619) +98.423% <= 0.207 milliseconds (cumulative count 98423) +99.267% <= 0.303 milliseconds (cumulative count 99267) +99.625% <= 0.407 milliseconds (cumulative count 99625) +99.761% <= 0.503 milliseconds (cumulative count 99761) +99.827% <= 0.607 milliseconds (cumulative count 99827) +99.856% <= 0.703 milliseconds (cumulative count 99856) +99.934% <= 0.807 milliseconds (cumulative count 99934) +99.962% <= 1.007 milliseconds (cumulative count 99962) +100.000% <= 1.503 milliseconds (cumulative count 100000) + +Summary: + throughput summary: 317460.31 requests per second + latency summary (msec): + avg min p50 p95 p99 max + 0.094 0.032 0.087 0.127 0.263 1.471 ``` -GET +ROTOM GET ```bash -$ redis-benchmark -t get -p 6969 ====== GET ====== - 100000 requests completed in 0.99 seconds + 100000 requests completed in 0.33 seconds 50 parallel clients 3 bytes payload keep alive: 1 multi-thread: no -0.00% <= 0.1 milliseconds -0.02% <= 0.2 milliseconds -97.46% <= 0.3 milliseconds -98.53% <= 0.4 milliseconds -99.61% <= 0.5 milliseconds -99.79% <= 0.6 milliseconds -99.88% <= 0.7 milliseconds -99.95% <= 0.9 milliseconds -99.98% <= 1.3 milliseconds -100.00% <= 1.3 milliseconds -101522.84 requests per second +Latency by percentile distribution: +0.000% <= 0.039 milliseconds (cumulative count 5) +50.000% <= 0.087 milliseconds (cumulative count 58882) +75.000% <= 0.095 milliseconds (cumulative count 88023) +93.750% <= 0.111 milliseconds (cumulative count 93816) +96.875% <= 0.151 milliseconds (cumulative count 97137) +98.438% <= 0.199 milliseconds (cumulative count 98621) +99.219% <= 0.255 milliseconds (cumulative count 99325) +99.609% <= 0.295 milliseconds (cumulative count 99616) +99.805% <= 0.391 milliseconds (cumulative count 99807) +99.902% <= 0.535 milliseconds (cumulative count 99917) +99.951% <= 0.991 milliseconds (cumulative count 99953) +99.976% <= 1.023 milliseconds (cumulative count 99978) +99.988% <= 1.039 milliseconds (cumulative count 99996) +99.997% <= 1.047 milliseconds (cumulative count 100000) +100.000% <= 1.047 milliseconds (cumulative count 100000) + +Cumulative distribution of latencies: +91.824% <= 0.103 milliseconds (cumulative count 91824) +98.738% <= 0.207 milliseconds (cumulative count 98738) +99.632% <= 0.303 milliseconds (cumulative count 99632) +99.825% <= 0.407 milliseconds (cumulative count 99825) +99.873% <= 0.503 milliseconds (cumulative count 99873) +99.919% <= 0.607 milliseconds (cumulative count 99919) +99.920% <= 0.703 milliseconds (cumulative count 99920) +99.921% <= 0.807 milliseconds (cumulative count 99921) +99.947% <= 0.903 milliseconds (cumulative count 99947) +99.975% <= 1.007 milliseconds (cumulative count 99975) +100.000% <= 1.103 milliseconds (cumulative count 100000) + +Summary: + throughput summary: 303030.28 requests per second + latency summary (msec): + avg min p50 p95 p99 max + 0.090 0.032 0.087 0.119 0.223 1.047 ``` -HSET +ROTOM HSET ```bash -$ redis-benchmark -t hset -p 6969 ====== HSET ====== - 100000 requests completed in 1.00 seconds + 100000 requests completed in 0.30 seconds 50 parallel clients 3 bytes payload keep alive: 1 multi-thread: no -0.00% <= 0.2 milliseconds -97.90% <= 0.3 milliseconds -98.54% <= 0.4 milliseconds -99.20% <= 0.5 milliseconds -99.63% <= 0.6 milliseconds -99.72% <= 0.7 milliseconds -99.88% <= 0.8 milliseconds -99.93% <= 0.9 milliseconds -99.95% <= 1.0 milliseconds -99.97% <= 1.4 milliseconds -100.00% <= 1.4 milliseconds -100300.91 requests per second +Latency by percentile distribution: +0.000% <= 0.031 milliseconds (cumulative count 2) +50.000% <= 0.079 milliseconds (cumulative count 65935) +75.000% <= 0.087 milliseconds (cumulative count 82455) +87.500% <= 0.095 milliseconds (cumulative count 89086) +93.750% <= 0.111 milliseconds (cumulative count 94258) +96.875% <= 0.135 milliseconds (cumulative count 97222) +98.438% <= 0.167 milliseconds (cumulative count 98445) +99.219% <= 0.215 milliseconds (cumulative count 99257) +99.609% <= 0.351 milliseconds (cumulative count 99616) +99.805% <= 0.575 milliseconds (cumulative count 99812) +99.902% <= 0.671 milliseconds (cumulative count 99903) +99.951% <= 0.855 milliseconds (cumulative count 99958) +99.976% <= 0.887 milliseconds (cumulative count 99989) +99.994% <= 0.951 milliseconds (cumulative count 99996) +99.997% <= 0.959 milliseconds (cumulative count 100000) +100.000% <= 0.959 milliseconds (cumulative count 100000) + +Cumulative distribution of latencies: +92.230% <= 0.103 milliseconds (cumulative count 92230) +99.152% <= 0.207 milliseconds (cumulative count 99152) +99.525% <= 0.303 milliseconds (cumulative count 99525) +99.729% <= 0.407 milliseconds (cumulative count 99729) +99.781% <= 0.503 milliseconds (cumulative count 99781) +99.863% <= 0.607 milliseconds (cumulative count 99863) +99.918% <= 0.703 milliseconds (cumulative count 99918) +99.990% <= 0.903 milliseconds (cumulative count 99990) +100.000% <= 1.007 milliseconds (cumulative count 100000) + +Summary: + throughput summary: 328947.38 requests per second + latency summary (msec): + avg min p50 p95 p99 max + 0.085 0.024 0.079 0.119 0.199 0.959 +``` + +REDIS SET + +```bash +====== SET ====== + 100000 requests completed in 0.37 seconds + 50 parallel clients + 3 bytes payload + keep alive: 1 + host configuration "save": 3600 1 300 100 60 10000 + host configuration "appendonly": no + multi-thread: no + +Latency by percentile distribution: +0.000% <= 0.031 milliseconds (cumulative count 2) +50.000% <= 0.095 milliseconds (cumulative count 66242) +75.000% <= 0.103 milliseconds (cumulative count 86834) +87.500% <= 0.111 milliseconds (cumulative count 94421) +96.875% <= 0.127 milliseconds (cumulative count 97184) +98.438% <= 0.159 milliseconds (cumulative count 98449) +99.219% <= 0.207 milliseconds (cumulative count 99221) +99.609% <= 0.311 milliseconds (cumulative count 99640) +99.805% <= 0.519 milliseconds (cumulative count 99830) +99.902% <= 0.727 milliseconds (cumulative count 99909) +99.951% <= 0.839 milliseconds (cumulative count 99953) +99.976% <= 0.983 milliseconds (cumulative count 99998) +99.998% <= 0.991 milliseconds (cumulative count 99999) +99.999% <= 0.999 milliseconds (cumulative count 100000) +100.000% <= 0.999 milliseconds (cumulative count 100000) + +Cumulative distribution of latencies: +86.834% <= 0.103 milliseconds (cumulative count 86834) +99.221% <= 0.207 milliseconds (cumulative count 99221) +99.609% <= 0.303 milliseconds (cumulative count 99609) +99.727% <= 0.407 milliseconds (cumulative count 99727) +99.763% <= 0.503 milliseconds (cumulative count 99763) +99.841% <= 0.607 milliseconds (cumulative count 99841) +99.891% <= 0.703 milliseconds (cumulative count 99891) +99.941% <= 0.807 milliseconds (cumulative count 99941) +99.969% <= 0.903 milliseconds (cumulative count 99969) +100.000% <= 1.007 milliseconds (cumulative count 100000) + +Summary: + throughput summary: 273972.59 requests per second + latency summary (msec): + avg min p50 p95 p99 max + 0.096 0.024 0.095 0.119 0.191 0.999 +``` + +REDIS GET + +```bash +====== GET ====== + 100000 requests completed in 0.37 seconds + 50 parallel clients + 3 bytes payload + keep alive: 1 + host configuration "save": 3600 1 300 100 60 10000 + host configuration "appendonly": no + multi-thread: no + +Latency by percentile distribution: +0.000% <= 0.039 milliseconds (cumulative count 7) +50.000% <= 0.095 milliseconds (cumulative count 57828) +75.000% <= 0.103 milliseconds (cumulative count 82527) +87.500% <= 0.111 milliseconds (cumulative count 93180) +93.750% <= 0.119 milliseconds (cumulative count 96200) +96.875% <= 0.127 milliseconds (cumulative count 97042) +98.438% <= 0.167 milliseconds (cumulative count 98596) +99.219% <= 0.199 milliseconds (cumulative count 99315) +99.609% <= 0.231 milliseconds (cumulative count 99673) +99.805% <= 0.295 milliseconds (cumulative count 99808) +99.902% <= 0.471 milliseconds (cumulative count 99903) +99.951% <= 0.527 milliseconds (cumulative count 99963) +99.976% <= 0.567 milliseconds (cumulative count 99977) +99.988% <= 0.703 milliseconds (cumulative count 99991) +99.994% <= 0.711 milliseconds (cumulative count 99996) +99.997% <= 0.719 milliseconds (cumulative count 100000) +100.000% <= 0.719 milliseconds (cumulative count 100000) + +Cumulative distribution of latencies: +82.527% <= 0.103 milliseconds (cumulative count 82527) +99.451% <= 0.207 milliseconds (cumulative count 99451) +99.825% <= 0.303 milliseconds (cumulative count 99825) +99.890% <= 0.407 milliseconds (cumulative count 99890) +99.937% <= 0.503 milliseconds (cumulative count 99937) +99.980% <= 0.607 milliseconds (cumulative count 99980) +99.991% <= 0.703 milliseconds (cumulative count 99991) +100.000% <= 0.807 milliseconds (cumulative count 100000) + +Summary: + throughput summary: 269541.78 requests per second + latency summary (msec): + avg min p50 p95 p99 max + 0.097 0.032 0.095 0.119 0.191 0.719 +``` + +REDIS HSET + +```bash +====== HSET ====== + 100000 requests completed in 0.36 seconds + 50 parallel clients + 3 bytes payload + keep alive: 1 + host configuration "save": 3600 1 300 100 60 10000 + host configuration "appendonly": no + multi-thread: no + +Latency by percentile distribution: +0.000% <= 0.031 milliseconds (cumulative count 2) +50.000% <= 0.095 milliseconds (cumulative count 68152) +75.000% <= 0.103 milliseconds (cumulative count 87772) +93.750% <= 0.111 milliseconds (cumulative count 94344) +96.875% <= 0.135 milliseconds (cumulative count 97131) +98.438% <= 0.183 milliseconds (cumulative count 98442) +99.219% <= 0.255 milliseconds (cumulative count 99252) +99.609% <= 0.351 milliseconds (cumulative count 99626) +99.805% <= 0.519 milliseconds (cumulative count 99817) +99.902% <= 0.607 milliseconds (cumulative count 99903) +99.951% <= 1.311 milliseconds (cumulative count 99955) +99.976% <= 1.399 milliseconds (cumulative count 99977) +99.988% <= 1.455 milliseconds (cumulative count 99988) +99.994% <= 1.479 milliseconds (cumulative count 99995) +99.997% <= 1.495 milliseconds (cumulative count 99997) +99.998% <= 1.511 milliseconds (cumulative count 99999) +99.999% <= 1.527 milliseconds (cumulative count 100000) +100.000% <= 1.527 milliseconds (cumulative count 100000) + +Cumulative distribution of latencies: +87.772% <= 0.103 milliseconds (cumulative count 87772) +98.811% <= 0.207 milliseconds (cumulative count 98811) +99.485% <= 0.303 milliseconds (cumulative count 99485) +99.681% <= 0.407 milliseconds (cumulative count 99681) +99.770% <= 0.503 milliseconds (cumulative count 99770) +99.903% <= 0.607 milliseconds (cumulative count 99903) +99.904% <= 0.703 milliseconds (cumulative count 99904) +99.931% <= 0.807 milliseconds (cumulative count 99931) +99.950% <= 1.207 milliseconds (cumulative count 99950) +99.951% <= 1.303 milliseconds (cumulative count 99951) +99.979% <= 1.407 milliseconds (cumulative count 99979) +99.997% <= 1.503 milliseconds (cumulative count 99997) +100.000% <= 1.607 milliseconds (cumulative count 100000) + +Summary: + throughput summary: 274725.28 requests per second + latency summary (msec): + avg min p50 p95 p99 max + 0.096 0.024 0.095 0.119 0.223 1.527 ``` \ No newline at end of file diff --git a/ae.go b/ae.go index 1f82077..bcf9897 100644 --- a/ae.go +++ b/ae.go @@ -48,7 +48,7 @@ type AeLoop struct { stop bool } -var fe2ep [3]uint32 = [3]uint32{0, unix.EPOLLIN, unix.EPOLLOUT} +var fe2ep = [3]uint32{0, unix.EPOLLIN, unix.EPOLLOUT} func getFeKey(fd int, mask FeType) int { if mask == AE_READABLE { diff --git a/aof.go b/aof.go index a1ac5a0..3b08fa2 100644 --- a/aof.go +++ b/aof.go @@ -4,6 +4,8 @@ import ( "bytes" "io" "os" + + "github.com/tidwall/mmap" ) const ( @@ -44,8 +46,17 @@ func (aof *Aof) Flush() error { } func (aof *Aof) Read(fn func(value Value)) error { + // Read file data by mmap. + data, err := mmap.Open(aof.filePath, false) + if len(data) == 0 { + return nil + } + if err != nil { + return err + } + // Iterate over the records in the file, applying the function to each. - reader := NewResp(aof.file) + reader := NewResp(data) for { value, err := reader.Read() if err != nil { diff --git a/bench.png b/bench.png new file mode 100644 index 0000000..adccb96 Binary files /dev/null and b/bench.png differ diff --git a/go.mod b/go.mod index c9f048c..898e1cb 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/rs/zerolog v1.33.0 github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980 github.com/stretchr/testify v1.9.0 + github.com/tidwall/mmap v0.3.0 github.com/xgzlucario/GigaCache v0.0.0-20240605031700-e88a04a9dd84 github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b golang.org/x/sys v0.21.0 @@ -19,6 +20,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/edsrzf/mmap-go v1.1.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/go.sum b/go.sum index ab8b1df..c95c6d0 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,8 @@ github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80N github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ= +github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= @@ -47,12 +49,15 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tidwall/mmap v0.3.0 h1:XXt1YsiXCF5/UAu3pLbu6g7iulJ9jsbs6vt7UpiV0sY= +github.com/tidwall/mmap v0.3.0/go.mod h1:2/dNzF5zA+te/JVHfrqNLcRkb8LjdH3c80vYHFQEZRk= github.com/xgzlucario/GigaCache v0.0.0-20240605031700-e88a04a9dd84 h1:YZQ7pvAASgoW0FsOF4pXkzgdWJQSS7j4JsOaU8Oc724= github.com/xgzlucario/GigaCache v0.0.0-20240605031700-e88a04a9dd84/go.mod h1:sPwGPAuvd9WdiONTmusXGNocqcY5L/J7+st1upAMlX8= github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b h1:C/+nN/kFJ6yrmEhIu+5Ra2jx/W8w+Ayu8pTiZfuU5Xc= github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b/go.mod h1:1ZgyZNk91XIllYdOPpwP+9L2RCw6QGSy6alTYF+Z0iU= golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 h1:LoYXNGAShUG3m/ehNk4iFctuhGX/+R1ZpfJ4/ia80JM= golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/handler.go b/handler.go index bfe9b75..4d34775 100644 --- a/handler.go +++ b/handler.go @@ -7,18 +7,18 @@ import ( ) func pingCommand(_ []Value) Value { - return Value{typ: STRING, str: "PONG"} + return Value{typ: STRING, raw: []byte("PONG")} } func setCommand(args []Value) Value { - key := args[0].bulk - value := args[1].bulk - db.strs.Set(string(key), value) + key := args[0].ToString() + value := args[1].ToBytes() + db.strs.Set(key, value) return ValueOK } func getCommand(args []Value) Value { - key := string(args[0].bulk) + key := args[0].ToString() value, _, ok := db.strs.Get(key) if ok { @@ -33,7 +33,7 @@ func getCommand(args []Value) Value { } func hsetCommand(args []Value) Value { - hash := string(args[0].bulk) + hash := args[0].ToString() args = args[1:] // check arguments number @@ -48,9 +48,9 @@ func hsetCommand(args []Value) Value { var newFields int for i := 0; i < len(args); i += 2 { - key := args[i].bulk - value := args[i+1].bulk - if hmap.Set(string(key), value) { + key := args[i].ToString() + value := args[i+1].ToBytes() + if hmap.Set(key, value) { newFields++ } } @@ -58,8 +58,8 @@ func hsetCommand(args []Value) Value { } func hgetCommand(args []Value) Value { - hash := string(args[0].bulk) - key := string(args[1].bulk) + hash := args[0].ToString() + key := args[1].ToString() hmap, err := fetchMap(hash) if err != nil { @@ -73,7 +73,7 @@ func hgetCommand(args []Value) Value { } func hdelCommand(args []Value) Value { - hash := string(args[0].bulk) + hash := args[0].ToString() keys := args[1:] hmap, err := fetchMap(hash) @@ -82,7 +82,7 @@ func hdelCommand(args []Value) Value { } var success int for _, v := range keys { - if hmap.Remove(string(v.bulk)) { + if hmap.Remove(v.ToString()) { success++ } } @@ -90,7 +90,7 @@ func hdelCommand(args []Value) Value { } func hgetallCommand(args []Value) Value { - hash := string(args[0].bulk) + hash := args[0].ToString() hmap, err := fetchMap(hash) if err != nil { @@ -99,8 +99,8 @@ func hgetallCommand(args []Value) Value { res := make([]Value, 0, 8) hmap.Scan(func(key, value []byte) { - res = append(res, Value{typ: BULK, bulk: key}) - res = append(res, Value{typ: BULK, bulk: value}) + res = append(res, newBulkValue(key)) + res = append(res, newBulkValue(value)) }) return newArrayValue(res) } diff --git a/handler_test.go b/handler_test.go index 3c4a003..ce94bb6 100644 --- a/handler_test.go +++ b/handler_test.go @@ -29,13 +29,13 @@ func TestHandler(t *testing.T) { assert := assert.New(t) go startup() - time.Sleep(time.Second / 3) + time.Sleep(time.Second / 5) // wait for client starup rdb := redis.NewClient(&redis.Options{ Addr: ":20082", }) - time.Sleep(time.Second / 3) + time.Sleep(time.Second / 5) t.Run("ping", func(t *testing.T) { res, _ := rdb.Ping(ctx).Result() diff --git a/resp.go b/resp.go index e9b39f5..d1bcbe3 100644 --- a/resp.go +++ b/resp.go @@ -1,7 +1,6 @@ package main import ( - "bufio" "bytes" "fmt" "io" @@ -20,7 +19,7 @@ const ( var ( CRLF = []byte("\r\n") - ValueOK = Value{typ: STRING, str: "OK"} + ValueOK = Value{typ: STRING, raw: []byte("OK")} ValueNull = Value{typ: NULL} ) @@ -28,35 +27,50 @@ var ( // Value represents the different types of RESP (Redis Serialization Protocol) values. type Value struct { typ byte // Type of value ('string', 'error', 'integer', 'bulk', 'array', 'null') - str string // Used for string and error types - num int64 // Used for integer type - bulk []byte // Used for bulk strings + raw []byte // Used for string, error, integer ad bulk strings array []Value // Used for arrays of nested values } +// Resp is a parser for RESP encoded data. +// It is a ZERO-COPY parser. type Resp struct { - reader *bufio.Reader + b []byte } // NewResp creates a new Resp object with a buffered reader. -func NewResp(rd io.Reader) *Resp { - return &Resp{reader: bufio.NewReader(rd)} +// DO NOT EDIT the `input` param because it will be referenced during read. +func NewResp(input []byte) *Resp { + return &Resp{b: input} +} + +func newErrValue(err error) Value { + return Value{typ: ERROR, raw: []byte(err.Error())} +} + +func newBulkValue(bulk []byte) Value { + if bulk == nil { + return Value{typ: NULL} + } + return Value{typ: BULK, raw: bulk} +} + +func newIntegerValue(n int) Value { + format := strconv.Itoa(n) + return Value{typ: INTEGER, raw: []byte(format)} +} + +func newArrayValue(value []Value) Value { + return Value{typ: ARRAY, array: value} } // readLine reads a line ending with CRLF from the reader. -func (r *Resp) readLine() (line []byte, n int, err error) { - for { - b, err := r.reader.ReadByte() - if err != nil { - return nil, 0, err - } - n += 1 - line = append(line, b) - if len(line) >= 2 && line[len(line)-2] == '\r' && line[len(line)-1] == '\n' { - break - } +func (r *Resp) readLine() ([]byte, int, error) { + before, after, found := bytes.Cut(r.b, CRLF) + if found { + r.b = after + return before, len(before) + 2, nil } - return line[:len(line)-2], n, nil // Trim the CRLF at the end + return nil, 0, ErrCRLFNotFound } // readInteger reads an integer value following the ':' prefix. @@ -72,9 +86,18 @@ func (r *Resp) readInteger() (x int, n int, err error) { return int(i64), n, nil } +func (r *Resp) readByte() (byte, error) { + if len(r.b) == 0 { + return 0, io.EOF + } + b := r.b[0] + r.b = r.b[1:] + return b, nil +} + // Read parses the next RESP value from the stream. func (r *Resp) Read() (Value, error) { - _type, err := r.reader.ReadByte() + _type, err := r.readByte() if err != nil { return Value{}, err } @@ -85,81 +108,62 @@ func (r *Resp) Read() (Value, error) { case BULK: return r.readBulk() case INTEGER: - n, _, err := r.readInteger() + len, _, err := r.readInteger() if err != nil { return Value{}, err } else { - return newIntegerValue(n), nil + return newIntegerValue(len), nil } default: - return Value{}, fmt.Errorf("unknown value type %v", _type) + return Value{}, fmt.Errorf("%w: %c", ErrUnknownType, _type) } } // readArray reads an array prefixed with '*' from the stream. func (r *Resp) readArray() (Value, error) { - v := Value{typ: ARRAY} + value := Value{typ: ARRAY} - len, _, err := r.readInteger() + n, _, err := r.readInteger() if err != nil { - return v, err + return Value{}, err } - v.array = make([]Value, len) - for i := 0; i < len; i++ { - val, err := r.Read() + value.array = make([]Value, n) + for i := range value.array { + v, err := r.Read() if err != nil { - return v, err + return Value{}, err } - v.array[i] = val + value.array[i] = v } - return v, nil + return value, nil } // readBulk reads a bulk string prefixed with '$' from the stream. func (r *Resp) readBulk() (Value, error) { - v := Value{typ: BULK} + value := Value{typ: BULK} - len, _, err := r.readInteger() + n, _, err := r.readInteger() if err != nil { - return v, err + return Value{}, err } - if len == -1 { // RESP Bulk strings can be null, indicated by "$-1" - return Value{typ: NULL}, nil + if n == -1 { // RESP Bulk strings can be null, indicated by "$-1" + return Value{typ: NULL}, err } - bulk := make([]byte, len) - _, err = io.ReadFull(r.reader, bulk) // Use ReadFull to ensure we read exactly 'len' bytes - if err != nil { - return v, err - } - v.bulk = bulk + value.raw = r.b[:n] + r.b = r.b[n:] r.readLine() // Read the trailing CRLF - return v, nil -} - -func newErrValue(err error) Value { - return Value{typ: ERROR, str: err.Error()} -} - -func newBulkValue(bulk []byte) Value { - if bulk == nil { - return Value{typ: NULL} - } - return Value{typ: BULK, bulk: bulk} + return value, nil } -func newIntegerValue(n int) Value { - return Value{typ: INTEGER, num: int64(n)} -} +func (v Value) ToString() string { return string(v.raw) } -func newArrayValue(value []Value) Value { - return Value{typ: ARRAY, array: value} -} +func (v Value) ToBytes() []byte { return v.raw } // Marshal converts a Value object into its corresponding RESP bytes. func (v Value) Marshal() []byte { @@ -184,7 +188,7 @@ func (v Value) Marshal() []byte { func (v Value) marshalInteger() []byte { w := bytes.NewBuffer(nil) w.WriteByte(INTEGER) - w.WriteString(strconv.FormatInt(v.num, 10)) + w.Write(v.raw) w.Write(CRLF) return w.Bytes() } @@ -193,7 +197,7 @@ func (v Value) marshalInteger() []byte { func (v Value) marshalString() []byte { w := bytes.NewBuffer(nil) w.WriteByte(STRING) - w.WriteString(v.str) + w.Write(v.raw) w.Write(CRLF) return w.Bytes() } @@ -202,9 +206,9 @@ func (v Value) marshalString() []byte { func (v Value) marshalBulk() []byte { w := bytes.NewBuffer(nil) w.WriteByte(BULK) - w.WriteString(strconv.Itoa(len(v.bulk))) + w.WriteString(strconv.Itoa(len(v.raw))) w.Write(CRLF) - w.Write(v.bulk) + w.Write(v.raw) w.Write(CRLF) return w.Bytes() } @@ -225,7 +229,7 @@ func (v Value) marshalArray() []byte { func (v Value) marshallError() []byte { w := bytes.NewBuffer(nil) w.WriteByte(ERROR) - w.WriteString(v.str) + w.Write(v.raw) w.Write(CRLF) return w.Bytes() } diff --git a/resp_test.go b/resp_test.go index 10b5230..a58e74a 100644 --- a/resp_test.go +++ b/resp_test.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "errors" "testing" @@ -16,7 +15,7 @@ func TestValue(t *testing.T) { data := value.Marshal() assert.Equal(string(data), "+OK\r\n") - _, err := NewResp(bytes.NewReader(data)).Read() + _, err := NewResp(data).Read() assert.NotNil(err) }) @@ -25,7 +24,7 @@ func TestValue(t *testing.T) { data := value.Marshal() assert.Equal(string(data), "-err message\r\n") - _, err := NewResp(bytes.NewReader(data)).Read() + _, err := NewResp(data).Read() assert.NotNil(err) }) @@ -34,7 +33,7 @@ func TestValue(t *testing.T) { data := value.Marshal() assert.Equal(string(data), "$5\r\nhello\r\n") { - value2, err := NewResp(bytes.NewReader(data)).Read() + value2, err := NewResp(data).Read() assert.Nil(err) assert.Equal(value, value2) } @@ -44,7 +43,7 @@ func TestValue(t *testing.T) { data = value.Marshal() assert.Equal(string(data), "$0\r\n\r\n") { - value2, err := NewResp(bytes.NewReader(data)).Read() + value2, err := NewResp(data).Read() assert.Nil(err) assert.Equal(value, value2) } @@ -54,7 +53,7 @@ func TestValue(t *testing.T) { data = value.Marshal() assert.Equal(string(data), "$-1\r\n") { - value2, err := NewResp(bytes.NewReader(data)).Read() + value2, err := NewResp(data).Read() assert.Nil(err) assert.Equal(value, value2) } @@ -65,35 +64,35 @@ func TestValue(t *testing.T) { data := value.Marshal() assert.Equal(string(data), ":1\r\n") - value2, err := NewResp(bytes.NewReader(data)).Read() + value2, err := NewResp(data).Read() assert.Nil(err) assert.Equal(value, value2) }) t.Run("array-value", func(t *testing.T) { value := newArrayValue([]Value{ - {typ: INTEGER, num: 1}, - {typ: INTEGER, num: 2}, - {typ: INTEGER, num: 3}, - {typ: BULK, bulk: []byte("hello")}, - {typ: BULK, bulk: []byte("world")}, + newIntegerValue(1), + newIntegerValue(2), + newIntegerValue(3), + newBulkValue([]byte("hello")), + newBulkValue([]byte("world")), }) data := value.Marshal() assert.Equal(string(data), "*5\r\n:1\r\n:2\r\n:3\r\n$5\r\nhello\r\n$5\r\nworld\r\n") - value2, err := NewResp(bytes.NewReader(data)).Read() + value2, err := NewResp(data).Read() assert.Nil(err) assert.Equal(value, value2) }) t.Run("error-value", func(t *testing.T) { // read nil - _, err := NewResp(bytes.NewReader(nil)).Read() + _, err := NewResp(nil).Read() assert.NotNil(err) for _, prefix := range []byte{BULK, INTEGER, ARRAY} { data := append([]byte{prefix}, "an error message"...) - _, err := NewResp(bytes.NewReader(data)).Read() + _, err := NewResp(data).Read() assert.NotNil(err) } diff --git a/rotom.go b/rotom.go index 8f852ff..a742a4e 100644 --- a/rotom.go +++ b/rotom.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "container/list" "fmt" "strings" @@ -10,10 +9,6 @@ import ( "github.com/xgzlucario/rotom/structx" ) -const ( - IO_BUF = 64 * KB -) - type ( Map = *structx.Map Set = *structx.Set @@ -66,7 +61,7 @@ var ( server Server // cmdTable is the list of all available commands. - cmdTable []Command = []Command{ + cmdTable []*Command = []*Command{ {"ping", pingCommand, 0, false}, {"set", setCommand, 2, true}, {"get", getCommand, 1, false}, @@ -81,7 +76,7 @@ func lookupCommand(command string) *Command { cmdStr := strings.ToLower(command) for _, c := range cmdTable { if c.name == cmdStr { - return &c + return c } } return nil @@ -112,7 +107,7 @@ func InitDB(config *Config) (err error) { // Load the initial data into memory by processing each stored command. err = db.aof.Read(func(value Value) { - command := string(value.array[0].bulk) + command := value.array[0].ToString() args := value.array[1:] cmd := lookupCommand(command) @@ -150,15 +145,13 @@ func AcceptHandler(loop *AeLoop, fd int, _ interface{}) { func ReadQueryFromClient(loop *AeLoop, fd int, extra interface{}) { client := extra.(*Client) n, err := Read(fd, client.queryBuf[client.queryLen:]) - if err != nil { + if n == 0 || err != nil { logger.Error().Msgf("client %v read err: %v", fd, err) freeClient(client) return } - if n > 0 { - client.queryLen += n - ProcessQueryBuf(client) - } + client.queryLen += n + ProcessQueryBuf(client) } func resetClient(client *Client) { @@ -174,7 +167,7 @@ func freeClient(client *Client) { func ProcessQueryBuf(client *Client) { queryBuf := client.queryBuf[:client.queryLen] - resp := NewResp(bytes.NewReader(queryBuf)) + resp := NewResp(queryBuf) value, err := resp.Read() if err != nil { @@ -182,18 +175,17 @@ func ProcessQueryBuf(client *Client) { return } - command := value.array[0].bulk + command := value.array[0].ToString() args := value.array[1:] var res Value // look up for command - cmd := lookupCommand(string(command)) + cmd := lookupCommand(command) if cmd != nil { res = cmd.processCommand(args) if server.config.AppendOnly && cmd.persist && res.typ != ERROR { db.aof.Write(queryBuf) } - } else { res = newErrValue(fmt.Errorf("invalid command: %s", command)) } @@ -239,7 +231,6 @@ func initServer(config *Config) (err error) { // ServerCronFlush flush aof file for every second. func ServerCronFlush(loop *AeLoop, id int, extra interface{}) { err := db.aof.Flush() - logger.Debug().Msg("flush") if err != nil { logger.Error().Msgf("flush aof buffer error: %v", err) }