Permalink
Browse files

First running code version

  • Loading branch information...
1 parent a7a2ec7 commit ec48f10ac003fb39139e932b426ad89ea2d373f7 @pmyteh committed Feb 15, 2014
Showing with 114 additions and 68 deletions.
  1. +20 −9 warc/warc.py
  2. +81 −45 warctika.py
  3. +13 −14 warctikad.py
View
@@ -136,6 +136,10 @@ def __repr__(self):
class WARCRecord(object):
"""The WARCRecord object represents a WARC Record.
"""
+ # TODO: Separate out subclasses for the different record types, with
+ # appropriate easy-access methods (warcinfo record body as a dict; separate
+ # body and header parts for response methods etc.) and methods for
+ # re-writing these to file again.
def __init__(self, header=None, payload=None, headers={}, defaults=True):
"""Creates a new WARC record.
"""
@@ -253,17 +257,24 @@ def get_underlying_mimetype(self):
"""Return the MIME type of the underlying content, for a given WARC
response or resource record. If no type recorded, return None"""
try:
- if record.type == 'response':
- # TODO: Test thoroughly!
- # Stuff the response payload into an HTTPMessage then ask for
- # the Content-Type.
- # Alternative: RegExp for the first case-insensitive
+ if self.type == 'response':
+ # RegExp for the first case-insensitive
# content-type in the payload, returning the rest of the line
# if there. If not, return None.
- return httplib.HTTPMessage(StringIO(self.payload)).gettype()
+ try:
+ headers = re.split(u'\n\n',
+ self.payload,
+ maxsplit=1
+ )[0]
+ return re.search(r'^Content-Type: (.*)$',
+ headers,
+ re.IGNORECASE | re.MULTILINE
+ ).group(1)
+ except Exception:
+ return None
elif self.type == 'resource':
try:
- return record['Content-Type']
+ return self['Content-Type']
except KeyError:
return None
else:
@@ -276,7 +287,7 @@ def get_underlying_content(self):
"""Return the underlying content for response and resource records.
i.e., just the resource file or HTTP body"""
if self.is_http_response():
- return re.split(u'\n\n', self.body, maxsplit=1)[1]
+ return re.split(u'\n\n', self.payload, maxsplit=1)[1]
elif self.type == 'resource':
return self.payload
else:
@@ -418,7 +429,7 @@ def read_record(self):
return None
self.current_payload = FilePart(fileobj, header.content_length)
- record = WARCRecord(header, self.current_payload, defaults=False)
+ record = WARCRecord(header, self.current_payload.read(), defaults=False)
return record
def _read_payload(self, fileobj, content_length):
View
@@ -27,12 +27,13 @@
#####
#import sys
-#import os
+import os
+import time
import pyinotify
import warc
import re
-# TODO: Check if necessary
-#import requests
+import requests
+import copy
#####
@@ -61,7 +62,8 @@ class WARCTikaProcessor:
matching lengthy and variable content-types such as the
application/vnd.openxmlformats-officedocument.* types."""
def __init__(
- self, tikaurl='http://localhost:9998/tika',
+ self,
+ tikaurl='http://localhost:9998/tika',
mimemappings=[
# Content-Types taken from a crawl of .gov.uk.
# It is astonishing what junk some web servers will supply
@@ -76,7 +78,7 @@ def __init__(
'application/msword'),
(r'^application/vnd\.openxmlformats-officedocument',
None),
- (r'^(text)|(application)/(rtf)|(richtext)$',
+ (r'^((text)|(application))/((rtf)|(richtext))$',
'text/rtf'),
(r'^application/vnd\.oasis\.opendocument',
None),
@@ -91,7 +93,8 @@ def __init__(
"produce plain text formats for storage. These processed files "
"have been stored as WARC conversion records: ")
for item in self._mimemappings:
- self._description += '"'+item[0]+'": '+str(item[1])+'. '
+ self._description += item[0]+'; '
+ self._description = self._description[:-2]+'.'
print "Initialised WARCTikaProcessor"
def process(self, infn, outfn):
@@ -101,48 +104,68 @@ def process(self, infn, outfn):
outwarc = warc.WARCFile(outfn, 'wb')
print "Processing %s to %s." % (infn, outfn)
for record in inwarc:
- print "Processing "+record.type
- try:
- if record.type == 'warcinfo':
- try:
- record.payload['description'] += (self._description)
- except KeyError:
- record.payload['description'] == self._description
- # TODO: Need to recalculate the length etc.
- elif record.type == 'response' or record.type == 'resource':
- if 'WARC-Segment-Number' in record.header:
- raise Exception("Segmented record. Skipping.")
- record = self.generate_new_record(record)
- # If 'metadata', 'request', 'revisit', 'continuation',
- # 'conversion' or something exotic, we can't do anything more
- # interesting than immediately re-writing it to the new file
- else:
- pass
- except Exception as e:
- print ("Warning: WARCTikaProcessor.process() failed on "+
- record.record_id+": "+str(e)+
- "\n\tWriting old record to new WARC.")
- finally:
- outwarc.write_record(warc.WARCRecord(header=record.header,
- payload=record.payload, # XXX or record.payload.read() or something
- defaults=False))
+# print "Processing "+record.type
+# try:
+ if record.type == 'warcinfo':
+ self.add_description_to_warcinfo(record)
+ elif record.type == 'response' or record.type == 'resource':
+ if 'WARC-Segment-Number' in record.header:
+ raise Exception("Segmented record. Skipping.")
+ record = self.generate_new_record(record)
+ # If 'metadata', 'request', 'revisit', 'continuation',
+ # 'conversion' or something exotic, we can't do anything more
+ # interesting than immediately re-writing it to the new file
+ else:
+ pass
+# except Exception as e:
+# print ("Warning: WARCTikaProcessor.process() failed on "+
+# record.header.record_id+": "+str(e.args)+", "+
+# str(e.message)+"\n\tWriting old record to new WARC.")
+# finally:
+ outwarc.write_record(
+ warc.WARCRecord(header=record.header,
+ payload=record.payload, # XXX or record.payload.read() or something
+ defaults=False))
inwarc.close()
outwarc.close()
+ def add_description_to_warcinfo(self, record):
+ """Add a description of our mangling to a warcinfo record's decription
+ tag, creating it if necessary"""
+ if record.type != 'warcinfo':
+ raise Exception("Non-warcinfo record passed to "
+ "add_description_to_warcinfo")
+
+ match = re.search(r'^(description: .*)$', record.payload, re.I | re.M)
+ if match:
+ record.payload = (record.payload[:match.end(0)-1] +
+ '. ' +
+ self._description +
+ record.payload[match.end(0):])
+ else:
+ record.payload = ("description: "+self._description+"\n"+
+ record.payload)
+# print record.payload
+
+ # Recalculate the record length
+ record.header['Content-Length'] = str(len(record.payload))
+
+
def generate_new_record(self, inrecord):
"""Produce and return a WARC conversion record based on the given
input WARC record. If conversion is not possible, return the
input record."""
# Check if handleable:
- i
if not inrecord.is_http_response() and not inrecord.type == 'resource':
return inrecord
inmimetype = inrecord.get_underlying_mimetype()
- inmimetype = self.checkmimetype(inmimetype)
- outcontent = self.tikaise(inrecord.get_underlying_content(),
- inmimetype)
- outheader = self.generate_cv_header(inrecord.h, outcontent)
+ inmimetype = self.make_canonical_mimetype(inmimetype)
+ if not inmimetype:
+ # Content-Type should not be Tikaised
+ return inrecord
+ outcontent = self.tikaise(inrecord.get_underlying_content(), inmimetype)
+ outheader = self.generate_cv_header(inrecord.header)
# defaults=true ensures (amongst other things) that the content-length
# field is regenerated.
# print outcontent, str(outcontent)
@@ -154,26 +177,32 @@ def tikaise(self, content, mimetype):
# TODO: Consider carefully whether to send Tika the filename to help
# guess the MIME type, which can be done by setting the (unofficial)
# {'File-Name': string} header.
- resp = requests.put(self._tikaurl, data=content,
- headers={'Content-Type': mimetype})
+ try:
+ resp = requests.put(self._tikaurl, data=content,
+ headers={'Content-Type': mimetype})
+ except requests.ConnectionError:
+ print "Unable to connect to Tika; will wait and retry."
+ time.sleep(120)
if resp.status_code != 200:
raise Exception("Bad response code from Tika ("
+resp.status_code+")")
- return resp # XXX: check that this is correct
+ return resp.content
# def strip_header(self, obj):
# """Strips the first HTTP/WARC header from an object, returning the
# rest of the object."""
# return re.split(u'\n\n', obj, maxsplit=1)[1]
- def check_mimetype(self, mimetype):
+ def make_canonical_mimetype(self, mimetype):
"""Return a canonical mimetype if mimetype matches our list to process,
else False.
None is always processable, but Tika will need to guess the type."""
if mimetype is None:
# Note: we can make Tika guess the Content-Type without assistance
# by setting it to the root type 'application/octet-stream'.
- return 'application/octet-stream'
+ # return 'application/octet-stream'
+ # Leave it as it is.
+ return False
for tup in self._mimemappings:
if re.search(tup[0], mimetype, re.IGNORECASE):
if tup[1] is None:
@@ -187,7 +216,7 @@ def generate_cv_header(self, oldheader):
kinds of digests as these will be automatically produced
by the WARC record creation process with defaults=True."""
# Build new header based upon the old header and new content
- d = oldheader.copy()
+ d = copy.copy(oldheader)
# Not valid in conversion records
d.pop('WARC-Concurrent-To', None)
@@ -205,11 +234,15 @@ def generate_cv_header(self, oldheader):
# As we're throwing away the old record, this isn't sensible.
#d['WARC-Refers-To'] = d['WARC-Record-ID']
#d['WARC-Record-ID'] = "<urn:uuid:%s>" % uuid.uuid1()
- return warc.Header(d)
+ return d
class WARCNotifyHandler(pyinotify.ProcessEvent):
"""Handler for pyinotify created/deleted WARC notifications."""
def my_init(self, warcprocessor=None,
+ # Note that this does not match ".open" files
+ # so we need not worry about heritrix files
+ # in production (as long as we pick them up
+ # when they move to their final filename.
oldsuffix='.warc.gz',
newsuffix='-ViaTika.warc.gz'):
if not warcprocessor:
@@ -220,6 +253,7 @@ def my_init(self, warcprocessor=None,
def process_IN_CREATE(self, event):
print "IN_CREATE called for "+event.pathname
# If the new file is a WARC, but not a tikaed one, process it
+ # TODO: Check that this is not an
if (event.pathname.endswith(self.oldsuffix) and not
event.pathname.endswith(self.newsuffix)):
try:
@@ -233,8 +267,10 @@ def process_IN_CREATE(self, event):
"file "+event.pathname+": "+str(e)+str(e.args)+
"\n\tGiving up on it.")
raise e
- def process_IN_MOVETO(self, event):
- print "IN_MOVETO called for "+event.pathname
+ os.remove(event.pathname)
+ print "Finished handling", event.pathname
+ def process_IN_MOVE_TO(self, event):
+ print "IN_MOVE_TO called for "+event.pathname
# Treat files moved as a creation.
self.process_IN_CREATE(event)
View
@@ -1,11 +1,5 @@
#!/usr/bin/env python
-"""Watch a directory for new WARC files, then process them by extracting
-non-text content with Apache Tika and re-writing a WARC file with
-transformation records in place of the original.
-
-Requirements: TikaJAXRS running on a given port and auto-reloaded.
-
-Copyright 2014 Tom Nicholls
+"""Copyright 2014 Tom Nicholls
This work is available under the terms of the GNU General Purpose Licence
This program is free software: you can redistribute it and/or modify
@@ -46,10 +40,10 @@
# watched events
# TODO: Consider if we also want IN_CLOSE_WRITE (depends on the order that
# heritrix finishes writing, closes and renames the file.
-mask = pyinotify.IN_CREATE | pyinotify.IN_MOVED_TO
+mask = pyinotify.IN_CREATE | pyinotify.IN_MOVED_TO | pyinotify.IN_MOVED_FROM
wm.add_watch(dirname, mask)
warcprocessor = warctika.WARCTikaProcessor()
-oldsuffix = 'warc.gz'
+oldsuffix = '.warc.gz'
newsuffix = '-ViaTika.warc.gz'
handler = warctika.WARCNotifyHandler(warcprocessor=warcprocessor,
oldsuffix=oldsuffix,
@@ -59,19 +53,24 @@
# On first run, loop through watched directory and handle all existing
# files, in case we restarted part-way through a crawl.
for fn in os.listdir(dirname):
+ infn = dirname+"/"+fn
+ outfn = re.sub(oldsuffix+'$', newsuffix, infn)
if fn.endswith(oldsuffix) and not fn.endswith(newsuffix):
- print "Processing existing file:"+dirname+"/"+fn
+# if os.path.exists(outfn):
+# print "Existing file", infn, "has already been processed. Skipping."
+# continue
+ print "Processing existing file:", infn
# try:
- print dirname
- warcprocessor.process(
- infn=dirname+"/"+fn,
- outfn=re.sub(oldsuffix+'$', newsuffix, dirname+"/"+fn) )
+ warcprocessor.process(infn=infn, outfn=outfn)
+ os.remove(infn)
# except Exception as e:
+# XXX cleanup: delete -ViaTika.warc.gz file if present.
# print ("Warning: Startup processor failed to process "+
# "file "+fn+": "+str(e)+str(e.args)+
# "\n\tGiving up on it.")
# raise e
+print "Finished processing existing files. Now watching for new WARC files."
# Run forever
notifier.loop()

0 comments on commit ec48f10

Please sign in to comment.