Skip to content
This repository has been archived by the owner on Mar 31, 2019. It is now read-only.

Commit

Permalink
cleaning up watch function a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
zabawaba99 committed Nov 11, 2016
1 parent d64f22e commit 3a41b87
Showing 1 changed file with 82 additions and 66 deletions.
148 changes: 82 additions & 66 deletions watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package firego

import (
"bufio"
"bytes"
"encoding/json"
"errors"
"log"
"net/http"
"strings"
)

const (
Expand Down Expand Up @@ -36,7 +37,7 @@ type Event struct {
// Data that changed
Data interface{}

rawData string
rawData []byte
}

// Value converts the raw payload of the event into the given interface.
Expand All @@ -45,26 +46,22 @@ func (e Event) Value(v interface{}) error {
Data interface{} `json:"data"`
}
tmp.Data = &v
return json.Unmarshal([]byte(e.rawData), &tmp)
return json.Unmarshal(e.rawData, &tmp)
}

// StopWatching stops tears down all connections that are watching.
func (fb *Firebase) StopWatching() {
if fb.isWatching() {
fb.watchMtx.Lock()
defer fb.watchMtx.Unlock()

if fb.watching {
// flip the bit back to not watching
fb.watching = false
// signal connection to terminal
fb.stopWatching <- struct{}{}
// flip the bit back to not watching
fb.setWatching(false)
}
}

func (fb *Firebase) isWatching() bool {
fb.watchMtx.Lock()
v := fb.watching
fb.watchMtx.Unlock()
return v
}

func (fb *Firebase) setWatching(v bool) {
fb.watchMtx.Lock()
fb.watching = v
Expand All @@ -78,12 +75,14 @@ func (fb *Firebase) setWatching(v bool) {
// second call to this function without a call to fb.StopWatching
// will close the channel given and return nil immediately.
func (fb *Firebase) Watch(notifications chan Event) error {
if fb.isWatching() {
fb.watchMtx.Lock()
if fb.watching {
fb.watchMtx.Unlock()
close(notifications)
return nil
}
// set watching flag
fb.setWatching(true)
fb.watching = true
fb.watchMtx.Unlock()

stop := make(chan struct{})
events, err := fb.watch(stop)
Expand All @@ -93,30 +92,56 @@ func (fb *Firebase) Watch(notifications chan Event) error {

var closedManually bool

// monitor the stopWatching channel
// if we're told to stop, close the response Body
go func() {
<-fb.stopWatching

closedManually = true
close(stop)
stop <- struct{}{}
}()

go func() {
defer func() {
close(notifications)
close(stop)
}()

for event := range events {
if event.Type == EventTypeError && closedManually {
break
if closedManually {
return
}

notifications <- event
}

close(notifications)
}()

return nil
}

func readLine(rdr *bufio.Reader, prefix string) ([]byte, error) {
// read event: line
line, err := rdr.ReadBytes('\n')
if err != nil {
return nil, err
}

// empty line check for empty prefix
if len(prefix) == 0 {
line = bytes.TrimSpace(line)
if len(line) != 0 {
return nil, errors.New("expected empty line")
}
return line, nil
}

// check line has event prefix
if !bytes.HasPrefix(line, []byte(prefix)) {
return nil, errors.New("missing prefix")
}

// trim space
line = line[len(prefix):]
return bytes.TrimSpace(line), nil
}

func (fb *Firebase) watch(stop chan struct{}) (chan Event, error) {
// build SSE request
req, err := http.NewRequest("GET", fb.String(), nil)
Expand All @@ -137,59 +162,61 @@ func (fb *Firebase) watch(stop chan struct{}) (chan Event, error) {

go func() {
<-stop
defer resp.Body.Close()
resp.Body.Close()
}()

// start parsing response body
go func() {
defer func() {
resp.Body.Close()
close(notifications)
}()

// build scanner for response body
scanner := bufio.NewReader(resp.Body)
var scanErr error

scanning:
for scanErr == nil {
// split event string
// event: put
// data: {"path":"/","data":{"foo":"bar"}}

var evt string
var dat string

sendError := func(err error) {
notifications <- Event{
Type: EventTypeError,
Data: err,
}
}
for {
// scan for 'event:'
evt, scanErr = scanner.ReadString('\n')
if scanErr != nil {
break scanning
evt, err := readLine(scanner, "event: ")
if err != nil {
sendError(err)
return
}
evt = strings.TrimSuffix(evt, "\n")

// scan for 'data:'
dat, scanErr = scanner.ReadString('\n')
if scanErr != nil {
break scanning
dat, err := readLine(scanner, "data: ")
if err != nil {
sendError(err)
return
}
dat = strings.TrimSuffix(dat, "\n")

// strip off last '\n'
_, scanErr = scanner.ReadString('\n')
if scanErr != nil {
break scanning
// read the empty line
_, err = readLine(scanner, "")
if err != nil {
sendError(err)
return
}

// create a base event
event := Event{
Type: strings.Replace(evt, "event: ", "", 1),
rawData: strings.Replace(dat, "data: ", "", 1),
Type: string(evt),
Data: string(dat),
rawData: dat,
}

// should be reacting differently based off the type of event
switch event.Type {
case EventTypePut, EventTypePatch:
// we've got extra data we've got to parse
var data map[string]interface{}
if err := json.Unmarshal([]byte(strings.Replace(dat, "data: ", "", 1)), &data); err != nil {
scanErr = err
break scanning
if err := json.Unmarshal(event.rawData, &data); err != nil {
sendError(err)
return
}

// set the extra fields
Expand All @@ -207,27 +234,16 @@ func (fb *Firebase) watch(stop chan struct{}) (chan Event, error) {

// send the cancel event
notifications <- event
break scanning
return
case EventTypeAuthRevoked:
// The data for this event is a string indicating that a the credential has expired
// This event will be sent when the supplied auth parameter is no longer valid
event.Data = strings.Replace(dat, "data: ", "", 1)
notifications <- event
break scanning
return
case eventTypeRulesDebug:
log.Printf("Rules-Debug: %s\n%s\n", evt, dat)
}
}

if scanErr != nil {
notifications <- Event{
Type: EventTypeError,
Data: scanErr,
}
}

// cleanup routines
close(notifications)
}()
return notifications, nil
}

0 comments on commit 3a41b87

Please sign in to comment.