Skip to content

Commit

Permalink
Turns out it isn't safe to use aggregation in 2.4 at all.
Browse files Browse the repository at this point in the history
2.4 fails if the total size of the aggregation is too big,
while in 3.2 it only fails if the size of an individual doc
gets too big.
Running mgopurge against big 2.4 databases was failing
(2.4M txns * 20 bytes per ID is >16MB).
  • Loading branch information
jameinel committed Apr 20, 2017
1 parent 42e03db commit 92f057b
Showing 1 changed file with 13 additions and 14 deletions.
27 changes: 13 additions & 14 deletions oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,13 @@ type DBOracle struct {
// populates the working set by inserting them from the client. This is less
// efficient that a $out in the pipeline, but must be used when Mongo doesn't
// support pipelines.
func (o *DBOracle) prepareWorkingDirectly(pipeline []bson.M) error {
func (o *DBOracle) prepareWorkingDirectly() error {
logger.Debugf("iterating the transactions collection to build the working set: %q", o.working.Name)
// Make sure the working set is clean
o.working.DropCollection()
pipe := o.txns.Pipe(pipeline)
pipe.Batch(maxBatchDocs)
pipe.AllowDiskUse()
iter := pipe.Iter()
query := o.txns.Find(bson.M{"s": bson.M{"$gte": taborted}}).Select(bson.M{"_id": 1})
query.Batch(maxBatchDocs)
iter := query.Iter()
var txnDoc struct {
Id bson.ObjectId `bson:"_id"`
}
Expand Down Expand Up @@ -149,8 +148,13 @@ func (o *DBOracle) prepareWorkingDirectly(pipeline []bson.M) error {

// prepareWorkingWithPipeline adds a $out stage to the pipeline, and has mongo
// populate the working set. This is the preferred method if Mongo supports $out.
func (o *DBOracle) prepareWorkingWithPipeline(pipeline []bson.M) error {
pipeline = append(pipeline, bson.M{"$out": o.working.Name})
func (o *DBOracle) prepareWorkingWithPipeline() error {
pipeline := []bson.M{
// This used to use $in but that's much slower than $gte.
{"$match": bson.M{"s": bson.M{"$gte": taborted}}},
{"$project": bson.M{"_id": 1}},
{"$out": o.working.Name},
}
pipe := o.txns.Pipe(pipeline)
pipe.Batch(maxBatchDocs)
pipe.AllowDiskUse()
Expand All @@ -167,16 +171,11 @@ func (o *DBOracle) prepare() (func(), error) {
// Load the ids of all completed and aborted txns into a separate
// temporary collection.
logger.Debugf("loading all completed transactions")
pipeline := []bson.M{
// This used to use $in but that's much slower than $gte.
{"$match": bson.M{"s": bson.M{"$gte": taborted}}},
{"$project": bson.M{"_id": 1}},
}
var err error
if o.usingMongoOut {
err = o.prepareWorkingWithPipeline(pipeline)
err = o.prepareWorkingWithPipeline()
} else {
err = o.prepareWorkingDirectly(pipeline)
err = o.prepareWorkingDirectly()
}
if err != nil {
o.cleanup()
Expand Down

0 comments on commit 92f057b

Please sign in to comment.