Permalink
Browse files

Playing around with tcpdup

  • Loading branch information...
1 parent 0de42e6 commit f42f9b54a969a8dd478ceddc70e0545603111023 @mrb committed Jan 7, 2013
Showing with 85 additions and 58 deletions.
  1. +6 −2 examples/syncasync/main.go
  2. +79 −56 examples/tcpudp/main.go
View
8 examples/syncasync/main.go
@@ -33,12 +33,16 @@ func main() {
// You can add a lot of events to a node. They interact with each other in
// different ways. 3 "asyncLogger" Event functions are instantiated here.
- node.NewAsyncEvents(makeAsyncLogger("a0"), makeAsyncLogger("a1"), makeAsyncLogger("a2"))
+ node.NewAsyncEvents(makeAsyncLogger("a0"),
+ makeAsyncLogger("a1"),
+ makeAsyncLogger("a2"))
// These three Events are different - they're "sync". They'll get executed
// in order and their scheduling is controlled by an external function. In
// this case, that's the watcher Event.
- node.NewSyncEvents(makeSyncLogger("s0"), makeSyncLogger("s1"), makeSyncLogger("s2"))
+ node.NewSyncEvents(makeSyncLogger("s0"),
+ makeSyncLogger("s1"),
+ makeSyncLogger("s2"))
// "Activate" is Sylvester's word for "start the flow of data."
graph.Activate()
View
135 examples/tcpudp/main.go
@@ -4,102 +4,125 @@ import (
syl "github.com/mrb/sylvester"
conn "github.com/mrb/sylvester/connections"
"log"
+ "net"
"os"
- "reflect"
)
func main() {
graph := syl.NewGraph()
input := graph.NewNode()
- input.NewAsyncEvent(Retryer)
- input.NewAsyncEvent(UDPbyteReader)
+ input.NewAsyncEvent(tcpConnectAndRead)
output := graph.NewNode()
- output.NewAsyncEvent(Retryer)
- output.NewAsyncEvent(TCPbyteWriter)
+ output.NewAsyncEvent(udpConnectAndWrite)
- errorHandler := graph.NewNode()
- errorHandler.NewAsyncEvent(ErrorHandler)
+ supervisor := graph.NewNode()
+ supervisor.NewAsyncEvent(makeWatcher(input, output))
graph.NewEdge(input, output)
- graph.NewEdge(input, errorHandler)
- graph.NewEdge(output, errorHandler)
- graph.NewEdge(errorHandler, input)
- graph.NewEdge(errorHandler, output)
graph.Activate()
- select {
- case <-graph.Control:
- log.Print("Received Exit Signal, exiting")
- os.Exit(0)
- }
+ <-graph.Control
+ log.Print("Received Exit Signal, exiting")
+ os.Exit(0)
}
-func Retryer(c syl.Channels) {
+func tcpConnectAndRead(c syl.Channels) {
+ tcp, tcpConnector := maketcpConnecter()
+ go tcpConnector(c)
+
+ tcpReader := maketcpReader(tcp)
+ go tcpReader(c)
+
for {
select {
- case sig := <-c.Control:
- log.Print("!", sig)
+ case <-c.Control:
+ tcpConnectAndRead(c)
}
}
}
-func TCPbyteWriter(c syl.Channels) {
- udp, err := conn.UdpConnect("localhost:2323")
- if err != nil {
- er := syl.NewEventError(c.NodeId, err)
- log.Print(reflect.TypeOf(er))
- c.Error <- er
- }
+func maketcpConnecter() (tcp *net.TCPConn, event syl.Event) {
+ event = func(c syl.Channels) {
+ var err error
+ tcp, err = conn.TcpConnect("localhost:2323")
- for {
- select {
- case data := <-c.Data:
- log.Printf("Writing %d bytes to UDP", len(data))
- udp.Write(data)
+ if err != nil {
+ c.Error <- err
}
}
+
+ return tcp, event
}
-func UDPbyteReader(c syl.Channels) {
- tcp, err := conn.TcpConnect("localhost:2322")
- if err != nil {
- c.Error <- syl.NewEventError(c.NodeId, err)
- return
+func maketcpReader(tcp *net.TCPConn) (event syl.Event) {
+ return func(c syl.Channels) {
+ data := make([]byte, 512)
+
+ for {
+ log.Print("Reading from TCP...")
+ dlen, err := tcp.Read(data)
+ if err != nil {
+ log.Print(err)
+ c.Error <- syl.NewEventError(c.NodeId, err)
+ return
+ }
+ log.Printf("...read %d bytes from TCP", dlen)
+
+ c.Data <- data[0:dlen]
+ }
}
+}
- data := make([]byte, 512)
+func udpConnectAndWrite(c syl.Channels) {
+ udp, udpConnector := makeudpConnecter()
+ go udpConnector(c)
+
+ udpWriter := makeudpWriter(udp)
+ go udpWriter(c)
for {
- log.Print("Reading from TCP...")
- dlen, err := tcp.Read(data)
+ select {}
+ }
+}
+
+func makeudpConnecter() (udp *net.UDPConn, event syl.Event) {
+ event = func(c syl.Channels) {
+ var err error
+ udp, err = conn.UdpConnect("localhost:2322")
if err != nil {
- c.Error <- syl.NewEventError(c.NodeId, err)
+ c.Error <- err
return
}
- log.Printf("...read %d bytes from TCP", dlen)
+ }
- c.Data <- data[0:dlen]
+ return udp, event
+}
+
+func makeudpWriter(udp *net.UDPConn) (event syl.Event) {
+ return func(c syl.Channels) {
+ for {
+ select {
+ case data := <-c.Data:
+ log.Printf("Writing %d bytes to UDP", len(data))
+ udp.Write(data)
+ }
+ }
}
}
-func ErrorHandler(c syl.Channels) {
- e := 0
- r := 4
- for {
- select {
- case err := <-c.Error:
- log.Print("?", err.Error())
- //if err.Error() == conn.ErrTCPConnection {
- log.Print(e, r)
- e += 1
- if e > r {
- os.Exit(3)
+func makeWatcher(input *syl.Node, output *syl.Node) syl.Event {
+ return func(c syl.Channels) {
+ for {
+ select {
+ case <-input.Error:
+ log.Print("IOE")
+ input.Control.Next()
+ case <-output.Error:
+ output.Control.Next()
}
- c.Control <- []byte{0, 0}
- //}
}
}
}

0 comments on commit f42f9b5

Please sign in to comment.