Ensure logTailer returns records in time order #6612

Merged
merged 6 commits into from Nov 25, 2016
View
@@ -59,11 +59,20 @@ type ModelSessioner interface {
ModelUUID() string
}
+// logIndexes defines the indexes we need on the log collection.
+var logIndexes = [][]string{
+ // This index needs to include _id because
+ // logTailer.processCollection uses _id to ensure log records with
+ // the same time have a consistent ordering.
+ {"e", "t", "_id"},
+ {"e", "n"},
+}
+
// InitDbLogs sets up the indexes for the logs collection. It should
// be called as state is opened. It is idempotent.
func InitDbLogs(session *mgo.Session) error {
logsColl := session.DB(logsDB).C(logsC)
- for _, key := range [][]string{{"e", "t"}, {"e", "n"}} {
+ for _, key := range logIndexes {
err := logsColl.EnsureIndex(mgo.Index{Key: key})
if err != nil {
return errors.Annotate(err, "cannot create index for logs collection")
@@ -457,14 +466,12 @@ func (t *logTailer) processCollection() error {
// and the tests only run one mongod process, including _id
// guarantees getting log messages in a predictable order.
//
- // Important: it is critical that the sort on _id is done
- // separately from the sort on {model, time}. Combining the sort
- // fields means that MongoDB won't use the indexes that are in
- // place, which risks hitting MongoDB's 32MB sort limit. See
- // https://pad.lv/1590605.
+ // Important: it is critical that the sort index includes _id,
+ // otherwise MongoDB won't use the index, which risks hitting
+ // MongoDB's 32MB sort limit. See https://pad.lv/1590605.
//
// TODO(ericsnow) Sort only by _id once it is a sequential int.
- iter := query.Sort("e", "t").Sort("_id").Iter()
+ iter := query.Sort("e", "t", "_id").Iter()
doc := new(logDoc)
for iter.Next(doc) {
rec, err := logDocToRecord(doc)
View
@@ -158,9 +158,9 @@ func (s *LogsSuite) TestIndexesCreated(c *gc.C) {
keys = append(keys, strings.Join(index.Key, "-"))
}
c.Assert(keys, jc.SameContents, []string{
- "_id", // default index
- "e-t", // model-uuid and timestamp
- "e-n", // model-uuid and entity
+ "_id", // default index
+ "e-t-_id", // model-uuid and timestamp
+ "e-n", // model-uuid and entity
})
}
@@ -550,6 +550,26 @@ func (s *LogTailerSuite) TestInitialLines(c *gc.C) {
s.assertTailer(c, tailer, 5, expected)
}
+func (s *LogTailerSuite) TestRecordsAddedOutOfTimeOrder(c *gc.C) {
+ format := "2006-01-02 03:04"
+ t1, err := time.Parse(format, "2016-11-25 09:10")
+ c.Assert(err, jc.ErrorIsNil)
+ t2, err := time.Parse(format, "2016-11-25 09:20")
+ c.Assert(err, jc.ErrorIsNil)
+ here := logTemplate{Message: "logged here"}
+ s.writeLogsT(c, t2, t2, 1, here)
+ migrated := logTemplate{Message: "transferred by migration"}
+ s.writeLogsT(c, t1, t1, 1, migrated)
+
+ tailer, err := state.NewLogTailer(s.otherState, &state.LogTailerParams{})
+ c.Assert(err, jc.ErrorIsNil)
+ defer tailer.Stop()
+
+ // They still come back in the right time order.
+ s.assertTailer(c, tailer, 1, migrated)
+ s.assertTailer(c, tailer, 1, here)
+}
+
func (s *LogTailerSuite) TestInitialLinesWithNotEnoughLines(c *gc.C) {
expected := logTemplate{Message: "want"}
s.writeLogs(c, 2, expected)
View
@@ -9,6 +9,7 @@ import (
"github.com/juju/errors"
"github.com/juju/loggo"
"gopkg.in/juju/names.v2"
+ "gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"gopkg.in/mgo.v2/txn"
)
@@ -220,3 +221,20 @@ func stripLocalFromFields(st *State, collName string, fields ...string) ([]txn.O
}
return ops, nil
}
+
+func DropOldLogIndex(st *State) error {
+ // If the log collection still has the old e,t index, remove it.
+ key := []string{"e", "t"}
+ db := st.MongoSession().DB(logsDB)
+ collection := db.C(logsC)
+ err := collection.DropIndex(key...)
+ if err == nil {
+ return nil
+ }
+ if queryErr, ok := err.(*mgo.QueryError); ok {
@mjs

mjs Nov 25, 2016

Contributor

I was thinking you could check the output of Indexes() and then decide whether to drop, but this is fine too.

@babbageclunk

babbageclunk Nov 25, 2016

Member

I had that at first, but this seemed less racy. Although it does depend on looking at the error message.

+ if strings.HasPrefix(queryErr.Message, "index not found") {
+ return nil
+ }
+ }
+ return errors.Trace(err)
+}
View
@@ -4,6 +4,7 @@
package state
import (
+ "reflect"
"time"
jc "github.com/juju/testing/checkers"
@@ -361,3 +362,42 @@ func (s *upgradesSuite) TestRenameAddModelPermission(c *gc.C) {
}}
s.assertUpgradedData(c, RenameAddModelPermission, coll, expected)
}
+
+func (s *upgradesSuite) TestDropOldLogIndex(c *gc.C) {
+ coll := s.state.MongoSession().DB(logsDB).C(logsC)
+ err := coll.EnsureIndexKey("e", "t")
+ c.Assert(err, jc.ErrorIsNil)
+ err = DropOldLogIndex(s.state)
+ c.Assert(err, jc.ErrorIsNil)
+
+ exists, err := hasIndex(coll, []string{"e", "t"})
+ c.Assert(err, jc.ErrorIsNil)
+ c.Assert(exists, jc.IsFalse)
+
+ // Sanity check for idempotency.
+ err = DropOldLogIndex(s.state)
+ c.Assert(err, jc.ErrorIsNil)
+}
+
+func (s *upgradesSuite) TestDropOldIndexWhenNoIndex(c *gc.C) {
+ coll := s.state.MongoSession().DB(logsDB).C(logsC)
+ exists, err := hasIndex(coll, []string{"e", "t"})
+ c.Assert(err, jc.ErrorIsNil)
+ c.Assert(exists, jc.IsFalse)
+
+ err = DropOldLogIndex(s.state)
+ c.Assert(err, jc.ErrorIsNil)
+}
+
+func hasIndex(coll *mgo.Collection, key []string) (bool, error) {
+ indexes, err := coll.Indexes()
+ if err != nil {
+ return false, err
+ }
+ for _, index := range indexes {
+ if reflect.DeepEqual(index.Key, key) {
+ return true, nil
+ }
+ }
+ return false, nil
+}
View
@@ -19,6 +19,7 @@ import (
var stateUpgradeOperations = func() []Operation {
steps := []Operation{
upgradeToVersion{version.MustParse("2.0.0"), stateStepsFor20()},
+ upgradeToVersion{version.MustParse("2.1.0"), stateStepsFor21()},
}
return steps
}
View
@@ -0,0 +1,21 @@
+// Copyright 2016 Canonical Ltd.
+// Licensed under the AGPLv3, see LICENCE file for details.
+
+package upgrades
+
+import (
+ "github.com/juju/juju/state"
+)
+
+// stateStepsFor21 returns upgrade steps for Juju 2.1 that manipulate state directly.
+func stateStepsFor21() []Step {
+ return []Step{
+ &upgradeStep{
+ description: "drop old log index",
+ targets: []Target{DatabaseMaster},
+ run: func(context Context) error {
+ return state.DropOldLogIndex(context.State())
+ },
+ },
+ }
+}
@@ -659,6 +659,7 @@ func (s *upgradeSuite) TestStateUpgradeOperationsVersions(c *gc.C) {
versions := extractUpgradeVersions(c, (*upgrades.StateUpgradeOperations)())
c.Assert(versions, gc.DeepEquals, []string{
"2.0.0",
+ "2.1.0",
})
}