Skip to content

Commit

Permalink
cluster,scheduler: enable user define local scheduler to handle message
Browse files Browse the repository at this point in the history
Signed-off-by: Lonng <heng@lonng.org>
  • Loading branch information
lonng committed Jun 30, 2019
1 parent 685004a commit 8d3b45d
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 10 deletions.
2 changes: 1 addition & 1 deletion app.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func run(addr string, isWs bool, certificate string, key string, opts ...Option)
}

// Use listen address as client address in non-cluster mode
if opt.advertiseAddr == "" && opt.clientAddr == "" {
if !opt.isMaster && opt.advertiseAddr == "" && opt.clientAddr == "" {
log.Println("The current server running in singleton mode")
opt.clientAddr = addr
}
Expand Down
30 changes: 28 additions & 2 deletions cluster/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (h *LocalHandler) localProcess(handler *component.Handler, lastMid uint64,
}

args := []reflect.Value{handler.Receiver, reflect.ValueOf(session), reflect.ValueOf(data)}
scheduler.PushTask(func() {
task := func() {
switch v := session.NetworkEntity().(type) {
case *agent:
v.lastMid = lastMid
Expand All @@ -384,5 +384,31 @@ func (h *LocalHandler) localProcess(handler *component.Handler, lastMid uint64,
log.Println(fmt.Sprintf("Service %s error: %+v", msg.Route, err))
}
}
})
}

index := strings.LastIndex(msg.Route, ".")
if index < 0 {
log.Println(fmt.Sprintf("nano/handler: invalid route %s", msg.Route))
return
}

// A message can be dispatch to global thread or a user customized thread
service := msg.Route[:index]
if s, found := h.localServices[service]; found && s.SchedName != "" {
sched := session.Value(s.SchedName)
if sched == nil {
log.Println(fmt.Sprintf("nanl/handler: cannot found `schedular.LocalScheduler` by %s", s.SchedName))
return
}

local, ok := sched.(scheduler.LocalScheduler)
if !ok {
log.Println(fmt.Sprintf("nanl/handler: Type %T does not implement the `schedular.LocalScheduler` interface",
sched))
return
}
local.Schedule(task)
} else {
scheduler.PushTask(task)
}
}
12 changes: 10 additions & 2 deletions component/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ package component

type (
options struct {
name string // component name
nameFunc func(string) string // rename handler name
name string // component name
nameFunc func(string) string // rename handler name
schedName string // schedName name
}

// Option used to customize handler
Expand All @@ -44,3 +45,10 @@ func WithNameFunc(fn func(string) string) Option {
opt.nameFunc = fn
}
}

// WithSchedulerName set the name of the service scheduler
func WithSchedulerName(name string) Option {
return func(opt *options) {
opt.schedName = name
}
}
12 changes: 7 additions & 5 deletions component/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ type (
// Service implements a specific service, some of it's methods will be
// called when the correspond events is occurred.
Service struct {
Name string // name of service
Type reflect.Type // type of the receiver
Receiver reflect.Value // receiver of methods for the service
Handlers map[string]*Handler // registered methods
Options options // options
Name string // name of service
Type reflect.Type // type of the receiver
Receiver reflect.Value // receiver of methods for the service
Handlers map[string]*Handler // registered methods
SchedName string // name of scheduler variable in session data
Options options // options
}
)

Expand All @@ -62,6 +63,7 @@ func NewService(comp Component, opts []Option) *Service {
} else {
s.Name = reflect.Indirect(s.Receiver).Type().Name()
}
s.SchedName = s.Options.schedName

return s
}
Expand Down
5 changes: 5 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ const (
sessionCloseBacklog = 1 << 8
)

// LocalScheduler schedules task to a customized goroutine
type LocalScheduler interface {
Schedule(Task)
}

type Task func()

type Hook func()
Expand Down

0 comments on commit 8d3b45d

Please sign in to comment.