Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement binary protocol (minimum) #29

Merged
merged 9 commits into from
Jan 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,18 @@ $ cd $GOPATH/github.com/kayac/go-katsubushi/cmd/katsubushi

## Protocol

katsubushi use protocol compatible with memcached (text only, not binary).
katsubushi use protocol compatible with memcached.

Some commands are available with text and binary protocol.

But the others are available only with text protocol.

### API

#### GET, GETS

Binary protocol is also available only for single key GET.

```
GET id1 id2
VALUE id1 0 18
Expand Down Expand Up @@ -87,6 +93,8 @@ STAT get_misses 0

Returns a version of katsubushi.

Binary protocol is available, too.

```
VERSION 1.1.2
```
Expand Down
14 changes: 13 additions & 1 deletion app.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,19 @@ func (app *App) handleConn(ctx context.Context, conn net.Conn) {

app.extendDeadline(conn)

scanner := bufio.NewScanner(conn)
bufReader := bufio.NewReader(conn)
isBin, err := app.IsBinaryProtocol(bufReader)
if err != nil {
log.Errorf("error on read first byte to decide binary protocol or not: %s", err)
return
}
if isBin {
log.Debug("binary protocol")
app.RespondToBinary(bufReader, conn)
return
}

scanner := bufio.NewScanner(bufReader)
w := bufio.NewWriter(conn)
for scanner.Scan() {
app.extendDeadline(conn)
Expand Down
324 changes: 323 additions & 1 deletion app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"testing"
"time"

"encoding/hex"
"github.com/bmizerany/mc"
"github.com/bradfitz/gomemcache/memcache"
)

Expand Down Expand Up @@ -439,7 +441,7 @@ func TestAppEmptyCommand(t *testing.T) {
}
}

func TestAppStatsReaceCondition(t *testing.T) {
func TestAppStatsRaceCondition(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
app := newTestAppAndListenTCP(ctx, t)
Expand Down Expand Up @@ -512,3 +514,323 @@ func TestAppCancel(t *testing.T) {
}
}
}

type testClientBinary struct {
conn net.Conn
}

func (c *testClientBinary) Command(cmd []byte) ([]byte, error) {
resp := make([]byte, 1024)
_, err := c.conn.Write(cmd)
if err != nil {
return nil, err
}
n, err := c.conn.Read(resp)
if err != nil {
return nil, err
}
return resp[0:n], nil
}

func newTestClientBinary(addr string) (*testClientBinary, error) {
conn, err := net.DialTimeout("tcp", addr, 1*time.Second)
if err != nil {
return nil, err
}
return &testClientBinary{conn}, nil
}

func newTestClientBinarySock(path string) (*testClientBinary, error) {
conn, err := net.DialTimeout("unix", path, 1*time.Second)
if err != nil {
return nil, err
}
return &testClientBinary{conn}, nil
}

func TestAppBinary(t *testing.T) {
ctx := context.Background()
app := newTestAppAndListenTCP(ctx, t)
cn, err := mc.Dial("tcp", app.Listener.Addr().String())
if err != nil {
t.Fatal(err)
}

val, cas, flags, err := cn.Get("hoge")
if err != nil {
t.Fatal(err)
}

t.Logf("cas = %d", cas)
t.Logf("flags = %d", flags)
t.Logf("id = %s", val)

if cas != 0 {
t.Errorf("Unexpected cas: %d", cas)
}

if flags != 0 {
t.Errorf("Unexpected flags: %d", flags)
}

if _, err := strconv.ParseInt(string(val), 10, 64); err != nil {
t.Errorf("Invalid id: %s", err)
}
}

func TestAppBinarySock(t *testing.T) {
ctx := context.Background()
app, tmpDir := newTestAppAndListenSock(ctx, t)
cn, err := mc.Dial("unix", app.Listener.Addr().String())
defer os.RemoveAll(tmpDir)
if err != nil {
t.Fatal(err)
}

value, cas, flags, err := cn.Get("hoge")
if err != nil {
t.Fatal(err)
}

t.Logf("cas = %d", cas)
t.Logf("flags = %d", flags)
t.Logf("id = %s", value)

if cas != 0 {
t.Errorf("Unexpected cas: %d", cas)
}

if flags != 0 {
t.Errorf("Unexpected flags: %d", flags)
}

if _, err := strconv.ParseInt(string(value), 10, 64); err != nil {
t.Errorf("Invalid id: %s", err)
}
}

func TestAppBinaryError(t *testing.T) {
ctx := context.Background()
app := newTestAppAndListenTCP(ctx, t)
client, err := newTestClientBinary(app.Listener.Addr().String())
if err != nil {
t.Fatal(err)
}

// add-command
// key: "Hello"
// value: "World"
// flags: 0xdeadbeef
// expiry: in two hours
cmd := []byte{
0x80, 0x02, 0x00, 0x05,
0x08, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x12,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0xde, 0xad, 0xbe, 0xef,
0x00, 0x00, 0x1c, 0x20,
0x48, 0x65, 0x6c, 0x6c,
0x6f, 0x57, 0x6f, 0x72,
0x6c, 0x64,
}

expected := []byte{
0x81, 0x00, 0x00, 0x00,
// status: Internal Error
0x00, 0x00, 0x00, 0x84,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
}

resp, err := client.Command(cmd)
if bytes.Compare(resp, expected) != 0 {
t.Errorf("invalid error response: %s", hex.Dump(resp))
}
}

func TestAppBinaryIdleTimeout(t *testing.T) {
ctx := context.Background()
app := newTestAppAndListenTCP(ctx, t)
app.SetIdleTimeout(1)

cn, err := mc.Dial("tcp", app.Listener.Addr().String())
if err != nil {
t.Fatal(err)
}

t.Log("Before timeout")
{
val, _, _, err := cn.Get("hoge")
if err != nil {
t.Fatal(err)
}

if _, err := strconv.ParseInt(string(val), 10, 64); err != nil {
t.Errorf("Invalid id: %s", err)
}
}

time.Sleep(2 * time.Second)

t.Log("After timeout")
{
_, _, _, err := cn.Get("hoge")
if err == nil {
t.Fatal("Connection must be disconnected")
}
}
}

func BenchmarkAppBinary(b *testing.B) {
app, _ := NewApp(getNextWorkerID())
go app.ListenTCP(context.Background(), ":0")
<-app.Ready()

// GET Hello
cmd := []byte{
0x80, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x48, 0x65, 0x6c, 0x6c,
0x6f,
}

b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
client, err := newTestClientBinary(app.Listener.Addr().String())
if err != nil {
b.Fatalf("Failed to connect to app: %s", err)
}
for pb.Next() {
resp, err := client.Command(cmd)
if err != nil {
b.Fatalf("Error on write: %s", err)
}
if resp[0] != 0x81 || resp[1] != 0x00 {
b.Fatalf("Got ERROR")
}
}
})
}

func BenchmarkAppBinarySock(b *testing.B) {
app, _ := NewApp(getNextWorkerID())
tmpDir, _ := ioutil.TempDir("", "go-katsubushi-")
defer os.RemoveAll(tmpDir)

go app.ListenSock(
context.Background(),
filepath.Join(tmpDir, "katsubushi.sock"),
)
<-app.Ready()

// GET Hello
cmd := []byte{
0x80, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x48, 0x65, 0x6c, 0x6c,
0x6f,
}

b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
client, err := newTestClientBinarySock(filepath.Join(tmpDir, "katsubushi.sock"))
if err != nil {
b.Fatalf("Failed to connect to app: %s", err)
}
for pb.Next() {
resp, err := client.Command(cmd)
if err != nil {
b.Fatalf("Error on write: %s", err)
}
if resp[0] != 0x81 || resp[1] != 0x00 {
b.Fatalf("Got ERROR")
}
}
})
}

func TestAppBinaryVersion(t *testing.T) {
ctx := context.Background()
app := newTestAppAndListenTCP(ctx, t)
client, err := newTestClientBinary(app.Listener.Addr().String())
if err != nil {
t.Fatal(err)
}
cmd := []byte{
0x80, 0x0b, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
}
expected := []byte{
0x81, 0x0b, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// length of body
0x00, 0x00, 0x00, 0x0b,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
// value: "development"
0x64, 0x65, 0x76, 0x65,
0x6c, 0x6f, 0x70, 0x6d,
0x65, 0x6e, 0x74,
}
resp, err := client.Command(cmd)
if bytes.Compare(resp, expected) != 0 {
t.Errorf("invalid version response: %s", hex.Dump(resp))
}
}

func TestAppBinaryCancel(t *testing.T) {
versionCmd := []byte{
0x80, 0x0b, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
}

ctx, cancel := context.WithCancel(context.Background())
app := newTestAppAndListenTCP(ctx, t)
{
client, err := newTestClientBinary(app.Listener.Addr().String())
if err != nil {
t.Fatal(err)
}
_, err = client.Command(versionCmd)
if err != nil {
t.Fatal(err)
}
cancelAndWait(cancel)
// disconnect by peer after canceled
res, err := client.Command(versionCmd)
if err == nil && len(res) > 24 && res[0] == 0x81 { // response returned
t.Fatal(err, res)
}
t.Log(res, err)
}
{
// failed to conenct after canceled
_, err := newTestClientBinary(app.Listener.Addr().String())
if err == nil {
t.Fatal(err)
}
}
}
Loading