Skip to content

Commit

Permalink
Merge pull request #132 from vmihailenco/fix/time-ext
Browse files Browse the repository at this point in the history
implement time extension
  • Loading branch information
vmihailenco committed May 11, 2017
2 parents 352569a + 6abac76 commit 6f3ee3b
Show file tree
Hide file tree
Showing 13 changed files with 163 additions and 110 deletions.
9 changes: 2 additions & 7 deletions appengine.go
Expand Up @@ -8,14 +8,9 @@ import (
ds "google.golang.org/appengine/datastore"
)

var (
keyPtrType = reflect.TypeOf((*ds.Key)(nil))
cursorType = reflect.TypeOf((*ds.Cursor)(nil)).Elem()
)

func init() {
Register(keyPtrType, encodeDatastoreKeyValue, decodeDatastoreKeyValue)
Register(cursorType, encodeDatastoreCursorValue, decodeDatastoreCursorValue)
Register((*ds.Key)(nil), encodeDatastoreKeyValue, decodeDatastoreKeyValue)
Register((*ds.Cursor)(nil), encodeDatastoreCursorValue, decodeDatastoreCursorValue)
}

func EncodeDatastoreKey(e *Encoder, key *ds.Key) error {
Expand Down
4 changes: 2 additions & 2 deletions decode.go
Expand Up @@ -195,7 +195,7 @@ func (d *Decoder) DecodeNil() error {
return err
}
if c != codes.Nil {
return fmt.Errorf("msgpack: invalid code %x decoding nil", c)
return fmt.Errorf("msgpack: invalid code=%x decoding nil", c)
}
return nil
}
Expand All @@ -215,7 +215,7 @@ func (d *Decoder) bool(c byte) (bool, error) {
if c == codes.True {
return true, nil
}
return false, fmt.Errorf("msgpack: invalid code %x decoding bool", c)
return false, fmt.Errorf("msgpack: invalid code=%x decoding bool", c)
}

func (d *Decoder) interfaceValue(v reflect.Value) error {
Expand Down
2 changes: 1 addition & 1 deletion decode_map.go
Expand Up @@ -109,7 +109,7 @@ func (d *Decoder) mapLen(c byte) (int, error) {
n, err := d.uint32()
return int(n), err
}
return 0, fmt.Errorf("msgpack: invalid code %x decoding map length", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding map length", c)
}

func decodeMapStringStringValue(d *Decoder, v reflect.Value) error {
Expand Down
8 changes: 4 additions & 4 deletions decode_number.go
Expand Up @@ -94,7 +94,7 @@ func (d *Decoder) uint(c byte) (uint64, error) {
case codes.Uint64, codes.Int64:
return d.uint64()
}
return 0, fmt.Errorf("msgpack: invalid code %x decoding uint64", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding uint64", c)
}

func (d *Decoder) DecodeInt64() (int64, error) {
Expand Down Expand Up @@ -135,7 +135,7 @@ func (d *Decoder) int(c byte) (int64, error) {
n, err := d.uint64()
return int64(n), err
}
return 0, fmt.Errorf("msgpack: invalid code %x decoding int64", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding int64", c)
}

func (d *Decoder) DecodeFloat32() (float32, error) {
Expand All @@ -157,7 +157,7 @@ func (d *Decoder) float32(c byte) (float32, error) {

n, err := d.int(c)
if err != nil {
return 0, fmt.Errorf("msgpack: invalid code %x decoding float32", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding float32", c)
}
return float32(n), nil
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (d *Decoder) float64(c byte) (float64, error) {

n, err := d.int(c)
if err != nil {
return 0, fmt.Errorf("msgpack: invalid code %x decoding float32", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding float32", c)
}
return float64(n), nil
}
Expand Down
2 changes: 1 addition & 1 deletion decode_slice.go
Expand Up @@ -33,7 +33,7 @@ func (d *Decoder) arrayLen(c byte) (int, error) {
n, err := d.uint32()
return int(n), err
}
return 0, fmt.Errorf("msgpack: invalid code %x decoding array length", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding array length", c)
}

func decodeStringSliceValue(d *Decoder, v reflect.Value) error {
Expand Down
2 changes: 1 addition & 1 deletion decode_string.go
Expand Up @@ -24,7 +24,7 @@ func (d *Decoder) bytesLen(c byte) (int, error) {
n, err := d.uint32()
return int(n), err
}
return 0, fmt.Errorf("msgpack: invalid code %x decoding bytes length", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding bytes length", c)
}

func (d *Decoder) DecodeString() (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion example_registerExt_test.go
Expand Up @@ -9,7 +9,7 @@ import (
)

func init() {
msgpack.RegisterExt(0, &EventTime{})
msgpack.RegisterExt(0, (*EventTime)(nil))
}

// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format
Expand Down
42 changes: 21 additions & 21 deletions ext.go
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/vmihailenco/msgpack/codes"
)

var extTypes []reflect.Type
var extTypes = make(map[int8]reflect.Type)

var bufferPool = &sync.Pool{
New: func() interface{} {
Expand All @@ -24,27 +24,31 @@ var bufferPool = &sync.Pool{
// Expecting to be used only during initialization, it panics if the mapping
// between types and ids is not a bijection.
func RegisterExt(id int8, value interface{}) {
if diff := int(id) - len(extTypes) + 1; diff > 0 {
extTypes = append(extTypes, make([]reflect.Type, diff)...)
}

if extTypes[id] != nil {
panic(fmt.Errorf("msgpack: ext with id=%d is already registered", id))
}

typ := reflect.TypeOf(value)
if typ.Kind() == reflect.Ptr {
typ = typ.Elem()
}
ptr := reflect.PtrTo(typ)

if _, ok := extTypes[id]; ok {
panic(fmt.Errorf("msgpack: ext with id=%d is already registered", id))
}
extTypes[id] = typ
decoder := getDecoder(typ)
Register(ptr, makeExtEncoder(id, getEncoder(ptr)), decoder)
Register(typ, makeExtEncoder(id, getEncoder(typ)), decoder)

registerExt(id, ptr, getEncoder(ptr), nil)
registerExt(id, typ, getEncoder(typ), getDecoder(typ))
}

func makeExtEncoder(id int8, enc encoderFunc) encoderFunc {
func registerExt(id int8, typ reflect.Type, enc encoderFunc, dec decoderFunc) {
if enc != nil {
typEncMap[typ] = makeExtEncoder(id, enc)
}
if dec != nil {
typDecMap[typ] = dec
}
}

func makeExtEncoder(typeId int8, enc encoderFunc) encoderFunc {
return func(e *Encoder, v reflect.Value) error {
buf := bufferPool.Get().(*bytes.Buffer)
defer bufferPool.Put(buf)
Expand All @@ -62,7 +66,7 @@ func makeExtEncoder(id int8, enc encoderFunc) encoderFunc {
if err := e.encodeExtLen(buf.Len()); err != nil {
return err
}
if err := e.w.WriteByte(byte(id)); err != nil {
if err := e.w.WriteByte(byte(typeId)); err != nil {
return err
}
return e.write(buf.Bytes())
Expand Down Expand Up @@ -121,7 +125,7 @@ func (d *Decoder) parseExtLen(c byte) (int, error) {
n, err := d.uint32()
return int(n), err
default:
return 0, fmt.Errorf("msgpack: invalid code %x decoding ext length", c)
return 0, fmt.Errorf("msgpack: invalid code=%x decoding ext length", c)
}
}

Expand All @@ -146,12 +150,8 @@ func (d *Decoder) ext(c byte) (interface{}, error) {
return nil, err
}

if int(extId) >= len(extTypes) {
return nil, fmt.Errorf("msgpack: unregistered ext id=%d", extId)
}

typ := extTypes[extId]
if typ == nil {
typ, ok := extTypes[int8(extId)]
if !ok {
return nil, fmt.Errorf("msgpack: unregistered ext id=%d", extId)
}

Expand Down
6 changes: 3 additions & 3 deletions ext_test.go
Expand Up @@ -9,7 +9,7 @@ import (
)

func init() {
msgpack.RegisterExt(9, ExtTest{})
msgpack.RegisterExt(9, (*ExtTest)(nil))
}

func TestRegisterExtPanic(t *testing.T) {
Expand All @@ -24,7 +24,7 @@ func TestRegisterExtPanic(t *testing.T) {
t.Fatalf("got %q, wanted %q", got, wanted)
}
}()
msgpack.RegisterExt(9, ExtTest{})
msgpack.RegisterExt(9, (*ExtTest)(nil))
}

type ExtTest struct {
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestExt(t *testing.T) {

v, ok := dst.(ExtTest)
if !ok {
t.Fatalf("got %#v, wanted extTest", dst)
t.Fatalf("got %#v, wanted ExtTest", dst)
}

wanted := "hello world"
Expand Down
2 changes: 1 addition & 1 deletion msgpack_test.go
Expand Up @@ -19,7 +19,7 @@ type nameStruct struct {
Name string
}

func Test(t *testing.T) { TestingT(t) }
func TestGocheck(t *testing.T) { TestingT(t) }

type MsgpackTest struct {
buf *bytes.Buffer
Expand Down
81 changes: 69 additions & 12 deletions time.go
@@ -1,52 +1,109 @@
package msgpack

import (
"encoding/binary"
"fmt"
"reflect"
"time"

"github.com/vmihailenco/msgpack/codes"
)

var timeType = reflect.TypeOf((*time.Time)(nil)).Elem()
var timeExtId int8 = -1

func init() {
Register(timeType, encodeTimeValue, decodeTimeValue)
timeType := reflect.TypeOf((*time.Time)(nil)).Elem()
registerExt(timeExtId, timeType, encodeTimeValue, decodeTimeValue)
}

func (e *Encoder) EncodeTime(tm time.Time) error {
if err := e.w.WriteByte(codes.FixedArrayLow | 2); err != nil {
b := e.encodeTime(tm)
if err := e.encodeExtLen(len(b)); err != nil {
return err
}
if err := e.EncodeInt64(tm.Unix()); err != nil {
if err := e.w.WriteByte(byte(timeExtId)); err != nil {
return err
}
return e.EncodeInt(tm.Nanosecond())
return e.write(b)
}

func (e *Encoder) encodeTime(tm time.Time) []byte {
secs := uint64(tm.Unix())
if secs>>34 == 0 {
data := uint64(tm.Nanosecond())<<34 | secs
if data&0xffffffff00000000 == 0 {
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, uint32(data))
return b
} else {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, data)
return b
}
}

b := make([]byte, 12)
binary.BigEndian.PutUint32(b, uint32(tm.Nanosecond()))
binary.BigEndian.PutUint64(b[4:], uint64(secs))
return b
}

func (d *Decoder) DecodeTime() (time.Time, error) {
b, err := d.readByte()
c, err := d.readByte()
if err != nil {
return time.Time{}, err
}
if b != 0x92 {
return time.Time{}, fmt.Errorf("msgpack: invalid code %x decoding time", b)

// Legacy format.
if c == codes.FixedArrayLow|2 {
sec, err := d.DecodeInt64()
if err != nil {
return time.Time{}, err
}
nsec, err := d.DecodeInt64()
if err != nil {
return time.Time{}, err
}
return time.Unix(sec, nsec), nil
}

sec, err := d.DecodeInt64()
extLen, err := d.parseExtLen(c)
if err != nil {
return time.Time{}, err
}
nsec, err := d.DecodeInt64()

_, err = d.r.ReadByte()
if err != nil {
return time.Time{}, nil
}

b, err := d.readN(extLen)
if err != nil {
return time.Time{}, err
}
return time.Unix(sec, nsec), nil

switch len(b) {
case 4:
sec := binary.BigEndian.Uint32(b)
return time.Unix(int64(sec), 0), nil
case 8:
sec := binary.BigEndian.Uint64(b)
nsec := int64(sec >> 34)
sec &= 0x00000003ffffffff
return time.Unix(int64(sec), nsec), nil
case 12:
nsec := binary.BigEndian.Uint32(b)
sec := binary.BigEndian.Uint64(b[4:])
return time.Unix(int64(sec), int64(nsec)), nil
default:
return time.Time{}, fmt.Errorf("msgpack: invalid ext len=%d decoding time", extLen)
}
}

func encodeTimeValue(e *Encoder, v reflect.Value) error {
tm := v.Interface().(time.Time)
return e.EncodeTime(tm)
b := e.encodeTime(tm)
return e.write(b)
}

func decodeTimeValue(d *Decoder, v reflect.Value) error {
Expand Down

0 comments on commit 6f3ee3b

Please sign in to comment.