Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

sorting functions related to messages. Using constants to get flags

  • Loading branch information...
commit 277e82a42583122391673581997c591da94cbe86 1 parent 7cb27ce
John Mac authored committed
View
145 mongo/collection.go
@@ -10,7 +10,24 @@ import (
)
-var fUpsert, fUpdateAll, fUpsertAll int32 // OP_UPDATE flags
+type indexDesc struct {
+ Name string
+ Ns string
+ Key map[string]int
+}
+
+type Collection struct {
+ db *Database
+ name string
+}
+
+
+// *** Client Request Messages
+// ***
+
+// *** OP_UPDATE
+
+var fUpsert, fUpdateAll, fUpsertAll int32 // flags
// Calculates values of flags
func init() {
@@ -20,19 +37,79 @@ func init() {
setBit32(&fUpsertAll, f_UPSERT, f_MULTI_UPDATE)
}
+func (self *Collection) Update(selector, document BSON) os.Error {
+ return self.update(&opUpdate{self.fullName(), _ZERO, selector, document})
+}
-type indexDesc struct {
- Name string
- Ns string
- Key map[string]int
+func (self *Collection) Upsert(selector, document BSON) os.Error {
+ return self.update(&opUpdate{self.fullName(), fUpsert, selector, document})
}
+func (self *Collection) UpdateAll(selector, document BSON) os.Error {
+ return self.update(&opUpdate{self.fullName(), fUpdateAll, selector, document})
+}
-type Collection struct {
- db *Database
- name string
+func (self *Collection) UpsertAll(selector, document BSON) os.Error {
+ return self.update(&opUpdate{self.fullName(), fUpsertAll, selector, document})
+}
+
+func (self *Collection) update(msg *opUpdate) os.Error {
+ return self.db.Conn.writeOp(msg)
+}
+
+// *** OP_INSERT
+
+func (self *Collection) Insert(doc BSON) os.Error {
+ msg := &opInsert{self.fullName(), doc}
+ return self.db.Conn.writeOp(msg)
+}
+
+// *** OP_QUERY
+
+func (self *Collection) Query(query BSON, skip, limit int32) (*Cursor, os.Error) {
+ conn := self.db.Conn
+ reqID := getRequestID()
+ msg := &opQuery{o_NONE, self.fullName(), skip, limit, query}
+
+ if err := conn.writeOpQuery(msg, reqID); err != nil {
+ return nil, err
+ }
+
+ reply, err := conn.readReply()
+ if err != nil {
+ return nil, err
+ }
+ if reply.responseTo != reqID {
+ return nil, os.NewError("wrong responseTo code")
+ }
+
+ return &Cursor{self, reply.cursorID, 0, reply.documents}, nil
+}
+
+// *** OP_DELETE
+
+var fSingleRemove int32 // flags
+
+// Calculates values of flags
+func init() {
+ setBit32(&fSingleRemove, f_SINGLE_REMOVE)
+}
+
+func (self *Collection) Remove(selector BSON) os.Error {
+ return self.remove(&opDelete{self.fullName(), _ZERO, selector})
}
+func (self *Collection) RemoveFirst(selector BSON) os.Error {
+ return self.remove(&opDelete{self.fullName(), fSingleRemove, selector})
+}
+
+func (self *Collection) remove(msg *opDelete) os.Error {
+ return self.db.Conn.writeOp(msg)
+}
+
+
+//**********
+
func (self *Collection) fullName() string { return self.db.name + "." + self.name }
func (self *Collection) EnsureIndex(name string, index map[string]int) os.Error {
@@ -69,36 +146,6 @@ func (self *Collection) Drop() os.Error {
return err
}
-func (self *Collection) Insert(doc BSON) os.Error {
- im := &opInsert{self.fullName(), doc}
- return self.db.Conn.writeOp(im)
-}
-
-func (self *Collection) Remove(selector BSON) os.Error {
- dm := &opDelete{self.fullName(), selector}
- return self.db.Conn.writeOp(dm)
-}
-
-func (self *Collection) Query(query BSON, skip, limit int) (*Cursor, os.Error) {
- reqID := getRequestID()
- conn := self.db.Conn
- qm := &opQuery{0, self.fullName(), int32(skip), int32(limit), query}
-
- err := conn.writeOpQuery(qm, reqID)
- if err != nil {
- return nil, err
- }
-
- reply, err := conn.readReply()
- if err != nil {
- return nil, err
- }
- if reply.responseTo != reqID {
- return nil, os.NewError("wrong responseTo code")
- }
-
- return &Cursor{self, reply.cursorID, 0, reply.documents}, nil
-}
func (self *Collection) FindAll(query BSON) (*Cursor, os.Error) {
return self.Query(query, 0, 0)
@@ -129,28 +176,6 @@ func (self *Collection) Count(query BSON) (int64, os.Error) {
return int64(reply.Get("n").Number()), nil
}
-func (self *Collection) update(msg *opUpdate) os.Error {
- conn := self.db.Conn
-
- return conn.writeOp(msg)
-}
-
-func (self *Collection) Update(selector, document BSON) os.Error {
- return self.update(&opUpdate{self.fullName(), _ZERO, selector, document})
-}
-
-func (self *Collection) Upsert(selector, document BSON) os.Error {
- return self.update(&opUpdate{self.fullName(), fUpsert, selector, document})
-}
-
-func (self *Collection) UpdateAll(selector, document BSON) os.Error {
- return self.update(&opUpdate{self.fullName(), fUpdateAll, selector, document})
-}
-
-func (self *Collection) UpsertAll(selector, document BSON) os.Error {
- return self.update(&opUpdate{self.fullName(), fUpsertAll, selector, document})
-}
-
// *** Utility
// ***
View
43 mongo/connection.go
@@ -5,7 +5,11 @@
package mongo
import (
+ "bytes"
+ "container/vector"
"fmt"
+ "io"
+ "io/ioutil"
"net"
"os"
)
@@ -57,3 +61,42 @@ func (self *Connection) GetDB(name string) *Database {
return &Database{self, name}
}
+
+// *** Database Response Message
+// ***
+
+// *** OP_REPLY
+
+func (self *Connection) readReply() (*opReply, os.Error) {
+ size_bits, _ := ioutil.ReadAll(io.LimitReader(self.conn, 4))
+ size := pack.Uint32(size_bits)
+ rest, _ := ioutil.ReadAll(io.LimitReader(self.conn, int64(size)-4))
+ reply := parseReply(rest)
+ return reply, nil
+}
+
+func parseReply(b []byte) *opReply {
+ r := new(opReply)
+ r.responseTo = int32(pack.Uint32(b[4:8]))
+ r.responseFlag = int32(pack.Uint32(b[12:16]))
+ r.cursorID = int64(pack.Uint64(b[16:24]))
+ r.startingFrom = int32(pack.Uint32(b[24:28]))
+ r.numberReturned = int32(pack.Uint32(b[28:32]))
+ r.documents = new(vector.Vector)
+
+ if r.numberReturned > 0 {
+ buf := bytes.NewBuffer(b[36:len(b)])
+ for i := 0; int32(i) < r.numberReturned; i++ {
+ var bson BSON
+ bb := new(_BSONBuilder)
+ bb.ptr = &bson
+ bb.Object()
+ Parse(buf, bb)
+ r.documents.Push(bson)
+ ioutil.ReadAll(io.LimitReader(buf, 4))
+ }
+ }
+
+ return r
+}
+
View
61 mongo/cursor.go
@@ -17,37 +17,21 @@ type Cursor struct {
docs *vector.Vector
}
-func (self *Cursor) HasMore() bool {
- if self.pos < self.docs.Len() {
- return true
- }
-
- err := self.GetMore()
- if err != nil {
- return false
- }
- return self.pos < self.docs.Len()
-}
+// *** Client Request Messages
+// ***
-func (self *Cursor) GetNext() (BSON, os.Error) {
- if self.HasMore() {
- doc := self.docs.At(self.pos).(BSON)
- self.pos = self.pos + 1
- return doc, nil
- }
- return nil, os.NewError("cursor failure")
-}
+// *** OP_GET_MORE
func (self *Cursor) GetMore() os.Error {
if self.id == 0 {
return os.NewError("no cursorID")
}
- gm := &opGetMore{self.collection.fullName(), 0, self.id}
conn := self.collection.db.Conn
- err := conn.writeOp(gm)
- if err != nil {
+ msg := &opGetMore{self.collection.fullName(), 0, self.id}
+
+ if err := conn.writeOp(msg); err != nil {
return err
}
@@ -62,14 +46,41 @@ func (self *Cursor) GetMore() os.Error {
return nil
}
+// *** OP_KILL_CURSORS
+
func (self *Cursor) Close() os.Error {
if self.id == 0 {
// not open on server
return nil
}
- km := &opKillCursors{1, []int64{self.id}}
- conn := self.collection.db.Conn
- return conn.writeOp(km)
+ msg := &opKillCursors{1, []int64{self.id}}
+ return self.collection.db.Conn.writeOp(msg)
+}
+
+
+// **************
+
+
+func (self *Cursor) HasMore() bool {
+ if self.pos < self.docs.Len() {
+ return true
+ }
+
+ err := self.GetMore()
+ if err != nil {
+ return false
+ }
+
+ return self.pos < self.docs.Len()
+}
+
+func (self *Cursor) GetNext() (BSON, os.Error) {
+ if self.HasMore() {
+ doc := self.docs.At(self.pos).(BSON)
+ self.pos = self.pos + 1
+ return doc, nil
+ }
+ return nil, os.NewError("cursor failure")
}
View
57 mongo/message.go
@@ -12,9 +12,6 @@ package mongo
import (
"bytes"
"container/vector"
- "io"
- "io/ioutil"
- "os"
)
@@ -141,11 +138,11 @@ func (self *opInsert) Bytes() []byte {
// opts
const (
- _QUERY_None = 0
- _QUERY_TailableCursor = 2
- _QUERY_SlaveOK = 4
- //_QUERY_OplogReplay = 8 // drivers should not implement
- _QUERY_NoCursorTimeout = 16
+ o_NONE = 0
+ o_TAILABLE_CURSOR = 2
+ o_SLAVE_OK = 4
+ o_NO_CURSOR_TIMEOUT = 16
+ //o_LOG_REPLAY = 8 // drivers should not implement
)
// query
@@ -153,11 +150,11 @@ const (
type opQuery struct {
//header msgHeader // standard message header
- opts int32 // query options. See above for details.
+ opts int32 // query options. See above
fullCollectionName string // "dbname.collectionname"
numberToSkip int32 // number of documents to skip
numberToReturn int32 // number of documents to return in the first OP_REPLY batch
- query BSON // query object. See above for details.
+ query BSON // query object. See above
//returnFieldSelector BSON // Optional. Selector indicating the fields to return.
}
@@ -219,7 +216,7 @@ func (self *opGetMore) Bytes() []byte {
const (
// If set, the database will remove only the first matching document in the
// collection. Otherwise all matching documents will be removed.
- _DELETE_SingleRemove = 1
+ f_SINGLE_REMOVE = 0
// 1-31 - Reserved - Must be set to 0.
)
@@ -228,8 +225,8 @@ type opDelete struct {
//header msgHeader // standard message header
//ZERO int32 // 0 - reserved for future use
fullCollectionName string // "dbname.collectionname"
- //flags int32 // bit vector - see above for details.
- selector BSON // query object. See below for details.
+ flags int32 // bit vector - see above
+ selector BSON // query object. See above
}
func (self *opDelete) OpCode() int32 { return _OP_DELETE }
@@ -241,6 +238,7 @@ func (self *opDelete) Bytes() []byte {
buf.WriteString(self.fullCollectionName)
buf.WriteByte(0)
+ pack.PutUint32(w32, uint32(self.flags))
buf.Write(w32)
buf.Write(self.selector.Bytes())
@@ -291,36 +289,3 @@ type opReply struct {
documents *vector.Vector // documents
}
-func (self *Connection) readReply() (*opReply, os.Error) {
- size_bits, _ := ioutil.ReadAll(io.LimitReader(self.conn, 4))
- size := pack.Uint32(size_bits)
- rest, _ := ioutil.ReadAll(io.LimitReader(self.conn, int64(size)-4))
- reply := parseReply(rest)
- return reply, nil
-}
-
-func parseReply(b []byte) *opReply {
- r := new(opReply)
- r.responseTo = int32(pack.Uint32(b[4:8]))
- r.responseFlag = int32(pack.Uint32(b[12:16]))
- r.cursorID = int64(pack.Uint64(b[16:24]))
- r.startingFrom = int32(pack.Uint32(b[24:28]))
- r.numberReturned = int32(pack.Uint32(b[28:32]))
- r.documents = new(vector.Vector)
-
- if r.numberReturned > 0 {
- buf := bytes.NewBuffer(b[36:len(b)])
- for i := 0; int32(i) < r.numberReturned; i++ {
- var bson BSON
- bb := new(_BSONBuilder)
- bb.ptr = &bson
- bb.Object()
- Parse(buf, bb)
- r.documents.Push(bson)
- ioutil.ReadAll(io.LimitReader(buf, 4))
- }
- }
-
- return r
-}
-
Please sign in to comment.
Something went wrong with that request. Please try again.