Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Renamed message names to follow the specification. Added types with d…

…oc. from specification.
  • Loading branch information...
commit 66242ab27bdc68caa360584f727e5899dadfd6e4 1 parent a95d40c
John Mac authored committed
Showing with 179 additions and 84 deletions.
  1. +179 −84 mongo/message.go
View
263 mongo/message.go
@@ -19,6 +19,8 @@ import (
)
+const _ZERO = 0
+
// Request Opcodes
const (
_OP_REPLY = 1 // Reply to a client request. responseTo is set
@@ -57,7 +59,7 @@ func header(h msgHeader) []byte {
}
-// *** Interface
+// *** Messages interface
// ***
type message interface {
@@ -70,155 +72,222 @@ type message interface {
// *** Client Request Messages
// ***
-type deleteMsg struct {
- fullCollectionName string
- selector BSON
- requestID int32
-}
-
-func (self *deleteMsg) OpCode() int32 { return _OP_DELETE }
-func (self *deleteMsg) RequestID() int32 { return self.requestID }
+// *** OP_UPDATE
-func (self *deleteMsg) Bytes() []byte {
- zero := make([]byte, 4)
- buf := bytes.NewBuffer(zero)
- buf.WriteString(self.fullCollectionName)
- buf.WriteByte(0)
- buf.Write(zero)
- buf.Write(self.selector.Bytes())
- return buf.Bytes()
-
-}
+/*const (
+ Upsert
+ MultiUpdate
+)
-// ***
+type opUpdate struct {
+ //header msgHeader // standard message header
+ //_ZERO int32 // 0 - reserved for future use
+ fullCollectionName string // "dbname.collectionname"
+ flags int32 // bit vector. see below
+ selector BSON // the query to select the document
+ update BSON // specification of the update to perform
+}*/
-type getMoreMsg struct {
+type opUpdate struct {
fullCollectionName string
- numberToReturn int32
- cursorID int64
+ flags int32
+ selector, document BSON
requestID int32
}
-func (self *getMoreMsg) OpCode() int32 { return _OP_GET_MORE }
-func (self *getMoreMsg) RequestID() int32 { return self.requestID }
+func (self *opUpdate) OpCode() int32 { return _OP_UPDATE }
+func (self *opUpdate) RequestID() int32 { return self.requestID }
+
+func (self *opUpdate) Bytes() []byte {
+ b := make([]byte, 4)
+ buf := bytes.NewBuffer(b) // _ZERO
-func (self *getMoreMsg) Bytes() []byte {
- buf := bytes.NewBuffer(make([]byte, 4))
buf.WriteString(self.fullCollectionName)
buf.WriteByte(0)
- b := make([]byte, 4)
- binary.LittleEndian.PutUint32(b, uint32(self.numberToReturn))
+ binary.LittleEndian.PutUint32(b, uint32(self.flags))
buf.Write(b)
- b = make([]byte, 8)
- binary.LittleEndian.PutUint64(b, uint64(self.cursorID))
- buf.Write(b)
+ buf.Write(self.selector.Bytes())
+ buf.Write(self.document.Bytes())
return buf.Bytes()
}
-// ***
+// *** OP_INSERT
+/*
+type opInsert struct {
+ //header msgHeader // standard message header
+ //_ZERO int32 // 0 - reserved for future use
+ fullCollectionName string // "dbname.collectionname"
+ documents BSON // one or more documents to insert into the collection
+}*/
-type insertMsg struct {
+type opInsert struct {
fullCollectionName string
doc BSON
requestID int32
}
-func (self *insertMsg) OpCode() int32 { return _OP_INSERT }
-func (self *insertMsg) RequestID() int32 { return self.requestID }
+func (self *opInsert) OpCode() int32 { return _OP_INSERT }
+func (self *opInsert) RequestID() int32 { return self.requestID }
+
+func (self *opInsert) Bytes() []byte {
+ buf := bytes.NewBuffer(make([]byte, 4)) // _ZERO
-func (self *insertMsg) Bytes() []byte {
- buf := bytes.NewBuffer(make([]byte, 4))
buf.WriteString(self.fullCollectionName)
buf.WriteByte(0)
+
buf.Write(self.doc.Bytes())
+
return buf.Bytes()
}
-// ***
+// *** OP_QUERY
-type killMsg struct {
- numberOfCursorIDs int32
- cursorIDs []int64
- requestID int32
-}
+/*type opQuery struct {
+ //header msgHeader // standard message header
+ opts int32 // query options. See below for details.
+ fullCollectionName string // "dbname.collectionname"
+ numberToSkip int32 // number of documents to skip
+ numberToReturn int32 // number of documents to return in the first OP_REPLY batch
+ query BSON // query object. See below for details.
+ returnFieldSelector BSON // Optional. Selector indicating the fields to return. See below for details.
+}*/
-func (self *killMsg) OpCode() int32 { return _OP_KILL_CURSORS }
-func (self *killMsg) RequestID() int32 { return self.requestID }
+type opQuery struct {
+ opts int32
+ fullCollectionName string
+ numberToSkip int32
+ numberToReturn int32
+ query BSON
+ requestID int32
+}
-func (self *killMsg) Bytes() []byte {
- buf := bytes.NewBuffer(make([]byte, 4))
+func (self *opQuery) OpCode() int32 { return _OP_QUERY }
+func (self *opQuery) RequestID() int32 { return self.requestID }
+func (self *opQuery) Bytes() []byte {
+ var buf bytes.Buffer
b := make([]byte, 4)
- binary.LittleEndian.PutUint32(b, uint32(self.numberOfCursorIDs))
+
+ binary.LittleEndian.PutUint32(b, uint32(self.opts))
buf.Write(b)
- b = make([]byte, 8)
- for _, id := range self.cursorIDs {
- binary.LittleEndian.PutUint64(b, uint64(id))
- buf.Write(b)
- }
+ buf.WriteString(self.fullCollectionName)
+ buf.WriteByte(0)
+
+ binary.LittleEndian.PutUint32(b, uint32(self.numberToSkip))
+ buf.Write(b)
+
+ binary.LittleEndian.PutUint32(b, uint32(self.numberToReturn))
+ buf.Write(b)
+
+ buf.Write(self.query.Bytes())
return buf.Bytes()
}
-// ***
-
-type queryMsg struct {
- opts int32
+// *** OP_GET_MORE
+/*
+type opGetMore struct {
+ //header msgHeader // standard message header
+ //_ZERO int32 // 0 - reserved for future use
+ fullCollectionName string // "dbname.collectionname"
+ numberToReturn int32 // number of documents to return
+ cursorID int64 // cursorID from the OP_REPLY
+}*/
+
+type opGetMore struct {
fullCollectionName string
- numberToSkip int32
numberToReturn int32
- query BSON
+ cursorID int64
requestID int32
}
-func (self *queryMsg) OpCode() int32 { return _OP_QUERY }
-func (self *queryMsg) RequestID() int32 { return self.requestID }
+func (self *opGetMore) OpCode() int32 { return _OP_GET_MORE }
+func (self *opGetMore) RequestID() int32 { return self.requestID }
-func (self *queryMsg) Bytes() []byte {
+func (self *opGetMore) Bytes() []byte {
b := make([]byte, 4)
- binary.LittleEndian.PutUint32(b, uint32(self.opts))
+ buf := bytes.NewBuffer(b) // _ZERO
- buf := bytes.NewBuffer(b)
buf.WriteString(self.fullCollectionName)
buf.WriteByte(0)
- binary.LittleEndian.PutUint32(b, uint32(self.numberToSkip))
+ binary.LittleEndian.PutUint32(b, uint32(self.numberToReturn))
buf.Write(b)
- binary.LittleEndian.PutUint32(b, uint32(self.numberToReturn))
+ b = make([]byte, 8)
+ binary.LittleEndian.PutUint64(b, uint64(self.cursorID))
buf.Write(b)
- buf.Write(self.query.Bytes())
return buf.Bytes()
}
-// ***
-
-type updateMsg struct {
+// *** OP_DELETE
+/*
+type opDelete struct {
+ //header msgHeader // standard message header
+ //_ZERO int32 // 0 - reserved for future use
+ fullCollectionName string // "dbname.collectionname"
+ flags int32 // bit vector - see below for details.
+ selector BSON // query object. See below for details.
+}*/
+
+type opDelete struct {
fullCollectionName string
- flags int32
- selector, document BSON
+ selector BSON
requestID int32
}
-func (self *updateMsg) OpCode() int32 { return _OP_UPDATE }
-func (self *updateMsg) RequestID() int32 { return self.requestID }
+func (self *opDelete) OpCode() int32 { return _OP_DELETE }
+func (self *opDelete) RequestID() int32 { return self.requestID }
+
+func (self *opDelete) Bytes() []byte {
+ b := make([]byte, 4)
+ buf := bytes.NewBuffer(b) // _ZERO
-func (self *updateMsg) Bytes() []byte {
- buf := bytes.NewBuffer(make([]byte, 4))
buf.WriteString(self.fullCollectionName)
buf.WriteByte(0)
- b := make([]byte, 4)
- binary.LittleEndian.PutUint32(b, uint32(self.flags))
buf.Write(b)
buf.Write(self.selector.Bytes())
- buf.Write(self.document.Bytes())
+
+ return buf.Bytes()
+}
+
+// *** OP_KILL_CURSORS
+
+/*type opKillCursors struct {
+ //header msgHeader // standard message header
+ //_ZERO int32 // 0 - reserved for future use
+ numberOfCursorIDs int32 // number of cursorIDs in message
+ cursorIDs []int64 // sequence of cursorIDs to close
+}*/
+
+type opKillCursors struct {
+ numberOfCursorIDs int32
+ cursorIDs []int64
+ requestID int32
+}
+
+func (self *opKillCursors) OpCode() int32 { return _OP_KILL_CURSORS }
+func (self *opKillCursors) RequestID() int32 { return self.requestID }
+
+func (self *opKillCursors) Bytes() []byte {
+ b := make([]byte, 4)
+ buf := bytes.NewBuffer(b) // _ZERO
+
+ binary.LittleEndian.PutUint32(b, uint32(self.numberOfCursorIDs))
+ buf.Write(b)
+
+ b = make([]byte, 8)
+ for _, id := range self.cursorIDs {
+ binary.LittleEndian.PutUint64(b, uint64(id))
+ buf.Write(b)
+ }
return buf.Bytes()
}
@@ -227,7 +296,18 @@ func (self *updateMsg) Bytes() []byte {
// *** Database Response Message
// ***
-type replyMsg struct {
+// *** OP_REPLY
+
+/*type opReply struct {
+ //header msgHeader // standard message header
+ responseFlag int32 // normally zero, non-zero on query failure
+ cursorID int64 // cursor id if client needs to do get more's
+ startingFrom int32 // where in the cursor this reply is starting
+ numberReturned int32 // number of documents in the reply
+ documents *vector.Vector // documents
+}*/
+
+type opReply struct {
responseTo int32
responseFlag int32
cursorID int64
@@ -236,7 +316,7 @@ type replyMsg struct {
docs *vector.Vector
}
-func (self *Connection) readReply() (*replyMsg, os.Error) {
+func (self *Connection) readReply() (*opReply, os.Error) {
size_bits, _ := ioutil.ReadAll(io.LimitReader(self.conn, 4))
size := binary.LittleEndian.Uint32(size_bits)
rest, _ := ioutil.ReadAll(io.LimitReader(self.conn, int64(size)-4))
@@ -244,8 +324,8 @@ func (self *Connection) readReply() (*replyMsg, os.Error) {
return reply, nil
}
-func parseReply(b []byte) *replyMsg {
- r := new(replyMsg)
+func parseReply(b []byte) *opReply {
+ r := new(opReply)
r.responseTo = int32(binary.LittleEndian.Uint32(b[4:8]))
r.responseFlag = int32(binary.LittleEndian.Uint32(b[12:16]))
r.cursorID = int64(binary.LittleEndian.Uint64(b[16:24]))
@@ -269,3 +349,18 @@ func parseReply(b []byte) *replyMsg {
return r
}
+
+// *** Utility
+// ***
+
+func (self *Connection) writeMessage(m message) os.Error {
+ body := m.Bytes()
+ h := header(msgHeader{int32(len(body) + 16), m.RequestID(), 0, m.OpCode()})
+
+ msg := bytes.Add(h, body)
+ _, err := self.conn.Write(msg)
+
+ last_req = m.RequestID()
+ return err
+}
+
Please sign in to comment.
Something went wrong with that request. Please try again.