Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Message and Codec fixes #9

Closed
wants to merge 23 commits into from

3 participants

@justinfx

Extracted from larger pull req
The changes to the codec_*test.go files was because it passed the gotest under the release. Not sure if you were testing under a certain weekly build or not.

justinfx and others added some commits
@justinfx justinfx connection.go: seperating general messages from heartbeats into diffe…
…rent channels.

heartbeat is delayed if the last message received is within the timeframe of the HeartbeatInterval.
16ef3a4
@justinfx justinfx Removed debug logging line from sio/keepalive() 0290f45
@justinfx justinfx added SetAuthorization/IsAuthorized to SocketIO, to allow user to spe…
…cify a custom

authorization hook using the original http request.
538c292
@justinfx justinfx fixed a bug in the new delayed heartbeats code where poor timing woul…
…d cause a

heartbeat to be skipped occasionally. new method uses a DelayTimer that resets
every time a message is received from the client.
5e3aa66
@justinfx justinfx gofix for release r59 63b6803
@justinfx justinfx connection.go: was using a singleshot timer in keepalive(). needed to…
… reset it after each trigger.
c8af75e
@justinfx justinfx connection.go: using inconsistent heartbeat intervals apparently brea…
…ks socket.io 0.6 client, which also has its own hardcoded timer and expects them to keep coming consistently.
e28da1e
@edsrzf edsrzf Don't convert between string and []byte so often c34f9a2
@edsrzf edsrzf Reset decode buffer after decode 41ce614
@justinfx justinfx Merge pull request #1 from edsrzf/memory-fixes
Memory fixes
244c869
@justinfx justinfx Merge remote branch 'upstream/master' ce0f06b
@justinfx justinfx testing with reader() buffering a89e04c
@justinfx justinfx Merge branch 'master', remote branch 'origin' 7d334bb
@justinfx justinfx message.go: added a Bytes() method
codec_sio.go: defined Bytes() method; fixed typo in case expression
7c333f5
@justinfx justinfx Merge branch 'master' into dev 9c5bdc0
@justinfx justinfx adding lock to client.Send() f3fcd6e
@justinfx justinfx connections.go: working on reader() to fix 4096 buffer limitations 2b86f84
@justinfx justinfx reverting master to pre memory-fixes state 8376f60
@justinfx justinfx merged memory-fixes; fixed codec_* tests 7738730
@justinfx justinfx Merge branch 'master' of github.com:justinfx/go-socket.io into dev 25691d8
@justinfx justinfx gofix for weekly.2011-08-17 9454 ffb7e7d
@madari
Owner

This is wrong. The idea is that we incrementally give more data to the decoder until it can decode a message.
A message can span over multiple Read():s (see reader() and ReadBufferSize). The decoder is responsible
for eating the buffer and connection.go is responsible for giving it data to crunch.

For example, using a 16-byte read buffer and sending the alphabets results in the following:

2011/08/26 15:38:50 sio/conn: read nr=16 err=
2011/08/26 15:38:50 sio/conn: read data=~m~52~m~ABCDEFGH # this is appended to the decoder buffer and a Decode() is tried
2011/08/26 15:38:50 sio/conn: read nr=16 err=
2011/08/26 15:38:50 sio/conn: read data=IJKLMNOPQRSTUVWX # this is appended to the decoder buffer and a Decode() is tried
2011/08/26 15:38:50 sio/conn: read nr=16 err=
2011/08/26 15:38:50 sio/conn: read data=YZABCDEFGHIJKLMN # this is appended to the decoder buffer and a Decode() is tried
2011/08/26 15:38:50 sio/conn: read nr=12 err=
2011/08/26 15:38:50 sio/conn: read data=OPQRSTUVWXYZ # this is appended to the decoder buffer and a Decode() is tried
2011/08/26 15:38:50 Server received ABCDEFGHIJKLMNOPQRSTUVWXYZABCDEFGHIJKLMNOPQRSTUVWXYZ # Finally the decoder returns a decoded message

Is this only for the codec_streaming?
It seems to me that ReadBufferSize has nothing to do with incremental reading. It just sets a size for socket.Read() and can cause a buffer full if its smaller than the message being sent. I must be missing something, but it looks like reader() will do a socket.Read(buf), and if it didnt get an error and did read any bytes it will just call c.receive(), which then puts that data into the decBuf and called Decode() right away. At that point its either looking to get back messages or an error. If its only a partial single message, I dont understand how that goes back through the logic of building up more data. receive() looks to be wanting a parsed message or error.

  • reader() *
    calls socket.Read(buf)

    if problem: die
    else if good data -> c.receive() (blocking)
    reader() is blocked

  • receive() *
    put data in dec buffer
    call dec.Decode() (blocking call)
    receive() is blocked

  • Decode() *
    Goes into a loop reading the buffer, but would never receive any new data from reader() because its blocking right now
    Reads everything it has builds and returns the message or an error

  • receive() *
    return from Decode()
    got a message? pass it to the handler
    error? return

  • reader() *
    return from receive()
    loop again and try to Read again.

Is what I am missing, that Decode() will return an error if it only got part of a message, to allow reader() to send more data through the process until it gets the full message, and THEN returns a msg and error nil?

Owner

ReadBufferSize determines how many bytes to read at most during a single read from a socket. Smaller buffer => more reads.
The size of the buffer does not affect in any other way. It does not e.g. limit the maximum size of messages.

Your analysis was almost right.
If Decode was called with incomplete data (meaning that no error was detected, but the data was
just not fully in the buffer yet) it will return 0 messages and no errors. It just means "nothing to see here". The Decoder keeps its own internal
state and knows where to continue when it's called the next time.

The idea is that this works:

// initialization
buf := new(bytes.Buffer)
dec := codec.NewDecoder(buf)

// write the first part of the message: the header (simulating partial read 1/2)
buf.WriteString("~m~3~m~")
messages, err := dec.Decode() // messages = [], err = nil

// write the rest of the message: the payload (simulating partial read 2/2)
buf.WriteString("123")
messages, err := dec.Decode() // messages = ["123"], err = nil

Additionally you can check out TestDecodeStreaming in codec_sio_test.go.
I hope this helps.

I admit, I really don't have much of an idea how this code works or how it's supposed to work.

As far as I could tell this buffer just kept being written to without ever being reset, but apparently that's not the case?

@madari

if !sio.isAuthorized(req) {

@justinfx

Will update that.

@madari
Owner

Cherry-picked only the relevant commit from this pull request.
Landed in commit 9d0a678.
Thanks again!

@madari madari closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 29, 2011
  1. @justinfx

    connection.go: seperating general messages from heartbeats into diffe…

    justinfx authored
    …rent channels.
    
    heartbeat is delayed if the last message received is within the timeframe of the HeartbeatInterval.
Commits on Jul 30, 2011
  1. @justinfx
Commits on Aug 2, 2011
  1. @justinfx

    added SetAuthorization/IsAuthorized to SocketIO, to allow user to spe…

    justinfx authored
    …cify a custom
    
    authorization hook using the original http request.
Commits on Aug 3, 2011
  1. @justinfx

    fixed a bug in the new delayed heartbeats code where poor timing woul…

    justinfx authored
    …d cause a
    
    heartbeat to be skipped occasionally. new method uses a DelayTimer that resets
    every time a message is received from the client.
Commits on Aug 4, 2011
  1. @justinfx

    gofix for release r59

    justinfx authored
  2. @justinfx

    connection.go: was using a singleshot timer in keepalive(). needed to…

    justinfx authored
    … reset it after each trigger.
Commits on Aug 10, 2011
  1. @justinfx

    connection.go: using inconsistent heartbeat intervals apparently brea…

    justinfx authored
    …ks socket.io 0.6 client, which also has its own hardcoded timer and expects them to keep coming consistently.
  2. @edsrzf
Commits on Aug 11, 2011
  1. @edsrzf
  2. @justinfx

    Merge pull request #1 from edsrzf/memory-fixes

    justinfx authored
    Memory fixes
  3. @justinfx
  4. @justinfx
  5. @justinfx
  6. @justinfx

    message.go: added a Bytes() method

    justinfx authored
    codec_sio.go: defined Bytes() method; fixed typo in case expression
  7. @justinfx
Commits on Aug 12, 2011
  1. @justinfx

    adding lock to client.Send()

    justinfx authored
  2. @justinfx
  3. @justinfx
Commits on Aug 18, 2011
  1. @justinfx
Commits on Aug 22, 2011
  1. @justinfx
Commits on Aug 25, 2011
  1. @justinfx
Commits on Aug 26, 2011
  1. @justinfx

    mirroring madari master

    justinfx authored
  2. @justinfx
This page is out of date. Refresh to see the latest.
View
43 codec_sio.go
@@ -7,7 +7,6 @@ import (
"json"
"os"
"strconv"
- "strings"
"utf8"
)
@@ -16,21 +15,23 @@ const (
SIOAnnotationRealm = "r"
SIOAnnotationJSON = "j"
- sioFrameDelim = "~m~"
- sioFrameDelimJSON = "~j~"
- sioFrameDelimHeartbeat = "~h~"
-
sioMessageTypeDisconnect = 0
sioMessageTypeMessage = 1
sioMessageTypeHeartbeat = 2
sioMessageTypeHandshake = 3
)
+var (
+ sioFrameDelim = []byte("~m~")
+ sioFrameDelimJSON = []byte("~j~")
+ sioFrameDelimHeartbeat = []byte("~h~")
+)
+
// SioMessage fulfills the message interface.
type sioMessage struct {
annotations map[string]string
typ uint8
- data string
+ data []byte
}
// MessageType checks if the message starts with sioFrameDelimJSON or
@@ -73,7 +74,7 @@ func (sm *sioMessage) Annotation(key string) (value string, ok bool) {
// false will be returned.
func (sm *sioMessage) heartbeat() (heartbeat, bool) {
if sm.typ == sioMessageTypeHeartbeat {
- if n, err := strconv.Atoi(sm.data); err == nil {
+ if n, err := strconv.Atoi(string(sm.data)); err == nil {
return heartbeat(n), true
}
}
@@ -81,18 +82,23 @@ func (sm *sioMessage) heartbeat() (heartbeat, bool) {
return -1, false
}
-// Data returns the raw message.
+// Data returns the raw message as a string.
func (sm *sioMessage) Data() string {
return string(sm.data)
}
+// Bytes returns the raw message.
+func (sm *sioMessage) Bytes() []byte {
+ return sm.data
+}
+
// JSON returns the JSON embedded in the message, if available.
-func (sm *sioMessage) JSON() (string, bool) {
+func (sm *sioMessage) JSON() ([]byte, bool) {
if sm.Type() == MessageJSON {
return sm.data, true
}
- return "", false
+ return nil, false
}
// SIOCodec is the codec used by the official Socket.IO client by LearnBoost.
@@ -172,7 +178,6 @@ const (
sioDecodeStateEnd
)
-
type sioDecoder struct {
src *bytes.Buffer
buf bytes.Buffer
@@ -219,7 +224,7 @@ L:
case sioDecodeStateHeaderBegin:
dec.buf.WriteRune(c)
if dec.buf.Len() == len(sioFrameDelim) {
- if dec.buf.String() != sioFrameDelim {
+ if !bytes.Equal(dec.buf.Bytes(), sioFrameDelim) {
dec.Reset()
return nil, os.NewError("Malformed header")
}
@@ -250,7 +255,7 @@ L:
continue
}
- if dec.buf.String() != sioFrameDelim {
+ if !bytes.Equal(dec.buf.Bytes(), sioFrameDelim) {
dec.Reset()
return nil, os.NewError("Malformed header")
}
@@ -279,17 +284,19 @@ L:
}
}
- dec.msg.data = dec.buf.String()
+ data := dec.buf.Bytes()
dec.msg.typ = sioMessageTypeMessage
- if strings.HasPrefix(dec.msg.data, sioFrameDelimJSON) {
+ if bytes.HasPrefix(data, sioFrameDelimJSON) {
dec.msg.annotations = make(map[string]string)
dec.msg.annotations[SIOAnnotationJSON] = ""
- dec.msg.data = dec.msg.data[len(sioFrameDelimJSON):]
- } else if strings.HasPrefix(dec.msg.data, sioFrameDelimHeartbeat) {
+ data = data[len(sioFrameDelimJSON):]
+ } else if bytes.HasPrefix(data, sioFrameDelimHeartbeat) {
dec.msg.typ = sioMessageTypeHeartbeat
- dec.msg.data = dec.msg.data[len(sioFrameDelimHeartbeat):]
+ data = data[len(sioFrameDelimHeartbeat):]
}
+ dec.msg.data = make([]byte, len(data))
+ copy(dec.msg.data, data)
messages = append(messages, dec.msg)
View
8 codec_sio_test.go
@@ -52,15 +52,15 @@ var encodeTests = []encodeTest{
},
{
struct {
- Boolean bool "bOoLeAn"
- Str string "sTr"
- Array []int "A"
+ Boolean bool "Boolean"
+ Str string "Str"
+ Array []int "Array"
}{
false,
"string♥",
[]int{1, 2, 3, 4},
},
- frame(`{"bOoLeAn":false,"sTr":"string♥","A":[1,2,3,4]}`, true),
+ frame(`{"Boolean":false,"Str":"string♥","Array":[1,2,3,4]}`, true),
},
{
[]byte("hello, world"),
View
5 codec_siostreaming.go
@@ -236,7 +236,10 @@ L:
}
}
- dec.msg.data = dec.buf.String()
+ data := dec.buf.Bytes()
+ dec.msg.data = make([]byte, len(data))
+ copy(dec.msg.data, data)
+
dec.buf.Reset()
dec.state = sioStreamingDecodeStateTrailer
fallthrough
View
8 codec_siostreaming_test.go
@@ -61,15 +61,15 @@ var streamingEncodeTests = []streamingEncodeTest{
},
{
struct {
- Boolean bool "bOoLeAn"
- Str string "sTr"
- Array []int "A"
+ Boolean bool "Boolean"
+ Str string "Str"
+ Array []int "Array"
}{
false,
"string♥",
[]int{1, 2, 3, 4},
},
- streamingFrame(`{"bOoLeAn":false,"sTr":"string♥","A":[1,2,3,4]}`, 1, true),
+ streamingFrame(`{"Boolean":false,"Str":"string♥","Array":[1,2,3,4]}`, 1, true),
},
{
[]byte("hello, world"),
View
3  message.go
@@ -43,6 +43,7 @@ type Message interface {
Annotations() map[string]string
Annotation(string) (string, bool)
Data() string
+ Bytes() []byte
Type() uint8
- JSON() (string, bool)
+ JSON() ([]byte, bool)
}
Something went wrong with that request. Please try again.