diff --git a/message.go b/message.go index f333c3c..bfc3132 100644 --- a/message.go +++ b/message.go @@ -223,6 +223,27 @@ func (wm *WireMessage) encodeHeader(dataSize uint16) (*WireMessage, error) { return wm, nil } +func newKeepalive(rxPortalSize int, pool *Pool) (wm *WireMessage, err error) { + wm = &WireMessage{ + Seq: -1, + Mt: KEEPALIVE, + buf: pool.Get(), + } + util.WriteInt32(wm.buf.Data[dataStart:], int32(rxPortalSize)) + return wm.encodeHeader(4) +} + +func (wm *WireMessage) asKeepalive() (rxPortalSize int, err error) { + if wm.messageType() != KEEPALIVE { + return 0, errors.Errorf("unexpected message type [%d], expected KEEPALIVE", wm.messageType()) + } + if wm.buf.Used < dataStart+4 { + return 0, errors.Errorf("short buffer for keepalive decode [%d < %d]", wm.buf.Used, dataStart+4) + } + rxPortalSize = int(util.ReadInt32(wm.buf.Data[dataStart:])) + return rxPortalSize, nil +} + func decodeHeader(buf *Buffer) (*WireMessage, error) { size := util.ReadUint16(buf.Data[5:dataStart]) if uint32(dataStart+size) > buf.Used {