Skip to content

Commit

Permalink
event: add extra targets to events
Browse files Browse the repository at this point in the history
  • Loading branch information
cezarsa committed Dec 18, 2017
1 parent 12cafe2 commit a76d722
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 5 deletions.
2 changes: 2 additions & 0 deletions db/storage.go
Expand Up @@ -208,12 +208,14 @@ func (s *Storage) Events() *storage.Collection {
kindIndex := mgo.Index{Key: []string{"kind.name"}}
startTimeIndex := mgo.Index{Key: []string{"-starttime"}}
uniqueIdIndex := mgo.Index{Key: []string{"uniqueid"}}
runningIndex := mgo.Index{Key: []string{"running"}}
c := s.Collection("events")
c.EnsureIndex(ownerIndex)
c.EnsureIndex(targetIndex)
c.EnsureIndex(kindIndex)
c.EnsureIndex(startTimeIndex)
c.EnsureIndex(uniqueIdIndex)
c.EnsureIndex(runningIndex)
return c
}

Expand Down
64 changes: 59 additions & 5 deletions event/event.go
Expand Up @@ -186,11 +186,12 @@ type eventData struct {
ID eventID `bson:"_id"`
UniqueID bson.ObjectId
StartTime time.Time
EndTime time.Time `bson:",omitempty"`
Target Target `bson:",omitempty"`
StartCustomData bson.Raw `bson:",omitempty"`
EndCustomData bson.Raw `bson:",omitempty"`
OtherCustomData bson.Raw `bson:",omitempty"`
EndTime time.Time `bson:",omitempty"`
Target Target `bson:",omitempty"`
ExtraTargets []ExtraTarget `bson:",omitempty"`
StartCustomData bson.Raw `bson:",omitempty"`
EndCustomData bson.Raw `bson:",omitempty"`
OtherCustomData bson.Raw `bson:",omitempty"`
Kind Kind
Owner Owner
LockUpdateTime time.Time
Expand Down Expand Up @@ -341,8 +342,14 @@ type Event struct {
logWriter io.Writer
}

type ExtraTarget struct {
Target Target
Lock bool
}

type Opts struct {
Target Target
ExtraTargets []ExtraTarget
Kind *permission.PermissionScheme
InternalKind string
Owner auth.Token
Expand Down Expand Up @@ -841,6 +848,7 @@ func newEvt(opts *Opts) (evt *Event, err error) {
evt = &Event{eventData: eventData{
ID: id,
UniqueID: uniqID,
ExtraTargets: opts.ExtraTargets,
Target: opts.Target,
StartTime: now,
Kind: k,
Expand All @@ -857,6 +865,11 @@ func newEvt(opts *Opts) (evt *Event, err error) {
for i := 0; i < maxRetries+1; i++ {
err = coll.Insert(evt.eventData)
if err == nil {
err = checkLocked(evt, opts.DisableLock)
if err != nil {
evt.Abort()
return nil, err
}
err = checkIsBlocked(evt)
if err != nil {
evt.Done(err)
Expand All @@ -883,6 +896,47 @@ func newEvt(opts *Opts) (evt *Event, err error) {
return nil, err
}

func checkLocked(evt *Event, disableLock bool) error {
var targets []Target
if !disableLock {
targets = append(targets, evt.Target)
}
for _, et := range evt.ExtraTargets {
if et.Lock {
targets = append(targets, et.Target)
}
}
if len(targets) == 0 {
return nil
}
var orBlock []bson.M
for _, t := range targets {
tBson, _ := t.GetBSON()
orBlock = append(orBlock, bson.M{"_id": tBson}, bson.M{
"extratargets": bson.M{"$elemMatch": bson.M{"target": tBson, "lock": true}},
})
}
conn, err := db.Conn()
if err != nil {
return err
}
defer conn.Close()
coll := conn.Events()
var existing Event
err = coll.Find(bson.M{
"running": true,
"uniqueid": bson.M{"$ne": evt.UniqueID},
"$or": orBlock,
}).One(&existing.eventData)
if err != nil {
if err == mgo.ErrNotFound {
return nil
}
return err
}
return ErrEventLocked{event: &existing}
}

func (e *Event) RawInsert(start, other, end interface{}) error {
e.ID = eventID{ObjId: e.UniqueID}
var err error
Expand Down
111 changes: 111 additions & 0 deletions event/event_test.go
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"runtime"
"sort"
Expand Down Expand Up @@ -177,6 +178,116 @@ func (s *S) TestNewLocks(c *check.C) {
c.Assert(err, check.ErrorMatches, `event locked: app\(myapp\) running "app.update.env.set" start by user me@me.com at .+`)
}

func (s *S) TestNewExtraTargetLocks(c *check.C) {
tests := []struct {
target1 Target
extras1 []ExtraTarget
disableLock1 bool
target2 Target
extras2 []ExtraTarget
disableLock2 bool
err string
}{
{
target1: Target{Type: "app", Value: "myapp"},
target2: Target{Type: "container", Value: "x"},
extras2: []ExtraTarget{{Target: Target{Type: "app", Value: "myapp"}, Lock: true}},
err: `event locked: app\(myapp\) running "app.update.env.set" start by user me@me.com at .+`,
},
{
target1: Target{Type: "app", Value: "myapp"},
extras1: []ExtraTarget{{Target: Target{Type: "app", Value: "myapp2"}, Lock: true}},
target2: Target{Type: "app", Value: "myapp2"},
err: `event locked: app\(myapp\) running "app.update.env.set" start by user me@me.com at .+`,
},
{
target1: Target{Type: "app", Value: "myapp"},
target2: Target{Type: "container", Value: "x"},
extras2: []ExtraTarget{{Target: Target{Type: "app", Value: "myapp"}, Lock: false}},
err: "",
},
{
target1: Target{Type: "app", Value: "myapp"},
extras1: []ExtraTarget{{Target: Target{Type: "app", Value: "myapp2"}, Lock: false}},
target2: Target{Type: "app", Value: "myapp2"},
err: "",
},
{
target1: Target{Type: "app", Value: "myapp"},
extras1: []ExtraTarget{{Target: Target{Type: "app", Value: "myapp2"}, Lock: true}},
target2: Target{Type: "app", Value: "myapp2"},
disableLock2: true,
err: "",
},
{
target1: Target{Type: "app", Value: "myapp"},
extras1: []ExtraTarget{{Target: Target{Type: "app", Value: "myapp2"}, Lock: true}},
target2: Target{Type: "app", Value: "myapp2"},
disableLock2: true,
err: "",
},
}
for i, tt := range tests {
evt1, err := New(&Opts{
Target: tt.target1,
ExtraTargets: tt.extras1,
Kind: permission.PermAppUpdateEnvSet,
Owner: s.token,
Allowed: Allowed(permission.PermAppReadEvents),
DisableLock: tt.disableLock1,
})
c.Assert(err, check.IsNil)
evt2, err := New(&Opts{
Target: tt.target2,
ExtraTargets: tt.extras2,
Kind: permission.PermAppUpdateEnvUnset,
Owner: s.token,
Allowed: Allowed(permission.PermAppReadEvents),
DisableLock: tt.disableLock2,
})
if tt.err != "" {
c.Assert(err, check.ErrorMatches, tt.err, check.Commentf("failed test case %d - %#v", i, tt))
} else {
c.Assert(err, check.IsNil, check.Commentf("failed test case %d - %#v", i, tt))
}
err = evt1.Done(nil)
c.Assert(err, check.IsNil)
if evt2 != nil {
err = evt2.Done(nil)
c.Assert(err, check.IsNil)
}
}
}

func (s *S) TestNewLockExtraTargetRace(c *check.C) {
originalMaxProcs := runtime.GOMAXPROCS(10)
defer runtime.GOMAXPROCS(originalMaxProcs)
wg := sync.WaitGroup{}
var countOK int32
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
_, err := New(&Opts{
Target: Target{Type: "app", Value: fmt.Sprintf("myapp-%d", i)},
ExtraTargets: []ExtraTarget{
{Target: Target{Type: "app", Value: "myapp"}, Lock: true},
},
Kind: permission.PermAppUpdateEnvSet,
Owner: s.token,
Allowed: Allowed(permission.PermAppReadEvents),
})
if _, ok := err.(ErrEventLocked); ok {
return
}
c.Assert(err, check.IsNil)
atomic.AddInt32(&countOK, 1)
}(i)
}
wg.Wait()
c.Assert(countOK <= 1, check.Equals, true)
}

func (s *S) TestNewDoneDisableLock(c *check.C) {
evt, err := New(&Opts{
Target: Target{Type: "app", Value: "myapp"},
Expand Down

0 comments on commit a76d722

Please sign in to comment.