Skip to content

Commit

Permalink
Fixed a bug in the tarred archive iterators that meant it could get s…
Browse files Browse the repository at this point in the history
…tuck in an infinite loop.

Sped up seeking to a particular archive+filename when reading tarred corpus subtypes.
Renamed "schedule" command -> "status", which is a little easier to understand!
  • Loading branch information
Mark Granroth-Wilding committed Apr 7, 2016
1 parent a77579a commit 2802ca0
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 19 deletions.
9 changes: 5 additions & 4 deletions src/python/pimlico/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from pimlico.core.modules.execute import execute_module, ModuleExecutionError


def schedule_cmd(pipeline, opts):
def status_cmd(pipeline, opts):
# Try deriving a schedule
print "Module execution schedule"
print "Module execution schedule with statuses"
for i, module_name in enumerate(pipeline.get_module_schedule(), start=1):
module = pipeline[module_name]
print " %d. %s" % (i, module_name)
Expand Down Expand Up @@ -70,8 +70,9 @@ def list_variants(pipeline, opts):
help="Check runtime dependencies for all modules. By default, these are not check as you might "
"be happy with them not all being satisfied at once")

schedule = subparsers.add_parser("schedule", help="Output a module execution schedule for the pipeline")
schedule.set_defaults(func=schedule_cmd)
status = subparsers.add_parser("status", help="Output a module execution schedule for the pipeline and execution "
"status for every module")
status.set_defaults(func=status_cmd)

run = subparsers.add_parser("run", help="Execute an individual pipeline module")
run.set_defaults(func=run_cmd)
Expand Down
25 changes: 20 additions & 5 deletions src/python/pimlico/datatypes/tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,28 @@ def archive_iter(self, subsample=None, start_after=None):
# Prepare a temporary directory to extract everything to
tmp_dir = mkdtemp()

file_num = -1
if start_after is None:
# Don't wait to start
started = True
else:
# Start after we've skipped this number of docs or hit this (archive, doc name)
# Start after we've hit this (archive, doc name)
started = False

try:
for tar_name, tarball_filename in zip(self.tarballs, self.tar_filenames):
# If we're waiting for a particular archive/file, we can skip archives until we're in the right one
if not started and start_after[0] != tar_name:
continue

# Extract the tarball to the temp dir
with tarfile.open(tarball_filename, 'r') as tarball:
for tarinfo in tarball:
file_num += 1
filename = tarinfo.name

# Allow the first portion of the corpus to be skipped
if not started:
if (type(start_after) is int and file_num == start_after) or \
(start_after == (tar_name, filename)):
# We know we're in the right archive now, skip until we get to the requested file
if start_after[1] == filename:
# We've hit the condition for starting
# Skip this doc and start on the next
started = True
Expand All @@ -97,6 +99,15 @@ def archive_iter(self, subsample=None, start_after=None):
yield tar_name, filename, document
# Remove the file once we're done with it (when we request another)
os.remove(os.path.join(tmp_dir, filename))

# Catch the case where the archive/filename requested as a starting point wasn't found
# We only get here with started=False when we're in the right archive and have got through the
# the whole thing without finding the requested filename
if not started:
raise TarredCorpusIterationError(
"tried to start iteration over tarred corpus at document (%s, %s), but filename %s "
"wasn't found in archive %s" % (start_after[0], start_after[1], start_after[1], tar_name)
)
finally:
# Remove the temp dir
shutil.rmtree(tmp_dir)
Expand Down Expand Up @@ -265,3 +276,7 @@ def __len__(self):

class CorpusAlignmentError(Exception):
pass


class TarredCorpusIterationError(Exception):
pass
1 change: 1 addition & 0 deletions src/python/pimlico/modules/corpora/tar/exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class ModuleExecutor(BaseModuleExecutor):
def execute(self):
# Most of what we need to do is implemented by the filter version of this module, so reuse that
filter_datatype = TarredCorpusFilter(
self.info.pipeline,
self.info.get_input("documents"),
self.info.options["archive_size"],
archive_basename=self.info.options["archive_basename"]
Expand Down
20 changes: 10 additions & 10 deletions src/python/pimlico/modules/corpora/tar_filter/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
import random

from pimlico.core.modules.base import BaseModuleInfo
from pimlico.core.modules.execute import ModuleNotReadyError
from pimlico.datatypes.base import IterableCorpus
Expand Down Expand Up @@ -44,7 +45,7 @@ def tarballs(self):
# Work out how many digits to pad the archive numbers with in the filenames
digits = len("%d" % (total_archives-1))
# Prepare a formatter for archive numbers
archive_name_format = "%s-%%%sd" % (self.archive_basename, "0" * digits)
archive_name_format = "%s-%%%sd" % (self.archive_basename, "0%d" % digits)

self._tarballs = [archive_name_format % archive_num for archive_num in range(total_archives)]
return self._tarballs
Expand All @@ -59,16 +60,20 @@ def archive_iter(self, subsample=None, start_after=None):
# Don't wait to start
started = True
else:
# Start after we've skipped this number of docs or hit this (archive, doc name)
# Start after we've hit this (archive, doc name)
started = False

for file_num, (doc_name, doc) in enumerate(self.input_datatype):
for doc_name, doc in self.input_datatype:
current_archive_count += 1

# Check whether we've put enough files in the current archive to move onto the next
if current_archive_count == self.archive_size:
current_archive = min(len(tarballs), current_archive+1)
current_archive_count = 0

# Allow the first portion of the corpus to be skipped
if not started:
if (type(start_after) is int and file_num == start_after) or \
(start_after == (tarballs[current_archive], doc_name)):
if start_after == (tarballs[current_archive], doc_name):
# We've hit the condition for starting
# Skip this doc and start on the next
started = True
Expand All @@ -79,11 +84,6 @@ def archive_iter(self, subsample=None, start_after=None):
# Reject this file
continue

# Check whether we've put enough files in the current archive to move onto the next
if current_archive_count == self.archive_size:
current_archive = min(len(tarballs), current_archive+1)
current_archive_count = 0

yield tarballs[current_archive], doc_name, doc

def list_archive_iter(self):
Expand Down

0 comments on commit 2802ca0

Please sign in to comment.