-
Notifications
You must be signed in to change notification settings - Fork 10
/
core.clj
571 lines (516 loc) · 23.9 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
(ns skyscraper.core
(:require
[clj-http.client :as http]
[clj-http.conn-mgr :as http-conn]
[clj-http.core :as http-core]
[clj-http.headers :as http-headers]
[clojure.edn :as edn]
[clojure.set :refer [intersection]]
[clojure.string :as string]
[net.cgrand.enlive-html :as enlive]
[reaver]
[skyscraper.cache :as cache]
[skyscraper.context :as context]
[skyscraper.db :as sqlite]
[skyscraper.traverse :as traverse]
[taoensso.timbre :refer [debugf infof warnf errorf]])
(:import [java.net URL]
[java.nio.charset Charset]))
;;; Directories
(def output-dir
"All Skyscraper output, either temporary or final, goes under here."
(str (System/getProperty "user.home") "/skyscraper-data/"))
(def html-cache-dir
"Local copies of downloaded HTML files go here."
(str output-dir "cache/html/"))
(def processed-cache-dir
"Cache storing the interim results of processing HTML files."
(str output-dir "cache/processed/"))
;;; Micro-templating framework
(defn- format-template
"Fills in a template string with moving parts from m. template should be
a string containing 'variable names' starting with colons; these names
are extracted, converted to keywords and looked up in m, which should be
a map (or a function taking keywords and returning strings).
Example:
```clojure
(format-template \":group/:user/index\" {:user \"joe\", :group \"admins\"})
;=> \"admins/joe/index\"
```"
[template m]
(let [re #":[a-z-]+"
keys (map #(keyword (subs % 1)) (re-seq re template))
fmt (string/replace template re "%s")]
(apply format fmt (map m keys))))
;;; Cache
(defn- sanitize-cache
"Converts a cache argument to the processor to a CacheBackend if it
isn't one already."
[value cache-dir]
(cond
(string? value) (cache/fs value)
(= value true) (cache/fs cache-dir)
(not value) (cache/null)
:otherwise value))
;;; Defining processors
(defonce
^{:doc "The global registry of processors: an atom containing a map from
keywords naming processors to the processor definitions."}
processors (atom {}))
(defmacro with-processor-definitions
"Runs body with processors defined as defs, restoring previous definitions
afterwards.
Note: don't use this unless you're skyscraper.dev."
[defs & body]
`(let [previous# @processors]
(try
(reset! processors ~defs)
~@body
(finally (reset! processors previous#)))))
(defn- default-process-fn
"The default function that becomes a processor's :process-fn
if you don't specify one."
[resource context]
[{::unimplemented true, ::resource resource, ::context context}])
(defn defprocessor
"Registers a processor named `name` with arguments `args`.
`name` should be a keyword. `args`, optional keys and values, may include:
- `:process-fn` – a function that takes a resource and a parent context, and returns a
sequence of child contexts (corresponding to the scraped resource). Alternatively,
it can return one context only, in which case it will be wrapped in a sequence.
- `:cache-template` – a string specifying the template for cache keys. Ignored when
`:cache-key-fn` is specified.
- `:cache-key-fn` – a function taking the context and returning the cache key. Overrides
`:cache-template`. Useful when mere templating does not suffice.
- `:url-fn` – a one-argument function taking the context and returning the URL to visit.
By default, Skyscraper just extracts the value under the `:url` key from the context.
- `:updatable` – a boolean (false by default). When true, the pages accessed by this
processor are considered to change often. When Skyscraper is run in update mode (see
below), these pages will be re-downloaded and re-processed even if they had been present
in the HTML or processed caches, respectively.
- `:parse-fn` – a custom function that will be used to produce Enlive resources from
downloaded documents. This can be useful, for instance, if you want to use reaver rather
than Enlive; if you are scraping something other than HTMLs (e.g., PDFs via a custom
parser); or when you’re scraping malformed HTML and need an interim fixup steps before
parsing.
- `:skyscraper.db/columns` – a vector of keys that are supposed to exist in the resulting
contexts; the corresponding values will be emitted as a database row when `:db` or
`:db-file` is supplied as a scrape argument.
- `:skyscraper.db/key-columns` – a vector of keys that, when
supplied, will be used to upsert records to database and treated as
a unique key to match existing database records against."
[name & {:as args}]
(swap! processors assoc name (merge {:name name, :process-fn default-process-fn} args)))
(defn- get-option
"Some options can be specified either in the processor definition or
during scraping; in this case, the per-processor one takes precedence."
([context options k] (get-option context options k nil))
([context options k default]
(or (get-in context [::current-processor k])
(get options k)
default)))
(defn- ensure-distinct-seq
"If x is a sequence, removes duplicates from it, else returns a vector
containing x only."
[x]
(if (map? x) [x] (doall (distinct x))))
(defn run-processor
"Runs a processor named by processor-name on document."
([processor-name document] (run-processor processor-name document {}))
([processor-name document context]
(let [processor (or (@processors processor-name)
{:name processor-name, :process-fn default-process-fn})]
(ensure-distinct-seq ((:process-fn processor) document context)))))
(defn allows?
"True if all keys in m1 that are also in m2 have equal values in both maps."
[m1 m2]
(let [ks (intersection (set (keys m1)) (set (keys m2)))]
(if (seq ks)
(let [f (apply juxt ks)]
(= (f m1) (f m2)))
true)))
(defn- filter-contexts
"If `:only` was supplied in `options`, returns `contexts` filtered by it
as specified in the docstring of `scrape`, else returns all contexts."
[{:keys [only] :as options} contexts]
(if only
(let [filter-fn (if (fn? only)
only
(fn [x] (some #(allows? % x) (ensure-distinct-seq only))))]
(filter filter-fn contexts))
contexts))
(defn merge-urls
"Fills the missing parts of new-url (which can be either absolute,
root-relative, or relative) with corresponding parts from url
(an absolute URL) to produce a new absolute URL."
[url new-url]
(if (string/starts-with? new-url "?")
(str (string/replace url #"\?.*" "") new-url)
(str (URL. (URL. url) new-url))))
(defn- merge-contexts
"Given two contexts, `old` as passed to a processor as input, and
`new` as returned by the processor, returns a merged context that
will be fed to child processors."
[old new]
(let [preserve (context/dissoc-internal old)
new-url (if-let [u (:url new)]
(merge-urls (:url old) u))
new (if new-url
(assoc new :url new-url)
new)]
(merge preserve new)))
(defn- string-resource
"Returns an Enlive resource for a HTML snippet passed as a string."
[s]
(enlive/html-resource (java.io.StringReader. s)))
(defn parse-string
"Parses `body`, a byte-array, as a string encoded with
content-type provided in `headers`. If `try-html?` is true,
tries to look for encoding in the <meta http-equiv> tag
in `body`."
([headers ^bytes body _context] (parse-string headers body _context false))
([headers ^bytes body _context try-html?]
(let [stream1 (java.io.ByteArrayInputStream. body)
body-map (http/parse-html stream1)
additional-headers (if try-html?
(http/get-headers-from-body body-map)
{})
all-headers (merge headers additional-headers)
content-type (get all-headers "content-type")
content-type (cond-> content-type
(vector? content-type) first)]
(String. body (Charset/forName (http/detect-charset content-type))))))
(defn parse-enlive
"Parses a byte array as a Enlive resource."
[headers body _context]
(string-resource (parse-string headers body _context true)))
(defn parse-reaver
"Parses a byte array as a JSoup/Reaver document."
[headers body _context]
(reaver/parse (parse-string headers body _context true)))
;;; Scraping
(defn- extract-namespaced-keys
"Filters `m`, returning a map with only the keys whose namespace is `ns`."
[ns m]
(into {}
(comp (filter #(= (namespace (key %)) ns))
(map (fn [[k v]] [(keyword (name k)) v])))
m))
;; The scraping engine is implemented on top of skyscraper.traverse,
;; but each step (download, parse, run processor, store in cache) is
;; decomposed into several stages collectively known as a "pipeline".
;; Steps in the pipeline normally run from left to right, mostly
;; sequentially (except for `download-handler` which is async), and
;; after the last step, we return to the first one. The current stage
;; is stored as `::stage` in the context. A handler can override
;; the next one by setting `::next-stage`.
(defn- make-pipeline
"Returns a list of symbols naming functions that implement the pipeline steps."
[{:keys [download-mode] :as options}]
`[init-handler
check-cache-handler
~(case download-mode
:async `download-handler
:sync `sync-download-handler)
store-cache-handler
process-handler
split-handler])
(defn- advance-pipeline
"Advances `context` to the next stage in `pipeline`."
[pipeline context]
(let [next-stage (or (::next-stage context)
(->> pipeline
(drop-while #(not= % (::stage context)))
second)
(when (and (:processor context) (:url context))
(first pipeline)))]
(when (and (:processor context) (not (:url context)))
(warnf "Encountered context with processor but no URL: %s" (context/describe context)))
(if next-stage
(-> context
(dissoc ::next-stage)
(assoc ::stage next-stage
::traverse/handler (if (= next-stage `download-handler)
`download-handler
`sync-handler)
::traverse/call-protocol (if (= next-stage `download-handler)
:callback
:sync)))
(context/dissoc-leaf-keys context))))
(defn- init-handler
"Sets up context with `::current-processor` and `::cache-key`."
[context options]
(let [{:keys [cache-template cache-key-fn]} (merge options (@processors (:processor context)))
cache-key-fn (or cache-key-fn
#(when cache-template
(format-template cache-template %)))]
[(assoc context
::current-processor (@processors (:processor context))
::cache-key (cache-key-fn context))]))
(defn- updatable?
"Should we redownload the given context even if we have it cached?"
[context]
(let [updatable (get-in context [::current-processor :updatable])]
(if (fn? updatable)
(updatable context)
updatable)))
;; TODO: deduplicate code around here
(defn- maybe-retrieve-from-http-cache
"When a context's cache-key exists in the cache, fetches the associated
data."
[context options]
(when (or (not (:update options))
(not (updatable? context)))
(if-let [key (::cache-key context)]
(if-let [item (cache/load-blob (:html-cache options) key)]
{:body (:blob item), :headers (:meta item)}))))
(defn- maybe-retrieve-from-processed-cache
"Likewise, for processed cache."
[context options]
(when (or (not (:update options))
(not (updatable? context)))
(if-let [key (::cache-key context)]
(if-let [item (cache/load-blob (:processed-cache options) key)]
(edn/read-string (String. (:blob item) "UTF-8"))))))
(defn- check-cache-handler
"If context is cached, loads the cached data and skips [[download-handler]],
otherwise returns it as-is."
[context options]
(let [processed-result (maybe-retrieve-from-processed-cache context options)
cached-response (maybe-retrieve-from-http-cache context options)]
(cond
(and (:uncached-only options) (or processed-result cached-response))
#_=> []
processed-result
#_=> [(assoc context
::new-items (map (partial merge-contexts context) processed-result)
::next-stage `split-handler)]
cached-response
#_=> [(assoc context
::response cached-response
::next-stage `process-handler)]
:otherwise
#_=> [context])))
(defn- wait
"If ms-or-fn is a number, Thread/sleep that many milliseconds, otherwise
assume that it's a zero-argument function, call it and sleep for the resulting
number."
[ms-or-fn]
(when ms-or-fn
(let [ms (if (number? ms-or-fn)
ms-or-fn
(ms-or-fn))]
(Thread/sleep ms))))
(defn signal-error
"Call this function from `download-error-handler` to cause scraping to signal an error."
[error context]
[{:skyscraper.traverse/error error,
:skyscraper.traverse/context context}])
(defn respond-with
"Call this function from `download-error-handler` to continue scraping as if download had succeeded."
[response {:keys [pipeline] :as options} context]
[(cond-> (advance-pipeline pipeline context)
true (assoc ::response response)
(:cookies response) (update :http/cookies merge (:cookies response)))])
(defn default-download-error-handler
"By default, when clj-http returns an error (e.g., when the server returns 4xx or 5xx),
Skyscraper will call this function to determine what to do next.
This handler causes Skyscraper to retry up to `retries` times for 5xx status codes,
and to throw an exception otherwise."
[error options context]
(let [{:keys [status]} (ex-data error)
retry? (or (and status (>= status 500))
(re-find #"timed out" (str (.getMessage error))))
retry (inc (or (::retry context) 0))]
(if (and retry? (<= retry (:retries options)))
(do
(warnf "[download] Unexpected error %s, retry %s, context %s" error retry (context/describe context))
[(assoc context ::retry retry)])
(do
(warnf "[download] Unexpected error %s, giving up, context %s" error (context/describe context))
(signal-error error context)))))
(defn- download-handler
"Asynchronously downloads the page specified by context."
[context {:keys [connection-manager download-semaphore sleep] :as options} callback]
(debugf "Running download-handler: %s" (:processor context))
(let [req (merge {:method :get, :url (:url context)}
(extract-namespaced-keys "http" context))
success-fn (fn [response]
(debugf "[download] Downloaded %s" (:url context))
(.release download-semaphore)
(callback (respond-with response options context)))
error-fn (fn [error]
(.release download-semaphore)
(let [error-handler (:download-error-handler options)]
(callback (error-handler error options context))))]
(debugf "[download] Waiting")
(.acquire download-semaphore)
(infof "[download] Downloading %s" (:url context))
(let [req (merge {:async? true,
:connection-manager connection-manager}
req (get-option context options :http-options))
request-fn (or (:request-fn options)
http/request)]
(wait sleep)
(request-fn req
success-fn
error-fn))))
(defn- sync-download-handler
"Synchronous version of download-handler."
[context {:keys [connection-manager sleep pipeline] :as options}]
(let [req (merge {:method :get, :url (:url context), :connection-manager connection-manager}
(extract-namespaced-keys "http" context)
(get-option context options :http-options))
request-fn (or (:request-fn options)
http/request)]
(try
(infof "[download] Downloading %s" (:url context))
(wait sleep)
(let [resp (request-fn req)]
(debugf "[download] Downloaded %s" (:url context))
[(cond-> (advance-pipeline pipeline context)
true (assoc ::response resp)
(:cookies resp) (update :http/cookies merge (:cookies resp)))])
(catch Exception error
(let [error-handler (:download-error-handler options)]
(error-handler error options context))))))
(defn- store-cache-handler
"Assuming context has downloaded data, stores it in HTML cache if
applicable and returns it unmodified."
[context options]
(when-let [key (::cache-key context)]
(cache/save-blob (:html-cache options) key (get-in context [::response :body]) (get-in context [::response :headers])))
[context])
(defn- process-handler
"Runs the processor specified by context on itself. Returns a single context
with the processor results as `::new-items`."
[context options]
(let [parse (get-option context options :parse-fn)
{:keys [headers body]} (::response context)
document (parse (into (http-headers/header-map) headers) body context)
processor-name (:processor context)
result (run-processor processor-name document context)]
(when-let [key (::cache-key context)]
(cache/save-blob (:processed-cache options) key (.getBytes (pr-str result) "UTF-8") nil))
[(assoc context ::new-items (map (partial merge-contexts context) result))]))
(defn- split-handler
"Extracts `::new-items` out of the supplied contexts and prunes the scraping
tree if necessary."
[context options]
(->> (::new-items context)
(map #(assoc % ::stage `split-handler))
(filter-contexts options)))
(defn- sync-handler
"A handler that runs the squashed pipeline."
[{:keys [::stage processor] :as context} options]
(debugf "Running sync-handler: %s %s" stage processor)
(let [f (ns-resolve *ns* stage)
results (f context options)
maybe-advance-pipeline (if (= stage `sync-download-handler)
(fn [pipeline context] context)
advance-pipeline)]
(map (partial maybe-advance-pipeline (:pipeline options)) results)))
(defn initialize-seed
"Ensures the seed is a seq and sets up internal keys."
[{:keys [download-mode pipeline] :as options} seed]
(let [seed (ensure-distinct-seq seed)]
(mapv #(advance-pipeline pipeline %) seed)))
(def default-options
"Default scraping options."
{:max-connections 10,
:retries 5,
:conn-mgr-options {},
:parse-fn parse-enlive,
:download-mode :async,
:download-error-handler default-download-error-handler,
:http-options {:redirect-strategy :lax,
:as :byte-array,
:socket-timeout 30000,
:connection-timeout 30000}})
(defn initialize-options
"Initializes scraping options, ensuring that the caches are
instances of [[CacheBackend]], and a db is present if `:db-file`
was supplied."
[options]
(let [options (merge default-options options)
html-cache (sanitize-cache (:html-cache options) html-cache-dir)
processed-cache (sanitize-cache (:processed-cache options) processed-cache-dir)]
(merge options
(sqlite/initialize-db-options options)
{:pipeline (make-pipeline options)
:enhance? ::new-items
:html-cache html-cache
:processed-cache processed-cache
:on-end #(try
(.close html-cache)
(finally
(.close processed-cache)))
:connection-manager (case (:download-mode options)
:sync (http-conn/make-reusable-conn-manager (:conn-mgr-options options))
:async (http-conn/make-reuseable-async-conn-manager (:conn-mgr-options options)))
:download-semaphore (java.util.concurrent.Semaphore. (:max-connections options))})))
(defn scrape
"Runs scraping on seed (an initial context or sequence of contexts), returning
a lazy sequence of leaf contexts.
`options` may include the ones supported by [[skyscraper.traverse/launch]],
as well as:
- `:conn-mgr-options` – Skyscraper will create a clj-http connection manager
with these options (a sync or async one, depending on `:download-mode`)
and use it across all HTTP requests it makes.
See [[clj-http.conn-mgr/make-reusable-conn-manager]] and
[[clj-http.conn-mgr/make-reusable-async-conn-manager]] for details on the
options you can specify here.
- `:db` – a clojure.java.jdbc compatible db-spec that, when passed, will
cause scraping to generate a SQL database of results. See
`doc/db.md` for a walkthrough. Only supports SQLite.
- `:db-file` – an alternative to `:db`, a filename or path that will
be used to construct a SQLite db-spec.
- `:ignore-db-keys` – if true, Skyscraper will insert (instead of upserting)
rows into the DB specified by `:db` or `:db-file`, as if none of the
processors specified `:skyscraper.db/key-columns`. Defaults to true
if the DB didn't exist.
- `:download-error-handler` – a function called when clj-http returns an
error when downloading; see `doc/error-handling.md` for details.
- `:download-mode` – can be `:async` (default) or `:sync`. When async,
Skyscraper will use clj-http's asynchronous mode to make HTTP requests.
- `:html-cache` – the HTTP cache to use. Can be an instance of `CacheBackend`,
a string (meaning a directory to use for a filesystem cache), `nil` or `false`
(meaning no cache), or `true` (meaning a filesystem cache in the default
location, [[html-cache-dir]]). Defaults to `nil`.
- `:http-options` – a map of additional options that will be passed to
[[clj-http.core/request]].
- `:max-connections` – maximum number of HTTP requests that can be active
at any time.
- `:only` – prunes the scrape tree to only include matching contexts; this can be
a map (specifying to only include records whose values, if present, coincide with
the map) or a predicate (meaning to filter contexts on it).
- `:parse-fn` – a function that takes 3 arguments: a map of HTTP headers,
a byte array containing the downloaded document, and the context,
and returns a parsed representation of that document. Skyscraper provides
[[parse-string]], [[parse-enlive]], and [[parse-reaver]] out of the box.
Defaults to [[parse-enlive]].
- `:processed-cache` – the processed cache to use. Same possible values as
for `:http-cache`. Defaults to `nil`.
- `:request-fn` – the HTTP request function to use. Defaults to [[clj-http.core/request]].
Skyscraper relies on the API of clj-http, so only override this if you
know what you're doing.
- `:retries` – maximum number of times that Skyscraper will retry downloading
a page until it gives up. Defaults to 5.
- `:sleep` – sleep this many milliseconds before each request, or a niladic fn
that returns a number of milliseconds. Useful for throttling. It's probably
best to set `:parallelism` to 1 together with this.
- `:uncached-only` – prune the scrape tree, yielding only the nodes that haven't been
scraped yet. See `doc/updates.md`.
- `:update` – run in update mode (see `doc/updates.md`)."
[seed & {:as options}]
(let [options (initialize-options options)
seed (initialize-seed options seed)]
(traverse/leaf-seq seed options)))
(defn scrape!
"Like scrape, but eager: terminates after scraping has succeeded. Returns nil.
Pass `:db`, `:db-file`, `:leaf-chan`, or `:item-chan` to access scraped data.
`options` are the same as in `scrape!`."
[seed & {:as options}]
(let [options (initialize-options options)
seed (initialize-seed options seed)]
(traverse/traverse! seed options)))