Skip to content

Commit

Permalink
messenger does not abort upon transport errors; scheduler and executo…
Browse files Browse the repository at this point in the history
…r observe network errors as indicators of disconnection
  • Loading branch information
James DeFelice committed Jan 19, 2016
1 parent 4a7554a commit d607d5a
Show file tree
Hide file tree
Showing 16 changed files with 339 additions and 347 deletions.
12 changes: 11 additions & 1 deletion examples/scheduler/main.go
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"strconv"
"strings"
"time"

"github.com/gogo/protobuf/proto"
log "github.com/golang/glog"
Expand Down Expand Up @@ -55,6 +56,7 @@ var (
taskCount = flag.String("task-count", "5", "Total task count to run.")
mesosAuthPrincipal = flag.String("mesos_authentication_principal", "", "Mesos authentication principal.")
mesosAuthSecretFile = flag.String("mesos_authentication_secret_file", "", "Mesos authentication secret file.")
slowLaunch = flag.Bool("slow_launch", false, "When true the ResourceOffers func waits for several seconds before attempting to launch tasks; useful for debugging failover")
)

type ExampleScheduler struct {
Expand Down Expand Up @@ -83,14 +85,22 @@ func (sched *ExampleScheduler) Registered(driver sched.SchedulerDriver, framewor

func (sched *ExampleScheduler) Reregistered(driver sched.SchedulerDriver, masterInfo *mesos.MasterInfo) {
log.Infoln("Framework Re-Registered with Master ", masterInfo)
_, err := driver.ReconcileTasks([]*mesos.TaskStatus{})
if err != nil {
log.Errorf("failed to request task reconciliation: %v", err)
}
}

func (sched *ExampleScheduler) Disconnected(sched.SchedulerDriver) {
log.Fatalf("disconnected from master, aborting")
log.Warningf("disconnected from master")
}

func (sched *ExampleScheduler) ResourceOffers(driver sched.SchedulerDriver, offers []*mesos.Offer) {

if *slowLaunch {
time.Sleep(3 * time.Second)
}

if sched.tasksLaunched >= sched.totalTasks {
log.Info("decline all of the offers since all of our tasks are already launched")
ids := make([]*mesos.OfferID, len(offers))
Expand Down
59 changes: 39 additions & 20 deletions executor/executor.go
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/mesos/mesos-go/mesosutil"
"github.com/mesos/mesos-go/mesosutil/process"
"github.com/mesos/mesos-go/messenger"
"github.com/mesos/mesos-go/messenger/sessionid"
"github.com/mesos/mesos-go/upid"
"github.com/pborman/uuid"
"golang.org/x/net/context"
Expand Down Expand Up @@ -116,6 +117,11 @@ func NewMesosExecutorDriver(config DriverConfig) (*MesosExecutorDriver, error) {
return driver, nil
}

// context returns the driver context, expects driver.lock to be locked
func (driver *MesosExecutorDriver) context() context.Context {
return sessionid.NewContext(context.TODO(), driver.connection.String())
}

// init initializes the driver.
func (driver *MesosExecutorDriver) init() error {
log.Infof("Init mesos executor driver\n")
Expand All @@ -127,11 +133,13 @@ func (driver *MesosExecutorDriver) init() error {
return err
}

guard := func(h messenger.MessageHandler) messenger.MessageHandler {
type messageHandler func(context.Context, *upid.UPID, proto.Message)

guard := func(h messageHandler) messenger.MessageHandler {
return messenger.MessageHandler(func(from *upid.UPID, pbMsg proto.Message) {
driver.lock.Lock()
defer driver.lock.Unlock()
h(from, pbMsg)
h(driver.context(), from, pbMsg)
})
}

Expand All @@ -145,6 +153,7 @@ func (driver *MesosExecutorDriver) init() error {
driver.messenger.Install(guard(driver.frameworkMessage), &mesosproto.FrameworkToExecutorMessage{})
driver.messenger.Install(guard(driver.shutdown), &mesosproto.ShutdownExecutorMessage{})
driver.messenger.Install(guard(driver.frameworkError), &mesosproto.FrameworkErrorMessage{})
driver.messenger.Install(guard(driver.networkError), &mesosproto.InternalNetworkError{})
return nil
}

Expand Down Expand Up @@ -214,7 +223,21 @@ func (driver *MesosExecutorDriver) Connected() bool {

// --------------------- Message Handlers --------------------- //

func (driver *MesosExecutorDriver) registered(from *upid.UPID, pbMsg proto.Message) {
func (driver *MesosExecutorDriver) networkError(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
if !driver.connected {
log.V(1).Info("ignoring network error because not connected")
return
}
msg := pbMsg.(*mesosproto.InternalNetworkError)
if msg.GetSession() == driver.connection.String() {
log.Info("slave disconnected")
driver.connected = false
driver.connection = uuid.UUID{}
driver.withExecutor(func(e Executor) { e.Disconnected(driver) })
}
}

func (driver *MesosExecutorDriver) registered(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.Infoln("Executor driver registered")

msg := pbMsg.(*mesosproto.ExecutorRegisteredMessage)
Expand All @@ -235,7 +258,7 @@ func (driver *MesosExecutorDriver) registered(from *upid.UPID, pbMsg proto.Messa
driver.withExecutor(func(e Executor) { e.Registered(driver, executorInfo, frameworkInfo, slaveInfo) })
}

func (driver *MesosExecutorDriver) reregistered(from *upid.UPID, pbMsg proto.Message) {
func (driver *MesosExecutorDriver) reregistered(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.Infoln("Executor driver reregistered")

msg := pbMsg.(*mesosproto.ExecutorReregisteredMessage)
Expand All @@ -254,11 +277,7 @@ func (driver *MesosExecutorDriver) reregistered(from *upid.UPID, pbMsg proto.Mes
driver.withExecutor(func(e Executor) { e.Reregistered(driver, slaveInfo) })
}

func (driver *MesosExecutorDriver) send(upid *upid.UPID, msg proto.Message) error {
//TODO(jdef) should implement timeout here
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

func (driver *MesosExecutorDriver) send(ctx context.Context, upid *upid.UPID, msg proto.Message) error {
c := make(chan error, 1)
go func() { c <- driver.messenger.Send(ctx, upid, msg) }()

Expand All @@ -271,7 +290,7 @@ func (driver *MesosExecutorDriver) send(upid *upid.UPID, msg proto.Message) erro
}
}

func (driver *MesosExecutorDriver) reconnect(from *upid.UPID, pbMsg proto.Message) {
func (driver *MesosExecutorDriver) reconnect(ctx context.Context, from *upid.UPID, pbMsg proto.Message) {
log.Infoln("Executor driver reconnect")

msg := pbMsg.(*mesosproto.ReconnectExecutorMessage)
Expand All @@ -298,12 +317,12 @@ func (driver *MesosExecutorDriver) reconnect(from *upid.UPID, pbMsg proto.Messag
message.Tasks = append(message.Tasks, t)
}
// Send the message.
if err := driver.send(driver.slaveUPID, message); err != nil {
if err := driver.send(ctx, driver.slaveUPID, message); err != nil {
log.Errorf("Failed to send %v: %v\n", message, err)
}
}

func (driver *MesosExecutorDriver) runTask(from *upid.UPID, pbMsg proto.Message) {
func (driver *MesosExecutorDriver) runTask(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.Infoln("Executor driver runTask")

msg := pbMsg.(*mesosproto.RunTaskMessage)
Expand All @@ -323,7 +342,7 @@ func (driver *MesosExecutorDriver) runTask(from *upid.UPID, pbMsg proto.Message)
driver.withExecutor(func(e Executor) { e.LaunchTask(driver, task) })
}

func (driver *MesosExecutorDriver) killTask(from *upid.UPID, pbMsg proto.Message) {
func (driver *MesosExecutorDriver) killTask(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.Infoln("Executor driver killTask")

msg := pbMsg.(*mesosproto.KillTaskMessage)
Expand All @@ -338,7 +357,7 @@ func (driver *MesosExecutorDriver) killTask(from *upid.UPID, pbMsg proto.Message
driver.withExecutor(func(e Executor) { e.KillTask(driver, taskID) })
}

func (driver *MesosExecutorDriver) statusUpdateAcknowledgement(from *upid.UPID, pbMsg proto.Message) {
func (driver *MesosExecutorDriver) statusUpdateAcknowledgement(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.Infoln("Executor statusUpdateAcknowledgement")

msg := pbMsg.(*mesosproto.StatusUpdateAcknowledgementMessage)
Expand All @@ -359,7 +378,7 @@ func (driver *MesosExecutorDriver) statusUpdateAcknowledgement(from *upid.UPID,
delete(driver.tasks, taskID.String())
}

func (driver *MesosExecutorDriver) frameworkMessage(from *upid.UPID, pbMsg proto.Message) {
func (driver *MesosExecutorDriver) frameworkMessage(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.Infoln("Executor driver received frameworkMessage")

msg := pbMsg.(*mesosproto.FrameworkToExecutorMessage)
Expand All @@ -374,7 +393,7 @@ func (driver *MesosExecutorDriver) frameworkMessage(from *upid.UPID, pbMsg proto
driver.withExecutor(func(e Executor) { e.FrameworkMessage(driver, string(data)) })
}

func (driver *MesosExecutorDriver) shutdown(from *upid.UPID, pbMsg proto.Message) {
func (driver *MesosExecutorDriver) shutdown(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.Infoln("Executor driver received shutdown")

_, ok := pbMsg.(*mesosproto.ShutdownExecutorMessage)
Expand All @@ -394,7 +413,7 @@ func (driver *MesosExecutorDriver) shutdown(from *upid.UPID, pbMsg proto.Message
driver.stop()
}

func (driver *MesosExecutorDriver) frameworkError(from *upid.UPID, pbMsg proto.Message) {
func (driver *MesosExecutorDriver) frameworkError(_ context.Context, from *upid.UPID, pbMsg proto.Message) {
log.Infoln("Executor driver received error")

msg := pbMsg.(*mesosproto.FrameworkErrorMessage)
Expand Down Expand Up @@ -433,7 +452,7 @@ func (driver *MesosExecutorDriver) start() (mesosproto.Status, error) {
ExecutorId: driver.executorID,
}

if err := driver.send(driver.slaveUPID, message); err != nil {
if err := driver.send(driver.context(), driver.slaveUPID, message); err != nil {
log.Errorf("Stopping the executor, failed to send %v: %v\n", message, err)
err0 := driver._stop(driver.status)
if err0 != nil {
Expand Down Expand Up @@ -585,7 +604,7 @@ func (driver *MesosExecutorDriver) sendStatusUpdate(taskStatus *mesosproto.TaskS
Pid: proto.String(driver.self.String()),
}
// Send the message.
if err := driver.send(driver.slaveUPID, message); err != nil {
if err := driver.send(driver.context(), driver.slaveUPID, message); err != nil {
log.Errorf("Failed to send %v: %v\n", message, err)
return driver.status, err
}
Expand Down Expand Up @@ -632,7 +651,7 @@ func (driver *MesosExecutorDriver) sendFrameworkMessage(data string) (mesosproto
}

// Send the message.
if err := driver.send(driver.slaveUPID, message); err != nil {
if err := driver.send(driver.context(), driver.slaveUPID, message); err != nil {
log.Errorln("Failed to send message %v: %v", message, err)
return driver.status, err
}
Expand Down
33 changes: 2 additions & 31 deletions executor/executor_intgr_test.go
Expand Up @@ -112,6 +112,7 @@ func (i *integrationTestDriver) setConnected(b bool) {
i.lock.Lock()
defer i.lock.Unlock()
i.connected = b
i.connection = uuid.NewUUID()
}

// connectionListener returns a signal chan that closes once driver.connected == true.
Expand Down Expand Up @@ -528,40 +529,10 @@ func TestExecutorDriverShutdownEvent(t *testing.T) {

select {
case <-ch:
case <-time.After(time.Second * 5):
case <-time.After(time.Second * 20):
log.Errorf("Tired of waiting...")
}

<-time.After(time.Second * 1) // wait for shutdown to finish.
assert.Equal(t, mesos.Status_DRIVER_STOPPED, driver.Status())
}

func TestExecutorDriverError(t *testing.T) {
setTestEnv(t)
ch := make(chan bool, 2)
// Mock Slave process to respond to registration event.
server := testutil.NewMockSlaveHttpServer(t, func(rsp http.ResponseWriter, req *http.Request) {
reqPath, err := url.QueryUnescape(req.URL.String())
assert.NoError(t, err)
log.Infoln("RCVD request", reqPath)
rsp.WriteHeader(http.StatusAccepted)
})

exec := newTestExecutor(t)
exec.ch = ch
exec.t = t

driver := newIntegrationTestDriver(t, exec)
server.Close() // will cause error
// Run() cause async message processing to start
// Therefore, error-handling will be done via Executor.Error callaback.
stat, err := driver.Run()
assert.NoError(t, err)
assert.Equal(t, mesos.Status_DRIVER_STOPPED, stat)

select {
case <-ch:
case <-time.After(time.Second * 1):
log.Errorf("Tired of waiting...")
}
}
2 changes: 1 addition & 1 deletion executor/executor_test.go
Expand Up @@ -388,7 +388,7 @@ func TestStatusUpdateAckRace_Issue103(t *testing.T) {
go func() {
driver.lock.Lock()
defer driver.lock.Unlock()
driver.statusUpdateAcknowledgement(nil, msg)
driver.statusUpdateAcknowledgement(driver.context(), nil, msg)
}()

taskStatus := util.NewTaskStatus(
Expand Down
1 change: 1 addition & 0 deletions mesosproto/authentication.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mesosproto/authenticationpb_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions mesosproto/internal.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions mesosproto/internal.proto
Expand Up @@ -21,3 +21,10 @@ message InternalAuthenticationResult {
// master pid that this result pertains to
required string pid = 3;
}

message InternalNetworkError {
// master pid that this event pertains to
required string pid = 1;
// driver session UUID
optional string session = 2;
}

0 comments on commit d607d5a

Please sign in to comment.