Skip to content

Commit

Permalink
function to uniquely identify feed entries is now configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
pschirmacher committed Dec 3, 2014
1 parent fd2f302 commit b0365aa
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 25 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject feedworker "0.1.0-SNAPSHOT"
(defproject feedworker "0.1.0"
:description "muncher of feeds"
:url "https://github.com/innoq/feedworker"
:license {:name "Apache 2.0"
Expand Down
56 changes: 32 additions & 24 deletions src/feedworker/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -91,40 +91,37 @@
:entry entry
:processed-on (java.util.Date.)})

(defn unique-id
"creates a unique id for the entry"
[entry]
(pandect/sha1 (pr-str entry))) ;; TODO find more efficient way

(defn process-entry [entry handler worker-id conf processing-strategy]
(let [result (try
(let [entry-identifier (-> conf :workers worker-id ::entry-identifier)
result (try
(handler entry worker-id conf)
(catch Exception e
(log "failed processing" (pr-str entry) e)
e))]
(after-processing processing-strategy (unique-id entry))
(after-processing processing-strategy (entry-identifier entry))
result))

(defn find-unprocessed-entries [feed-pages processing-strategy]
(defn find-unprocessed-entries [feed-pages processing-strategy entry-identifier]
(->> feed-pages
(mapcat :entries)
(take-while #(not (already-processed? processing-strategy (unique-id %))))))
(take-while #(not (already-processed? processing-strategy (entry-identifier %))))))

(defn process-feed [processing-strategy tracer feed-pages handler worker-id conf]
(when-let [entries (seq (find-unprocessed-entries feed-pages processing-strategy))] ;; find entries which so far have not been processed
(log (str "found " (count entries) " entry/entries for processing in feed " (-> conf :workers worker-id :url)))
(loop [[entry & remaining] (reverse entries)] ;; process entries starting from the oldest one
(when (start-processing? processing-strategy (unique-id entry)) ;; check again to make sure no one else processed the entry
;; (and possibly persist the fact that the entry has been processed)
(let [timer (.timer (::metrics-registry conf) (str (name worker-id) ".entry.processing.timer"))
result (time! timer
(process-entry entry handler worker-id conf processing-strategy))]
(if (= :break result)
(mark-for-retry processing-strategy (unique-id entry)) ;; break out of loop and process entry again on next run
(do
(trace tracer (unique-id entry) (trace-msg result entry))
(when (seq remaining)
(recur remaining)))))))))
(let [entry-identifier (-> conf :workers worker-id ::entry-identifier)]
(when-let [entries (seq (find-unprocessed-entries feed-pages processing-strategy entry-identifier))] ;; find entries which so far have not been processed
(log (str "found " (count entries) " entry/entries for processing in feed " (-> conf :workers worker-id :url)))
(loop [[entry & remaining] (reverse entries)] ;; process entries starting from the oldest one
(when (start-processing? processing-strategy (entry-identifier entry)) ;; check again to make sure no one else processed the entry
;; (and possibly persist the fact that the entry has been processed)
(let [timer (.timer (::metrics-registry conf) (str (name worker-id) ".entry.processing.timer"))
result (time! timer
(process-entry entry handler worker-id conf processing-strategy))]
(if (= :break result)
(mark-for-retry processing-strategy (entry-identifier entry)) ;; break out of loop and process entry again on next run
(do
(trace tracer (entry-identifier entry) (trace-msg result entry))
(when (seq remaining)
(recur remaining))))))))))

(defn metric-name [worker suffix]
(-> (::id worker)
Expand Down Expand Up @@ -165,6 +162,15 @@
(.mkdirs dir)
(assoc worker ::dir dir)))))

(defn unique-id [entry]
(pandect/sha1 (pr-str entry)))

(defn create-entry-identifiers [conf]
(map-workers conf
(fn [worker]
(let [identifier (get worker :entry-identifier unique-id)]
(assoc worker ::entry-identifier identifier)))))

(defn create-processing-strategies [conf]
(map-workers conf
(fn [worker]
Expand Down Expand Up @@ -312,6 +318,7 @@
create-metrics-registry
create-worker-dirs
create-processing-strategies
create-entry-identifiers
create-feed-loaders
create-tracers
create-cleanups
Expand Down Expand Up @@ -341,8 +348,9 @@
{:dilbert {:url "http://feed.dilbert.com/dilbert/most_popular?format=xml"
:handler example-handler
:processing-strategy :at-most-once
;; optional: :entry-identifier :uri
:interval 10000}}
:processed-entries-dir "processedentries"
:cleanup {:keep 10 :max 200}
:metrics {:http {:port 8080
:path "/feedworker/status"}}})
:path "/feedworker/metrics"}}})

0 comments on commit b0365aa

Please sign in to comment.