Skip to content

Commit

Permalink
Added ConnectRaw method.
Browse files Browse the repository at this point in the history
  • Loading branch information
jbrukh committed Feb 26, 2012
1 parent a2c5aa6 commit a4b6112
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 107 deletions.
42 changes: 24 additions & 18 deletions examples/abort/abort.go
Expand Up @@ -5,44 +5,50 @@ import (
"fmt"
"os"
"time"
"flag"
)

const SERIAL_PORT = "/dev/tty.MindBand"
const DEFAULT_PORT = "/dev/tty.MindBand2"
var serialPort *string = flag.String("port", DEFAULT_PORT, "the serial port for the device")

func signalHandler(disconnect chan<- bool, ch chan byte) {
func init() {
flag.Parse()
}

func signalHandler(d *goneuro.Device, data chan byte) {
println("sleeping 10 seconds...")
time.Sleep(2 * 1e9)
time.Sleep(10 * time.Second)
println("disconnecting...")
disconnect <- true
d.Disconnect()
println("closing ch")
close(ch)
close(data)
}

func main() {
// collect meditation on a channel
ch := make(chan byte, 512)
data := make(chan byte, 512)
listener := &goneuro.ThinkGearListener{
Meditation: func(b byte) {
ch <- b
data <- b
},
}

// open the device
disconnect, err := goneuro.Connect(SERIAL_PORT, listener)
if err != nil {
println("couldn't connect to device")
os.Exit(1)
}
println("getting device and connecting...")
d := goneuro.NewDevice(*serialPort)
if err := d.Connect(listener); err != nil {
fmt.Println(err)
os.Exit(1)
}
d.Engage()

// listen for Ctrl-C
go signalHandler(disconnect, ch)
go signalHandler(d, data)

// wait for and print values indefinitely
for {
b, ok := <-ch
b, ok := <-data
if !ok {
println("will die in 2")
time.Sleep(1e9 * 2)
println("will die in 5")
time.Sleep(1e9 * 5)
break // we are done
}
fmt.Println("Meditation: ", b)
Expand Down
22 changes: 16 additions & 6 deletions examples/streamer/streamer.go
Expand Up @@ -4,9 +4,16 @@ import (
"fmt"
"github.com/jbrukh/goneuro"
"time"
"flag"
"os"
)

const SERIAL_PORT = "/dev/tty.MindBand2"
const DEFAULT_PORT = "/dev/tty.MindBand2"
var serialPort *string = flag.String("port", DEFAULT_PORT, "the serial port for the device")

func init() {
flag.Parse()
}

func main() {
data := make(chan int16)
Expand All @@ -15,12 +22,15 @@ func main() {
data <- int16(a)<<8 | int16(b)
},
}
_, err := goneuro.Connect(SERIAL_PORT, listener)
if err != nil {
fmt.Println(err)
return
}

d := goneuro.NewDevice(*serialPort)
if err := d.Connect(listener); err != nil {
os.Exit(1)
}
println("sleeping 5 seconds")
time.Sleep(5*time.Second)
println("engaging")
d.Engage()
startNanos := time.Now()
for {
fmt.Println(time.Now().Sub(startNanos).Nanoseconds(), <-data)
Expand Down
149 changes: 66 additions & 83 deletions goneuro.go
Expand Up @@ -14,7 +14,7 @@ import (
"fmt"
"io"
"os"
"github.com/jbrukh/atomic"
"sync"
)

// Approx the number of data points to be
Expand Down Expand Up @@ -123,7 +123,7 @@ func parseByteStream(device io.ReadCloser, pparser payloadParser, conn <-chan bo
// check for exit
select {
case v, ok := <-conn:
if ok & !v {
if ok && !v {
return // disconnect when "false" sent
} else if ok && v {
engaged = true
Expand Down Expand Up @@ -171,16 +171,16 @@ func parseByteStream(device io.ReadCloser, pparser payloadParser, conn <-chan bo
println("checksum has failed: ", checksum, "expected: ", stated)
continue
}
if payloadParser != nil {
payloadParser(&payload)
if pparser != nil {
pparser(&payload)
}
}
println("done with parsing")
}

// fullPayloadParser delivers a payload parser with the
// given listener
func fullPayloadParser(listener *ThinkGearListener) {
func fullPayloadParser(listener *ThinkGearListener) payloadParser {
return func(payloadPtr *[]byte) {
parseFullPayload(payloadPtr, listener)
}
Expand All @@ -189,7 +189,7 @@ func fullPayloadParser(listener *ThinkGearListener) {
// rawPayloadParser delivers a payload parser that only
// parses raw signal and ignores everything else, and then
// delivers the raw signal on a channel
func rawPayloadParser(output chan float64) {
func rawPayloadParser(output chan<- float64) payloadParser {
return func(payloadPtr *[]byte) {
parseRawPayload(payloadPtr, output)
}
Expand Down Expand Up @@ -286,8 +286,8 @@ func parseRawPayload(payloadPtr *[]byte, output chan<- float64) {
// get the data
output <- float64(int16(payload[inx+2])<<8 | int16(payload[inx+3]))
} else {
println("raw signal did not have 2 bytes")
break
println("raw signal did not have 2 bytes")
break
}
nextRow(4)
default:
Expand All @@ -297,17 +297,18 @@ func parseRawPayload(payloadPtr *[]byte, output chan<- float64) {
}

// Device represents a NeuroSky/ThinkGear device
type Device {
conn chan<- bool
type Device struct {
conn chan bool
Port string
connected *atomic.AtomicValue
connected bool
lock *sync.Mutex
}

func (d *Device) New(serialPort string) *Device {
func NewDevice(serialPort string) *Device {
return &Device{
conn: make(chan bool)
conn: make(chan bool),
Port: serialPort,
connected: atomic.NewWithValue(false)
lock: new(sync.Mutex),
}
}

Expand All @@ -317,17 +318,22 @@ func (d *Device) New(serialPort string) *Device {
// If the device is not connected, then this call
// will have no effect.
func (d *Device) Engage() {
if (d.connected.Get().(bool)) {
conn <- true
d.lock.Lock()
defer d.lock.Unlock()
if (d.connected) {
d.conn <- true
}
}

// Disconnect will disconnect from the device and
// close the serial port. If the device is not
// connected, this call will have no effect.
func (d *Device) Disconnect() {
if (d.connected.Get().(bool)) {
conn <- false
d.lock.Lock() // otherwise, multiple calls here can block forever
defer d.lock.Unlock()
if (d.connected) {
d.conn <- false
d.connected = false
}
}

Expand All @@ -342,22 +348,47 @@ func (d *Device) Disconnect() {
//
// If the device is already connected, then this call
// will have no effect.
//
// The serial port is typically a string of the form
//
// /dev/tty.MindBand
//
// or whatever you set up in your systems Bluetooth
// options for the device. Note that the various
// portions of the ThinkGearListener will be triggered
// synchronously to parsing, so it may be desirable
// in certain situations for the user to throw data
// onto channels for serial, asynchronous processing.
// If you do use a channel, make sure that this channel
// is asynchronous, or you can still hold up processing.
func (d *Device) Connect(listener *ThinkGearListener) (err error) {
if (d.connected.Get().(bool)) {
return errors.New("device is already connected")
d.lock.Lock() // multiple connects on the same device will block
defer d.lock.Unlock()

device, err := d.connect()
if err != nil {
return
}
// start spinning the data stream on another thread
// and wait for Engage() call
go parseByteStream(device, fullPayloadParser(listener), d.conn)
return
}

// connect will connect to the serial port and set internal
// state of the Device appropriately. This method probably
// needs to be synchronized externally.
func (d *Device) connect() (device io.ReadCloser, err error) {
if (d.connected) {
return nil, errors.New("device is already connected")
}
var device io.ReadCloser
device, err = os.Open(d.Port)
if err != nil {
str := fmt.Sprintf("device problem: %s", err)
return errors.New(str)
return nil, errors.New(str)
}
d.connected.Set(true)
d.connected = true
println("connected: ", d.Port)

// start spinning the data stream on another thread
// and wait for Engage() call
go parseByteStream(device, fullPayloadParser(listener), d.conn)
return
}

Expand All @@ -370,64 +401,16 @@ func (d *Device) Connect(listener *ThinkGearListener) (err error) {
//
// If the device is already connected, then this call
// will have no effect.
func (d *Device) ConnectRaw(output chan<- float64) {
if (d.connected.Get().(bool)) {
return
}
}
func (d *Device) ConnectRaw(output chan<- float64) (err error){
d.lock.Lock() // multiple connects on the same device will block
defer d.lock.Unlock()

// Connect to the device over the serial port
// and start parsing data; the serial port
// is typically a string of the form
//
// /dev/tty.MindBand
//
// or whatever you set up in your systems Bluetooth
// options for the device. Note that the various
// portions of the ThinkGearListener will be triggered
// synchronously to parsing, so it may be desirable
// in certain situations for the user to throw data
// onto channels for serial, asynchronous processing.
// If you do use a channel, make sure that this channel
// is asynchronous, or you can still hold up processing.
//
// This method will return a send-only channel
// for the purposes of ceasing the connection. In
// order to close the connection, send true to
// the disconnect channel.
func Connect(serialPort string, listener *ThinkGearListener) (conn chan<- bool, err error) {
var device io.ReadCloser
device, err = os.Open(serialPort)
device, err := d.connect()
if err != nil {
str := fmt.Sprintf("device problem: %s", err)
return nil, errors.New(str)
return
}
println("connected: ", serialPort)

// create the disconnect channel
ch := make(chan bool)

// go and process this this stream asynchronously
// until the user sends a signal to disconnect
go parseByteStream(device, fullPayloadParser(listener), ch)

disconnect = ch // cast to send-only
// start spinning the data stream on another thread
// and wait for Engage() call
go parseByteStream(device, rawPayloadParser(output), d.conn)
return
}

// ConnectRaw streams just the raw data on a channel;
// this is provided as a convenience method
func ConnectRaw(serialPort string) (disconnect chan<- bool, data <-chan float64, err error) {
ch := make(chan float64, WINDOW_SIZE)
listener := &ThinkGearListener{
RawSignal: func(a, b byte) {
ch <- float64(int16(a)<<8 | int16(b))
},
}
disconnect, err = Connect(serialPort, listener)
if err != nil {
return
}
data = ch
return
}

0 comments on commit a4b6112

Please sign in to comment.