Skip to content

Commit

Permalink
Merge pull request #29 from xorphitus/binary-protocol
Browse files Browse the repository at this point in the history
implement binary protocol (minimum)
  • Loading branch information
fujiwara committed Jan 11, 2018
2 parents 60ec3f4 + 23a83d4 commit 0485f9d
Show file tree
Hide file tree
Showing 5 changed files with 885 additions and 3 deletions.
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,18 @@ $ cd $GOPATH/github.com/kayac/go-katsubushi/cmd/katsubushi

## Protocol

katsubushi use protocol compatible with memcached (text only, not binary).
katsubushi use protocol compatible with memcached.

Some commands are available with text and binary protocol.

But the others are available only with text protocol.

### API

#### GET, GETS

Binary protocol is also available only for single key GET.

```
GET id1 id2
VALUE id1 0 18
Expand Down Expand Up @@ -87,6 +93,8 @@ STAT get_misses 0

Returns a version of katsubushi.

Binary protocol is available, too.

```
VERSION 1.1.2
```
Expand Down
14 changes: 13 additions & 1 deletion app.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,19 @@ func (app *App) handleConn(ctx context.Context, conn net.Conn) {

app.extendDeadline(conn)

scanner := bufio.NewScanner(conn)
bufReader := bufio.NewReader(conn)
isBin, err := app.IsBinaryProtocol(bufReader)
if err != nil {
log.Errorf("error on read first byte to decide binary protocol or not: %s", err)
return
}
if isBin {
log.Debug("binary protocol")
app.RespondToBinary(bufReader, conn)
return
}

scanner := bufio.NewScanner(bufReader)
w := bufio.NewWriter(conn)
for scanner.Scan() {
app.extendDeadline(conn)
Expand Down
324 changes: 323 additions & 1 deletion app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"testing"
"time"

"encoding/hex"
"github.com/bmizerany/mc"
"github.com/bradfitz/gomemcache/memcache"
)

Expand Down Expand Up @@ -439,7 +441,7 @@ func TestAppEmptyCommand(t *testing.T) {
}
}

func TestAppStatsReaceCondition(t *testing.T) {
func TestAppStatsRaceCondition(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
app := newTestAppAndListenTCP(ctx, t)
Expand Down Expand Up @@ -512,3 +514,323 @@ func TestAppCancel(t *testing.T) {
}
}
}

type testClientBinary struct {
conn net.Conn
}

func (c *testClientBinary) Command(cmd []byte) ([]byte, error) {
resp := make([]byte, 1024)
_, err := c.conn.Write(cmd)
if err != nil {
return nil, err
}
n, err := c.conn.Read(resp)
if err != nil {
return nil, err
}
return resp[0:n], nil
}

func newTestClientBinary(addr string) (*testClientBinary, error) {
conn, err := net.DialTimeout("tcp", addr, 1*time.Second)
if err != nil {
return nil, err
}
return &testClientBinary{conn}, nil
}

func newTestClientBinarySock(path string) (*testClientBinary, error) {
conn, err := net.DialTimeout("unix", path, 1*time.Second)
if err != nil {
return nil, err
}
return &testClientBinary{conn}, nil
}

func TestAppBinary(t *testing.T) {
ctx := context.Background()
app := newTestAppAndListenTCP(ctx, t)
cn, err := mc.Dial("tcp", app.Listener.Addr().String())
if err != nil {
t.Fatal(err)
}

val, cas, flags, err := cn.Get("hoge")
if err != nil {
t.Fatal(err)
}

t.Logf("cas = %d", cas)
t.Logf("flags = %d", flags)
t.Logf("id = %s", val)

if cas != 0 {
t.Errorf("Unexpected cas: %d", cas)
}

if flags != 0 {
t.Errorf("Unexpected flags: %d", flags)
}

if _, err := strconv.ParseInt(string(val), 10, 64); err != nil {
t.Errorf("Invalid id: %s", err)
}
}

func TestAppBinarySock(t *testing.T) {
ctx := context.Background()
app, tmpDir := newTestAppAndListenSock(ctx, t)
cn, err := mc.Dial("unix", app.Listener.Addr().String())
defer os.RemoveAll(tmpDir)
if err != nil {
t.Fatal(err)
}

value, cas, flags, err := cn.Get("hoge")
if err != nil {
t.Fatal(err)
}

t.Logf("cas = %d", cas)
t.Logf("flags = %d", flags)
t.Logf("id = %s", value)

if cas != 0 {
t.Errorf("Unexpected cas: %d", cas)
}

if flags != 0 {
t.Errorf("Unexpected flags: %d", flags)
}

if _, err := strconv.ParseInt(string(value), 10, 64); err != nil {
t.Errorf("Invalid id: %s", err)
}
}

func TestAppBinaryError(t *testing.T) {
ctx := context.Background()
app := newTestAppAndListenTCP(ctx, t)
client, err := newTestClientBinary(app.Listener.Addr().String())
if err != nil {
t.Fatal(err)
}

// add-command
// key: "Hello"
// value: "World"
// flags: 0xdeadbeef
// expiry: in two hours
cmd := []byte{
0x80, 0x02, 0x00, 0x05,
0x08, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x12,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0xde, 0xad, 0xbe, 0xef,
0x00, 0x00, 0x1c, 0x20,
0x48, 0x65, 0x6c, 0x6c,
0x6f, 0x57, 0x6f, 0x72,
0x6c, 0x64,
}

expected := []byte{
0x81, 0x00, 0x00, 0x00,
// status: Internal Error
0x00, 0x00, 0x00, 0x84,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
}

resp, err := client.Command(cmd)
if bytes.Compare(resp, expected) != 0 {
t.Errorf("invalid error response: %s", hex.Dump(resp))
}
}

func TestAppBinaryIdleTimeout(t *testing.T) {
ctx := context.Background()
app := newTestAppAndListenTCP(ctx, t)
app.SetIdleTimeout(1)

cn, err := mc.Dial("tcp", app.Listener.Addr().String())
if err != nil {
t.Fatal(err)
}

t.Log("Before timeout")
{
val, _, _, err := cn.Get("hoge")
if err != nil {
t.Fatal(err)
}

if _, err := strconv.ParseInt(string(val), 10, 64); err != nil {
t.Errorf("Invalid id: %s", err)
}
}

time.Sleep(2 * time.Second)

t.Log("After timeout")
{
_, _, _, err := cn.Get("hoge")
if err == nil {
t.Fatal("Connection must be disconnected")
}
}
}

func BenchmarkAppBinary(b *testing.B) {
app, _ := NewApp(getNextWorkerID())
go app.ListenTCP(context.Background(), ":0")
<-app.Ready()

// GET Hello
cmd := []byte{
0x80, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x48, 0x65, 0x6c, 0x6c,
0x6f,
}

b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
client, err := newTestClientBinary(app.Listener.Addr().String())
if err != nil {
b.Fatalf("Failed to connect to app: %s", err)
}
for pb.Next() {
resp, err := client.Command(cmd)
if err != nil {
b.Fatalf("Error on write: %s", err)
}
if resp[0] != 0x81 || resp[1] != 0x00 {
b.Fatalf("Got ERROR")
}
}
})
}

func BenchmarkAppBinarySock(b *testing.B) {
app, _ := NewApp(getNextWorkerID())
tmpDir, _ := ioutil.TempDir("", "go-katsubushi-")
defer os.RemoveAll(tmpDir)

go app.ListenSock(
context.Background(),
filepath.Join(tmpDir, "katsubushi.sock"),
)
<-app.Ready()

// GET Hello
cmd := []byte{
0x80, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x48, 0x65, 0x6c, 0x6c,
0x6f,
}

b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
client, err := newTestClientBinarySock(filepath.Join(tmpDir, "katsubushi.sock"))
if err != nil {
b.Fatalf("Failed to connect to app: %s", err)
}
for pb.Next() {
resp, err := client.Command(cmd)
if err != nil {
b.Fatalf("Error on write: %s", err)
}
if resp[0] != 0x81 || resp[1] != 0x00 {
b.Fatalf("Got ERROR")
}
}
})
}

func TestAppBinaryVersion(t *testing.T) {
ctx := context.Background()
app := newTestAppAndListenTCP(ctx, t)
client, err := newTestClientBinary(app.Listener.Addr().String())
if err != nil {
t.Fatal(err)
}
cmd := []byte{
0x80, 0x0b, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
}
expected := []byte{
0x81, 0x0b, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// length of body
0x00, 0x00, 0x00, 0x0b,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// value: "development"
0x64, 0x65, 0x76, 0x65,
0x6c, 0x6f, 0x70, 0x6d,
0x65, 0x6e, 0x74,
}
resp, err := client.Command(cmd)
if bytes.Compare(resp, expected) != 0 {
t.Errorf("invalid version response: %s", hex.Dump(resp))
}
}

func TestAppBinaryCancel(t *testing.T) {
versionCmd := []byte{
0x80, 0x0b, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
}

ctx, cancel := context.WithCancel(context.Background())
app := newTestAppAndListenTCP(ctx, t)
{
client, err := newTestClientBinary(app.Listener.Addr().String())
if err != nil {
t.Fatal(err)
}
_, err = client.Command(versionCmd)
if err != nil {
t.Fatal(err)
}
cancelAndWait(cancel)
// disconnect by peer after canceled
res, err := client.Command(versionCmd)
if err == nil && len(res) > 24 && res[0] == 0x81 { // response returned
t.Fatal(err, res)
}
t.Log(res, err)
}
{
// failed to conenct after canceled
_, err := newTestClientBinary(app.Listener.Addr().String())
if err == nil {
t.Fatal(err)
}
}
}
Loading

0 comments on commit 0485f9d

Please sign in to comment.