Permalink
Browse files

This is relatively clean, the syncasync example works nicely

  • Loading branch information...
mrb committed Jan 6, 2013
1 parent b03f821 commit a13f1bc2b6410529fc109147fd6a730f885cf002
Showing with 80 additions and 76 deletions.
  1. +28 −26 examples/syncasync/main.go
  2. +1 −1 graph.go
  3. +32 −29 node.go
  4. +5 −10 protocol.go
  5. +14 −10 sylvester.go
View
@@ -15,7 +15,22 @@ func main() {
node := graph.NewNode()
node.NewAsyncEvent(Starter)
- node.NewAsyncEvent(Watcher)
+
+ node.NewAsyncEvent(func(c syl.Channels, g syl.ControlChan) {
+ for {
+ select {
+ case control := <-c.Control:
+ switch {
+ case bytes.Equal(control, syl.NodeExit()):
+ g.Exit()
+ case bytes.Equal(control, syl.NodeNext()):
+ node.NextSyncEvent()
+ default:
+ log.Print("Unhandled control event: ", control)
+ }
+ }
+ }
+ })
node.NewAsyncEvent(ASyncLogger)
node.NewAsyncEvent(ASyncLogger)
@@ -28,16 +43,19 @@ func main() {
graph.Activate()
<-graph.Control
+ log.Print("Received EXIT, exiting in 100ms")
+ <-time.After(100 * time.Millisecond)
+ os.Exit(0)
}
-func Starter(c syl.Channels) {
+func Starter(c syl.Channels, g syl.ControlChan) {
for cd := 0; cd < 100; cd++ {
c.Data <- []byte{byte(cd)}
}
- c.Control <- syl.NodeExit()
+ c.Control.Exit()
}
-func ASyncLogger(c syl.Channels) {
+func ASyncLogger(c syl.Channels, g syl.ControlChan) {
for {
select {
case data := <-c.Data:
@@ -47,45 +65,29 @@ func ASyncLogger(c syl.Channels) {
}
}
-func SyncLogger(c syl.Channels) {
+func SyncLogger(c syl.Channels, g syl.ControlChan) {
select {
case data := <-c.Data:
<-time.After(time.Duration(rand.Int31n(10)) * time.Millisecond)
log.Print(" s", data)
- c.Control <- syl.NodeNext()
+ c.Control.Next()
}
}
-func SyncLogger2(c syl.Channels) {
+func SyncLogger2(c syl.Channels, g syl.ControlChan) {
select {
case data := <-c.Data:
<-time.After(time.Duration(rand.Int31n(10)) * time.Millisecond)
log.Print(" s2", data)
- c.Control <- syl.NodeNext()
+ c.Control.Next()
}
}
-func SyncLogger3(c syl.Channels) {
+func SyncLogger3(c syl.Channels, g syl.ControlChan) {
select {
case data := <-c.Data:
<-time.After(time.Duration(rand.Int31n(10)) * time.Millisecond)
log.Print(" s3", data)
- c.Control <- syl.NodeSyncEventRestart()
- }
-}
-
-func Watcher(c syl.Channels) {
- for {
- select {
- case control := <-c.Control:
- if bytes.Equal(control, syl.NodeExit()) {
- log.Print("Received EXIT, exiting in 100ms")
- <-time.After(500 * time.Millisecond)
- os.Exit(0)
- } else {
- c.Control <- control
- <-time.After(1 * time.Millisecond)
- }
- }
+ c.Control.Next()
}
}
View
@@ -43,7 +43,7 @@ func (g *Graph) NewEdges(anode *Node, bnodes []*Node) *Edge {
}
func (g *Graph) NewNode() *Node {
- node := NewNode()
+ node := NewNode(g)
g.nodes = append(g.nodes, node)
g.nodemap[node.Id()] = node
View
61 node.go
@@ -1,37 +1,40 @@
package sylvester
import (
- "bytes"
+ "sync"
)
-type Event func(Channels)
+var (
+ syncPositionMutex = &sync.Mutex{}
+)
+
+type Event func(Channels, ControlChan)
type Node struct {
- id []byte
- data []byte
- syncEvents []Event
- asyncEvents []Event
+ id []byte
+ data []byte
+ syncEvents []Event
+ syncPosition int
+ asyncEvents []Event
+ graph *Graph
*Channels
}
func (n *Node) Id() *[]byte {
return &n.id
}
-func NewNode() *Node {
+func NewNode(g *Graph) *Node {
nodeId := newID()
return &Node{
- id: nodeId,
- data: nil,
- asyncEvents: nil,
- syncEvents: nil,
- Channels: &Channels{
- Data: make(DataChan, 1),
- Control: make(ControlChan, 1),
- Error: make(ErrorChan, 1),
- NodeId: nodeId,
- },
+ id: nodeId,
+ data: nil,
+ asyncEvents: nil,
+ syncEvents: nil,
+ syncPosition: 0,
+ Channels: NewChannels(),
+ graph: g,
}
}
@@ -61,22 +64,22 @@ func (n *Node) Activate() {
func (n *Node) StartAsyncEvents() {
for _, event := range n.asyncEvents {
- go event(*n.Channels)
+ go event(*n.Channels, n.graph.Control)
}
}
func (n *Node) StartSyncEvents() {
- for _, event := range n.syncEvents {
- go event(*n.Channels)
- select {
- case control := <-n.Control:
- if bytes.Equal(control, NodeNext()) {
+ go n.syncEvents[n.syncPosition](*n.Channels, n.graph.Control)
+}
- } else if bytes.Equal(control, NodeSyncEventRestart()) {
- n.StartSyncEvents()
- } else {
- n.Control <- control
- }
- }
+func (n *Node) NextSyncEvent() {
+ syncPositionMutex.Lock()
+ sp := n.syncPosition
+ if sp == (len(n.syncEvents) - 1) {
+ n.syncPosition = 0
+ } else {
+ n.syncPosition++
}
+ go n.syncEvents[n.syncPosition](*n.Channels, n.graph.Control)
+ syncPositionMutex.Unlock()
}
View
@@ -2,12 +2,11 @@ package sylvester
var (
Messages = map[string][]byte{
- "START": {0, 0},
- "EXIT": {0, 1},
- "PING": {0, 2},
- "PONG": {0, 3},
- "NEXT": {0, 4},
- "SYNCRESTART": {0, 5},
+ "START": {0, 0},
+ "EXIT": {0, 1},
+ "PING": {0, 2},
+ "PONG": {0, 3},
+ "NEXT": {0, 4},
}
)
@@ -32,10 +31,6 @@ func NodeNext() []byte {
return Messages["NEXT"]
}
-func NodeSyncEventRestart() []byte {
- return Messages["SYNCRESTART"]
-}
-
// Functions for Channels
func (p ControlChan) Start() {
p <- Messages["START"]
View
@@ -20,23 +20,27 @@ type Channels struct {
func NewGraph() *Graph {
return &Graph{
- id: newID(),
- nodes: nil,
- edges: nil,
- nodemap: make(map[*[]byte]*Node),
- edgemap: make(map[*[]byte]*Edge),
- Channels: &Channels{
- Data: make(DataChan, 1),
- Control: make(ControlChan, 1),
- Error: make(ErrorChan, 1),
- },
+ id: newID(),
+ nodes: nil,
+ edges: nil,
+ nodemap: make(map[*[]byte]*Node),
+ edgemap: make(map[*[]byte]*Edge),
+ Channels: NewChannels(),
}
}
func newID() []byte {
return []byte(fmt.Sprintf("%d%d", time.Now().UnixNano(), rand.Intn(1000000)))
}
+func NewChannels() *Channels {
+ return &Channels{
+ Data: make(DataChan, 1),
+ Control: make(ControlChan, 1),
+ Error: make(ErrorChan, 1),
+ }
+}
+
func (d DataChan) Word() {
log.Print("hmm")
}

0 comments on commit a13f1bc

Please sign in to comment.