Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement time extension #132

Merged
merged 2 commits into from May 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 2 additions & 7 deletions appengine.go
Expand Up @@ -8,14 +8,9 @@ import (
ds "google.golang.org/appengine/datastore"
)

var (
keyPtrType = reflect.TypeOf((*ds.Key)(nil))
cursorType = reflect.TypeOf((*ds.Cursor)(nil)).Elem()
)

func init() {
Register(keyPtrType, encodeDatastoreKeyValue, decodeDatastoreKeyValue)
Register(cursorType, encodeDatastoreCursorValue, decodeDatastoreCursorValue)
Register((*ds.Key)(nil), encodeDatastoreKeyValue, decodeDatastoreKeyValue)
Register((*ds.Cursor)(nil), encodeDatastoreCursorValue, decodeDatastoreCursorValue)
}

func EncodeDatastoreKey(e *Encoder, key *ds.Key) error {
Expand Down
4 changes: 2 additions & 2 deletions decode.go
Expand Up @@ -195,7 +195,7 @@ func (d *Decoder) DecodeNil() error {
return err
}
if c != codes.Nil {
return fmt.Errorf("msgpack: invalid code %x decoding nil", c)
return fmt.Errorf("msgpack: invalid code=%x decoding nil", c)
}
return nil
}
Expand All @@ -215,7 +215,7 @@ func (d *Decoder) bool(c byte) (bool, error) {
if c == codes.True {
return true, nil
}
return false, fmt.Errorf("msgpack: invalid code %x decoding bool", c)
return false, fmt.Errorf("msgpack: invalid code=%x decoding bool", c)
}

func (d *Decoder) interfaceValue(v reflect.Value) error {
Expand Down
2 changes: 1 addition & 1 deletion decode_map.go
Expand Up @@ -109,7 +109,7 @@ func (d *Decoder) mapLen(c byte) (int, error) {
n, err := d.uint32()
return int(n), err
}
return 0, fmt.Errorf("msgpack: invalid code %x decoding map length", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding map length", c)
}

func decodeMapStringStringValue(d *Decoder, v reflect.Value) error {
Expand Down
8 changes: 4 additions & 4 deletions decode_number.go
Expand Up @@ -94,7 +94,7 @@ func (d *Decoder) uint(c byte) (uint64, error) {
case codes.Uint64, codes.Int64:
return d.uint64()
}
return 0, fmt.Errorf("msgpack: invalid code %x decoding uint64", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding uint64", c)
}

func (d *Decoder) DecodeInt64() (int64, error) {
Expand Down Expand Up @@ -135,7 +135,7 @@ func (d *Decoder) int(c byte) (int64, error) {
n, err := d.uint64()
return int64(n), err
}
return 0, fmt.Errorf("msgpack: invalid code %x decoding int64", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding int64", c)
}

func (d *Decoder) DecodeFloat32() (float32, error) {
Expand All @@ -157,7 +157,7 @@ func (d *Decoder) float32(c byte) (float32, error) {

n, err := d.int(c)
if err != nil {
return 0, fmt.Errorf("msgpack: invalid code %x decoding float32", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding float32", c)
}
return float32(n), nil
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (d *Decoder) float64(c byte) (float64, error) {

n, err := d.int(c)
if err != nil {
return 0, fmt.Errorf("msgpack: invalid code %x decoding float32", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding float32", c)
}
return float64(n), nil
}
Expand Down
2 changes: 1 addition & 1 deletion decode_slice.go
Expand Up @@ -33,7 +33,7 @@ func (d *Decoder) arrayLen(c byte) (int, error) {
n, err := d.uint32()
return int(n), err
}
return 0, fmt.Errorf("msgpack: invalid code %x decoding array length", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding array length", c)
}

func decodeStringSliceValue(d *Decoder, v reflect.Value) error {
Expand Down
2 changes: 1 addition & 1 deletion decode_string.go
Expand Up @@ -24,7 +24,7 @@ func (d *Decoder) bytesLen(c byte) (int, error) {
n, err := d.uint32()
return int(n), err
}
return 0, fmt.Errorf("msgpack: invalid code %x decoding bytes length", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding bytes length", c)
}

func (d *Decoder) DecodeString() (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion example_registerExt_test.go
Expand Up @@ -9,7 +9,7 @@ import (
)

func init() {
msgpack.RegisterExt(0, &EventTime{})
msgpack.RegisterExt(0, (*EventTime)(nil))
}

// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format
Expand Down
42 changes: 21 additions & 21 deletions ext.go
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/vmihailenco/msgpack/codes"
)

var extTypes []reflect.Type
var extTypes = make(map[int8]reflect.Type)

var bufferPool = &sync.Pool{
New: func() interface{} {
Expand All @@ -24,27 +24,31 @@ var bufferPool = &sync.Pool{
// Expecting to be used only during initialization, it panics if the mapping
// between types and ids is not a bijection.
func RegisterExt(id int8, value interface{}) {
if diff := int(id) - len(extTypes) + 1; diff > 0 {
extTypes = append(extTypes, make([]reflect.Type, diff)...)
}

if extTypes[id] != nil {
panic(fmt.Errorf("msgpack: ext with id=%d is already registered", id))
}

typ := reflect.TypeOf(value)
if typ.Kind() == reflect.Ptr {
typ = typ.Elem()
}
ptr := reflect.PtrTo(typ)

if _, ok := extTypes[id]; ok {
panic(fmt.Errorf("msgpack: ext with id=%d is already registered", id))
}
extTypes[id] = typ
decoder := getDecoder(typ)
Register(ptr, makeExtEncoder(id, getEncoder(ptr)), decoder)
Register(typ, makeExtEncoder(id, getEncoder(typ)), decoder)

registerExt(id, ptr, getEncoder(ptr), nil)
registerExt(id, typ, getEncoder(typ), getDecoder(typ))
}

func makeExtEncoder(id int8, enc encoderFunc) encoderFunc {
func registerExt(id int8, typ reflect.Type, enc encoderFunc, dec decoderFunc) {
if enc != nil {
typEncMap[typ] = makeExtEncoder(id, enc)
}
if dec != nil {
typDecMap[typ] = dec
}
}

func makeExtEncoder(typeId int8, enc encoderFunc) encoderFunc {
return func(e *Encoder, v reflect.Value) error {
buf := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buf)
Expand All @@ -62,7 +66,7 @@ func makeExtEncoder(id int8, enc encoderFunc) encoderFunc {
if err := e.encodeExtLen(buf.Len()); err != nil {
return err
}
if err := e.w.WriteByte(byte(id)); err != nil {
if err := e.w.WriteByte(byte(typeId)); err != nil {
return err
}
return e.write(buf.Bytes())
Expand Down Expand Up @@ -121,7 +125,7 @@ func (d *Decoder) parseExtLen(c byte) (int, error) {
n, err := d.uint32()
return int(n), err
default:
return 0, fmt.Errorf("msgpack: invalid code %x decoding ext length", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding ext length", c)
}
}

Expand All @@ -146,12 +150,8 @@ func (d *Decoder) ext(c byte) (interface{}, error) {
return nil, err
}

if int(extId) >= len(extTypes) {
return nil, fmt.Errorf("msgpack: unregistered ext id=%d", extId)
}

typ := extTypes[extId]
if typ == nil {
typ, ok := extTypes[int8(extId)]
if !ok {
return nil, fmt.Errorf("msgpack: unregistered ext id=%d", extId)
}

Expand Down
6 changes: 3 additions & 3 deletions ext_test.go
Expand Up @@ -9,7 +9,7 @@ import (
)

func init() {
msgpack.RegisterExt(9, ExtTest{})
msgpack.RegisterExt(9, (*ExtTest)(nil))
}

func TestRegisterExtPanic(t *testing.T) {
Expand All @@ -24,7 +24,7 @@ func TestRegisterExtPanic(t *testing.T) {
t.Fatalf("got %q, wanted %q", got, wanted)
}
}()
msgpack.RegisterExt(9, ExtTest{})
msgpack.RegisterExt(9, (*ExtTest)(nil))
}

type ExtTest struct {
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestExt(t *testing.T) {

v, ok := dst.(ExtTest)
if !ok {
t.Fatalf("got %#v, wanted extTest", dst)
t.Fatalf("got %#v, wanted ExtTest", dst)
}

wanted := "hello world"
Expand Down
2 changes: 1 addition & 1 deletion msgpack_test.go
Expand Up @@ -19,7 +19,7 @@ type nameStruct struct {
Name string
}

func Test(t *testing.T) { TestingT(t) }
func TestGocheck(t *testing.T) { TestingT(t) }

type MsgpackTest struct {
buf *bytes.Buffer
Expand Down
81 changes: 69 additions & 12 deletions time.go
@@ -1,52 +1,109 @@
package msgpack

import (
"encoding/binary"
"fmt"
"reflect"
"time"

"github.com/vmihailenco/msgpack/codes"
)

var timeType = reflect.TypeOf((*time.Time)(nil)).Elem()
var timeExtId int8 = -1

func init() {
Register(timeType, encodeTimeValue, decodeTimeValue)
timeType := reflect.TypeOf((*time.Time)(nil)).Elem()
registerExt(timeExtId, timeType, encodeTimeValue, decodeTimeValue)
}

func (e *Encoder) EncodeTime(tm time.Time) error {
if err := e.w.WriteByte(codes.FixedArrayLow | 2); err != nil {
b := e.encodeTime(tm)
if err := e.encodeExtLen(len(b)); err != nil {
return err
}
if err := e.EncodeInt64(tm.Unix()); err != nil {
if err := e.w.WriteByte(byte(timeExtId)); err != nil {
return err
}
return e.EncodeInt(tm.Nanosecond())
return e.write(b)
}

func (e *Encoder) encodeTime(tm time.Time) []byte {
secs := uint64(tm.Unix())
if secs>>34 == 0 {
data := uint64(tm.Nanosecond())<<34 | secs
if data&0xffffffff00000000 == 0 {
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, uint32(data))
return b
} else {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, data)
return b
}
}

b := make([]byte, 12)
binary.BigEndian.PutUint32(b, uint32(tm.Nanosecond()))
binary.BigEndian.PutUint64(b[4:], uint64(secs))
return b
}

func (d *Decoder) DecodeTime() (time.Time, error) {
b, err := d.readByte()
c, err := d.readByte()
if err != nil {
return time.Time{}, err
}
if b != 0x92 {
return time.Time{}, fmt.Errorf("msgpack: invalid code %x decoding time", b)

// Legacy format.
if c == codes.FixedArrayLow|2 {
sec, err := d.DecodeInt64()
if err != nil {
return time.Time{}, err
}
nsec, err := d.DecodeInt64()
if err != nil {
return time.Time{}, err
}
return time.Unix(sec, nsec), nil
}

sec, err := d.DecodeInt64()
extLen, err := d.parseExtLen(c)
if err != nil {
return time.Time{}, err
}
nsec, err := d.DecodeInt64()

_, err = d.r.ReadByte()
if err != nil {
return time.Time{}, nil
}

b, err := d.readN(extLen)
if err != nil {
return time.Time{}, err
}
return time.Unix(sec, nsec), nil

switch len(b) {
case 4:
sec := binary.BigEndian.Uint32(b)
return time.Unix(int64(sec), 0), nil
case 8:
sec := binary.BigEndian.Uint64(b)
nsec := int64(sec >> 34)
sec &= 0x00000003ffffffff
return time.Unix(int64(sec), nsec), nil
case 12:
nsec := binary.BigEndian.Uint32(b)
sec := binary.BigEndian.Uint64(b[4:])
return time.Unix(int64(sec), int64(nsec)), nil
default:
return time.Time{}, fmt.Errorf("msgpack: invalid ext len=%d decoding time", extLen)
}
}

func encodeTimeValue(e *Encoder, v reflect.Value) error {
tm := v.Interface().(time.Time)
return e.EncodeTime(tm)
b := e.encodeTime(tm)
return e.write(b)
}

func decodeTimeValue(d *Decoder, v reflect.Value) error {
Expand Down