Skip to content

Commit

Permalink
make filesystem inference work for RangeDaily
Browse files Browse the repository at this point in the history
Brings RangeDaily feature set on par with RangeHourly, by
generalizing infer_bulk_complete_from_fs implementation.
  • Loading branch information
ulzha committed Feb 23, 2015
1 parent f004c35 commit 1aea123
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 48 deletions.
64 changes: 36 additions & 28 deletions luigi/tools/range.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,28 +374,29 @@ def _get_per_location_glob(tasks, outputs, regexes):
if m is None:
raise NotImplementedError("Couldn't deduce datehour representation in output path %r of task %s" % (p, t))

positions = [most_common((m.start(i), m.end(i)) for m in matches)[0] for i in range(1, 5)] # the most common position of every group is likely to be conclusive hit or miss
n_groups = len(matches[0].groups())
positions = [most_common((m.start(i), m.end(i)) for m in matches)[0] for i in range(1, n_groups + 1)] # the most common position of every group is likely to be conclusive hit or miss

glob = list(paths[0]) # TODO sanity check that it's the same for all paths?
glob = list(paths[0]) # FIXME sanity check that it's the same for all paths
for start, end in positions:
glob = glob[:start] + ['[0-9]'] * (end - start) + glob[end:]
return ''.join(glob).rsplit('/', 1)[0] # chop off the last path item (wouldn't need to if `hadoop fs -ls -d` equivalent were available)


def _get_filesystems_and_globs(task_cls):
def _get_filesystems_and_globs(datetime_to_task, datetime_to_re):
"""
Yields a (filesystem, glob) tuple per every output location of task_cls.
Yields a (filesystem, glob) tuple per every output location of task.
task_cls can have one or several FileSystemTarget outputs.
The task can have one or several FileSystemTarget outputs.
For convenience, task_cls can be a wrapper task,
For convenience, the task can be a luigi.WrapperTask,
in which case outputs of all its dependencies are considered.
"""
# probe some scattered datehours unlikely to all occur in paths, other than by being sincere datehour parameter's representations
# probe some scattered datetimes unlikely to all occur in paths, other than by being sincere datetime parameter's representations
# TODO limit to [self.start, self.stop) so messages are less confusing? Done trivially it can kill correctness
sample_datehours = [datetime(y, m, d, h) for y in range(2000, 2050, 10) for m in range(1, 4) for d in range(5, 8) for h in range(21, 24)]
regexes = [re.compile('(%04d).*(%02d).*(%02d).*(%02d)' % (d.year, d.month, d.day, d.hour)) for d in sample_datehours]
sample_tasks = [task_cls(d) for d in sample_datehours]
sample_datetimes = [datetime(y, m, d, h) for y in range(2000, 2050, 10) for m in range(1, 4) for d in range(5, 8) for h in range(21, 24)]
regexes = [re.compile(datetime_to_re(d)) for d in sample_datetimes]
sample_tasks = [datetime_to_task(d) for d in sample_datetimes]
sample_outputs = [flatten_output(t) for t in sample_tasks]

for o, t in zip(sample_outputs, sample_tasks):
Expand All @@ -407,7 +408,7 @@ def _get_filesystems_and_globs(task_cls):
if not isinstance(target, FileSystemTarget):
raise NotImplementedError("Output targets must be instances of FileSystemTarget; was %r for %r" % (target, t))

for o in zip(*sample_outputs): # transposed, so here we're iterating over logical outputs, not datehours
for o in zip(*sample_outputs): # transposed, so here we're iterating over logical outputs, not datetimes
glob = _get_per_location_glob(sample_tasks, o, regexes)
yield o[0].fs, glob

Expand All @@ -431,9 +432,9 @@ def _list_existing(filesystem, glob, paths):
return set(listing)


def infer_bulk_complete_from_fs(task_cls, finite_datehours):
def infer_bulk_complete_from_fs(datetimes, datetime_to_task, datetime_to_re):
"""
Efficiently determines missing datehours by filesystem listing.
Efficiently determines missing datetimes by filesystem listing.
The current implementation works for the common case of a task writing
output to a FileSystemTarget whose path is built using strftime with format
Expand All @@ -442,31 +443,28 @@ def infer_bulk_complete_from_fs(task_cls, finite_datehours):
(Eventually Luigi could have ranges of completion as first-class citizens.
Then this listing business could be factored away/be provided for
explicitly in target API or some kind of a history server.)
TODO support RangeDaily
"""
filesystems_and_globs_by_location = _get_filesystems_and_globs(task_cls)
paths_by_datehour = [[o.path for o in flatten_output(task_cls(d))] for d in finite_datehours]
filesystems_and_globs_by_location = _get_filesystems_and_globs(datetime_to_task, datetime_to_re)
paths_by_datetime = [[o.path for o in flatten_output(datetime_to_task(d))] for d in datetimes]
listing = set()
for (f, g), p in zip(filesystems_and_globs_by_location, zip(*paths_by_datehour)): # transposed, so here we're iterating over logical outputs, not datehours
for (f, g), p in zip(filesystems_and_globs_by_location, zip(*paths_by_datetime)): # transposed, so here we're iterating over logical outputs, not datetimes
listing |= _list_existing(f, g, p)

# quickly learn everything that's missing
missing_datehours = []
for d, p in zip(finite_datehours, paths_by_datehour):
missing_datetimes = []
for d, p in zip(datetimes, paths_by_datetime):
if not set(p) <= listing:
missing_datehours.append(d)
missing_datetimes.append(d)

return missing_datehours
return missing_datetimes


class RangeDaily(RangeDailyBase):
"""Efficiently produces a contiguous completed range of a daily recurring
task that takes a single DateParameter.
Falls back to infer it from output filesystem listing to facilitate the common
case usage.
(FIXME the latter not implemented yet)
Falls back to infer it from output filesystem listing to facilitate the
common case usage.
Convenient to use even from command line, like:
Expand All @@ -476,7 +474,13 @@ class RangeDaily(RangeDailyBase):
"""

def missing_datetimes(self, task_cls, finite_datetimes):
return set(finite_datetimes) - set(map(self.parameter_to_datetime, task_cls.bulk_complete(list(map(self.datetime_to_parameter, finite_datetimes)))))
try:
return set(finite_datetimes) - set(map(self.parameter_to_datetime, task_cls.bulk_complete(map(self.datetime_to_parameter, finite_datetimes))))
except NotImplementedError:
return infer_bulk_complete_from_fs(
finite_datetimes,
lambda d: task_cls(self.datetime_to_parameter(d)),
lambda d: '(%04d).*(%02d).*(%02d)' % (d.year, d.month, d.day))


class RangeHourly(RangeHourlyBase):
Expand All @@ -485,7 +489,8 @@ class RangeHourly(RangeHourlyBase):
Benefits from bulk_complete information to efficiently cover gaps.
Falls back to infer it from output filesystem listing to facilitate the common case usage.
Falls back to infer it from output filesystem listing to facilitate the
common case usage.
Convenient to use even from command line, like:
Expand All @@ -498,4 +503,7 @@ def missing_datetimes(self, task_cls, finite_datetimes):
try:
return set(finite_datetimes) - set(map(self.parameter_to_datetime, task_cls.bulk_complete(list(map(self.datetime_to_parameter, finite_datetimes)))))
except NotImplementedError:
return infer_bulk_complete_from_fs(task_cls, finite_datetimes)
return infer_bulk_complete_from_fs(
finite_datetimes,
lambda d: task_cls(self.datetime_to_parameter(d)),
lambda d: '(%04d).*(%02d).*(%02d).*(%02d)' % (d.year, d.month, d.day, d.hour))
126 changes: 106 additions & 20 deletions test/range_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,12 @@ def requires(self):
yield TaskB(dh=self.dh, complicator='no/worries') # str(self.dh) would complicate beyond working


def mock_listdir(_, glob):
for path in fnmatch.filter(mock_contents, glob + '*'):
yield path
def mock_listdir(contents):
def contents_listdir(_, glob):
for path in fnmatch.filter(contents, glob + '*'):
yield path

return contents_listdir


def mock_exists_always_true(_, _2):
Expand Down Expand Up @@ -455,7 +458,78 @@ def test_start_long_before_long_hours_back_and_with_long_hours_forward(self):
)


class FilesystemInferenceTest(unittest.TestCase):

def _test_filesystems_and_globs(self, datetime_to_task, datetime_to_re, expected):
actual = list(_get_filesystems_and_globs(datetime_to_task, datetime_to_re))
self.assertEqual(len(actual), len(expected))
for (actual_filesystem, actual_glob), (expected_filesystem, expected_glob) in zip(actual, expected):
self.assertTrue(isinstance(actual_filesystem, expected_filesystem))
self.assertEqual(actual_glob, expected_glob)

def test_date_glob_successfully_inferred(self):
self._test_filesystems_and_globs(
lambda d: CommonDateTask(d),
lambda d: '(%04d).*(%02d).*(%02d)' % (d.year, d.month, d.day),
[
(MockFileSystem, '/n2000y01a05n/[0-9][0-9][0-9][0-9]_[0-9][0-9]-_-[0-9][0-9]aww/21mm01dara21'),
]
)

def test_datehour_glob_successfully_inferred(self):
self._test_filesystems_and_globs(
lambda d: CommonDateHourTask(d),
lambda d: '(%04d).*(%02d).*(%02d).*(%02d)' % (d.year, d.month, d.day, d.hour),
[
(MockFileSystem, '/n2000y01a05n/[0-9][0-9][0-9][0-9]_[0-9][0-9]-_-[0-9][0-9]aww/21mm[0-9][0-9]dara21'),
]
)

def test_wrapped_datehour_globs_successfully_inferred(self):
self._test_filesystems_and_globs(
lambda d: CommonWrapperTask(d),
lambda d: '(%04d).*(%02d).*(%02d).*(%02d)' % (d.year, d.month, d.day, d.hour),
[
(MockFileSystem, 'TaskA/[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]'),
(MockFileSystem, 'TaskB/no/worries[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]'),
]
)

def test_inconsistent_output_datehour_glob_not_inferred(self):
class InconsistentlyOutputtingDateHourTask(luigi.Task):
dh = luigi.DateHourParameter()

def output(self):
base = self.dh.strftime('/even/%Y%m%d%H')
if self.dh.hour % 2 == 0:
return MockFile(base)
else:
return {
'spi': MockFile(base + '/something.spi'),
'spl': MockFile(base + '/something.spl'),
}

with self.assertRaises(NotImplementedError):
list(_get_filesystems_and_globs(
lambda d: InconsistentlyOutputtingDateHourTask(d),
lambda d: '(%04d).*(%02d).*(%02d).*(%02d)' % (d.year, d.month, d.day, d.hour)))

def test_wrapped_inconsistent_datehour_globs_not_inferred(self):
class InconsistentlyParameterizedWrapperTask(luigi.WrapperTask):
dh = luigi.DateHourParameter()

def requires(self):
yield TaskA(dh=self.dh - datetime.timedelta(days=1))
yield TaskB(dh=self.dh, complicator='no/worries')

with self.assertRaises(NotImplementedError):
list(_get_filesystems_and_globs(
lambda d: InconsistentlyParameterizedWrapperTask(d),
lambda d: '(%04d).*(%02d).*(%02d).*(%02d)' % (d.year, d.month, d.day, d.hour)))


class RangeDailyTest(unittest.TestCase):

def test_bulk_complete_correctly_interfaced(self):
class BulkCompleteDailyTask(luigi.Task):
d = luigi.DateParameter()
Expand All @@ -480,26 +554,38 @@ def output(self):
actual = [t.task_id for t in task.requires()]
self.assertEqual(actual, expected)

@mock.patch('luigi.mock.MockFileSystem.listdir',
new=mock_listdir([
'/data/2014/p/v/z/2014_/_03-_-21octor/20/ZOOO',
'/data/2014/p/v/z/2014_/_03-_-23octor/20/ZOOO',
'/data/2014/p/v/z/2014_/_03-_-24octor/20/ZOOO',
]))
@mock.patch('luigi.mock.MockFileSystem.exists',
new=mock_exists_always_true)
def test_missing_tasks_correctly_required(self):
class SomeDailyTask(luigi.Task):
d = luigi.DateParameter()

class RangeHourlyTest(unittest.TestCase):
def output(self):
return MockFile(self.d.strftime('/data/2014/p/v/z/%Y_/_%m-_-%doctor/20/ZOOO'))

def _test_filesystems_and_globs(self, task_cls, expected):
actual = list(_get_filesystems_and_globs(task_cls))
self.assertEqual(len(actual), len(expected))
for (actual_filesystem, actual_glob), (expected_filesystem, expected_glob) in zip(actual, expected):
self.assertTrue(isinstance(actual_filesystem, expected_filesystem))
self.assertEqual(actual_glob, expected_glob)
task = RangeDaily(now=datetime_to_epoch(datetime.datetime(2016, 4, 1)),
of='SomeDailyTask',
start=datetime.date(2014, 3, 20),
task_limit=3,
days_back=3 * 365)
expected = [
'SomeDailyTask(d=2014-03-20)',
'SomeDailyTask(d=2014-03-22)',
'SomeDailyTask(d=2014-03-25)',
]
actual = [t.task_id for t in task.requires()]
self.assertEqual(actual, expected)

def test_successfully_inferred(self):
self._test_filesystems_and_globs(CommonDateHourTask, [
(MockFileSystem, '/n2000y01a05n/[0-9][0-9][0-9][0-9]_[0-9][0-9]-_-[0-9][0-9]aww/21mm[0-9][0-9]dara21'),
])
self._test_filesystems_and_globs(CommonWrapperTask, [
(MockFileSystem, 'TaskA/[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]'),
(MockFileSystem, 'TaskB/no/worries[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]'),
])

@mock.patch('luigi.mock.MockFileSystem.listdir', new=mock_listdir) # fishy to mock the mock, but MockFileSystem doesn't support globs yet
class RangeHourlyTest(unittest.TestCase):

@mock.patch('luigi.mock.MockFileSystem.listdir', new=mock_listdir(mock_contents)) # fishy to mock the mock, but MockFileSystem doesn't support globs yet
@mock.patch('luigi.mock.MockFileSystem.exists',
new=mock_exists_always_true)
def test_missing_tasks_correctly_required(self):
Expand All @@ -513,7 +599,7 @@ def test_missing_tasks_correctly_required(self):
actual = [t.task_id for t in task.requires()]
self.assertEqual(actual, expected_a)

@mock.patch('luigi.mock.MockFileSystem.listdir', new=mock_listdir)
@mock.patch('luigi.mock.MockFileSystem.listdir', new=mock_listdir(mock_contents))
@mock.patch('luigi.mock.MockFileSystem.exists',
new=mock_exists_always_true)
def test_missing_wrapper_tasks_correctly_required(self):
Expand Down

0 comments on commit 1aea123

Please sign in to comment.