-
Notifications
You must be signed in to change notification settings - Fork 1
/
warctasks.py
267 lines (228 loc) · 11.1 KB
/
warctasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
import os
import io
import sys
import binascii
import logging
import luigi
import luigi.contrib.hdfs
import luigi.contrib.hadoop
import warcio, six
from warcio.recordloader import ArcWarcRecord
import requests, urllib3, chardet, certifi, idna # Needed for HTTP actions
logger = logging.getLogger('luigi-interface')
#
# This is a Python-based streaming Hadoop job for performing basic processing of warcs
# and e.g. generating stats. However all implementations (`warctools`, `warc` and `pywb`) require
# behaviour that is 'difficult' to support. For the first two, both use Python's gzip support,
# which requires seekable streams (e.g. `seek(offset,whence)` support). Python Wayback (`pywb`)
# does not appear to depend on that module, but required `ffi` support for native calls, which
# makes deployment more difficult. Therefore, we use Java map-reduce jobs for WARC parsing, but
# we can generate simple line-oriented text files from the WARCs, after which streaming works
# just fine.
#
# Further experimentation with warcio shows this seems to be working better as it has very few dependencies.
#
# Also attempted to use Hadoop's built-in auto-gunzipping support, which is built into streaming mode.
# After some difficulties, this could be made to work, but was unreliable as different nodes would behave differently
# with respect to keeping-going when gunzipping concateneted gz records.
#
class ExternalListFile(luigi.ExternalTask):
"""
This ExternalTask defines the Target at the top of the task chain. i.e. resources that are overall inputs rather
than generated by the tasks themselves.
"""
input_file = luigi.Parameter()
def complete(self):
"""
Assume the files are present, as checking for them slows things down a lot,
and if they are not there we'll soon find out.
"""
return True
def output(self):
"""
Returns the target output for this task.
In this case, it expects a file to be present in HDFS.
:return: the target output for this task.
:rtype: object (:py:class:`luigi.target.Target`)
"""
return luigi.contrib.hdfs.HdfsTarget(self.input_file)
class ExternalFilesFromList(luigi.ExternalTask):
"""
This ExternalTask defines the Target at the top of the task chain. i.e. resources that are overall inputs rather
than generated by the tasks themselves.
"""
input_file = luigi.Parameter()
from_local = luigi.BoolParameter(default=False)
def output(self):
"""
Returns the target output for this task.
In this case, it expects a file to be present in HDFS.
:return: the target output for this task.
:rtype: object (:py:class:`luigi.target.Target`)
"""
for line in open(self.input_file, 'r').readlines():
line = line.strip()
if line:
if self.from_local:
#logger.debug("Yielding local target: %s" % line)
yield luigi.LocalTarget(path=line)
else:
#logger.debug("Yielding HDFS target: %s" % line)
yield luigi.contrib.hdfs.HdfsTarget(line, format=luigi.contrib.hdfs.format.PlainFormat)
# Special reader to read the input stream and yield WARC records:
class TellingReader():
def __init__(self, stream):
self.stream = stream
self.pos = 0
def read(self, size=None):
#logger.warning("read()ing from current position: %i, size=%s" % (self.pos, size))
chunk = self.stream.read(size)
#if len(bytes(chunk)) == 0:
# logger.warning("read() 0 bytes, current position: %i" % self.pos)
#else:
# logger.warning("read() %s" % binascii.hexlify(chunk[:64]))
self.pos += len(bytes(chunk))
#logger.warning("read()ing current position now: %i" % self.pos)
return chunk
def readline(self, size=None):
#logger.warning("readline()ing from current position: %i" % self.pos)
line = self.stream.readline(size)
#logger.warning("readline() %s" % line)
self.pos += len(bytes(line))
#logger.warning("readline()ing current position now: %i" % self.pos)
return line
def tell(self):
#logger.debug("tell()ing current position: %i" % self.pos)
return self.pos
class BinaryInputHadoopJobRunner(luigi.contrib.hadoop.HadoopJobRunner):
"""
A job runner to use the UnsplittableInputFileFormat (based on DefaultHadoopJobRunner):
"""
def __init__(self):
config = luigi.configuration.get_config()
streaming_jar = config.get('hadoop', 'streaming-jar')
# Find our JAR:
dir_path = os.path.dirname(os.path.realpath(__file__))
jar_path = os.path.join(dir_path, "../../jars/warc-hadoop-recordreaders-3.0.0-SNAPSHOT-job.jar")
# Setup:
super(BinaryInputHadoopJobRunner, self).__init__(
streaming_jar=streaming_jar,
input_format="uk.bl.wa.hadoop.mapred.UnsplittableInputFileFormat",
libjars=[jar_path])
class HadoopWarcReaderJob(luigi.contrib.hadoop.JobTask):
"""
Specialisation of the usual Hadoop JobTask that is configured to parse warc files.
Should be sub-classed to make tasks that work with WARCs
As this uses the stream directly and so data-locality is preserved (at least for the first chunk).
WARNING: Not production ready! Using hadoop streaming to push in the whole WARC requires reading the whole
thing into RAM first, in order to submit it as a single key-value pair. This is slow and brittle in terms of RAM.
Parameters:
input_file: The path for the file that contains the list of WARC files to process
from_local: Whether the paths refer to files on HDFS or local files
read_for_offset: Whether the WARC parser should read the whole record so it can populate the
record.raw_offset and record.raw_length fields (good for CDX indexing). Enabling this will
mean the reader has consumed the content body so your job will not have access to it.
"""
input_file = luigi.Parameter()
from_local = luigi.BoolParameter(default=False)
read_for_offset = luigi.BoolParameter(default=False)
kv_separator = '\t'
def __init__(self, **kwargs):
super(HadoopWarcReaderJob, self).__init__(**kwargs)
def requires(self):
return ExternalFilesFromList(self.input_file, from_local=self.from_local)
def extra_files(self):
return []
def extra_modules(self):
# Always needs to include the root packages of everything that's imported above except luigi (because luigi handles that)
return [warcio,six,requests,urllib3,chardet,certifi,idna]
def jobconfs(self):
jcs = super(HadoopWarcReaderJob, self).jobconfs()
jcs.append('stream.map.input.field.separator=%s' % self.kv_separator)
#jcs.append('stream.map.input.ignoreKey=true')
return jcs
def job_runner(self):
outputs = luigi.task.flatten(self.output())
for output in outputs:
if not isinstance(output, luigi.contrib.hdfs.HdfsTarget):
logger.warn("Job is using one or more non-HdfsTarget outputs" +
" so it will be run in local mode")
return luigi.contrib.hadoop.LocalJobRunner()
else:
return BinaryInputHadoopJobRunner()
def run_mapper(self, stdin=sys.stdin, stdout=sys.stdout):
"""
Run the mapper on the hadoop node.
ANJ: Creating modified version to pass through the raw stdin
"""
self.init_hadoop()
self.init_mapper()
outputs = self._map_input(stdin)
if self.reducer == NotImplemented:
self.writer(outputs, stdout)
else:
self.internal_writer(outputs, stdout)
def read_key_from_stream(self, stream):
c = ''
name = []
while c != self.kv_separator:
name.append(c)
c = stream.read(1)
name = ''.join(name)
logger.warning("Got file name '%s'..." % name)
return name
def _map_input(self, input_stream):
"""
Iterate over input and call the mapper for each item.
If the job has a parser defined, the return values from the parser will
be passed as arguments to the mapper.
If the input is coded output from a previous run,
the arguments will be split in key and value.
ANJ: Modified to use the warcio parser instead of splitting lines.
"""
# Wrap the stream in a handy Reader:
wrapped_stream = TellingReader(input_stream)
# Parse the start of the input stream, which is <filename>\t<filedata>
name = self.read_key_from_stream(wrapped_stream)
# Having consumed the 'key', read the payload:
wrapped_stream.pos = 0
try:
reader = warcio.ArchiveIterator(wrapped_stream)
logger.warning("Reader types: %s %s" % (reader.reader.decompressor, reader.reader.decomp_type))
for record in reader:
logger.warning("Got record type: %s %s %i" % (record.rec_type, record.content_type, record.length ))
logger.warning("Got record format and headers: %s %s %s" % (record.format, record.rec_headers, record.http_headers ))
#content = record.content_stream().read()
#logger.warning("Record content: %s" % content[:128])
#logger.warning("Record content as hex: %s" % binascii.hexlify(content[:128]))
#logger.warning("Got record offset + length: %i %i" % (reader.get_record_offset(), reader.get_record_length() ))
if self.read_for_offset:
record.raw_offset = reader.get_record_offset()
record.raw_length = reader.get_record_length()
for output in self.mapper(record):
yield output
except Exception as e:
logger.error("Exception while processing WARC file!")
logger.exception("ArchiveIterator threw...", e)
if self.final_mapper != NotImplemented:
for output in self.final_mapper():
yield output
self._flush_batch_incr_counter()
def mapper(self, record):
# type: (ArcWarcRecord) -> [(str, str)]
""" Override this call to implement your own ArcWarcRecord-reading mapper. """
yield record.rec_type, 1
def internal_reader(self, input_stream):
"""
Reader which uses python eval on each part of a tab separated string.
Yields a tuple of python objects.
"""
for input_line in input_stream:
logger.warning("Processing LINE: %s" % input_line)
logger.warning("Processing split LINE: %s" % input_line.split("\t"))
logger.warning("USING: %s" % self.deserialize)
# Skip lines with errors in, arising from
# https://github.com/webrecorder/warcio/blob/ed7ebfd7c518bb6ff7e43bfc5d76285ea6eeb8b0/warcio/bufferedreaders.py#L143
if input_line.startswith('Error -3 while decompressing'):
continue
yield list(map(self.deserialize, input_line.split("\t")))