Skip to content

Commit

Permalink
msh: support for additional resources, reservation refinement
Browse files Browse the repository at this point in the history
This patch allows `msh` to leverage pre-reserved resources for tasks.
  • Loading branch information
James DeFelice authored and jdef committed Jul 3, 2019
1 parent 55539d1 commit 35f3497
Showing 1 changed file with 141 additions and 39 deletions.
180 changes: 141 additions & 39 deletions api/v1/cmd/msh/app/app.go
Expand Up @@ -25,30 +25,34 @@ import (
"github.com/mesos/mesos-go/api/v1/lib/httpcli/httpagent"
"github.com/mesos/mesos-go/api/v1/lib/httpcli/httpsched"
"github.com/mesos/mesos-go/api/v1/lib/resources"
"github.com/mesos/mesos-go/api/v1/lib/roles"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/calls"
"github.com/mesos/mesos-go/api/v1/lib/scheduler/events"
)

func init() {
mesos.CapabilityReservationRefinement = "1"
}

const (
RFC3339a = "20060102T150405Z0700"
)

type Config struct {
FrameworkName string
TaskName string
MesosMaster string // MesosMaster is formatted as host:port
User string
Role string
CPUs float64
Memory float64
TTY bool
Pod bool
Interactive bool
Command []string // Command must not be empty.
Log func(string, ...interface{})
Silent bool
FrameworkName string
TaskName string
MesosMaster string // MesosMaster is formatted as host:port
User string
Role string
CPUs float64
Memory float64
TTY bool
Pod bool
Interactive bool
Command []string // Command must not be empty.
Log func(string, ...interface{})
Silent bool
AdditionalResources mesos.Resources
}

func DefaultConfig() Config {
Expand Down Expand Up @@ -101,13 +105,20 @@ func New(c Config) *App {
c.Log = log.Printf
}
}

// resource math doesn't work properly with invalid resources.
// validate user-specified additional resources before we try
// anything fancy.
validateAll(c.AdditionalResources)

app := &App{
Config: c,
wantsExecutorResources: mesos.Resources{
resources.NewCPUs(0.01).Resource,
resources.NewMemory(32).Resource,
resources.NewDisk(5).Resource,
},
wantsExecutorResources: withAllocationRole(c.Role,
mesos.Resources{
resources.NewCPUs(0.01).Resource,
resources.NewMemory(32).Resource,
resources.NewDisk(5).Resource,
}),
agentDirectory: make(map[mesos.AgentID]string),
uponExit: new(cleanups),
fidStore: store.DecorateSingleton(
Expand All @@ -116,10 +127,11 @@ func New(c Config) *App {
c.Log("FrameworkID %q", v)
return nil
})),
wantsResources: mesos.Resources{
resources.NewCPUs(c.CPUs).Resource,
resources.NewMemory(c.Memory).Resource,
},
wantsResources: withAllocationRole(c.Role,
mesos.Resources{
resources.NewCPUs(c.CPUs).Resource,
resources.NewMemory(c.Memory).Resource,
}.Plus(c.AdditionalResources...)),
taskPrototype: mesos.TaskInfo{
Name: c.TaskName,
Command: &mesos.CommandInfo{
Expand All @@ -142,9 +154,36 @@ func New(c Config) *App {
},
}
}
validateAll(app.wantsResources)
validateAll(app.wantsExecutorResources)
app.Log("configured with task resources {%v} and executor resources {%v}", app.wantsResources, app.wantsExecutorResources)
return app
}

func validateAll(r mesos.Resources) {
for i := range r {
rr := &r[i]
if err := rr.Validate(); err != nil {
panic(err)
}
}
}

func withAllocationRole(role string, r mesos.Resources) mesos.Resources {
result := make(mesos.Resources, 0, len(r))
for i := range r {
rr := &r[i]
if rr.GetAllocationInfo().GetRole() != role {
rr = proto.Clone(rr).(*mesos.Resource)
rr.AllocationInfo = &mesos.Resource_AllocationInfo{
Role: proto.String(role),
}
}
result = append(result, *rr)
}
return result
}

func (app *App) Run(ctx context.Context) error {
defer app.uponExit.unwind()

Expand All @@ -155,7 +194,16 @@ func (app *App) Run(ctx context.Context) error {

return controller.Run(
ctx,
&mesos.FrameworkInfo{User: app.User, Name: app.FrameworkName, Role: proto.String(app.Role)},
&mesos.FrameworkInfo{
User: app.User,
Name: app.FrameworkName,
Roles: []string{app.Role},
Capabilities: []mesos.FrameworkInfo_Capability{
{Type: mesos.FrameworkInfo_Capability_MULTI_ROLE},
{Type: mesos.FrameworkInfo_Capability_RESERVATION_REFINEMENT},
{Type: mesos.FrameworkInfo_Capability_REGION_AWARE},
},
},
caller,
controller.WithContextPerSubscription(true),
controller.WithEventHandler(app.buildEventHandler(caller)),
Expand Down Expand Up @@ -258,31 +306,69 @@ func (app *App) resourceOffers(caller calls.Caller) events.HandlerFunc {
return app.wantsResources
}
}()
match = index.Find(offers.ContainsResources(matchResources))
matched mesos.Resources
)
if match != nil {

// NOTE: assumes that each agent will express, at most, one offer per OFFERS event.
app.Log("wants resources {%v}", matchResources)

var matchedOffer mesos.Offer
for _, oo := range off {
// do math here to avoid modifying the original proto
offeredResources := mesos.Resources{}.Plus(oo.Resources...)

checkResources:
if len(matchResources) == 0 {
matchedOffer = oo
break
}

for _, offered := range offeredResources {
for _, wants := range matchResources {
if offered.Contains(wants) {
offeredLessWants := mesos.Resources{offered}.Minus(wants)
matched = append(matched, mesos.Resources{offered}.Minus(offeredLessWants...)...)
matchResources.Subtract1(wants)
offeredResources.Subtract1(matched[len(matched)-1])
goto checkResources
}
// app.Log("{%v} does not contain {%v}", offered, wants)
}
}

// offer didn't have everything we needed, start fresh w/ the next offer
// XXX dedup
app.Log("wanted resources {%v} not found in {%v}", matchResources, mesos.Resources(oo.Resources))
matchResources = func() mesos.Resources {
if app.Pod {
return app.wantsResources.Plus(app.wantsExecutorResources...)
} else {
return app.wantsResources
}
}()
}
if len(matchResources) == 0 {
ts := time.Now().Format(RFC3339a)
task := app.taskPrototype
task.TaskID = mesos.TaskID{Value: ts}
task.AgentID = match.AgentID
task.Resources = resources.Find(
resources.Flatten(app.wantsResources, roles.Role(app.Role).Assign()),
match.Resources...,
)
task.AgentID = matchedOffer.AgentID
task.Resources = matched

app.Log("launching task with resources %v", matched)

if app.Pod {
task.Resources = matched.Minus(app.wantsExecutorResources...)
executor := app.executorPrototype
executor.ExecutorID = mesos.ExecutorID{Value: "msh_" + ts}
executor.Resources = resources.Find(
resources.Flatten(app.wantsExecutorResources, roles.Role(app.Role).Assign()),
match.Resources...,
)
executor.Resources = matched.Minus(task.Resources...)
err = calls.CallNoData(ctx, caller, calls.Accept(
calls.OfferOperations{calls.OpLaunchGroup(executor, task)}.WithOffers(match.ID),
calls.OfferOperations{calls.OpLaunchGroup(executor, task)}.WithOffers(matchedOffer.ID),
))

app.Log("launching executor with resources %v", mesos.Resources(executor.Resources))
} else {
err = calls.CallNoData(ctx, caller, calls.Accept(
calls.OfferOperations{calls.OpLaunch(task)}.WithOffers(match.ID),
calls.OfferOperations{calls.OpLaunch(task)}.WithOffers(matchedOffer.ID),
))
}
if err != nil {
Expand All @@ -293,8 +379,8 @@ func (app *App) resourceOffers(caller calls.Caller) events.HandlerFunc {
} else {
app.Log("rejected insufficient offers")
}
// decline all but the possible match
delete(index, match.GetID())
// decline all but the possible match (if there is no match, everything is declined)
delete(index, matchedOffer.ID)
err = calls.CallNoData(ctx, caller, calls.Decline(index.IDs()...).With(refuseSeconds))
if err != nil {
return
Expand Down Expand Up @@ -322,6 +408,11 @@ func (app *App) statusUpdate(ctx context.Context, e *scheduler.Event) error {
return nil
}
case mesos.TASK_LOST, mesos.TASK_KILLED, mesos.TASK_FAILED, mesos.TASK_ERROR:
// TODO(jdef) investigate:
// TASK_FAILED with reason REASON_IO_SWITCHBOARD_EXITED from source SOURCE_EXECUTOR
// with message 'Command exited with status 0: 'IOSwitchboard' exited with status 1'
// ^^ this happens when I CTRL-D to exit from an interactive shell.

app.Log("Exiting because task " + s.GetTaskID().Value +
" is in an unexpected state " + st.String() +
" with reason " + s.GetReason().String() +
Expand All @@ -339,6 +430,17 @@ type ExitError int

func (e ExitError) Error() string { return fmt.Sprintf("exit code %d", int(e)) }

func IsErrSuccess(err error) bool {
if err == nil {
return true
}
exitErr, ok := err.(ExitError)
if !ok {
return false
}
return ok && exitErr == 0
}

func (app *App) tryInteractive(ctx context.Context, agentHost string, cid mesos.ContainerID) (err error) {
// TODO(jdef) only re-attach if we're disconnected (guard against redundant TASK_RUNNING)
ctx, cancel := context.WithCancel(ctx)
Expand Down

0 comments on commit 35f3497

Please sign in to comment.