This repository has been archived by the owner on Apr 29, 2023. It is now read-only.
/
sqs.clj
143 lines (115 loc) · 4.3 KB
/
sqs.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
(ns oc.lib.sqs
"
A component to consume messages from an SQS queue with a long poll and pass them off to a handler, deleting them if
they are processed successfully (no exception) by the handler.
https://github.com/stuartsierra/component
"
(:require [com.stuartsierra.component :as component]
[com.climate.squeedo.sqs-consumer :as sqs]
[clojure.core.async :as async]
[clojure.java.io :as io]
[amazonica.aws.s3 :as s3]
[cheshire.core :as json]
[taoensso.timbre :as timbre])
(:import [java.util.zip GZIPInputStream]))
(defn ack
"Acknowledge the completion of message handling."
[done-channel message]
(async/put! done-channel message))
(defn log-handler
"Message handler wrapper that logs unhandled errors."
[handler message done-channel]
(try
(handler message done-channel)
(catch Exception e
(timbre/error e)
(throw e))))
(defrecord SQSListener [sqs-creds sqs-queue message-handler]
:load-ns true ; needed for Eastwood linting
;; Implement the Lifecycle protocol
component/Lifecycle
(start [component]
(timbre/info "Starting SQSListener")
(assoc component :retriever (sqs/start-consumer sqs-queue message-handler)))
(stop [component]
(timbre/info "Stopping SQSListener")
(when-let [consumer (:retriever component)]
(sqs/stop-consumer consumer))
(dissoc component :retriever)))
(defn sqs-listener
([{:keys [sqs-creds sqs-queue message-handler]}]
{:pre [(map? sqs-creds)
(string? sqs-queue)
(fn? message-handler)]}
(sqs-listener sqs-creds sqs-queue message-handler))
([sqs-creds sqs-queue message-handler]
(map->SQSListener {:sqs-creds sqs-creds :sqs-queue sqs-queue :message-handler (partial log-handler message-handler)})))
(defn- read-from-s3
[record]
(let [bucket (get-in record [:s3 :bucket :name])
object-key (get-in record [:s3 :object :key])
s3-parsed (clojure.string/join
"\n"
(->
(s3/get-object bucket object-key)
:object-content
(java.util.zip.GZIPInputStream.)
io/reader
line-seq))]
(try
(read-string s3-parsed)
(catch Exception e
(json/parse-string s3-parsed true)))))
(defn read-message-body
"
Try to parse as json, otherwise use read-string. If message is from S3, read data object.
"
[msg]
(let [parsed-msg (try
(json/parse-string msg true)
(catch Exception e
(read-string msg)))]
(cond
(seq (:Records parsed-msg)) ;; from S3 to SQS
;; read each record
(map read-from-s3 (:Records parsed-msg))
;; from S3 to SNS
(and (string? (:Message parsed-msg))
(seq (:Records (json/parse-string (:Message parsed-msg) true))))
(map read-from-s3 (:Records (json/parse-string (:Message parsed-msg) true)))
:default
[parsed-msg])))
(defn __no-op__
"Ignore: needed for Eastwood linting."
[]
(component/system-map {}))
(comment
(require '[environ.core :refer (env)])
(require '[com.stuartsierra.component :as component])
(require '[amazonica.aws.sqs :as sqs2])
(require '[oc.lib.sqs :as sqs] :reload)
(def access-creds {:access-key (env :aws-access-key-id)
:secret-key (env :aws-secret-access-key)})
(def sqs-queue "replace-me")
(defn test-handler
"Handler for testing purposes. Users of this lib will write their own handler."
[message done-channel]
(println "Got message:\n" message)
(println "Oops!")
(/ 1 0)
(sqs/ack done-channel message))
(defn system
"System for testing purposes. Users of this lib will define their own system."
[config-options]
(let [{:keys [sqs-creds sqs-queue sqs-msg-handler]} config-options]
(component/system-map
:sqs (sqs/sqs-listener sqs-creds sqs-queue sqs-msg-handler))))
(def repl-system (system {:sqs-queue sqs-queue
:sqs-msg-handler test-handler
:sqs-creds access-creds}))
;; Test starting a consumer
(alter-var-root #'repl-system component/start)
(sqs2/send-message access-creds sqs-queue "Hello World!")
;; Stop the consumer
(alter-var-root #'repl-system component/stop)
)