Skip to content

Commit

Permalink
Merge pull request #10 from manyminds/refactor_agent
Browse files Browse the repository at this point in the history
Refactor agent
  • Loading branch information
sharpner committed Feb 26, 2016
2 parents 6215c68 + e3e18bb commit d3b1716
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 66 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ go:

before_install:
- sudo apt-get -qq update
- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv EA312927
- echo "deb http://repo.mongodb.org/apt/ubuntu "$(lsb_release -sc)"/mongodb-org/3.2 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.2.list
- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10
- echo 'deb http://downloads-distro.mongodb.org/repo/ubuntu-upstart dist 10gen' | sudo tee /etc/apt/sources.list.d/mongodb.list
- sudo apt-get update
- sudo apt-get install -y mongodb-org
- ulimit -a
Expand Down
1 change: 1 addition & 0 deletions redkeepcli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func main() {
log.Fatal(err)
}

log.Println("Agent started.")
agent.Tail(running, *rescan)
running <- false
}
89 changes: 73 additions & 16 deletions tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"log"
"strings"
"time"
Expand All @@ -22,6 +23,51 @@ type TailAgent struct {
startTime time.Time
}

//Query represents a mongodb oplog query
type Query interface {
DB() string
C() string
OP() string
}

type oplogQuery struct {
dataset map[string]interface{}
db, collection string
}

func (o oplogQuery) C() string {
return o.collection
}
func (o oplogQuery) DB() string {
return o.db
}

func (o oplogQuery) OP() string {
if s, ok := o.dataset["op"].(string); ok {
return s
}

return ""
}

//NewOplogQuery generates a new query object from the given dataset
func NewOplogQuery(dataset map[string]interface{}) (Query, error) {
namespace, ok := dataset["ns"].(string)
if namespace == "" || !ok {
return nil, errors.New("namespace not given")
}

p := strings.Index(namespace, ".")
if p == -1 {
return nil, errors.New("Invalid namespace given, must contain dot")
}

triggerDB := namespace[:p]
triggerCollection := namespace[p+1:]

return oplogQuery{dataset: dataset, db: triggerDB, collection: triggerCollection}, nil
}

//mongoTimestamp has the capability to cast to a mongo timestamp
type mongoTimestamp struct {
time.Time
Expand All @@ -37,21 +83,22 @@ func (m mongoTimestamp) MongoTimestamp() bson.MongoTimestamp {
return bson.MongoTimestamp(result)
}

func (t TailAgent) analyzeResult(dataset map[string]interface{}) {
namespace, ok := dataset["ns"].(string)
if namespace == "" || !ok {
func analyzeResult(dataset map[string]interface{}, w []Watch, s *mgo.Session) {
query, err := NewOplogQuery(dataset)
if err != nil {
log.Println(err)
return
}

p := strings.Index(namespace, ".")
if p == -1 {
return
}
session := s.Copy()
defer session.Close()

watches := t.config.Watches
triggerDB := namespace[:p]
triggerCollection := namespace[p+1:]
operationType := dataset["op"]
t := NewChangeTracker(session)
watches := w
triggerDB := query.DB()
triggerCollection := query.C()
operationType := query.OP()
namespace := fmt.Sprintf("%s.%s", triggerDB, triggerCollection)

if command, ok := dataset["o"].(map[string]interface{}); ok {
triggerID, _ := command["_id"].(bson.ObjectId)
Expand All @@ -65,7 +112,7 @@ func (t TailAgent) analyzeResult(dataset map[string]interface{}) {
switch operationType {
case "i":
if w.TargetCollection == namespace {
t.tracker.HandleInsert(w, command, triggerRef)
t.HandleInsert(w, command, triggerRef)
}
case "u":
if w.TargetCollection == namespace {
Expand All @@ -75,18 +122,18 @@ func (t TailAgent) analyzeResult(dataset map[string]interface{}) {
Id: dataset["o2"].(map[string]interface{})["_id"].(bson.ObjectId),
}

t.tracker.HandleInsert(w, command, triggerRef)
t.HandleInsert(w, command, triggerRef)
}

if w.TrackCollection == namespace {
if selector, ok := dataset["o2"].(map[string]interface{}); ok {
t.tracker.HandleUpdate(w, command, selector)
t.HandleUpdate(w, command, selector)
}
}
case "d":
if w.TrackCollection == namespace {
if selector, ok := dataset["o2"].(map[string]interface{}); ok {
t.tracker.HandleRemove(w, command, selector)
t.HandleRemove(w, command, selector)
}
}
case "c":
Expand Down Expand Up @@ -133,6 +180,7 @@ func (t TailAgent) Tail(quit chan bool, forceRescan bool) error {
query := oplogCollection.Find(bson.M{"ts": bson.M{"$gt": startTime.MongoTimestamp()}})
iter := query.LogReplay().Sort("$natural").Tail(requeryDuration)

sessionCopy := session.Copy()
var lastTimestamp bson.MongoTimestamp
for {
select {
Expand All @@ -146,7 +194,15 @@ func (t TailAgent) Tail(quit chan bool, forceRescan bool) error {

for iter.Next(&result) {
lastTimestamp = result["ts"].(bson.MongoTimestamp)
t.analyzeResult(result)

// in order to avoid a race condition, each routine needs
// copies from everything.
copyResult := make(map[string]interface{})
for k, v := range result {
copyResult[k] = v
}

go analyzeResult(copyResult, t.config.Watches[:], sessionCopy)
}

if iter.Err() != nil {
Expand Down Expand Up @@ -178,6 +234,7 @@ func (t *TailAgent) connect() error {
t.session = session
t.tracker = NewChangeTracker(t.session)

log.Println("Connected.")
return nil
}

Expand Down
Loading

0 comments on commit d3b1716

Please sign in to comment.