-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.clj
68 lines (60 loc) · 2.25 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
(ns npm-registry-follow.core
(:require [clj-http.client :as client]
[clojure.data.json :as json]))
(def ^:dynamic *registry-url* "https://replicate.npmjs.com/registry")
(def stream-url (format "%s/_changes?include_docs=false&since=now&feed=continuous" *registry-url*))
(def poll-url #(format "%s/_changes?since=%s" *registry-url* %))
(def last-seq-url (format "%s/_changes?since=now" *registry-url*))
;; read changes from the npm registry
;; calls `callback` for each change, with a string of the module name
;; returns a function that can be called to stop listening for changes
;; the stop function blocks until stream been closed
(defn listen-for-changes [callback]
(let [res (clojure.java.io/input-stream (java.net.URL. stream-url))]
(future
(with-open [rdr (clojure.java.io/reader res)]
(doseq [line (line-seq rdr)]
(if (= line "")
(do
(.close res)
(listen-for-changes callback))
(callback (:id (json/read-json line true)))))))
(fn [] (future (.close res)))))
(defn get-changes-since [last-seq]
(try
(json/read-json (slurp (poll-url last-seq)))
(catch Exception e
{:error? true
:exception e})))
(defn get-last-seq []
(:last_seq (json/read-json (slurp last-seq-url))))
(comment
(get-changes-since 1275194)
(get-last-seq))
;; also reads changes from the npm registry
;; calls `callback` for each change, with a string of the module name
;; returns a function that can called to stop listening for changes
;; defaults to polling every 10 seconds
(defn poll-for-changes
([callback]
(poll-for-changes (* 10 1000) callback))
([polling-interval callback]
(let [poll? (atom true)]
(future
(let [last-seq (atom (get-last-seq)) ]
(while @poll?
(let [res (get-changes-since @last-seq)]
(if (:error? res)
(callback res)
(do
(doseq [item (:results res)]
(callback (:id item)))
(reset! last-seq (:last_seq res))
))
(Thread/sleep polling-interval)))))
(fn [] (reset! poll? false)))))
(comment
(def stop (listen-for-changes #(println %)))
(stop)
(def stop2 (poll-for-changes #(println %)))
(stop2))