From 1533cd996422994329857c11385b5ed91439a40a Mon Sep 17 00:00:00 2001 From: John Lauer Date: Sun, 27 Jul 2014 19:20:07 -0700 Subject: [PATCH] Buffer flow plugins added. TinyG has a plugin. Added ability for buffer flow plugins. There is a new buffer flow plugin for TinyG that watches the {"qr":NN} response. When it sees the qr value go below 12 it pauses its own sending and queues up whatever is still coming in on the Websocket. This is fine because we've got plenty of RAM on the websocket server. The {"qr":NN} value is still sent back on the websocket as soon as it was before, so the host application should see no real difference as to how it worked before. The difference now though is that the serial sending knows to check if sending is paused to the serial port and queue. This makes sure no buffer overflows ever occur. The reason this was becoming important is that the lag time between the qr response and the sending of Gcode was too distant and this buffer flow needs resolution around 5ms. Normal latency on the Internet is like 20ms to 200ms, so it just wasn't fast enough. If the Javascript hosting the websocket was busy processing other events, then this lag time became even worse. So, now the Serial Port JSON Server simply helps out by lots of extra buffering. Go ahead and pound it even harder with more serial commands and see it fly. Former-commit-id: e3b08b3f77cf660f60438b87291016341e63386b --- README.md | 43 +++++++++++++++++++++- bufferflow.go | 37 +++++++++++++++++++ bufferflow_dummypause.go | 33 +++++++++++++++++ bufferflow_grbl.go | 1 + bufferflow_tinyg.go | 78 ++++++++++++++++++++++++++++++++++++++++ dummy.go | 57 +++++++++++++++++++++++++++-- hub.go | 28 +++++++++------ main.go | 4 +-- serial.go | 15 ++++---- seriallist_windows.go | 1 + serialport.go | 65 ++++++++++++++++++++++++++++++--- 11 files changed, 336 insertions(+), 26 deletions(-) create mode 100644 bufferflow.go create mode 100644 bufferflow_dummypause.go create mode 100644 bufferflow_grbl.go create mode 100644 bufferflow_tinyg.go diff --git a/README.md b/README.md index 755798f..c853ea9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ serial-port-json-server ======================= -Version 1.2 +Version 1.3 A serial port JSON websocket & web server that runs from the command line on Windows, Mac, Linux, Raspberry Pi, or Beagle Bone that lets you communicate with your serial @@ -35,6 +35,47 @@ great use case. Finally you can write web apps that interact with a user's local Thanks go to gary.burd.info for the websocket example in Go. Thanks also go to tarm/goserial for the serial port base implementation. +How to Build +--------- +1. Install Go (http://golang.org/doc/install) +2. If you're on a Mac, install Xcode from the Apple Store. + If you're on Windows, Linux, Raspberry Pi, or Beagle Bone you are all set. +3. Get go into your path so you can run "go" from any directory: + On Linux, Mac, Raspberry Pi, Beagle Bone Black + export PATH=$PATH:/usr/local/go/bin + On Windows, use the Environment Variables dialog by right-click My Computer +4. Define your GOPATH variable. This is your personal working folder for all your +Go code. This is important because you will be retrieving several projects +from Github and Go needs to know where to download all the files and where to +build the directory structure. On my Windows computer I created a folder called +C:\Users\John\go and set GOPATH=C:\Users\John\go +5. Change directory into your GOPATH +6. Type "go get github.com/johnlauer/serial-port-json-server". This will retrieve +this Github project and all dependent projects. +7. Then change direcory into github.com\johnlauer\serial-port-json-server. +8. Type "go build" when you're inside that directory and it will create a binary +called serial-port-json-server +9. Run it by typing ./serial-port-json-server + + +Changes in 1.3 +- Added ability for buffer flow plugins. There is a new buffer flow plugin + for TinyG that watches the {"qr":NN} response. When it sees the qr value + go below 12 it pauses its own sending and queues up whatever is still coming + in on the Websocket. This is fine because we've got plenty of RAM on the + websocket server. The {"qr":NN} value is still sent back on the websocket as + soon as it was before, so the host application should see no real difference + as to how it worked before. The difference now though is that the serial sending + knows to check if sending is paused to the serial port and queue. This makes + sure no buffer overflows ever occur. The reason this was becoming important is + that the lag time between the qr response and the sending of Gcode was too distant + and this buffer flow needs resolution around 5ms. Normal latency on the Internet + is like 20ms to 200ms, so it just wasn't fast enough. If the Javascript hosting + the websocket was busy processing other events, then this lag time became even + worse. So, now the Serial Port JSON Server simply helps out by lots of extra + buffering. Go ahead and pound it even harder with more serial commands and see + it fly. + Changes in 1.2 - Added better error handling - Removed forcibly adding a newline to the serial data being sent to the port. This diff --git a/bufferflow.go b/bufferflow.go new file mode 100644 index 0000000..7fd8b69 --- /dev/null +++ b/bufferflow.go @@ -0,0 +1,37 @@ +package main + +import ( +//"log" +//"time" +) + +type Bufferflow interface { + BlockUntilReady() // implement this method + OnIncomingData(data string) // implement this method + //Name string + //Port string + //myvar mytype string + //pause bool // keep track if we're paused from sending + //buffertype string // is it tinyg, grbl, or other? +} + +/* +// this method is a method of the struct above +func (b *bufferflow) blockUntilReady() { + log.Printf("Blocking until ready. Buffertype is:%v\n", b.buffertype) + //time.Sleep(3000 * time.Millisecond) + if b.buffertype == "dummypause" { + buf := bufferflow_dummypause{Name: "blah"} + buf.blockUntilReady() + } + log.Printf("Done blocking. Buffertype is:%v\n", b.buffertype) +} + +func (b *bufferflow) onIncomingData(data) { + log.Printf("onIncomingData. data:%v", data) + if b.buffertype == "dummypause" { + buf := bufferflow_dummypause{Name: "blah"} + buf.waitUntilReady() + } +} +*/ diff --git a/bufferflow_dummypause.go b/bufferflow_dummypause.go new file mode 100644 index 0000000..2af1c63 --- /dev/null +++ b/bufferflow_dummypause.go @@ -0,0 +1,33 @@ +package main + +import ( + "log" + "time" +) + +type BufferflowDummypause struct { + Name string + Port string + NumLines int + Paused bool +} + +func (b *BufferflowDummypause) Init() { +} + +func (b *BufferflowDummypause) BlockUntilReady() { + log.Printf("BlockUntilReady() start. numLines:%v\n", b.NumLines) + log.Printf("buffer:%v\n", b) + //for b.Paused { + log.Println("We are paused. Yeilding send.") + time.Sleep(3000 * time.Millisecond) + //} + log.Printf("BlockUntilReady() end\n") +} + +func (b *BufferflowDummypause) OnIncomingData(data string) { + log.Printf("OnIncomingData() start. data:%v\n", data) + b.NumLines++ + //time.Sleep(3000 * time.Millisecond) + log.Printf("OnIncomingData() end. numLines:%v\n", b.NumLines) +} diff --git a/bufferflow_grbl.go b/bufferflow_grbl.go new file mode 100644 index 0000000..85f0393 --- /dev/null +++ b/bufferflow_grbl.go @@ -0,0 +1 @@ +package main \ No newline at end of file diff --git a/bufferflow_tinyg.go b/bufferflow_tinyg.go new file mode 100644 index 0000000..d92d5b1 --- /dev/null +++ b/bufferflow_tinyg.go @@ -0,0 +1,78 @@ +package main + +import ( + "log" + "regexp" + "strconv" + "time" +) + +type BufferflowTinyg struct { + Name string + Port string + Paused bool + StopSending int + StartSending int + sem chan int +} + +var ( + // the regular expression to find the qr value + re, _ = regexp.Compile("\"qr\":(\\d+)") +) + +func (b *BufferflowTinyg) Init() { + b.StartSending = 16 + b.StopSending = 14 + b.sem = make(chan int) +} + +func (b *BufferflowTinyg) BlockUntilReady() { + log.Printf("BlockUntilReady() start\n") + //log.Printf("buffer:%v\n", b) + if b.Paused { + //<-b.sem // will block until told from OnIncomingData to go + + for b.Paused { + //log.Println("We are paused. Yeilding send.") + time.Sleep(5 * time.Millisecond) + } + + } else { + // still yeild a bit cuz seeing we need to let tinyg + // have a chance to respond + time.Sleep(15 * time.Millisecond) + } + log.Printf("BlockUntilReady() end\n") +} + +func (b *BufferflowTinyg) OnIncomingData(data string) { + //log.Printf("OnIncomingData() start. data:%v\n", data) + if re.Match([]byte(data)) { + // we have a qr value + //log.Printf("Found a qr value:%v", re) + res := re.FindStringSubmatch(data) + qr, err := strconv.Atoi(res[1]) + if err != nil { + log.Printf("Got error converting qr value. huh? err:%v\n", err) + } else { + log.Printf("The qr val is:\"%v\"", qr) + if qr <= b.StopSending { + b.Paused = true + + log.Println("Paused sending gcode") + } else if qr >= b.StartSending { + b.Paused = false + //b.sem <- 1 // send channel a val to trigger the unblocking in BlockUntilReady() + log.Println("Started sending gcode again") + } else { + log.Println("In a middle state where we're paused sending gcode but watching for the buffer to get high enough to start sending again") + } + } + } + // Look for {"qr":28} + // Actually, if we hit qr:10, stop sending + // when hit qr:16 start again + //time.Sleep(3000 * time.Millisecond) + //log.Printf("OnIncomingData() end.\n") +} diff --git a/dummy.go b/dummy.go index ae1c63d..79bb57d 100644 --- a/dummy.go +++ b/dummy.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "log" "time" ) @@ -17,7 +18,59 @@ func (d *dummy) run() { for { //h.broadcast <- message log.Print("dummy data") - h.broadcast <- []byte("dummy data") - time.Sleep(15000 * time.Millisecond) + //h.broadcast <- []byte("dummy data") + time.Sleep(8000 * time.Millisecond) + h.broadcast <- []byte("list") + + // open com4 (tinyg) + h.broadcast <- []byte("open com4 115200 tinyg") + time.Sleep(1000 * time.Millisecond) + + // send some commands + //h.broadcast <- []byte("send com4 ?\n") + //time.Sleep(3000 * time.Millisecond) + h.broadcast <- []byte("send com4 {\"qr\":\"\"}\n") + h.broadcast <- []byte("send com4 g21 g90\n") // mm + //h.broadcast <- []byte("send com4 {\"qr\":\"\"}\n") + //h.broadcast <- []byte("send com4 {\"sv\":0}\n") + //time.Sleep(3000 * time.Millisecond) + for i := 0.0; i < 10.0; i = i + 0.001 { + h.broadcast <- []byte("send com4 G1 X" + fmt.Sprintf("%.3f", i) + " F100\n") + time.Sleep(10 * time.Millisecond) + } + /* + h.broadcast <- []byte("send com4 G1 X1\n") + h.broadcast <- []byte("send com4 G1 X2\n") + h.broadcast <- []byte("send com4 G1 X3\n") + h.broadcast <- []byte("send com4 G1 X4\n") + h.broadcast <- []byte("send com4 G1 X5\n") + h.broadcast <- []byte("send com4 G1 X6\n") + h.broadcast <- []byte("send com4 G1 X7\n") + h.broadcast <- []byte("send com4 G1 X8\n") + h.broadcast <- []byte("send com4 G1 X9\n") + h.broadcast <- []byte("send com4 G1 X10\n") + h.broadcast <- []byte("send com4 G1 X1\n") + h.broadcast <- []byte("send com4 G1 X2\n") + h.broadcast <- []byte("send com4 G1 X3\n") + h.broadcast <- []byte("send com4 G1 X4\n") + h.broadcast <- []byte("send com4 G1 X5\n") + h.broadcast <- []byte("send com4 G1 X6\n") + h.broadcast <- []byte("send com4 G1 X7\n") + h.broadcast <- []byte("send com4 G1 X8\n") + h.broadcast <- []byte("send com4 G1 X9\n") + h.broadcast <- []byte("send com4 G1 X10\n") + h.broadcast <- []byte("send com4 G1 X1\n") + h.broadcast <- []byte("send com4 G1 X2\n") + h.broadcast <- []byte("send com4 G1 X3\n") + h.broadcast <- []byte("send com4 G1 X4\n") + h.broadcast <- []byte("send com4 G1 X5\n") + h.broadcast <- []byte("send com4 G1 X6\n") + h.broadcast <- []byte("send com4 G1 X7\n") + h.broadcast <- []byte("send com4 G1 X8\n") + h.broadcast <- []byte("send com4 G1 X9\n") + h.broadcast <- []byte("send com4 G1 X10\n") + */ + break } + log.Println("dummy process exited") } diff --git a/hub.go b/hub.go index 8373a26..1d069b1 100644 --- a/hub.go +++ b/hub.go @@ -43,20 +43,20 @@ func (h *hub) run() { delete(h.connections, c) close(c.send) case m := <-h.broadcast: - log.Print("Got a broadcast") + //log.Print("Got a broadcast") //log.Print(m) //log.Print(len(m)) if len(m) > 0 { //log.Print(string(m)) //log.Print(h.broadcast) checkCmd(m) - log.Print("-----") + //log.Print("-----") for c := range h.connections { select { case c.send <- m: - log.Print("did broadcast to ") - log.Print(c.ws.RemoteAddr()) + //log.Print("did broadcast to ") + //log.Print(c.ws.RemoteAddr()) //c.send <- []byte("hello world") default: delete(h.connections, c) @@ -66,15 +66,15 @@ func (h *hub) run() { } } case m := <-h.broadcastSys: - log.Print("Got a system broadcast") - log.Print(string(m)) - log.Print("-----") + //log.Print("Got a system broadcast") + //log.Print(string(m)) + //log.Print("-----") for c := range h.connections { select { case c.send <- m: - log.Print("did broadcast to ") - log.Print(c.ws.RemoteAddr()) + //log.Print("did broadcast to ") + //log.Print(c.ws.RemoteAddr()) //c.send <- []byte("hello world") default: delete(h.connections, c) @@ -111,7 +111,15 @@ func checkCmd(m []byte) { go spErr("Problem converting baud rate " + args[2]) return } - go spHandlerOpen(args[1], baud) + // pass in buffer type now as string. if user does not + // ask for a buffer type pass in empty string + bufferAlgorithm := "" + if len(args) > 3 { + // cool. we got a buffer type request + buftype := strings.Replace(args[3], "\n", "", -1) + bufferAlgorithm = buftype + } + go spHandlerOpen(args[1], baud, bufferAlgorithm) } else if strings.HasPrefix(sl, "close") { diff --git a/main.go b/main.go index f36afae..b1d7395 100644 --- a/main.go +++ b/main.go @@ -1,4 +1,4 @@ -// Version 1.2 +// Version 1.3 // Supports Windows, Linux, Mac, and Raspberry Pi, Beagle Bone Black package main @@ -13,7 +13,7 @@ import ( ) var ( - version = "1.2" + version = "1.3" addr = flag.String("addr", ":8989", "http service address") assets = flag.String("assets", defaultAssetPath(), "path to assets") //homeTempl *template.Template diff --git a/serial.go b/serial.go index 4b4a275..f7c1607 100644 --- a/serial.go +++ b/serial.go @@ -1,4 +1,4 @@ -// Supports Windows, Linux, Mac, and Raspberry Pi +// Supports Windows, Linux, Mac, BeagleBone Black, and Raspberry Pi package main @@ -61,7 +61,7 @@ func (sh *serialhub) run() { select { case p := <-sh.register: log.Print("Registering a port: ", p.portConf.Name) - h.broadcastSys <- []byte("{\"Cmd\" : \"Open\", \"Desc\" : \"Got register/open on port.\", \"Port\" : \"" + p.portConf.Name + "\", \"Baud\" : " + strconv.Itoa(p.portConf.Baud) + " }") + h.broadcastSys <- []byte("{\"Cmd\" : \"Open\", \"Desc\" : \"Got register/open on port.\", \"Port\" : \"" + p.portConf.Name + "\", \"Baud\" : " + strconv.Itoa(p.portConf.Baud) + ", \"BufferType\" : \"" + p.BufferType + "\" }") //log.Print(p.portConf.Name) sh.ports[p] = true case p := <-sh.unregister: @@ -70,17 +70,18 @@ func (sh *serialhub) run() { delete(sh.ports, p) close(p.send) case wr := <-sh.write: - log.Print("Got a write to a port") - log.Print("Port: ", string(wr.p.portConf.Name)) + //log.Print("Got a write to a port") + //log.Print("Port: ", string(wr.p.portConf.Name)) //log.Print(wr.p) //log.Print("Data is ") //log.Print(wr.d) - log.Print("Data:" + string(wr.d)) - log.Print("-----") + //log.Print("Data:" + string(wr.d)) + //log.Print("-----") select { case wr.p.send <- wr.d: //log.Print("Did write to serport") - h.broadcastSys <- []byte("{\"Cmd\" : \"Write\", \"Desc\" : \"Did write on port.\", \"Port\" : \"" + wr.p.portConf.Name + "\"}") + //h.broadcastSys <- []byte("{\"Cmd\" : \"Write\", \"Desc\" : \"Did write on port.\", \"Port\" : \"" + wr.p.portConf.Name + "\"}") + h.broadcastSys <- []byte("{\"Cmd\" : \"Write\", \"Desc\" : \"Queued write on port.\", \"Port\" : \"" + wr.p.portConf.Name + "\"}") default: sh.unregister <- wr.p //delete(sh.ports, wr.p) diff --git a/seriallist_windows.go b/seriallist_windows.go index d2e28b7..4d6f104 100644 --- a/seriallist_windows.go +++ b/seriallist_windows.go @@ -47,6 +47,7 @@ func getListViaWmiPnpEntity() ([]OsSerialPort, os.SyscallError) { log.Println("Got result from oleutil.CallMethod") if err2 != nil { // we got back an error or empty list + log.Printf("Got an error back from oleutil.CallMethod. err:%v", err2) return nil, err } diff --git a/serialport.go b/serialport.go index 06dd8da..8881516 100644 --- a/serialport.go +++ b/serialport.go @@ -20,11 +20,16 @@ type serport struct { // Buffered channel of outbound messages. send chan []byte + + // Do we have an extra channel/thread to watch our buffer? + BufferType string + //bufferwatcher *BufferflowDummypause + bufferwatcher Bufferflow } type SpPortMessage struct { - P string - D string + P string // the port, i.e. com22 + D string // the data, i.e. G0 X0 Y0 } func (p *serport) reader() { @@ -41,6 +46,13 @@ func (p *serport) reader() { //log.Print("The data i will convert to json is:") //log.Print(data) + // give the data to our bufferflow so it can do it's work + // to read/translate the data to see if it wants to block + // writes to the serialport. each bufferflow type will decide + // this on its own based on its logic, i.e. tinyg vs grbl vs others + //p.b.bufferwatcher..OnIncomingData(data) + p.bufferwatcher.OnIncomingData(data) + //m := SpPortMessage{"Alice", "Hello"} m := SpPortMessage{p.portConf.Name, data} //log.Print("The m obj struct is:") @@ -102,9 +114,25 @@ func (p *serport) reader() { p.portIo.Close() } +// this method runs as its own thread because it's instantiated +// as a "go" method. so if it blocks inside, it is ok func (p *serport) writer() { + // this for loop blocks on p.send until that channel + // sees something come in for data := range p.send { + + // we want to block here if we are being asked + // to pause. the problem is, how do we unblock + //bufferBlockUntilReady(p.bufferwatcher) + p.bufferwatcher.BlockUntilReady() + n2, err := p.portIo.Write(data) + + // if we get here, we were able to write successfully + // to the serial port because it blocks until it can write + + h.broadcastSys <- []byte("{\"Cmd\" : \"WriteComplete\", \"Bytes\" : " + strconv.Itoa(n2) + ", \"Desc\" : \"Completed write on port.\", \"Port\" : \"" + p.portConf.Name + "\"}") + log.Print("Just wrote ", n2, " bytes to serial: ", string(data)) //log.Print(n2) //log.Print(" bytes to serial: ") @@ -122,7 +150,7 @@ func (p *serport) writer() { p.portIo.Close() } -func spHandlerOpen(portname string, baud int) { +func spHandlerOpen(portname string, baud int, buftype string) { log.Print("Inside spHandler") @@ -153,7 +181,36 @@ func spHandlerOpen(portname string, baud int) { return } log.Print("Opened port successfully") - p := &serport{send: make(chan []byte, 256), portConf: conf, portIo: sp} + //p := &serport{send: make(chan []byte, 256), portConf: conf, portIo: sp} + p := &serport{send: make(chan []byte, 256*100), portConf: conf, portIo: sp, BufferType: buftype} + + // if user asked for a buffer watcher, i.e. tinyg/grbl then attach here + if buftype != "" { + + if buftype == "tinyg" { + bw := &BufferflowTinyg{Name: "no name needed"} + bw.Init() + p.bufferwatcher = bw + } + //p.bufferwatcher := &bufferflow{buffertype: buftype} + //p.bufferwatcher.buffertype = buftype + + // could open the buffer thread here, or do it when this + // port is registered. the buffer thread will watch the writer + // and the reader. it will look at the content and decide + // if a pause must occur + + } else { + // for now, just use a dummy pause type bufferflow object + // to test artificially a delay on the serial port write + //p.bufferwatcher.buffertype = "dummypause" + //p.bufferwatcher.BlockUntilReady() + bw := &BufferflowDummypause{Name: "blah"} + p.bufferwatcher = bw + //p.bufferwatcher.Name = "blah2" + + } + sh.register <- p defer func() { sh.unregister <- p }() go p.writer()