Skip to content

Commit

Permalink
Initial distributor implementation.
Browse files Browse the repository at this point in the history
This adds the following:
  * A new 'distributor' subpackage which defines the internal interface between DM and all distributor implementations (swarming, jobsim, etc.)
  * Scheduling logic in mutate; executions now get scheduled with distributors, distributors can ping back about the status of executions.
  * DM supports quest-controllable DM-side timeouts for interactions with the distributor.
  * The pRPC interface has been reduced to 4 rpcs for full functionality. Of those, ActivateExecution and FinishAttempt are very simple, and EnsureGraphData and WalkGraph have standardized on GraphData as their output format.
  * The persistent-state API has been implemented as part of the distributor interface.
  * The state machine flow has been simplified (elimination of 'adding deps' state, attempts now are in the executing state for the full duration of an outstanding execution, all failure states between attempts and executions are unified and separate from a successful finished state).

Other misc changes:
  * "as_account" is part of the quest description
  * quests can be generated from templates as part of EnsureQuestData
  * a fake distributor implementation for testing
  * tests have been refactored to take advantage of fake distributor; the deps tests were getting really annoying to update and keep correct.

BUG=550684

Review-Url: https://codereview.chromium.org/1537883002
  • Loading branch information
riannucci authored and Commit bot committed Jun 18, 2016
1 parent 10fab38 commit 00e9027
Show file tree
Hide file tree
Showing 97 changed files with 6,099 additions and 3,313 deletions.
15 changes: 8 additions & 7 deletions appengine/cmd/dm/deps/activate_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
package deps

import (
"github.com/luci/gae/service/datastore"
"github.com/luci/luci-go/appengine/cmd/dm/model"
"github.com/luci/luci-go/common/api/dm/service/v1"
"github.com/luci/luci-go/appengine/cmd/dm/mutate"
dm "github.com/luci/luci-go/common/api/dm/service/v1"
"github.com/luci/luci-go/common/logging"
google_pb "github.com/luci/luci-go/common/proto/google"
"golang.org/x/net/context"
)

func (d *deps) ActivateExecution(c context.Context, req *dm.ActivateExecutionReq) (*google_pb.Empty, error) {
err := datastore.Get(c).RunInTransaction(func(c context.Context) error {
_, _, err := model.ActivateExecution(c, req.Auth, req.ExecutionToken)
return err
}, nil)
logging.Fields{"execution": req.Auth.Id}.Infof(c, "activating")
err := tumbleNow(c, &mutate.ActivateExecution{
Auth: req.Auth,
NewTok: req.ExecutionToken,
})
return &google_pb.Empty{}, err
}
100 changes: 48 additions & 52 deletions appengine/cmd/dm/deps/activate_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ package deps
import (
"testing"

"github.com/luci/gae/service/datastore"
"github.com/luci/luci-go/appengine/tumble"
"github.com/luci/luci-go/appengine/cmd/dm/distributor/fake"
dm "github.com/luci/luci-go/common/api/dm/service/v1"
. "github.com/luci/luci-go/common/testing/assertions"
. "github.com/smartystreets/goconvey/convey"
Expand All @@ -18,65 +17,62 @@ func TestActivateExecution(t *testing.T) {
t.Parallel()

Convey("Test ActivateExecution", t, func() {
ttest := &tumble.Testing{}
c := ttest.Context()
ds := datastore.Get(c)
_ = ds
s := newDecoratedDeps()
ttest, c, dist, s := testSetup()

qid := ensureQuest(c, "foo", 1)
qid := s.ensureQuest(c, "foo", 1)
ttest.Drain(c)
realAuth := execute(c, dm.NewAttemptID(qid, 1))

eid := dm.NewExecutionID(qid, 1, 1)

req := &dm.ActivateExecutionReq{
Auth: &dm.Execution_Auth{Id: eid},
ExecutionToken: []byte("newtok"),
}

Convey("bad", func() {
Convey("wrong token", func() {
_, err := s.ActivateExecution(c, req)
So(err, ShouldBeRPCUnauthenticated, "failed to activate")
})

Convey("wrong token (already activated)", func() {
req.Auth.Token = realAuth.Token
_, err := s.ActivateExecution(c, req)
So(err, ShouldBeNil)

req.Auth.Token = []byte("bad sekret")
req.ExecutionToken = []byte("random other tok")
_, err = s.ActivateExecution(c, req)
So(err, ShouldBeRPCUnauthenticated, "failed to activate")
dist.RunTask(c, eid, func(tsk *fake.Task) error {
req := &dm.ActivateExecutionReq{
Auth: &dm.Execution_Auth{Id: eid},
ExecutionToken: []byte("sufficiently long new 'random' token"),
}

Convey("bad", func() {
Convey("wrong token", func() {
_, err := s.ActivateExecution(c, req)
So(err, ShouldBeRPCPermissionDenied, "failed to activate")
})

Convey("wrong token (already activated)", func() {
req.Auth.Token = tsk.Auth.Token
_, err := s.ActivateExecution(c, req)
So(err, ShouldBeNil)

req.Auth.Token = []byte("bad sekret")
req.ExecutionToken = []byte("random other super duper long token")
_, err = s.ActivateExecution(c, req)
So(err, ShouldBeRPCPermissionDenied, "failed to activate")
})

Convey("concurrent activation", func() {
req.Auth.Token = tsk.Auth.Token
_, err := s.ActivateExecution(c, req)
So(err, ShouldBeNil)

req.ExecutionToken = append(req.ExecutionToken, []byte(" (but incorrect)")...)
_, err = s.ActivateExecution(c, req)
So(err, ShouldBeRPCPermissionDenied, "failed to activate")
})
})

Convey("concurrent activation", func() {
req.Auth.Token = realAuth.Token
_, err := s.ActivateExecution(c, req)
So(err, ShouldBeNil)
Convey("good", func() {
req.Auth.Token = tsk.Auth.Token

req.ExecutionToken = []byte("other newtok")
_, err = s.ActivateExecution(c, req)
So(err, ShouldBeRPCUnauthenticated, "failed to activate")
})
})

Convey("good", func() {
req.Auth.Token = realAuth.Token

Convey("normal activation", func() {
_, err := s.ActivateExecution(c, req)
So(err, ShouldBeNil)
})
Convey("normal activation", func() {
_, err := s.ActivateExecution(c, req)
So(err, ShouldBeNil)
})

Convey("repeated activation", func() {
_, err := s.ActivateExecution(c, req)
So(err, ShouldBeNil)
_, err = s.ActivateExecution(c, req)
So(err, ShouldBeNil)
Convey("repeated activation", func() {
_, err := s.ActivateExecution(c, req)
So(err, ShouldBeNil)
_, err = s.ActivateExecution(c, req)
So(err, ShouldBeNil)
})
})
return nil
})
})
}
25 changes: 12 additions & 13 deletions appengine/cmd/dm/deps/add_deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/luci/gae/service/datastore"
"github.com/luci/luci-go/appengine/cmd/dm/model"
"github.com/luci/luci-go/appengine/tumble"
"github.com/luci/luci-go/common/api/dm/service/v1"
. "github.com/luci/luci-go/common/testing/assertions"
. "github.com/smartystreets/goconvey/convey"
Expand All @@ -20,10 +19,8 @@ func TestAddDeps(t *testing.T) {
t.Parallel()

Convey("EnsureGraphData (Adding deps)", t, func() {
ttest := &tumble.Testing{}
c := ttest.Context()
_, c, _, s := testSetup()
ds := datastore.Get(c)
s := newDecoratedDeps()
zt := time.Time{}

a := &model.Attempt{ID: *dm.NewAttemptID("quest", 1)}
Expand All @@ -36,11 +33,11 @@ func TestAddDeps(t *testing.T) {
State: dm.Execution_RUNNING}

toQuestDesc := &dm.Quest_Desc{
DistributorConfigName: "foof",
DistributorConfigName: "fakeDistributor",
JsonPayload: `{"data":"yes"}`,
}
toQuest, err := model.NewQuest(c, toQuestDesc)
So(err, ShouldBeNil)
So(toQuestDesc.Normalize(), ShouldBeNil)
toQuest := model.NewQuest(c, toQuestDesc)
to := &model.Attempt{ID: *dm.NewAttemptID(toQuest.ID, 1)}
fwd := &model.FwdDep{Depender: ak, Dependee: to.ID}

Expand All @@ -64,7 +61,7 @@ func TestAddDeps(t *testing.T) {
So(ds.Put(a, e), ShouldBeNil)

_, err := s.EnsureGraphData(c, req)
So(err, ShouldBeRPCInvalidArgument, `cannot create attempts for absent quest "Q9SgH-f5kraxP_om80CdR9EmAvgmnUws_s5fvRmZiuc"`)
So(err, ShouldBeRPCInvalidArgument, `cannot create attempts for absent quest "FwcLo7vH7d24_mnsKIyKswk3NSezONOAKuDZwHrgl7M"`)
})
})

Expand All @@ -76,7 +73,7 @@ func TestAddDeps(t *testing.T) {

rsp, err := s.EnsureGraphData(c, req)
So(err, ShouldBeNil)
purgeTimestamps(rsp.Result)
rsp.Result.PurgeTimestamps()
So(rsp, ShouldResemble, &dm.EnsureGraphDataRsp{
Accepted: true,
Result: &dm.GraphData{Quests: map[string]*dm.Quest{
Expand All @@ -85,7 +82,7 @@ func TestAddDeps(t *testing.T) {
Desc: toQuestDesc,
BuiltBy: []*dm.Quest_TemplateSpec{},
},
Attempts: map[uint32]*dm.Attempt{1: dm.NewAttemptNeedsExecution(zt)},
Attempts: map[uint32]*dm.Attempt{1: dm.NewAttemptScheduling()},
},
}},
})
Expand All @@ -97,7 +94,7 @@ func TestAddDeps(t *testing.T) {

rsp, err := s.EnsureGraphData(c, req)
So(err, ShouldBeNil)
purgeTimestamps(rsp.Result)
rsp.Result.PurgeTimestamps()
So(rsp, ShouldResemble, &dm.EnsureGraphDataRsp{
Accepted: true,
Result: &dm.GraphData{Quests: map[string]*dm.Quest{
Expand All @@ -106,7 +103,7 @@ func TestAddDeps(t *testing.T) {
Desc: toQuestDesc,
BuiltBy: []*dm.Quest_TemplateSpec{},
},
Attempts: map[uint32]*dm.Attempt{1: dm.NewAttemptFinished(zt, 0, "")},
Attempts: map[uint32]*dm.Attempt{1: dm.NewAttemptFinished(zt, 0, "", nil)},
},
}},
})
Expand All @@ -123,7 +120,9 @@ func TestAddDeps(t *testing.T) {

So(ds.Get(fwd), ShouldBeNil)
So(ds.Get(a), ShouldBeNil)
So(a.State, ShouldEqual, dm.Attempt_ADDING_DEPS)
So(a.State, ShouldEqual, dm.Attempt_EXECUTING)
So(ds.Get(e), ShouldBeNil)
So(e.State, ShouldEqual, dm.Execution_STOPPING)
})

})
Expand Down
Loading

0 comments on commit 00e9027

Please sign in to comment.