Skip to content

Commit

Permalink
updates to fromSQS with increased logging for the console and some no…
Browse files Browse the repository at this point in the history
…t so pretty unlocking
  • Loading branch information
mikedewar committed Sep 4, 2014
1 parent f342d6f commit 47ecfbc
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Expand Up @@ -3,3 +3,5 @@ WORKDIR /gopath/src/github.com/nytlabs/streamtools
ADD . /gopath/src/github.com/nytlabs/streamtools
RUN make clean
RUN make
RUN ["mkdir", "-p", "/gopath/bin"]
RUN ["ln", "-s", "/gopath/src/github.com/nytlabs/streamtools/build/st", "/gopath/bin/st"]
34 changes: 25 additions & 9 deletions st/library/fromSQS.go
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/goamz/goamz/aws"
"github.com/goamz/goamz/sqs"

"strconv"
"sync"

"github.com/nytlabs/streamtools/st/blocks" // blocks
Expand All @@ -16,52 +15,68 @@ import (

// lots of this code stolen brazenly from JP https://github.com/jprobinson
func (b *FromSQS) listener() {
log.Println("in listener")
b.lock.Lock()
// OK I KNOW that everything inside this lock is bad. This is a quick fix. Promise to do better in the future.
log.Println("in lock")
accessKey, ok := b.auth["AccessKey"].(string)
if !ok {
b.Error("could not assert AccessKey to a string")
b.listening = false
b.lock.Unlock()
return
}
accessSecret, ok := b.auth["AccessSecret"].(string)
if !ok {
b.Error("could not assert AccessSecret to a string")
b.listening = false
b.lock.Unlock()
return
}
queueName, ok := b.auth["QueueName"].(string)
if !ok {
b.Error("could not assert queue name to a string")
b.listening = false
b.lock.Unlock()
return
}
maxNstring, ok := b.auth["MaxNumberOfMessages"].(string)
maxN, ok := b.auth["MaxNumberOfMessages"].(string)
if !ok {
b.Error("could not assert MaxNumberOfMessages to a string")
b.listening = false
b.lock.Unlock()
return
}
log.Println("authenticating with aws")
auth := aws.Auth{AccessKey: accessKey, SecretKey: accessSecret}
sqsClient := sqs.New(auth, aws.USEast)
log.Println("getting SQS queue")
queue, err := sqsClient.GetQueue(queueName)
if err != nil {
b.Error(err)
b.listening = false
b.lock.Unlock()
return
}

maxN, err := strconv.Atoi(maxNstring)
if err != nil {
b.Error(err)
return
}

log.Println("setting listening flag")
b.listening = true
b.lock.Unlock()

params := map[string]string{
"WaitTimeSeconds":"1",
"MaxNumberOfMessages":maxN,
}

var resp *sqs.ReceiveMessageResponse
log.Println("starting read loop")
for {
select {
case <-b.stop:
log.Println("Exiting SQS read loop")
return
default:
resp, err = queue.ReceiveMessage(maxN)
resp, err = queue.ReceiveMessageWithParameters(params)
if err != nil {
b.Error(err)
}
Expand Down Expand Up @@ -150,6 +165,7 @@ func (b *FromSQS) Run() {
}

b.stopListening()
log.Println("starting new listener")
go b.listener()
case <-b.quit:
b.stopListening()
Expand Down

0 comments on commit 47ecfbc

Please sign in to comment.