Permalink
Browse files

Pretty good shape for syncasync\!

  • Loading branch information...
1 parent caa613a commit 23241cee3203cf180b48493b4705ad3751c3416b @mrb committed Jan 5, 2013
Showing with 84 additions and 21 deletions.
  1. +44 −15 examples/syncasync/main.go
  2. BIN examples/syncasync/syncasync
  3. +26 −6 node.go
  4. +14 −0 protocol.go
@@ -1,9 +1,11 @@
package main
import (
+ "bytes"
syl "github.com/mrb/sylvester"
"log"
"math/rand"
+ "os"
"time"
)
@@ -13,49 +15,76 @@ func main() {
node := graph.NewNode()
node.NewAsyncEvent(Starter)
- node.NewAsyncEvent(Async)
- node.NewAsyncEvent(Async)
- node.NewAsyncEvent(Async)
- node.NewAsyncEvent(Async)
+ node.NewAsyncEvent(Watcher)
node.NewSyncEvent(Sync)
- node.NewSyncEvent(Sync)
- node.NewSyncEvent(Sync)
- node.NewSyncEvent(Sync)
+ node.NewSyncEvent(Sync2)
+ node.NewSyncEvent(Sync3)
graph.Activate()
- select {
- case <-graph.Control:
- log.Print("Peace")
- }
+ <-graph.Control
}
func Starter(c syl.Channels) {
cd := 0
for {
- <-time.After(time.Duration(rand.Int31n(150)) * time.Millisecond)
c.Data <- []byte{byte(cd)}
cd += 1
+ if cd > 100 {
+ c.Control <- syl.NodeExit()
+ }
}
}
func Async(c syl.Channels) {
for {
select {
case data := <-c.Data:
+ <-time.After(time.Duration(rand.Int31n(100)) * time.Millisecond)
log.Print("a", data)
}
}
}
func Sync(c syl.Channels) {
- log.Print("here sync")
+ select {
+ case data := <-c.Data:
+ <-time.After(time.Duration(rand.Int31n(100)) * time.Millisecond)
+ log.Print(" s", data)
+ c.Control <- syl.NodeNext()
+ }
+}
+func Sync2(c syl.Channels) {
+ select {
+ case data := <-c.Data:
+ <-time.After(time.Duration(rand.Int31n(100)) * time.Millisecond)
+ log.Print(" s2", data)
+ c.Control <- syl.NodeNext()
+ }
+}
+
+func Sync3(c syl.Channels) {
+ select {
+ case data := <-c.Data:
+ <-time.After(time.Duration(rand.Int31n(100)) * time.Millisecond)
+ log.Print(" s3", data)
+ c.Control <- syl.NodeSyncEventRestart()
+ }
+}
+
+func Watcher(c syl.Channels) {
for {
+ <-time.After(10 * time.Millisecond)
select {
- case data := <-c.Data:
- log.Print("s", data)
+ case control := <-c.Control:
+ if bytes.Equal(control, syl.NodeExit()) {
+ log.Print("Received EXIT, exiting")
+ os.Exit(0)
+ } else {
+ c.Control <- control
+ }
}
}
}
Binary file not shown.
View
@@ -1,6 +1,8 @@
package sylvester
-import ()
+import (
+ "bytes"
+)
type Event func(Channels)
@@ -49,14 +51,32 @@ func (n *Node) NewSyncEvent(newEvent Event) error {
func (n *Node) Activate() {
if len(n.asyncEvents) > 0 {
- for _, event := range n.asyncEvents {
- go event(*n.Channels)
- }
+ n.StartAsyncEvents()
}
if len(n.syncEvents) > 0 {
- for _, event := range n.syncEvents {
- go event(*n.Channels)
+ n.StartSyncEvents()
+ }
+}
+
+func (n *Node) StartAsyncEvents() {
+ for _, event := range n.asyncEvents {
+ go event(*n.Channels)
+ }
+}
+
+func (n *Node) StartSyncEvents() {
+ for _, event := range n.syncEvents {
+ go event(*n.Channels)
+ select {
+ case control := <-n.Control:
+ if bytes.Equal(control, NodeNext()) {
+
+ } else if bytes.Equal(control, NodeSyncEventRestart()) {
+ n.StartSyncEvents()
+ } else {
+ n.Control <- control
+ }
}
}
}
View
@@ -6,6 +6,8 @@ var (
"EXIT": {0, 1},
"PING": {0, 2},
"PONG": {0, 3},
+ "NEXT": {0, 4},
+ "SYNCRESTART": {0, 5},
}
)
@@ -26,6 +28,14 @@ func NodePong() []byte {
return Messages["PONG"]
}
+func NodeNext() []byte {
+ return Messages["NEXT"]
+}
+
+func NodeSyncEventRestart() []byte {
+ return Messages["SYNCRESTART"]
+}
+
// Functions for Channels
func (p ControlChan) Start() {
p <- Messages["START"]
@@ -42,3 +52,7 @@ func (p ControlChan) Ping() {
func (p ControlChan) Pong() {
p <- Messages["PONG"]
}
+
+func (p ControlChan) Next() {
+ p <- Messages["NEXT"]
+}

0 comments on commit 23241ce

Please sign in to comment.