-
Notifications
You must be signed in to change notification settings - Fork 1
/
fetcher.clj
69 lines (60 loc) · 3.07 KB
/
fetcher.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
(ns common-crawl-utils.fetcher
(:require [clojure.core.async :refer [<!! >! chan close! go thread]]
[clojure.tools.logging :as log]
[common-crawl-utils.constants :as constants]
[common-crawl-utils.coordinates :as coordinates]
[common-crawl-utils.utils :as utils]
[org.httpkit.client :as http])
(:import (java.io ByteArrayInputStream)
(java.util Scanner)
(java.util.zip GZIPInputStream)))
(defn- get-range-header [{:keys [offset length]}]
(let [offset (Integer/parseInt offset)
length (Integer/parseInt length)]
(format "bytes=%s-%s" offset (dec (+ offset length)))))
(defn- read-content [body]
(with-open [rdr (-> body (ByteArrayInputStream.) (GZIPInputStream.) (Scanner.))]
{:warc (.next (.useDelimiter rdr "\r\n\r\n"))
:header (.next (.useDelimiter rdr "\r\n\r\n"))
:html (.next (.useDelimiter rdr "\\A"))}))
(defn fetch-single-coordinate-content [coordinate]
@(http/request {:url (str constants/cc-s3-base-url (get coordinate :filename))
:method :get
:headers {"range" (get-range-header coordinate)}
:as :byte-array
:timeout constants/http-timeout}
(fn [{:keys [body error status] :as response}]
(if (or (some? error) (not= status 206))
(assoc coordinate :error (utils/get-http-error response))
(-> coordinate (dissoc :error) (assoc :content (read-content body)))))))
(defn fetch-content
"Fetches coordinates from Common Crawl Index Server along with their content from AWS
Takes `query` map, described in https://github.com/webrecorder/pywb/wiki/CDX-Server-API#api-reference
Additionally, `:cdx-api` query key can specify index server endpoint.
If `:cdx-api` is not provided, endpoint from most recent crawl is used and
can be accesed with `(common-crawl-utils.config/get-most-recent-cdx-api)`
;; To fetch all content for host from most recent crawl
(fetch-content {:url \"http://www.cnn.com\" :matchType \"host\"})
;; To fetch limited number of coordinates with content
(take 10 (fetch-content {:url \"http://www.cnn.com\" :matchType \"host\"}))"
[query]
(map (fn [{error :error :as coordinate}]
(cond->> coordinate (nil? error) (fetch-single-coordinate-content)))
(coordinates/fetch query)))
(defn fetch-content-async
[{:keys [coordinate-chan content-chan close?]
:as query
:or {coordinate-chan (chan)
content-chan (chan)
close? true}}]
(let [coordinate-chan (coordinates/fetch-async (assoc query :coordinate-chan coordinate-chan :close? close?))]
(thread
(loop []
(when-let [{error :error :as coordinate} (<!! coordinate-chan)]
(go
(>! content-chan (cond->> coordinate (nil? error) (fetch-single-coordinate-content))))
(recur)))
(when close?
(close! content-chan)
(log/debugf "Closed content channel for query `%s`" (select-keys query coordinates/cdx-params))))
content-chan))