diff --git a/Dockerfile b/Dockerfile index 95e35356..09b277d6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,3 +3,5 @@ WORKDIR /gopath/src/github.com/nytlabs/streamtools ADD . /gopath/src/github.com/nytlabs/streamtools RUN make clean RUN make +RUN ["mkdir", "-p", "/gopath/bin"] +RUN ["ln", "-s", "/gopath/src/github.com/nytlabs/streamtools/build/st", "/gopath/bin/st"] diff --git a/st/library/fromSQS.go b/st/library/fromSQS.go index 92d5032d..14692c4d 100644 --- a/st/library/fromSQS.go +++ b/st/library/fromSQS.go @@ -7,7 +7,6 @@ import ( "github.com/goamz/goamz/aws" "github.com/goamz/goamz/sqs" - "strconv" "sync" "github.com/nytlabs/streamtools/st/blocks" // blocks @@ -16,52 +15,68 @@ import ( // lots of this code stolen brazenly from JP https://github.com/jprobinson func (b *FromSQS) listener() { + log.Println("in listener") b.lock.Lock() + // OK I KNOW that everything inside this lock is bad. This is a quick fix. Promise to do better in the future. + log.Println("in lock") accessKey, ok := b.auth["AccessKey"].(string) if !ok { b.Error("could not assert AccessKey to a string") + b.listening = false + b.lock.Unlock() return } accessSecret, ok := b.auth["AccessSecret"].(string) if !ok { b.Error("could not assert AccessSecret to a string") + b.listening = false + b.lock.Unlock() return } queueName, ok := b.auth["QueueName"].(string) if !ok { b.Error("could not assert queue name to a string") + b.listening = false + b.lock.Unlock() return } - maxNstring, ok := b.auth["MaxNumberOfMessages"].(string) + maxN, ok := b.auth["MaxNumberOfMessages"].(string) if !ok { b.Error("could not assert MaxNumberOfMessages to a string") + b.listening = false + b.lock.Unlock() return } + log.Println("authenticating with aws") auth := aws.Auth{AccessKey: accessKey, SecretKey: accessSecret} sqsClient := sqs.New(auth, aws.USEast) + log.Println("getting SQS queue") queue, err := sqsClient.GetQueue(queueName) if err != nil { b.Error(err) + b.listening = false + b.lock.Unlock() return } - maxN, err := strconv.Atoi(maxNstring) - if err != nil { - b.Error(err) - return - } - + log.Println("setting listening flag") b.listening = true b.lock.Unlock() + params := map[string]string{ + "WaitTimeSeconds":"1", + "MaxNumberOfMessages":maxN, + } + var resp *sqs.ReceiveMessageResponse + log.Println("starting read loop") for { select { case <-b.stop: log.Println("Exiting SQS read loop") return default: - resp, err = queue.ReceiveMessage(maxN) + resp, err = queue.ReceiveMessageWithParameters(params) if err != nil { b.Error(err) } @@ -150,6 +165,7 @@ func (b *FromSQS) Run() { } b.stopListening() + log.Println("starting new listener") go b.listener() case <-b.quit: b.stopListening()