Permalink
Browse files

Sync and Async Events

It compiles, but the examples are weird. This is, of course, the
most naive possible implementation of this concept.
  • Loading branch information...
1 parent 1d20f18 commit 30242622fe4f18a6c092bb028c415741f449b83a @mrb committed Jan 3, 2013
Showing with 78 additions and 44 deletions.
  1. +2 −0 RAMBLINGS.md
  2. +7 −1 edge.go
  3. +38 −26 examples/tcpudp/main.go
  4. +1 −1 graph.go
  5. +26 −12 node.go
  6. +4 −4 protocol.go
View
@@ -1,3 +1,5 @@
+// This is preserved for history, it's some of my first notes on the project.
+
```go
/*
A framework for creating networking applications that can be expressed
View
@@ -1,6 +1,8 @@
package sylvester
-import ()
+import (
+ "log"
+)
type Edge struct {
id []byte
@@ -29,6 +31,10 @@ func NewEdges(anode *Node, bnodes []*Node) *Edge {
}
func (e *Edge) Activate(c Channels) {
+ for _, bnode := range e.bnodes {
+ log.Printf("Connecting: %s - %s", e.anode.Id(), bnode.Id())
+ }
+
go func() {
for {
select {
@@ -5,19 +5,22 @@ import (
conn "github.com/mrb/sylvester/connections"
"log"
"os"
+ "reflect"
)
func main() {
graph := syl.NewGraph()
input := graph.NewNode()
- input.NewEvent(UDPbyteReader)
+ input.NewAsyncEvent(Retryer)
+ input.NewAsyncEvent(UDPbyteReader)
output := graph.NewNode()
- output.NewEvent(TCPbyteWriter)
+ output.NewAsyncEvent(Retryer)
+ output.NewAsyncEvent(TCPbyteWriter)
errorHandler := graph.NewNode()
- errorHandler.NewEvent(ErrorHandler)
+ errorHandler.NewAsyncEvent(ErrorHandler)
graph.NewEdge(input, output)
graph.NewEdge(input, errorHandler)
@@ -34,7 +37,30 @@ func main() {
}
}
-func Restarter(c syl.Channels) {
+func Retryer(c syl.Channels) {
+ for {
+ select {
+ case sig := <-c.Control:
+ log.Print("!", sig)
+ }
+ }
+}
+
+func TCPbyteWriter(c syl.Channels) {
+ udp, err := conn.UdpConnect("localhost:2323")
+ if err != nil {
+ er := syl.NewEventError(c.NodeId, err)
+ log.Print(reflect.TypeOf(er))
+ c.Error <- er
+ }
+
+ for {
+ select {
+ case data := <-c.Data:
+ log.Printf("Writing %d bytes to UDP", len(data))
+ udp.Write(data)
+ }
+ }
}
func UDPbyteReader(c syl.Channels) {
@@ -59,35 +85,21 @@ func UDPbyteReader(c syl.Channels) {
}
}
-func TCPbyteWriter(c syl.Channels) {
- udp, err := conn.UdpConnect("localhost:2323")
- if err != nil {
- c.Error <- syl.NewEventError(c.NodeId, err)
- }
-
- for {
- select {
- case data := <-c.Data:
- log.Printf("Writing %d bytes to UDP", len(data))
- udp.Write(data)
- }
- }
-}
-
func ErrorHandler(c syl.Channels) {
e := 0
r := 4
for {
select {
case err := <-c.Error:
- log.Print(err)
- if err == conn.ErrTCPConnection {
- e += 1
- if e > r {
- os.Exit(3)
- }
- c.Control <- []byte{0, 0}
+ log.Print("?", err.Error())
+ //if err.Error() == conn.ErrTCPConnection {
+ log.Print(e, r)
+ e += 1
+ if e > r {
+ os.Exit(3)
}
+ c.Control <- []byte{0, 0}
+ //}
}
}
}
View
@@ -16,7 +16,7 @@ func (g *Graph) Id() *[]byte {
func (g *Graph) Activate() {
for _, node := range g.nodemap {
- go node.Activate(*g.Channels)
+ go node.Activate()
}
for _, edge := range g.edges {
go edge.Activate(*g.Channels)
View
@@ -5,9 +5,10 @@ import ()
type Event func(Channels)
type Node struct {
- id []byte
- data []byte
- events []Event
+ id []byte
+ data []byte
+ syncEvents []Event
+ asyncEvents []Event
*Channels
}
@@ -19,9 +20,10 @@ func NewNode() *Node {
nodeId := newID()
return &Node{
- id: nodeId,
- data: nil,
- events: nil,
+ id: nodeId,
+ data: nil,
+ asyncEvents: nil,
+ syncEvents: nil,
Channels: &Channels{
Data: make(DataChan, 1),
Control: make(ControlChan, 1),
@@ -35,14 +37,26 @@ func (n *Node) DataChan() DataChan {
return n.Data
}
-func (n *Node) NewEvent(newEvent Event) error {
- n.events = append(n.events, newEvent)
+func (n *Node) NewAsyncEvent(newEvent Event) error {
+ n.asyncEvents = append(n.asyncEvents, newEvent)
return nil
}
-func (n *Node) Activate(c Channels) {
- // Currently only handles one Event.
- if len(n.events) > 0 {
- go n.events[0](*n.Channels)
+func (n *Node) NewSyncEvent(newEvent Event) error {
+ n.syncEvents = append(n.syncEvents, newEvent)
+ return nil
+}
+
+func (n *Node) Activate() {
+ if len(n.asyncEvents) > 0 {
+ for _, event := range n.asyncEvents {
+ go event(*n.Channels)
+ }
+ }
+
+ if len(n.syncEvents) > 0 {
+ for _, event := range n.syncEvents {
+ event(*n.Channels)
+ }
}
}
View
@@ -10,19 +10,19 @@ var (
)
// Convenient Access for Message Acess
-func Start() []byte {
+func NodeStart() []byte {
return Messages["START"]
}
-func Exit() []byte {
+func NodeExit() []byte {
return Messages["EXIT"]
}
-func Ping() []byte {
+func NodePing() []byte {
return Messages["PING"]
}
-func Pong() []byte {
+func NodePong() []byte {
return Messages["PONG"]
}

0 comments on commit 3024262

Please sign in to comment.