Skip to content

Commit

Permalink
Allow enable/disable of autoretry
Browse files Browse the repository at this point in the history
  • Loading branch information
mmaelzer committed Dec 23, 2015
1 parent 065ba0e commit b015152
Showing 1 changed file with 31 additions and 23 deletions.
54 changes: 31 additions & 23 deletions cam.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ type Camera struct {
Password string // optional password for basic authentication
Log bool // should log
LastFrame *Frame
AutoRetry bool // should automatically retry
body io.ReadCloser // a reference to the http response body
listeners []chan Frame // slice of channels returned from the Subscribe method
mutex sync.Mutex
locked bool // lock to prevent multiple keepalive goroutines
}

// A Frame is a container for jpeg data from a Camera
Expand Down Expand Up @@ -97,7 +99,12 @@ func (cam *Camera) logf(t string, l ...interface{}) {
}

func (cam *Camera) keepalive() {
if cam.locked {
return
}
cam.locked = true
time.Sleep(time.Second * 10)
cam.locked = false
if cam.LastFrame != nil &&
time.Since(cam.LastFrame.Timestamp) > time.Second*10 {
cam.stop()
Expand All @@ -109,8 +116,11 @@ func (cam *Camera) keepalive() {
// read will read data from the response until eof or the response
// body is closed
func (cam *Camera) read(mr *multipart.Reader) {
// Never stop reading
defer func() {
if !cam.AutoRetry {
return
}

cam.logf("[%s] Reconnecting", cam.Name)
err := cam.start()
for err != nil {
Expand All @@ -123,7 +133,9 @@ func (cam *Camera) read(mr *multipart.Reader) {
start := time.Now()
frames := 0

go cam.keepalive()
if cam.AutoRetry {
go cam.keepalive()
}

for i := 0; true; i++ {
part, err := mr.NextPart()
Expand Down Expand Up @@ -186,11 +198,9 @@ func (cam *Camera) Subscribe() (<-chan Frame, error) {
if len(cam.listeners) == 0 {
err = cam.start()
}
go func() {
cam.mutex.Lock()
cam.listeners = append(cam.listeners, l)
cam.mutex.Unlock()
}()
cam.mutex.Lock()
cam.listeners = append(cam.listeners, l)
cam.mutex.Unlock()
return l, err
}

Expand All @@ -200,22 +210,20 @@ func (cam *Camera) Subscribe() (<-chan Frame, error) {
func (cam *Camera) Unsubscribe(unsub <-chan Frame) bool {
for i, l := range cam.listeners {
if unsub == l {
go func() {
if len(cam.listeners) == 1 {
cam.stop()
cam.mutex.Lock()
cam.listeners = make([]chan Frame, 0)
cam.mutex.Unlock()
} else {
cam.mutex.Lock()
cam.listeners = append(
cam.listeners[:i],
cam.listeners[i+1:]...,
)
cam.mutex.Unlock()
}
close(l)
}()
if len(cam.listeners) == 1 {
cam.stop()
cam.mutex.Lock()
cam.listeners = make([]chan Frame, 0)
cam.mutex.Unlock()
} else {
cam.mutex.Lock()
cam.listeners = append(
cam.listeners[:i],
cam.listeners[i+1:]...,
)
cam.mutex.Unlock()
}
close(l)
return true
}
}
Expand Down

0 comments on commit b015152

Please sign in to comment.