Skip to content

Commit

Permalink
Merge pull request #3 from sebest/mongo_fallback
Browse files Browse the repository at this point in the history
More robust in case of mongodb failover
  • Loading branch information
sebest committed Dec 3, 2015
2 parents 8ef9cd8 + 1e28b22 commit 6ed339e
Show file tree
Hide file tree
Showing 13 changed files with 659 additions and 259 deletions.
17 changes: 12 additions & 5 deletions cmd/hookyd/main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package main

import (
"fmt"
"log"
"net/http"
"os"
"time"

"github.com/codegangsta/cli"
"github.com/sebest/hooky/models"
"github.com/sebest/hooky/restapi"
"github.com/sebest/hooky/scheduler"
"github.com/sebest/hooky/store"
Expand Down Expand Up @@ -36,7 +36,7 @@ func main() {
},
cli.StringFlag{
Name: "mongo-uri",
Value: "localhost/hooky",
Value: "mongodb://127.0.0.1/hooky",
Usage: "MongoDB URI to connect to",
EnvVar: "HOOKY_MONGO_URI",
},
Expand Down Expand Up @@ -82,6 +82,13 @@ func main() {
if err != nil {
log.Fatal(err)
}

db := s.DB()
if err := models.NewBase(db).Bootstrap(); err != nil {
log.Fatal(err)
}
db.Session.Close()

sched := scheduler.New(s, c.Int("max-mongo-query"), c.Int("max-http-request"), c.Int("touch-interval"), c.Int("clean-finished-attempts")*3600)
sched.Start()
ra, err := restapi.New(s, c.String("admin-password"), c.String("accesslog-format"))
Expand All @@ -97,11 +104,11 @@ func main() {
}
err = server.ListenAndServe()
if err != nil {
log.Fatal(err)
log.Println(err)
}
fmt.Println("exiting")
log.Println("exiting...")
sched.Stop()
fmt.Println("exited")
log.Println("exited")
}
app.Run(os.Args)
}
42 changes: 30 additions & 12 deletions models/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (b *Base) NewAccount(name *string) (account *Account, err error) {
Key: randKey(32),
}
err = b.db.C("accounts").Insert(account)
_, err = b.ShouldRefreshSession(err)
return
}

Expand All @@ -51,6 +52,7 @@ func (b *Base) UpdateAccount(accountID bson.ObjectId, name *string) (account *Ac
}
account = &Account{}
_, err = b.db.C("accounts").Find(query).Apply(change, account)
_, err = b.ShouldRefreshSession(err)
return
}

Expand All @@ -62,6 +64,7 @@ func (b *Base) GetAccount(accountID bson.ObjectId) (account *Account, err error)
}
account = &Account{}
err = b.db.C("accounts").Find(query).One(account)
_, err = b.ShouldRefreshSession(err)
if err == mgo.ErrNotFound {
err = nil
account = nil
Expand All @@ -76,19 +79,33 @@ func (b *Base) DeleteAccount(account bson.ObjectId) (err error) {
"deleted": true,
},
}
err = b.db.C("accounts").UpdateId(account, update)
if err == nil {
query := bson.M{
"account": account,
}
if _, err = b.db.C("applications").UpdateAll(query, update); err == nil {
if _, err = b.db.C("queues").UpdateAll(query, update); err == nil {
if _, err = b.db.C("tasks").UpdateAll(query, update); err == nil {
_, err = b.db.C("attempts").UpdateAll(query, update)
}
}
}

query := bson.M{
"account": account,
}
_, err = b.db.C("attempts").UpdateAll(query, update)
_, err = b.ShouldRefreshSession(err)
if err != nil {
return
}
_, err = b.db.C("queues").UpdateAll(query, update)
_, err = b.ShouldRefreshSession(err)
if err != nil {
return
}
_, err = b.db.C("tasks").UpdateAll(query, update)
_, err = b.ShouldRefreshSession(err)
if err != nil {
return
}
_, err = b.db.C("applications").UpdateAll(query, update)
_, err = b.ShouldRefreshSession(err)
if err != nil {
return
}

err = b.db.C("accounts").UpdateId(account, update)
_, err = b.ShouldRefreshSession(err)
return
}

Expand All @@ -109,6 +126,7 @@ func (b *Base) AuthenticateAccount(account bson.ObjectId, key string) (bool, err
}
n, err := b.db.C("accounts").Find(query).Count()
if err != nil {
_, err = b.ShouldRefreshSession(err)
return false, err
}
return n == 1, nil
Expand Down
87 changes: 55 additions & 32 deletions models/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package models

import (
"errors"
"fmt"

"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
Expand Down Expand Up @@ -38,6 +37,7 @@ func (b *Base) NewApplication(account bson.ObjectId, name string) (application *
Name: name,
}
err = b.db.C("applications").Insert(application)
_, err = b.ShouldRefreshSession(err)
return
}

Expand All @@ -50,6 +50,7 @@ func (b *Base) GetApplication(account bson.ObjectId, name string) (application *
}
application = &Application{}
err = b.db.C("applications").Find(query).One(application)
_, err = b.ShouldRefreshSession(err)
if err == mgo.ErrNotFound {
err = nil
application = nil
Expand All @@ -62,50 +63,72 @@ func (b *Base) DeleteApplication(account bson.ObjectId, name string) (err error)
if name == "default" {
return ErrDeleteDefaultApplication
}
query := bson.M{
"account": account,
"name": name,
}
update := bson.M{
"$set": bson.M{
"deleted": true,
},
}
if _, err = b.db.C("applications").UpdateAll(query, update); err == nil {
query := bson.M{
"account": account,
"application": name,
}
if _, err = b.db.C("queues").UpdateAll(query, update); err == nil {
if _, err = b.db.C("tasks").UpdateAll(query, update); err == nil {
_, err = b.db.C("attempts").UpdateAll(query, update)
}
}

query := bson.M{
"account": account,
"application": name,
}
_, err = b.db.C("attempts").UpdateAll(query, update)
_, err = b.ShouldRefreshSession(err)
if err != nil {
return
}
_, err = b.db.C("tasks").UpdateAll(query, update)
_, err = b.ShouldRefreshSession(err)
if err != nil {
return
}
_, err = b.db.C("queues").UpdateAll(query, update)
_, err = b.ShouldRefreshSession(err)
if err != nil {
return
}

query = bson.M{
"account": account,
"name": name,
}
_, err = b.db.C("applications").UpdateAll(query, update)
_, err = b.ShouldRefreshSession(err)
return
}

// DeleteApplications deletes all Applications owns by an Account.
func (b *Base) DeleteApplications(account bson.ObjectId) (err error) {
query := bson.M{
"account": account,
"name": bson.M{"$ne": "default"},
}
update := bson.M{
"$set": bson.M{
"deleted": true,
},
}
if _, err = b.db.C("applications").UpdateAll(query, update); err == nil {
query = bson.M{
"account": account,
}
if _, err = b.db.C("queues").UpdateAll(query, update); err == nil {
if _, err = b.db.C("tasks").UpdateAll(query, update); err == nil {
_, err = b.db.C("attempts").UpdateAll(query, update)
}
}
query := bson.M{
"account": account,
}
_, err = b.db.C("attempts").UpdateAll(query, update)
_, err = b.ShouldRefreshSession(err)
if err != nil {
return
}
_, err = b.db.C("tasks").UpdateAll(query, update)
_, err = b.ShouldRefreshSession(err)
if err != nil {
return
}
_, err = b.db.C("queues").UpdateAll(query, update)
_, err = b.ShouldRefreshSession(err)
if err != nil {
return
}
query = bson.M{
"account": account,
"name": bson.M{"$ne": "default"},
}
_, err = b.db.C("applications").UpdateAll(query, update)
_, err = b.ShouldRefreshSession(err)
return
}

Expand All @@ -119,14 +142,14 @@ func (b *Base) GetApplications(account bson.ObjectId, lp ListParams, lr *ListRes
}

// EnsureApplicationIndex creates mongo indexes for Application.
func (b *Base) EnsureApplicationIndex() {
func (b *Base) EnsureApplicationIndex() (err error) {
index := mgo.Index{
Key: []string{"account", "name"},
Unique: true,
Background: false,
Sparse: true,
}
if err := b.db.C("applications").EnsureIndex(index); err != nil {
fmt.Printf("Error creating index on applications: %s\n", err)
}
err = b.db.C("applications").EnsureIndex(index)
_, err = b.ShouldRefreshSession(err)
return
}
Loading

0 comments on commit 6ed339e

Please sign in to comment.