Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: rpl/going-nuts
base: eb35c5b61d
...
head fork: rpl/going-nuts
compare: 809e8488f9
  • 3 commits
  • 4 files changed
  • 0 commit comments
  • 1 contributor
View
90 pkg/gen_server/api.go
@@ -0,0 +1,90 @@
+// "Erlang OTP gen_server"-like Concurrency Pattern for Go
+package gen_server
+
+// SetDebug enable/disable debugging messages
+func (self *GenServer) SetDebug(debug bool) {
+ self.debug = debug
+}
+
+// GetStatus returns the current GenServer Status
+func (self *GenServer) GetStatus() int {
+ return self.status
+}
+
+func CreateGenServer(impl IGenServerImpl) *GenServer {
+ gensrv := new(GenServer)
+
+ ch := make(MessageChannel)
+ control_ch := make(controlChannel)
+ gensrv.ch = ch
+ gensrv.control_ch = control_ch
+ gensrv.impl = impl
+
+ return gensrv
+}
+
+func (self *GenServer) Start(init_args Data, opts ...interface{}) {
+ self.status = STARTING;
+ go self.loop()
+
+ timeout := getopt_timeout(opts, 5e9)
+ reply_ch := make(ReplyMessageChannel,1)
+
+ send_and_wait_until(self.control_ch,
+ initControlMessage{Args: init_args, ReplyChannel: reply_ch},
+ func() interface{} { return <- reply_ch },
+ func() interface{} { return ReplyMessage{Ok: false, Error: "GenServer Cast Timeout"} },
+ timeout)
+}
+
+func (self *GenServer) Stop(opts ...interface{}) ReplyMessage {
+ if self.status >= STOPPED {
+ return ReplyMessage{Ok: false, Error: "GenServer Stopped or Crashed"}
+ }
+
+ timeout := getopt_timeout(opts, 5e9)
+ reply_ch := make(ReplyMessageChannel,1)
+
+ return send_and_wait_until(self.control_ch, stopControlMessage{ReplyChannel: reply_ch},
+ func() interface{} { return <- reply_ch },
+ func() interface{} { return ReplyMessage{Ok: false, Error: "GenServer Stop Timeout"} },
+ timeout).(ReplyMessage)
+}
+
+
+func (self *GenServer) Cast(payload Data, opts ...interface{}) {
+ if self.status >= STOPPED {
+ return // Error: "GenServer Stopped or Crashed"
+ }
+
+ timeout := getopt_timeout(opts, 5e9)
+
+ send_and_wait_until(self.ch, CastMessage{Payload: payload},
+ func() interface{} { return ReplyMessage{Ok: true} },
+ func() interface{} { return ReplyMessage{Ok: false, Error: "GenServer Cast Timeout"} },
+ timeout)
+}
+
+func (self *GenServer) Call(payload Data, opts ...interface{}) ReplyMessage {
+ if self.status >= STOPPED {
+ return ReplyMessage{Ok: false, Error: "GenServer Stopped or Crashed"}
+ }
+
+ timeout := getopt_timeout(opts, 5e9)
+ reply_ch := make(ReplyMessageChannel,1)
+
+ return send_and_wait_until(self.ch,
+ CallMessage{Payload: payload, replyChannel: reply_ch},
+ func() interface{} { return <- reply_ch },
+ func() interface{} { return ReplyMessage{Ok: false, Error: "GenServer Stop Timeout"} },
+ timeout).(ReplyMessage)
+}
+
+func (self *CallMessage) Reply(ok bool, result Data) {
+ select {
+ case self.replyChannel <- ReplyMessage{Ok: ok, Result: result}:
+ default:
+ // CLIENT GOROUTINE CRASHED?
+ }
+}
+
View
12 pkg/gen_server/types.go → pkg/gen_server/gen_server.go
@@ -2,8 +2,10 @@ package gen_server
// Message represents the opaque type of gen_server exchanged messages
type Message interface {}
-// MessageContent represents the opaque type of gen_server exchanged data
+// Data represents the opaque type of gen_server exchanged data
type Data interface {}
+// State represents the opaque type of gen_server implementation state
+type State interface {}
// CastMessage is a asynchronous message (it doesn't need a reply)
type CastMessage struct {
@@ -57,7 +59,7 @@ type GenServer struct {
ch MessageChannel
control_ch controlChannel
impl IGenServerImpl
- state Data
+ state State
status int
debug bool
}
@@ -65,10 +67,10 @@ type GenServer struct {
// IGenServerImpl is the interface that GenServer implementations have to implements:
type IGenServerImpl interface {
// Init initialize the server returning a state, optionally based on Init arguments
- Init(args Data) (bool, Data)
+ Init(args Data) (bool, State)
// HandleCast handle cast messages
- HandleCast(msg *CastMessage)
+ HandleCast(msg *CastMessage, state State)
// HandleCall handle call messages
- HandleCall(msg *CallMessage)
+ HandleCall(msg *CallMessage, state State)
}
View
6 pkg/gen_server/gen_server_test.go
@@ -30,11 +30,11 @@ func CreateTestServer() *TestServer {
}
func (self *TestServer) Start() bool {
- self.GenServer.Start()
+ self.GenServer.Start(nil)
return true
}
-func (self *TestServer) Init(args Data) (bool, Data) {
+func (self *TestServer) Init(args Data) (bool, State) {
return true, nil
}
@@ -46,7 +46,7 @@ type CastCrashTestMessage struct {
msg string
}
-func (self *TestServer) HandleCast(cast *CastMessage) {
+func (self *TestServer) HandleCast(cast *CastMessage, state State) {
self.log("HANDLE CAST ", cast)
self.last_cast_received = cast
switch payload := cast.Payload.(type) {
View
215 pkg/gen_server/methods.go → pkg/gen_server/internals.go
@@ -1,4 +1,3 @@
-// "Erlang OTP gen_server"-like Concurrency Pattern for Go
package gen_server
import (
@@ -8,31 +7,24 @@ import (
"fmt"
)
-// loop implements the internal GenServer goroutine loop
-func (self *GenServer) loop() {
- defer self.recover()
+type callback_fn func() interface {}
- for ; ; {
- self.status = READY
- select {
- case msg := <- self.ch :
- self.status = BUSY
- switch cmsg := msg.(type) {
- case CallMessage:
- self.handle_call(&cmsg)
- case CastMessage:
- self.handle_cast(&cmsg)
- }
- case cmd := <- self.control_ch :
- self.status = BUSY
- switch ccmd := cmd.(type) {
- case initControlMessage:
- self.handle_init(&ccmd)
- case stopControlMessage:
- self.handle_stop(&ccmd)
- }
- }
+func send_and_wait_until(where chan interface {}, what interface {},
+ sent_cb callback_fn, timeout_cb callback_fn, timeout int64) (Data) {
+
+ timeout_ch := make(chan bool,1)
+ go func() {
+ time.Sleep(timeout)
+ timeout_ch <- true
+ }()
+ select {
+ case where <- what:
+ return sent_cb()
+ case <- timeout_ch:
+ return timeout_cb()
}
+
+ return ReplyMessage{Ok: false, Error: "GenServer Error Unknown"}
}
func (self *GenServer) recover() {
@@ -57,78 +49,6 @@ func (self *GenServer) log(log_msg string, log_data interface {}) {
}
}
-// SetDebug enable/disable debugging messages
-func (self *GenServer) SetDebug(debug bool) {
- self.debug = debug
-}
-
-// handle_init will be called to handle incoming InitControlMessages
-func (self *GenServer) handle_init(cmd *initControlMessage) {
- self.log("RECEIVED INIT ",cmd)
- self.status = READY
- cmd.ReplyChannel <- ReplyMessage{Ok: true}
-}
-
-// handle_stop will be called to handle incoming StopControlMessages
-func (self *GenServer) handle_stop(cmd *stopControlMessage) {
- defer func() {
- // TODO: error handling
- self.status = STOPPED
- cmd.ReplyChannel <- ReplyMessage{Ok: true}
- }()
- self.log("RECEIVED STOP ",cmd)
- runtime.Goexit()
-}
-
-// handle_cast will be called to handle incoming CastMessages
-func (self *GenServer) handle_cast(cast *CastMessage) {
- self.log("RECEIVED CAST ",cast)
- self.impl.HandleCast(cast)
-}
-
-// handle_call will be called to handle incoming CallMessages
-func (self *GenServer) handle_call(call *CallMessage) {
- self.log("RECEIVED CALL ",call)
- self.impl.HandleCall(call)
-}
-
-// GetStatus returns the current GenServer Status
-func (self *GenServer) GetStatus() int {
- return self.status
-}
-
-func CreateGenServer(impl IGenServerImpl) *GenServer {
- gensrv := new(GenServer)
-
- ch := make(MessageChannel)
- control_ch := make(controlChannel)
- gensrv.ch = ch
- gensrv.control_ch = control_ch
- gensrv.impl = impl
-
- return gensrv
-}
-
-type callback_fn func() interface {}
-
-func send_and_wait_until(where chan interface {}, what interface {},
- sent_cb callback_fn, timeout_cb callback_fn, timeout int64) (Data) {
-
- timeout_ch := make(chan bool,1)
- go func() {
- time.Sleep(timeout)
- timeout_ch <- true
- }()
- select {
- case where <- what:
- return sent_cb()
- case <- timeout_ch:
- return timeout_cb()
- }
-
- return ReplyMessage{Ok: false, Error: "GenServer Error Unknown"}
-}
-
func getopt_timeout(opts []interface{}, default_value int64) int64 {
timeout := default_value
if len(opts)>0 {
@@ -147,67 +67,66 @@ func getopt_timeout(opts []interface{}, default_value int64) int64 {
return timeout
}
-func (self *GenServer) Start(opts ...interface{}) {
- self.status = STARTING;
- go self.loop()
-
- timeout := getopt_timeout(opts, 5e9)
- reply_ch := make(ReplyMessageChannel,1)
-
- send_and_wait_until(self.control_ch, initControlMessage{ReplyChannel: reply_ch},
- func() interface{} { return <- reply_ch },
- func() interface{} { return ReplyMessage{Ok: false, Error: "GenServer Cast Timeout"} },
- timeout)
-}
+// loop implements the internal GenServer goroutine loop
+func (self *GenServer) loop() {
+ defer self.recover()
-func (self *GenServer) Stop(opts ...interface{}) ReplyMessage {
- if self.status >= STOPPED {
- return ReplyMessage{Ok: false, Error: "GenServer Stopped or Crashed"}
+ for ; ; {
+ self.status = READY
+ select {
+ case msg := <- self.ch :
+ self.status = BUSY
+ switch cmsg := msg.(type) {
+ case CallMessage:
+ self.handle_call(&cmsg)
+ case CastMessage:
+ self.handle_cast(&cmsg)
+ }
+ case cmd := <- self.control_ch :
+ self.status = BUSY
+ switch ccmd := cmd.(type) {
+ case initControlMessage:
+ self.handle_init(&ccmd)
+ case stopControlMessage:
+ self.handle_stop(&ccmd)
+ }
+ }
}
-
- timeout := getopt_timeout(opts, 5e9)
- reply_ch := make(ReplyMessageChannel,1)
-
- return send_and_wait_until(self.control_ch, stopControlMessage{ReplyChannel: reply_ch},
- func() interface{} { return <- reply_ch },
- func() interface{} { return ReplyMessage{Ok: false, Error: "GenServer Stop Timeout"} },
- timeout).(ReplyMessage)
}
-
-func (self *GenServer) Cast(payload Data, opts ...interface{}) {
- if self.status >= STOPPED {
- return // Error: "GenServer Stopped or Crashed"
+// handle_init will be called to handle incoming InitControlMessages
+func (self *GenServer) handle_init(cmd *initControlMessage) {
+ self.log("RECEIVED INIT ",cmd)
+ ok, state := self.impl.Init(cmd.Args)
+ if ok == true {
+ self.state = state
+ self.status = READY
+ cmd.ReplyChannel <- ReplyMessage{Ok: true}
+ } else {
+ cmd.ReplyChannel <- ReplyMessage{Ok: false, Error: "GenServer Init failed"}
}
-
- timeout := getopt_timeout(opts, 5e9)
-
- send_and_wait_until(self.ch, CastMessage{Payload: payload},
- func() interface{} { return ReplyMessage{Ok: true} },
- func() interface{} { return ReplyMessage{Ok: false, Error: "GenServer Cast Timeout"} },
- timeout)
}
-func (self *GenServer) Call(payload Data, opts ...interface{}) ReplyMessage {
- if self.status >= STOPPED {
- return ReplyMessage{Ok: false, Error: "GenServer Stopped or Crashed"}
- }
-
- timeout := getopt_timeout(opts, 5e9)
- reply_ch := make(ReplyMessageChannel,1)
+// handle_stop will be called to handle incoming StopControlMessages
+func (self *GenServer) handle_stop(cmd *stopControlMessage) {
+ defer func() {
+ // TODO: error handling
+ self.status = STOPPED
+ cmd.ReplyChannel <- ReplyMessage{Ok: true}
+ }()
+ self.log("RECEIVED STOP ",cmd)
+ runtime.Goexit()
+}
- return send_and_wait_until(self.ch,
- CallMessage{Payload: payload, replyChannel: reply_ch},
- func() interface{} { return <- reply_ch },
- func() interface{} { return ReplyMessage{Ok: false, Error: "GenServer Stop Timeout"} },
- timeout).(ReplyMessage)
+// handle_cast will be called to handle incoming CastMessages
+func (self *GenServer) handle_cast(cast *CastMessage) {
+ self.log("RECEIVED CAST ",cast)
+ self.impl.HandleCast(cast,self.state)
}
-func (self *CallMessage) Reply(ok bool, result Data) {
- select {
- case self.replyChannel <- ReplyMessage{Ok: ok, Result: result}:
- default:
- // CLIENT GOROUTINE CRASHED?
- }
+// handle_call will be called to handle incoming CallMessages
+func (self *GenServer) handle_call(call *CallMessage) {
+ self.log("RECEIVED CALL ",call)
+ self.impl.HandleCall(call,self.state)
}

No commit comments for this range

Something went wrong with that request. Please try again.