Permalink
Browse files

db: added a connection collector

This collector retires unused connections, so they don't feel tired :-)
  • Loading branch information...
1 parent d3c3b8d commit aa68bfbd70be6da0e24db60c667ececd69d8b768 @fsouza fsouza committed Jan 21, 2013
Showing with 61 additions and 0 deletions.
  1. +28 −0 db/storage.go
  2. +33 −0 db/storage_test.go
View
@@ -24,6 +24,8 @@ var (
ticker *time.Ticker // for garbage collection
)
+const period time.Duration = 7 * 24 * time.Hour
+
type session struct {
s *mgo.Session
used time.Time
@@ -130,3 +132,29 @@ func (s *Storage) Users() *mgo.Collection {
func (s *Storage) Teams() *mgo.Collection {
return s.Collection("teams")
}
+
+func init() {
+ ticker = time.NewTicker(72 * time.Hour)
+ go retire(ticker)
+}
+
+// retire retires old connections :-)
+func retire(t *time.Ticker) {
+ for _ = range t.C {
+ now := time.Now()
+ var old []string
+ mut.RLock()
+ for k, v := range conn {
+ if now.Sub(v.used) >= period {
+ old = append(old, k)
+ }
+ }
+ mut.RUnlock()
+ mut.Lock()
+ for _, c := range old {
+ conn[c].s.Close()
+ delete(conn, c)
+ }
+ mut.Unlock()
+ }
+}
View
@@ -9,7 +9,9 @@ import (
"labix.org/v2/mgo"
. "launchpad.net/gocheck"
"reflect"
+ "sync"
"testing"
+ "time"
)
type hasUniqueIndexChecker struct{}
@@ -47,6 +49,10 @@ type S struct{}
var _ = Suite(&S{})
+func (s *S) SetUpSuite(c *C) {
+ ticker.Stop()
+}
+
func (s *S) TearDownSuite(c *C) {
storage, err := Open("127.0.0.1:27017", "tsuru_storage_test")
c.Assert(err, IsNil)
@@ -173,3 +179,30 @@ func (s *S) TestMethodTeamsShouldReturnTeamsCollection(c *C) {
teamsc := storage.Collection("teams")
c.Assert(teams, DeepEquals, teamsc)
}
+
+func (s *S) TestRetire(c *C) {
+ defer func() {
+ if r := recover(); !c.Failed() && r == nil {
+ c.Errorf("Should panic in ping, but did not!")
+ }
+ }()
+ storage, _ := Open("127.0.0.1:27017", "tsuru_storage_test")
+ sess := conn["127.0.0.1:27017"]
+ sess.used = sess.used.Add(-1 * 2 * period)
+ conn["127.0.0.1:27017"] = sess
+ var ticker time.Ticker
+ ch := make(chan time.Time, 1)
+ ticker.C = ch
+ ch <- time.Now()
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ retire(&ticker)
+ wg.Done()
+ }()
+ close(ch)
+ wg.Wait()
+ _, ok := conn["127.0.0.1:27017"]
+ c.Check(ok, Equals, false)
+ storage.session.Ping()
+}

0 comments on commit aa68bfb

Please sign in to comment.