Skip to content

Commit

Permalink
add: add quicklist implement
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Jun 22, 2024
1 parent b6f0c83 commit 6f6bc9f
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 55 deletions.
5 changes: 1 addition & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ test-cover:
go test ./... -race -coverprofile=coverage.txt -covermode=atomic
go tool cover -html=coverage.txt -o coverage.html
rm coverage.txt
rm *.aof

pprof:
go tool pprof -http=:18081 "http://192.168.1.6:6060/debug/pprof/profile?seconds=30"
Expand All @@ -21,8 +22,4 @@ build-docker:
bench:
go test -bench . -benchmem

clean:
rm -f *.aof
rm -f coverage.html

# rsync -av --exclude='.git' rotom/ 2:~/xgz/rotom
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ rotom 基于 [godis](https://github.com/archeryue/godis) 项目

### 实现特性

1. 使用 unix 网络编程实现的基于 epoll 的 AeLoop 事件循环
1. 基于 epoll 网络模型,还原了 Redis 中的 AeLoop 单线程事件循环
2. 兼容 Redis RESP 协议,你可以使用任何 redis 客户端连接 rotom
3. DB hashmap 基于 [GigaCache](https://github.com/xgzlucario/GigaCache)
3. 实现了 dict, quicklist, hash, set, zset 数据结构
4. AOF 支持
5. 目前支持所有 redis-benchmark 命令(17种)
5. 支持 17 种常用命令

### 原理介绍

Expand All @@ -24,10 +24,10 @@ IO多路复用是一种同时监听多个 socket 的技术,当一个或多个

**AeLoop 事件循环**

AeLoop(Async Event Loop) 是 Redis 的核心异步事件驱动机制,主要有以下步骤
AeLoop(Async Event Loop) 是 Redis 的核心异步事件驱动机制,主要有以下部分

1. 文件事件(FileEvent:使用 IO 多路复用处理网络 socket 上的读写事件。事件类型分为 `AE_READABLE``AE_WRIABLE`
2. 时间事件(TimeEvent:处理需要延迟执行或定时执行的任务,如每隔 `100ms` 进行过期淘汰
1. FileEvent:使用 IO 多路复用处理网络 socket 上的读写事件。事件类型分为 `AE_READABLE``AE_WRIABLE`
2. TimeEvent:处理需要延迟执行或定时执行的任务,如每隔 `100ms` 进行过期淘汰
3. 当事件就绪时,通过该事件绑定的回调函数进行处理

在 rotom 内部实现中,还原了 Redis 中的 AeLoop 事件循环机制,具体来说:
Expand Down
32 changes: 18 additions & 14 deletions ae.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import (
"golang.org/x/sys/unix"
)

type FeType int
type FeType uint32

const (
AE_READABLE FeType = iota + 1
AE_WRITABLE
AE_READABLE FeType = unix.EPOLLIN
AE_WRITABLE FeType = unix.EPOLLOUT
)

type TeType int
Expand Down Expand Up @@ -49,8 +49,6 @@ type AeLoop struct {
stop bool
}

var fe2ep = [3]uint32{0, unix.EPOLLIN, unix.EPOLLOUT}

func getFeKey(fd int, mask FeType) int {
if mask == AE_READABLE {
return fd
Expand All @@ -59,29 +57,32 @@ func getFeKey(fd int, mask FeType) int {
}
}

func (loop *AeLoop) getEpollMask(fd int) (ev uint32) {
func (loop *AeLoop) getEpollMask(fd int) (ev FeType) {
if loop.FileEvents[getFeKey(fd, AE_READABLE)] != nil {
ev |= fe2ep[AE_READABLE]
ev |= AE_READABLE
}
if loop.FileEvents[getFeKey(fd, AE_WRITABLE)] != nil {
ev |= fe2ep[AE_WRITABLE]
ev |= AE_WRITABLE
}
return
}

func (loop *AeLoop) AddFileEvent(fd int, mask FeType, proc FileProc, extra interface{}) {
// epoll ctl
ev := loop.getEpollMask(fd)
if ev&fe2ep[mask] != 0 {
if ev&mask != 0 {
// event is already registered
return
}
op := unix.EPOLL_CTL_ADD
if ev != 0 {
op = unix.EPOLL_CTL_MOD
}
ev |= fe2ep[mask]
err := unix.EpollCtl(loop.fileEventFd, op, fd, &unix.EpollEvent{Fd: int32(fd), Events: ev})
ev |= mask
err := unix.EpollCtl(loop.fileEventFd, op, fd, &unix.EpollEvent{
Fd: int32(fd),
Events: uint32(ev),
})
if err != nil {
log.Error().Msgf("epoll ctl error: %v", err)
return
Expand All @@ -99,11 +100,14 @@ func (loop *AeLoop) RemoveFileEvent(fd int, mask FeType) {
// epoll ctl
op := unix.EPOLL_CTL_DEL
ev := loop.getEpollMask(fd)
ev &= ^fe2ep[mask]
ev &= ^mask
if ev != 0 {
op = unix.EPOLL_CTL_MOD
}
err := unix.EpollCtl(loop.fileEventFd, op, fd, &unix.EpollEvent{Fd: int32(fd), Events: ev})
err := unix.EpollCtl(loop.fileEventFd, op, fd, &unix.EpollEvent{
Fd: int32(fd),
Events: uint32(ev),
})
if err != nil {
if !os.IsNotExist(err) {
log.Error().Msgf("epoll del error: %v", err)
Expand Down Expand Up @@ -237,7 +241,7 @@ func (loop *AeLoop) AeProcess(tes []*AeTimeEvent, fes []*AeFileEvent) {
}

func (loop *AeLoop) AeMain() {
for {
for !loop.stop {
loop.AeProcess(loop.AeWait())
}
}
26 changes: 16 additions & 10 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func setCommand(writer *RESPWriter, args []RESP) {
func msetCommand(writer *RESPWriter, args []RESP) {
// check arguments number
if len(args)%2 == 1 {
writer.WriteError(ErrWrongNumberArgs("hset"))
writer.WriteError(ErrWrongNumberArgs("mset"))
return
}
for i := 0; i < len(args); i += 2 {
Expand All @@ -104,17 +104,21 @@ func msetCommand(writer *RESPWriter, args []RESP) {

func incrCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToString()

val, _, ok := db.strs.Get(key)
if !ok {
db.strs.Set(key, []byte("1"))
writer.WriteInteger(1)
return
}
num, err := strconv.Atoi(b2s(val))

num, err := RESP(val).ToInt()
if err != nil {
writer.WriteError(ErrParseInteger)
return
}
num++

db.strs.Set(key, []byte(strconv.Itoa(num)))
writer.WriteInteger(num)
}
Expand All @@ -127,12 +131,14 @@ func getCommand(writer *RESPWriter, args []RESP) {
writer.WriteBulk(value)
return
}

// check extra maps
_, ok = db.extras.Get(key)
if ok {
writer.WriteError(ErrWrongType)
} else {
writer.WriteNull()
}
writer.WriteNull()
}

func hsetCommand(writer *RESPWriter, args []RESP) {
Expand Down Expand Up @@ -181,7 +187,7 @@ func hgetCommand(writer *RESPWriter, args []RESP) {
}

func hdelCommand(writer *RESPWriter, args []RESP) {
hash := args[0].ToString()
hash := args[0].ToStringUnsafe()
keys := args[1:]

hmap, err := fetchMap(hash)
Expand All @@ -199,7 +205,7 @@ func hdelCommand(writer *RESPWriter, args []RESP) {
}

func hgetallCommand(writer *RESPWriter, args []RESP) {
hash := args[0].ToString()
hash := args[0].ToStringUnsafe()

hmap, err := fetchMap(hash)
if err != nil {
Expand All @@ -224,7 +230,7 @@ func lpushCommand(writer *RESPWriter, args []RESP) {
}

for _, arg := range args[1:] {
ls.LPush(arg.ToString())
ls.LPush(arg.ToStringUnsafe())
}
writer.WriteInteger(ls.Size())
}
Expand All @@ -239,13 +245,13 @@ func rpushCommand(writer *RESPWriter, args []RESP) {
}

for _, arg := range args[1:] {
ls.RPush(arg.ToString())
ls.RPush(arg.ToStringUnsafe())
}
writer.WriteInteger(ls.Size())
}

func lpopCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToString()
key := args[0].ToStringUnsafe()

ls, err := fetchList(key)
if err != nil {
Expand All @@ -262,7 +268,7 @@ func lpopCommand(writer *RESPWriter, args []RESP) {
}

func rpopCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToString()
key := args[0].ToStringUnsafe()

ls, err := fetchList(key)
if err != nil {
Expand All @@ -279,7 +285,7 @@ func rpopCommand(writer *RESPWriter, args []RESP) {
}

func lrangeCommand(writer *RESPWriter, args []RESP) {
key := args[0].ToString()
key := args[0].ToStringUnsafe()
start, err := args[1].ToInt()
if err != nil {
writer.WriteError(err)
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/sakeven/RbTree v1.1.1
github.com/stretchr/testify v1.9.0
github.com/tidwall/mmap v0.3.0
github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b
golang.org/x/sys v0.21.0
)

Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/mmap v0.3.0 h1:XXt1YsiXCF5/UAu3pLbu6g7iulJ9jsbs6vt7UpiV0sY=
github.com/tidwall/mmap v0.3.0/go.mod h1:2/dNzF5zA+te/JVHfrqNLcRkb8LjdH3c80vYHFQEZRk=
github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b h1:C/+nN/kFJ6yrmEhIu+5Ra2jx/W8w+Ayu8pTiZfuU5Xc=
github.com/xgzlucario/quicklist v0.0.0-20240530174658-6f1a884f579b/go.mod h1:1ZgyZNk91XIllYdOPpwP+9L2RCw6QGSy6alTYF+Z0iU=
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY=
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
File renamed without changes
File renamed without changes
File renamed without changes
4 changes: 2 additions & 2 deletions internal/list/benchmark/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"runtime/debug"
"time"

"github.com/xgzlucario/quicklist"
"github.com/xgzlucario/rotom/internal/list"
)

var previousPause time.Duration
Expand Down Expand Up @@ -47,7 +47,7 @@ func main() {
}()

case "quicklist":
ls := quicklist.New()
ls := list.New()
for i := 0; i < entries; i++ {
ls.RPush(genKey(i))
}
Expand Down
20 changes: 4 additions & 16 deletions internal/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ func newNode() *Node {
return &Node{ListPack: NewListPack()}
}

func (ls *QuickList) lpush(key string) {
// LPush
func (ls *QuickList) LPush(key string) {
if len(ls.head.data)+len(key) >= maxListPackSize {
n := newNode()
n.next = ls.head
Expand All @@ -44,14 +45,8 @@ func (ls *QuickList) lpush(key string) {
ls.head.Insert(0, key)
}

// LPush
func (ls *QuickList) LPush(keys ...string) {
for _, k := range keys {
ls.lpush(k)
}
}

func (ls *QuickList) rpush(key string) {
// RPush
func (ls *QuickList) RPush(key string) {
if len(ls.tail.data)+len(key) >= maxListPackSize {
n := newNode()
ls.tail.next = n
Expand All @@ -61,13 +56,6 @@ func (ls *QuickList) rpush(key string) {
ls.tail.Insert(-1, key)
}

// RPush
func (ls *QuickList) RPush(keys ...string) {
for _, k := range keys {
ls.rpush(k)
}
}

// Index
func (ls *QuickList) Index(i int) (val string, ok bool) {
ls.Range(i, i+1, func(key []byte) bool {
Expand Down

0 comments on commit 6f6bc9f

Please sign in to comment.