forked from apache/tika
-
Notifications
You must be signed in to change notification settings - Fork 0
/
BatchProcessBuilder.java
305 lines (261 loc) · 12.3 KB
/
BatchProcessBuilder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.batch.builders;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import javax.xml.parsers.DocumentBuilder;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import org.apache.tika.batch.BatchProcess;
import org.apache.tika.batch.ConsumersManager;
import org.apache.tika.batch.FileResource;
import org.apache.tika.batch.FileResourceCrawler;
import org.apache.tika.batch.Interrupter;
import org.apache.tika.batch.StatusReporter;
import org.apache.tika.exception.TikaException;
import org.apache.tika.util.ClassLoaderUtil;
import org.apache.tika.util.XMLDOMUtil;
import org.apache.tika.utils.XMLReaderUtils;
/**
* Builds a BatchProcessor from a combination of runtime arguments and the
* config file.
*/
public class BatchProcessBuilder {
public final static int DEFAULT_MAX_QUEUE_SIZE = 1000;
public final static String MAX_QUEUE_SIZE_KEY = "maxQueueSize";
public final static String NUM_CONSUMERS_KEY = "numConsumers";
/**
* numConsumers is needed by both the crawler and the consumers. This utility method
* is to be used to extract the number of consumers from a map of String key value pairs.
* <p>
* If the value is "default", not a parseable integer or has a value < 1,
* then <code>AbstractConsumersBuilder</code>'s <code>getDefaultNumConsumers()</code>
*
* @param attrs attributes from which to select the NUM_CONSUMERS_KEY
* @return number of consumers
*/
public static int getNumConsumers(Map<String, String> attrs) {
String nString = attrs.get(BatchProcessBuilder.NUM_CONSUMERS_KEY);
if (nString == null || nString.equals("default")) {
return AbstractConsumersBuilder.getDefaultNumConsumers();
}
int n = -1;
try {
n = Integer.parseInt(nString);
} catch (NumberFormatException e) {
//swallow
}
if (n < 1) {
n = AbstractConsumersBuilder.getDefaultNumConsumers();
}
return n;
}
/**
* Builds a BatchProcess from runtime arguments and a
* input stream of a configuration file. With the exception of the QueueBuilder,
* the builders choose how to adjudicate between
* runtime arguments and the elements in the configuration file.
* <p/>
* This does not close the InputStream!
*
* @param is inputStream
* @param runtimeAttributes incoming runtime attributes
* @return batch process
* @throws java.io.IOException
*/
public BatchProcess build(InputStream is, Map<String, String> runtimeAttributes)
throws IOException {
Document doc = null;
try {
DocumentBuilder docBuilder = XMLReaderUtils.getDocumentBuilder();
doc = docBuilder.parse(is);
} catch (TikaException | SAXException e) {
throw new IOException(e);
}
Node docElement = doc.getDocumentElement();
return build(docElement, runtimeAttributes);
}
/**
* Builds a FileResourceBatchProcessor from runtime arguments and a
* document node of a configuration file. With the exception of the QueueBuilder,
* the builders choose how to adjudicate between
* runtime arguments and the elements in the configuration file.
*
* @param docElement document element of the xml config file
* @param incomingRuntimeAttributes runtime arguments
* @return FileResourceBatchProcessor
*/
public BatchProcess build(Node docElement, Map<String, String> incomingRuntimeAttributes) {
//key components
long timeoutThresholdMillis =
XMLDOMUtil.getLong("timeoutThresholdMillis", incomingRuntimeAttributes, docElement);
long timeoutCheckPulseMillis = XMLDOMUtil
.getLong("timeoutCheckPulseMillis", incomingRuntimeAttributes, docElement);
long pauseOnEarlyTerminationMillis = XMLDOMUtil
.getLong("pauseOnEarlyTerminationMillis", incomingRuntimeAttributes, docElement);
int maxAliveTimeSeconds =
XMLDOMUtil.getInt("maxAliveTimeSeconds", incomingRuntimeAttributes, docElement);
FileResourceCrawler crawler = null;
ConsumersManager consumersManager = null;
StatusReporter reporter = null;
Interrupter interrupter = null;
/*
* TODO: This is a bit smelly. NumConsumers needs to be used by the crawler
* and the consumers. This copies the incomingRuntimeAttributes and then
* supplies the numConsumers from the commandline (if it exists) or from the config file
* At least this creates an unmodifiable defensive copy of incomingRuntimeAttributes...
*/
Map<String, String> runtimeAttributes =
setNumConsumersInRuntimeAttributes(docElement, incomingRuntimeAttributes);
//build queue
ArrayBlockingQueue<FileResource> queue = buildQueue(docElement, runtimeAttributes);
NodeList children = docElement.getChildNodes();
Map<String, Node> keyNodes = new HashMap<>();
for (int i = 0; i < children.getLength(); i++) {
Node child = children.item(i);
if (child.getNodeType() != Node.ELEMENT_NODE) {
continue;
}
String nodeName = child.getNodeName();
keyNodes.put(nodeName, child);
}
//build consumers
consumersManager =
buildConsumersManager(keyNodes.get("consumers"), runtimeAttributes, queue);
//build crawler
crawler = buildCrawler(queue, keyNodes.get("crawler"), runtimeAttributes);
if (keyNodes.containsKey("reporter")) {
reporter = buildReporter(crawler, consumersManager, keyNodes.get("reporter"),
runtimeAttributes);
}
if (keyNodes.containsKey("interrupter")) {
interrupter =
buildInterrupter(keyNodes.get("interrupter"), pauseOnEarlyTerminationMillis,
runtimeAttributes);
}
BatchProcess proc = new BatchProcess(crawler, consumersManager, reporter, interrupter);
if (timeoutThresholdMillis > -1) {
proc.setTimeoutThresholdMillis(timeoutThresholdMillis);
}
if (pauseOnEarlyTerminationMillis > -1) {
proc.setPauseOnEarlyTerminationMillis(pauseOnEarlyTerminationMillis);
}
if (timeoutCheckPulseMillis > -1) {
proc.setTimeoutCheckPulseMillis(timeoutCheckPulseMillis);
}
proc.setMaxAliveTimeSeconds(maxAliveTimeSeconds);
return proc;
}
private Interrupter buildInterrupter(Node node, long pauseOnEarlyTermination,
Map<String, String> runtimeAttributes) {
Map<String, String> attrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
String className = attrs.get("builderClass");
if (className == null) {
throw new RuntimeException("Need to specify class name in interrupter element");
}
InterrupterBuilder builder =
ClassLoaderUtil.buildClass(InterrupterBuilder.class, className);
return builder.build(node, pauseOnEarlyTermination, runtimeAttributes);
}
private StatusReporter buildReporter(FileResourceCrawler crawler,
ConsumersManager consumersManager, Node node,
Map<String, String> runtimeAttributes) {
Map<String, String> attrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
String className = attrs.get("builderClass");
if (className == null) {
throw new RuntimeException("Need to specify class name in reporter element");
}
StatusReporterBuilder builder =
ClassLoaderUtil.buildClass(StatusReporterBuilder.class, className);
return builder.build(crawler, consumersManager, node, runtimeAttributes);
}
private Map<String, String> setNumConsumersInRuntimeAttributes(Node docElement,
Map<String, String> incomingRuntimeAttributes) {
Map<String, String> runtimeAttributes = new HashMap<>();
for (Map.Entry<String, String> e : incomingRuntimeAttributes.entrySet()) {
runtimeAttributes.put(e.getKey(), e.getValue());
}
//if this is set at runtime use that value
if (runtimeAttributes.containsKey(NUM_CONSUMERS_KEY)) {
return Collections.unmodifiableMap(runtimeAttributes);
}
Node ncNode = docElement.getAttributes().getNamedItem("numConsumers");
int numConsumers = -1;
String numConsumersString = ncNode.getNodeValue();
try {
numConsumers = Integer.parseInt(numConsumersString);
} catch (NumberFormatException e) {
//swallow and just use numConsumers
}
//TODO: should we have a max range check?
if (numConsumers < 1) {
numConsumers = AbstractConsumersBuilder.getDefaultNumConsumers();
}
runtimeAttributes.put(NUM_CONSUMERS_KEY, Integer.toString(numConsumers));
return Collections.unmodifiableMap(runtimeAttributes);
}
//tries to get maxQueueSize from main element
private ArrayBlockingQueue<FileResource> buildQueue(Node docElement,
Map<String, String> runtimeAttributes) {
int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
String szString = runtimeAttributes.get(MAX_QUEUE_SIZE_KEY);
if (szString == null) {
Node szNode = docElement.getAttributes().getNamedItem(MAX_QUEUE_SIZE_KEY);
if (szNode != null) {
szString = szNode.getNodeValue();
}
}
if (szString != null) {
try {
maxQueueSize = Integer.parseInt(szString);
} catch (NumberFormatException e) {
//swallow
}
}
if (maxQueueSize < 0) {
maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
}
return new ArrayBlockingQueue<>(maxQueueSize);
}
private ConsumersManager buildConsumersManager(Node node, Map<String, String> runtimeAttributes,
ArrayBlockingQueue<FileResource> queue) {
Map<String, String> attrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
String className = attrs.get("builderClass");
if (className == null) {
throw new RuntimeException("Need to specify class name in consumers element");
}
AbstractConsumersBuilder builder =
ClassLoaderUtil.buildClass(AbstractConsumersBuilder.class, className);
return builder.build(node, runtimeAttributes, queue);
}
private FileResourceCrawler buildCrawler(ArrayBlockingQueue<FileResource> queue, Node node,
Map<String, String> runtimeAttributes) {
Map<String, String> attrs = XMLDOMUtil.mapifyAttrs(node, runtimeAttributes);
String className = attrs.get("builderClass");
if (className == null) {
throw new RuntimeException("Need to specify class name in crawler element");
}
ICrawlerBuilder builder = ClassLoaderUtil.buildClass(ICrawlerBuilder.class, className);
return builder.build(node, runtimeAttributes, queue);
}
}