-
Notifications
You must be signed in to change notification settings - Fork 7
/
KafkaKeyedToCrawlFeed.java
325 lines (276 loc) · 11.9 KB
/
KafkaKeyedToCrawlFeed.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
/**
*
*/
package uk.bl.wap.crawler.postprocessor;
import static org.archive.modules.CoreAttributeConstants.A_HERITABLE_KEYS;
import java.io.UnsupportedEncodingException;
import java.net.IDN;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.httpclient.URIException;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.archive.crawler.spring.SheetOverlaysManager;
import org.archive.modules.CrawlURI;
import org.archive.modules.deciderules.DecideRule;
import org.archive.spring.KeyedProperties;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
*
* Sub-class that builds a CrawlRequest rather than the usual Crawl Log output.
*
* Based on: @see AMQPPublishProcessor and @see KafkaKeyedCrawlLogFeed
* and @KafkaUrlReceiver
*
* @author Andrew Jackson <Andrew.Jackson@bl.uk>
*
*/
public class KafkaKeyedToCrawlFeed extends KafkaKeyedCrawlLogFeed {
private boolean emitInScopeOnly = false;
public boolean isEmitInScopeOnly() {
return emitInScopeOnly;
}
public void setEmitInScopeOnly(boolean emitInScopeOnly) {
this.emitInScopeOnly = emitInScopeOnly;
}
private boolean emitOutlinks = true;
public boolean isEmitOutlinks() {
return emitOutlinks;
}
public void setEmitOutlinks(boolean emitOutlinks) {
this.emitOutlinks = emitOutlinks;
}
protected DecideRule scope;
public DecideRule getScope() {
return this.scope;
}
@Autowired
public void setScope(DecideRule scope) {
this.scope = scope;
}
protected SheetOverlaysManager sheetOverlaysManager;
public SheetOverlaysManager getSheetOverlaysManager() {
return sheetOverlaysManager;
}
@Autowired
public void setSheetOverlaysManager(
SheetOverlaysManager sheetOverlaysManager) {
this.sheetOverlaysManager = sheetOverlaysManager;
}
protected KafkaKeyedDiscardedFeed discardedUriFeed;
public KafkaKeyedDiscardedFeed getDiscardedUriFeed() {
return discardedUriFeed;
}
@Autowired
public void setDiscardedUriFeed(KafkaKeyedDiscardedFeed discardedUriFeed) {
this.discardedUriFeed = discardedUriFeed;
}
private boolean discardedUriFeedEnabled = false;
public boolean isDiscardedUriFeedEnabled() {
return discardedUriFeedEnabled;
}
public void setDiscardedUriFeedEnabled(boolean discardedUriFeedEnabled) {
this.discardedUriFeedEnabled = discardedUriFeedEnabled;
}
/**
* Constructs the json to send.
*
* @return the message to send.
* @see CrawlURI#inheritFrom(CrawlURI)
*/
protected JSONObject buildJsonMessage(CrawlURI source, CrawlURI curi) {
// If we have no source, use self:
if (source == null) {
source = curi;
}
// Set up object concerning the new URI:
JSONObject message = new JSONObject().put("url", curi.toString());
message.put("isSeed", curi.isSeed());
message.put("forceFetch", curi.forceFetch());
message.put("hop", curi.getLastHop());
message.put("method", "GET");
message.put("headers", curi.getData().get("customHttpRequestHeaders"));
/*
* if (getExtraFields() != null) { for (String k :
* getExtraFields().keySet()) { message.put(k, getExtraFields().get(k));
* } }
*/
// Also store metadata about the parent URI:
message.put("parentUrl", source.getURI());
HashMap<String, Object> metadata = new HashMap<String, Object>();
metadata.put("pathFromSeed", source.getPathFromSeed());
@SuppressWarnings("unchecked")
Set<String> heritableKeys = (Set<String>) source.getData()
.get(A_HERITABLE_KEYS);
HashMap<String, Object> heritableData = new HashMap<String, Object>();
if (heritableKeys != null) {
for (String key : heritableKeys) {
heritableData.put(key, source.getData().get(key));
}
}
metadata.put("heritableData", heritableData);
message.put("parentUrlMetadata", metadata);
return message;
}
protected byte[] buildMessage(CrawlURI source, CrawlURI curi) {
try {
return buildJsonMessage(source, curi).toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
/**
* This cache is used to avoid too much duplication of discovered URLs.
* However, note that this will interfere with re-crawling dynamics over
* short times (sub-5-minutes)
*/
private Cache<String, Boolean> recentlySentCache = CacheBuilder
.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).softValues()
.maximumSize(2000).build();
public void sendToKafka(String topic, CrawlURI curi, CrawlURI candidate) {
// Check if this URL has been sent recently:
Boolean recentlySent = recentlySentCache
.getIfPresent(candidate.getURI());
if (recentlySent == null) {
// Make a suitable key:
String key = this.getKeyForCrawlURI(candidate);
// Send
logger.finer("Sending a message wrapping URI: " + candidate
+ " to topic " + topic);
byte[] message = buildMessage(curi, candidate);
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<String, byte[]>(
topic, key, message);
kafkaProducer().send(producerRecord, stats);
recentlySentCache.put(candidate.getURI(), true);
}
}
@Override
protected boolean shouldEmit(CrawlURI candidate) {
// Drop HTTP URIs that appear to be malformed:
if (candidate.getURI().startsWith("http")) {
try {
String idn_host = IDN.toASCII(candidate.getUURI().getHost());
logger.finest("Parsed URI and host successfully: " + idn_host);
} catch (URIException e) {
logger.warning("Could not parse URI: " + candidate.getURI());
return false;
} catch (IllegalArgumentException e) {
logger.warning(
"Could not parse host from: " + candidate.getURI());
return false;
}
}
// Otherwise, the usual rules:
return super.shouldEmit(candidate);
}
@Override
protected void innerProcess(CrawlURI curi) throws InterruptedException {
if (emitOutlinks) {
// Process all outlinks:
Collection<CrawlURI> outLinks = curi.getOutLinks();
// Record the set of Strings in case the same URIs are extracted
// with
// different hop paths or contexts:
Collection<String> sentURIs = new LinkedHashSet<String>();
// Record what's been sent so then can be removed:
Collection<CrawlURI> toRemove = new LinkedHashSet<CrawlURI>();
// Iterate through the outlinks:
for (CrawlURI candidate : outLinks) {
// Load the sheet overlays so we apply recrawl frequencies etc.
// (as done in CandidatesProcessor.runCandidateChain():
candidate.setFullVia(curi);
sheetOverlaysManager.applyOverlaysTo(candidate);
try {
KeyedProperties.clearOverridesFrom(curi);
KeyedProperties.loadOverridesFrom(candidate);
// Route most via Kafka, not prerequisites
if (!candidate.isPrerequisite()) {
// Discard malformed or 'data:' or 'mailto:' URLs
if (this.shouldEmit(candidate)) {
// Avoid re-sending the same URI a lot:
if (!sentURIs.contains(candidate.getURI())) {
// Only emit URLs that are in scope, if
// configured
// to do so:
if (this.emitInScopeOnly) {
// This part needs the
// sheets/keyed-properties setup right:
if (this.getScope().accepts(candidate)) {
// Pass to Kafka queue:
sendToKafka(getTopic(), curi,
candidate);
} else {
// (optionally) log discarded URLs for
// analysis:
if (discardedUriFeedEnabled) {
discardedUriFeed
.doInnerProcess(candidate);
}
}
} else {
// Ignore scope rules and emit all
// non-prerequisites:
sendToKafka(getTopic(), curi, candidate);
}
// Record this diverted URL string so it will
// only
// be sent once:
sentURIs.add(candidate.getURI());
} else {
logger.finest(
"Not emitting CrawlURI (appears to have been sent already): "
+ candidate.getURI());
}
} else {
logger.finest(
"Not emitting CrawlURI (appears to be invalid): "
+ candidate.getURI());
}
// Remove all Crawl URIs except pre-requisites so only
// they
// will be queued directly:
toRemove.add(candidate);
} else {
logger.finest(
"Not emitting pre-requisite CrawlURI (will be enqueued locally): "
+ candidate.getURI());
}
} finally {
KeyedProperties.clearOverridesFrom(candidate);
KeyedProperties.loadOverridesFrom(curi);
}
}
// And remove re-routed candidates from the candidates list:
for (CrawlURI candidate : toRemove) {
outLinks.remove(candidate);
}
} else {
// Treat the CrawlURI instead:
CrawlURI candidate = curi;
int statusAfterCandidateChain = candidate.getFetchStatus();
// If the URL appears to be in-scope:
if (statusAfterCandidateChain >= 0 ) {
// Discard malformed or 'data:' or 'mailto:' URLs
if (this.shouldEmit(candidate)) {
// Pass to Kafka queue:
sendToKafka(getTopic(), curi.getFullVia(), candidate);
} else {
logger.finest(
"Not emitting CrawlURI (appears to be invalid): "
+ candidate.getURI());
}
} else {
// (optionally) log discarded URLs for
// analysis:
if (discardedUriFeedEnabled) {
discardedUriFeed.doInnerProcess(candidate);
}
}
}
}
}