Skip to content

Commit

Permalink
add: support pipeline commands
Browse files Browse the repository at this point in the history
  • Loading branch information
xgzlucario committed Jun 11, 2024
1 parent a178360 commit 8293c8d
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 101 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ pprof:
go tool pprof -http=:18081 "http://192.168.1.6:6060/debug/pprof/profile?seconds=30"

heap:
go tool pprof http://localhost:6060/debug/pprof/heap
go tool pprof http://192.168.1.6:6060/debug/pprof/heap

build-docker:
docker build -t rotom .

clean:
rm -f *.aof
rm -f coverage.html

# rsync -av --exclude='.git' rotom/ 2:~/xgz/rotom
8 changes: 4 additions & 4 deletions ae.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
type FeType int

const (
AE_READABLE FeType = 1
AE_WRITABLE FeType = 2
AE_READABLE FeType = iota + 1
AE_WRITABLE
)

type TeType int

const (
AE_NORMAL TeType = 1
AE_ONCE TeType = 2
AE_NORMAL TeType = iota + 1
AE_ONCE
)

type FileProc func(loop *AeLoop, fd int, extra interface{})
Expand Down
41 changes: 41 additions & 0 deletions aof_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package main

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestAof(t *testing.T) {
assert := assert.New(t)
setCommand := []byte("*3\r\n$3\r\nset\r\n$3\r\nfoo\r\n$3\r\nbar\r\n")

t.Run("write", func(t *testing.T) {
aof, err := NewAof("test.aof")
assert.Nil(err)
defer aof.Close()

aof.Flush()
aof.Write(setCommand)
aof.Flush()
})

t.Run("read", func(t *testing.T) {
aof, err := NewAof("test.aof")
assert.Nil(err)
defer aof.Close()

aof.Read(func(value Value) {
assert.Equal(value.Append(nil), setCommand)
})
})

t.Run("read-error", func(t *testing.T) {
aof, _ := NewAof("not-exist.aof")
defer aof.Close()

aof.Read(func(value Value) {
panic("should not call")
})
})
}
2 changes: 1 addition & 1 deletion command.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func setCommand(args []Value) Value {
}

func getCommand(args []Value) Value {
key := args[0].ToString()
key := args[0].ToStringUnsafe()

value, _, ok := db.strs.Get(key)
if ok {
Expand Down
4 changes: 4 additions & 0 deletions command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,8 @@ func TestCommand(t *testing.T) {
val, _ = rdb.RPop(ctx, "list").Result()
assert.Equal(val, "f")
})

t.Run("client-closed", func(t *testing.T) {
rdb.Close()
})
}
3 changes: 0 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ func LoadConfig(path string) (config *Config, err error) {
if err != nil {
return
}

logger.Debug().Msgf("read config file: %s", jsonStr)

config = &Config{}
if err = json.Unmarshal(jsonStr, config); err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/deckarep/golang-set/v2 v2.6.0
github.com/redis/go-redis/v9 v9.5.2
github.com/rs/zerolog v1.33.0
github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980
github.com/sakeven/RbTree v1.1.1
github.com/stretchr/testify v1.9.0
github.com/tidwall/mmap v0.3.0
github.com/xgzlucario/GigaCache v0.0.0-20240605031700-e88a04a9dd84
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/f
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980 h1:t5uAkycj8WepkboiZvJzHB+FvkNj+P6Z2dEN4pFajU4=
github.com/sakeven/RbTree v0.0.0-20240321014605-9899538dc980/go.mod h1:zwEumjdcK6Q/ky/gFPqMviw1p7ZUb+B3pU4ybgOHvns=
github.com/sakeven/RbTree v1.1.1 h1:uraKm67BJYqWDOWAYVOhPsRaMSJZqFF1+bQgaeCR3so=
github.com/sakeven/RbTree v1.1.1/go.mod h1:zwEumjdcK6Q/ky/gFPqMviw1p7ZUb+B3pU4ybgOHvns=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func main() {
if server.config.AppendOnly {
server.aeLoop.AddTimeEvent(AE_NORMAL, 1000, ServerCronFlush, nil)
}
logger.Debug().Int("port", config.Port).Msg("running on")
logger.Debug().Msg("rotom server is ready to accept.")
server.aeLoop.AeMain()
}
10 changes: 5 additions & 5 deletions net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,29 @@ func TestServer(t *testing.T) {
assert := assert.New(t)
testCount := 100

t.Run("serve", func(t *testing.T) {
t.Run("echo-server", func(t *testing.T) {
fd, err := TcpServer(20083)
assert.Nil(err)

var wg sync.WaitGroup

// start listener
go func() {
for {
cfd, err := Accept(fd)
assert.Nil(err)
wg.Add(1)

// read
var buf [32]byte
n, _ := Read(cfd, buf[:])

// write
Write(cfd, buf[:n])
wg.Done()
}
}()

var wg sync.WaitGroup

// start clients
go func() {
for i := 0; i < testCount; i++ {
Expand All @@ -58,7 +60,5 @@ func TestServer(t *testing.T) {
}()

wg.Wait()

time.Sleep(time.Second)
})
}
103 changes: 52 additions & 51 deletions resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"strconv"
"unsafe"
)

const (
Expand Down Expand Up @@ -165,6 +166,10 @@ func (v Value) ToString() string {
return string(v.raw)
}

func (v Value) ToStringUnsafe() string {
return *(*string)(unsafe.Pointer(&v.raw))
}

func (v Value) ToInt() (int, error) {
return strconv.Atoi(string(v.raw))
}
Expand All @@ -173,77 +178,73 @@ func (v Value) ToBytes() []byte {
return v.raw
}

// Marshal converts a Value object into its corresponding RESP bytes.
func (v Value) Marshal() []byte {
// Append converts a Value object into its corresponding RESP bytes.
func (v Value) Append(b []byte) []byte {
switch v.typ {
case ARRAY:
return v.marshalArray()
return v.appendArray(b)
case BULK:
return v.marshalBulk()
return v.appendBulk(b)
case STRING:
return v.marshalString()
return v.appendString(b)
case INTEGER:
return v.marshalInteger()
return v.appendInteger(b)
case NULL:
return v.marshallNull()
return v.appendNull(b)
case ERROR:
return v.marshallError()
return v.appendError(b)
default:
return []byte(ErrUnknownType.Error())
return append(b, ErrUnknownType.Error()...)
}
}

func (v Value) marshalInteger() []byte {
buf := make([]byte, 0, 1+len(v.raw)+2)
buf = append(buf, INTEGER)
buf = append(buf, v.raw...)
buf = append(buf, CRLF...)
return buf
// appendInteger appends a integer value into RESP format.
func (v Value) appendInteger(b []byte) []byte {
b = append(b, INTEGER)
b = append(b, v.raw...)
b = append(b, CRLF...)
return b
}

// marshalString marshals a string value into RESP format.
func (v Value) marshalString() []byte {
buf := make([]byte, 0, 1+len(v.raw)+2)
buf = append(buf, STRING)
buf = append(buf, v.raw...)
buf = append(buf, CRLF...)
return buf
// appendString appends a string value into RESP format.
func (v Value) appendString(b []byte) []byte {
b = append(b, STRING)
b = append(b, v.raw...)
b = append(b, CRLF...)
return b
}

// marshalBulk marshals a bulk string into RESP format.
func (v Value) marshalBulk() []byte {
// appendBulk appends a bulk string into RESP format.
func (v Value) appendBulk(b []byte) []byte {
format := strconv.Itoa(len(v.raw))
buf := make([]byte, 0, 1+len(format)+2+len(v.raw)+2)
buf = append(buf, BULK)
buf = append(buf, format...)
buf = append(buf, CRLF...)
buf = append(buf, v.raw...)
buf = append(buf, CRLF...)
return buf
}

// marshalArray marshals an array of values into RESP format.
func (v Value) marshalArray() []byte {
buf := make([]byte, 0, 16)
buf = append(buf, ARRAY)
buf = append(buf, strconv.Itoa(len(v.array))...)
buf = append(buf, CRLF...)
b = append(b, BULK)
b = append(b, format...)
b = append(b, CRLF...)
b = append(b, v.raw...)
b = append(b, CRLF...)
return b
}

// appendArray appends an array of values into RESP format.
func (v Value) appendArray(b []byte) []byte {
b = append(b, ARRAY)
b = append(b, strconv.Itoa(len(v.array))...)
b = append(b, CRLF...)
for _, val := range v.array {
buf = append(buf, val.Marshal()...)
b = val.Append(b)
}
return buf
return b
}

// marshallError marshals an error message into RESP format.
func (v Value) marshallError() []byte {
buf := make([]byte, 0, 1+len(v.raw)+2)
buf = append(buf, ERROR)
buf = append(buf, v.raw...)
buf = append(buf, CRLF...)
return buf
// appendError appends an error message into RESP format.
func (v Value) appendError(b []byte) []byte {
b = append(b, ERROR)
b = append(b, v.raw...)
b = append(b, CRLF...)
return b
}

// marshallNull marshals a null value into RESP bulk string format.
func (v Value) marshallNull() []byte {
return []byte("$-1\r\n")
// appendNull appends a null value into RESP bulk string format.
func (v Value) appendNull(b []byte) []byte {
return append(b, "$-1\r\n"...)
}
16 changes: 8 additions & 8 deletions resp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestValue(t *testing.T) {

t.Run("str-value", func(t *testing.T) {
value := ValueOK
data := value.Marshal()
data := value.Append(nil)
assert.Equal(string(data), "+OK\r\n")

_, err := NewResp(data).Read()
Expand All @@ -21,7 +21,7 @@ func TestValue(t *testing.T) {

t.Run("err-value", func(t *testing.T) {
value := newErrValue(errors.New("err message"))
data := value.Marshal()
data := value.Append(nil)
assert.Equal(string(data), "-err message\r\n")

_, err := NewResp(data).Read()
Expand All @@ -30,7 +30,7 @@ func TestValue(t *testing.T) {

t.Run("bulk-value", func(t *testing.T) {
value := newBulkValue([]byte("hello"))
data := value.Marshal()
data := value.Append(nil)
assert.Equal(string(data), "$5\r\nhello\r\n")
{
value2, err := NewResp(data).Read()
Expand All @@ -40,7 +40,7 @@ func TestValue(t *testing.T) {

// empty bulk string
value = newBulkValue([]byte(""))
data = value.Marshal()
data = value.Append(nil)
assert.Equal(string(data), "$0\r\n\r\n")
{
value2, err := NewResp(data).Read()
Expand All @@ -50,7 +50,7 @@ func TestValue(t *testing.T) {

// nil bulk string
value = newBulkValue(nil)
data = value.Marshal()
data = value.Append(nil)
assert.Equal(string(data), "$-1\r\n")
{
value2, err := NewResp(data).Read()
Expand All @@ -61,7 +61,7 @@ func TestValue(t *testing.T) {

t.Run("integer-value", func(t *testing.T) {
value := newIntegerValue(1)
data := value.Marshal()
data := value.Append(nil)
assert.Equal(string(data), ":1\r\n")

value2, err := NewResp(data).Read()
Expand All @@ -77,7 +77,7 @@ func TestValue(t *testing.T) {
newBulkValue([]byte("hello")),
newBulkValue([]byte("world")),
})
data := value.Marshal()
data := value.Append(nil)
assert.Equal(string(data), "*5\r\n:1\r\n:2\r\n:3\r\n$5\r\nhello\r\n$5\r\nworld\r\n")

value2, err := NewResp(data).Read()
Expand All @@ -98,7 +98,7 @@ func TestValue(t *testing.T) {

// marshal error type
value := Value{typ: 76}
data := value.Marshal()
data := value.Append(nil)
assert.Equal(string(data), ErrUnknownType.Error())
})
}
Loading

0 comments on commit 8293c8d

Please sign in to comment.