Skip to content

Commit

Permalink
basic query function
Browse files Browse the repository at this point in the history
  • Loading branch information
mikejs committed Nov 16, 2009
1 parent 2e41c8e commit c89ce29
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 51 deletions.
158 changes: 108 additions & 50 deletions mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@ package mongo

import (
"os";
"io";
"net";
"fmt";
"rand";
"bytes";
"encoding/binary";
"container/vector";
)

var last_req int32

const (
OP_REPLY = 1;
OP_MSG = 1000;
Expand All @@ -21,6 +25,12 @@ const (
OP_KILL_CURSORS = 2007;
)

type message interface {
Bytes() []byte;
RequestID() int32;
OpCode() int32;
}

type Connection struct {
host string;
port int;
Expand Down Expand Up @@ -48,6 +58,75 @@ func header(length, reqID, respTo, opCode int32) []byte {
return b;
}

func (c *Connection) writeMessage(m message) os.Error {
body := m.Bytes();
hb := header(int32(len(body)+16), m.RequestID(), 0, m.OpCode());
msg := bytes.Add(hb, body);

_, err := c.conn.Write(msg);

last_req = m.RequestID();
return err;
}

type Database struct {
conn *Connection;
name string;
}

func (c *Connection) GetDB(name string) *Database {
return &Database{c, name}
}

type Collection struct {
db *Database;
name string;
}

func (db *Database) GetCollection(name string) *Collection {
return &Collection{db, name}
}

func (c *Collection) fullName() string { return c.db.name + "." + c.name }

func (c *Collection) Insert(doc BSON) {
im := &insertMsg{c.fullName(), doc, rand.Int31()};

err := c.db.conn.writeMessage(im);

if err != nil {
fmt.Printf("Error inserting: %v\n", err);
os.Exit(1);
}
}

func (coll *Collection) Query(query BSON) (*vector.Vector, os.Error) {
req_id := rand.Int31();
qm := &queryMsg{0, coll.fullName(), 0, 0, query, req_id};

conn := coll.db.conn;
err := conn.writeMessage(qm);
if err != nil {
return nil, err
}

size_bits, _ := io.ReadAll(io.LimitReader(conn.conn, 4));
size := binary.LittleEndian.Uint32(size_bits);

rest, _ := io.ReadAll(io.LimitReader(conn.conn, int64(size)-4));
resp_id := int32(binary.LittleEndian.Uint32(rest[4:8]));
if resp_id != req_id {
return nil, os.NewError("fail")
}

reply := ParseReply(rest[12:len(rest)]);
if reply.numberReturned == 0 {
return nil, os.NewError("0 returned")
}

return reply.docs, nil;
}

type queryMsg struct {
opts int32;
fullCollectionName string;
Expand All @@ -74,16 +153,7 @@ func (q *queryMsg) Bytes() []byte {
buf.Write(b);

buf.Write(q.query.Bytes());

hb := header(int32(buf.Len()+16), q.requestID, 0, OP_QUERY);

return bytes.Add(hb, buf.Bytes());
}

type message interface {
Bytes() []byte;
RequestID() int32;
OpCode() int32;
return buf.Bytes();
}

type insertMsg struct {
Expand All @@ -99,51 +169,39 @@ func (i *insertMsg) Bytes() []byte {
buf.WriteString(i.fullCollectionName);
buf.WriteByte(0);
buf.Write(i.doc.Bytes());

hb := header(int32(buf.Len()+16), i.requestID, 0, OP_INSERT);

return bytes.Add(hb, buf.Bytes());
return buf.Bytes();
}

func Query(collection string, query BSON) BSON {
laddr, _ := net.ResolveTCPAddr("localhost");
addr, _ := net.ResolveTCPAddr("localhost:27017");
conn, err := net.DialTCP("tcp", laddr, addr);

if err != nil {
fmt.Printf("Failed connecting to database: %v\n", err);
os.Exit(1);
}

qm := queryMsg{0, "test.coll", 0, 0, query, 1};
b := qm.Bytes();

n, err := conn.Write(b);

if err != nil || n != len(b) {
fmt.Printf("Error writing query: %v\n", err);
os.Exit(1);
}

return query;
func (db *Database) GetCollectionNames() *vector.StringVector {
return vector.NewStringVector(0)
}

func (c *Connection) Insert(collection string, doc BSON) {
im := insertMsg{collection, doc, 529};

_, err := c.conn.Write(im.Bytes());

if err != nil {
fmt.Printf("Error inserting: %v\n", err);
os.Exit(1);
}
type replyMsg struct {
responseFlag int32;
cursorID int64;
startingFrom int32;
numberReturned int32;
docs *vector.Vector;
}

type Database struct {
conn *Connection;
name string;
}
func ParseReply(b []byte) *replyMsg {
r := new(replyMsg);
r.responseFlag = int32(binary.LittleEndian.Uint32(b[0:4]));
r.cursorID = int64(binary.LittleEndian.Uint64(b[4:12]));
r.startingFrom = int32(binary.LittleEndian.Uint32(b[12:16]));
r.numberReturned = int32(binary.LittleEndian.Uint32(b[16:20]));
r.docs = vector.New(0);

buf := bytes.NewBuffer(b[24:len(b)]);
for i := 0; int32(i) < r.numberReturned; i++ {
var bson BSON;
bb := new(_BSONBuilder);
bb.ptr = &bson;
bb.Object();
Parse(buf, bb);
r.docs.Push(bson);
io.ReadAll(io.LimitReader(buf, 4));
}

func (db *Database) GetCollectionNames() *vector.StringVector {
return vector.NewStringVector(0)
return r;
}
12 changes: 11 additions & 1 deletion mongo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,21 @@ package mongo_test
import (
"mongo";
"testing";
"fmt";
)

func TestInsert(t *testing.T) {
obj, _ := mongo.BytesToBSON([]byte{92, 0, 0, 0, 1, 115, 101, 99, 111, 110, 100, 0, 0, 0, 0, 0, 0, 0, 0, 64, 3, 102, 105, 102, 116, 104, 0, 23, 0, 0, 0, 2, 118, 0, 2, 0, 0, 0, 101, 0, 2, 102, 0, 2, 0, 0, 0, 105, 0, 0, 3, 102, 111, 117, 114, 116, 104, 0, 5, 0, 0, 0, 0, 2, 116, 104, 105, 114, 100, 0, 6, 0, 0, 0, 116, 104, 114, 101, 101, 0, 16, 102, 105, 114, 115, 116, 0, 1, 0, 0, 0, 0});

conn, _ := mongo.Connect("localhost", 27017);
conn.Insert("test.coll", obj);
coll := conn.GetDB("test").GetCollection("coll");
coll.Insert(obj);

q, _ := mongo.BytesToBSON([]byte{5, 0, 0, 0, 0});
ret, err := coll.Query(q);
if err != nil || ret == nil {
fmt.Printf("Failed: %v\n", err)
}

fmt.Printf("Got %v docs.\n", ret.Len());
}

0 comments on commit c89ce29

Please sign in to comment.