Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
Added contrib.simulate.RunAnywayTarget with test in test/simulate_tes…
…t.py RunAnywayTarget: added new test, checking tasks output RunAnywayTarget: added test and a reset function called in each test Fixed coding style and documentation errors RunAnywayTarget: Made some changes to the custom temp directory and fixed some problems related to travis' tests RunAnywayTarget: complied with the sphinx documentation syntax Fixed unused import and used absolute paths for custom temp dirs in simulate_test RunAnywayTarget: changed the way it works, now deleting files at the end of the execution Removed a test than made no sense with the new changes, and adapted the other ones Revamped luigi.contrib.simulate module using a shared value containing a PID as an unique identifier Changed tests for luigi.contrib.simulate and removed the inconsistent ones Fixed test_output in simulate_test, and removed Travis specific file removal Potentially bypassed tests for environments that don't allow file creation in a temporary directory Fixed coding style and added is_writable decorator in simulate_test Fixed is_writable decorator, now creating parent directories and handling exceptions Made changes to the RunAnywayTarget (changed comments, used task_id) and added a test ran in another process
- Loading branch information
Showing
2 changed files
with
222 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Copyright 2012-2015 Spotify AB | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
""" | ||
A module containing classes used to simulate certain behaviors | ||
""" | ||
|
||
from multiprocessing import Value | ||
import tempfile | ||
import hashlib | ||
import logging | ||
import os | ||
|
||
import luigi | ||
|
||
logger = logging.getLogger('luigi-interface') | ||
|
||
|
||
class RunAnywayTarget(luigi.Target): | ||
""" | ||
A target used to make a task run everytime it is called. | ||
Usage: | ||
Pass `self` as the first argument in your task's `output`: | ||
.. code-block: python | ||
def output(self): | ||
return RunAnywayTarget(self) | ||
And then mark it as `done` in your task's `run`: | ||
.. code-block: python | ||
def run(self): | ||
# Your task execution | ||
# ... | ||
self.output().done() # will then be considered as "existing" | ||
""" | ||
|
||
# Specify the location of the temporary folder storing the state files. Subclass to change this value | ||
temp_dir = os.path.join(tempfile.gettempdir(), 'luigi-simulate') | ||
temp_time = 24 * 3600 # seconds | ||
|
||
# Unique value (PID of the first encountered target) to separate temporary files between executions and | ||
# avoid deletion collision | ||
unique = Value('i', 0) | ||
|
||
def __init__(self, task_obj): | ||
self.task_id = task_obj.task_id | ||
|
||
if self.unique.value == 0: | ||
with self.unique.get_lock(): | ||
if self.unique.value == 0: | ||
self.unique.value = os.getpid() # The PID will be unique for every execution of the pipeline | ||
|
||
# Deleting old files > temp_time | ||
if os.path.isdir(self.temp_dir): | ||
import shutil | ||
import time | ||
limit = time.time() - self.temp_time | ||
for fn in os.listdir(self.temp_dir): | ||
path = os.path.join(self.temp_dir, fn) | ||
if os.path.isdir(path) and os.stat(path).st_mtime < limit: | ||
shutil.rmtree(path) | ||
logger.debug('Deleted temporary directory %s', path) | ||
|
||
def get_path(self): | ||
""" | ||
Returns a temporary file path based on a MD5 hash generated with the task's name and its arguments | ||
""" | ||
md5_hash = hashlib.md5(self.task_id.encode()).hexdigest() | ||
logger.debug('Hash %s corresponds to task %s', md5_hash, self.task_id) | ||
|
||
return os.path.join(self.temp_dir, str(self.unique.value), md5_hash) | ||
|
||
def exists(self): | ||
""" | ||
Checks if the file exists | ||
""" | ||
return os.path.isfile(self.get_path()) | ||
|
||
def done(self): | ||
""" | ||
Creates temporary file to mark the task as `done` | ||
""" | ||
logger.info('Marking %s as done', self.task_id) | ||
|
||
fn = self.get_path() | ||
os.makedirs(os.path.dirname(fn), exist_ok=True) | ||
open(fn, 'w').close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
# -*- coding: utf-8 -*- | ||
# | ||
# Copyright 2012-2015 Spotify AB | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
from helpers import unittest | ||
import luigi | ||
from luigi.contrib.simulate import RunAnywayTarget | ||
|
||
from multiprocessing import Process | ||
import os | ||
import tempfile | ||
|
||
|
||
def temp_dir(): | ||
if os.getenv('TRAVIS') == 'true': | ||
return os.path.abspath(os.path.join(os.path.dirname(__file__), 'simulate-tmp')) | ||
return os.path.join(tempfile.gettempdir(), 'luigi-simulate') | ||
|
||
|
||
def is_writable(): | ||
d = temp_dir() | ||
fn = os.path.join(d, 'luigi-simulate-write-test') | ||
exists = True | ||
try: | ||
os.makedirs(d, exist_ok=True) | ||
open(fn, 'w').close() | ||
os.remove(fn) | ||
except: | ||
exists = False | ||
|
||
return unittest.skipIf(not exists, 'Can\'t write to temporary directory') | ||
|
||
|
||
class PathRunAnywayTarget(RunAnywayTarget): | ||
temp_dir = temp_dir() | ||
|
||
|
||
class TaskA(luigi.Task): | ||
i = luigi.IntParameter(default=0) | ||
|
||
def output(self): | ||
return PathRunAnywayTarget(self) | ||
|
||
def run(self): | ||
fn = os.path.join(temp_dir(), 'luigi-simulate-test.tmp') | ||
os.makedirs(os.path.dirname(fn), exist_ok=True) | ||
|
||
with open(fn, 'a') as f: | ||
f.write('{0}={1}\n'.format(self.__class__.__name__, self.i)) | ||
|
||
self.output().done() | ||
|
||
|
||
class TaskB(TaskA): | ||
def requires(self): | ||
return TaskA(i=10) | ||
|
||
|
||
class TaskC(TaskA): | ||
def requires(self): | ||
return TaskA(i=5) | ||
|
||
|
||
class TaskD(TaskA): | ||
def requires(self): | ||
return [TaskB(), TaskC(), TaskA(i=20)] | ||
|
||
|
||
class TaskWrap(luigi.WrapperTask): | ||
def requires(self): | ||
return [TaskA(), TaskD()] | ||
|
||
|
||
def reset(): | ||
# Force tasks to be executed again (because multiple pipelines are executed inside of the same process) | ||
t = TaskA().output() | ||
with t.unique.get_lock(): | ||
t.unique.value = 0 | ||
|
||
|
||
class RunAnywayTargetTest(unittest.TestCase): | ||
@is_writable() | ||
def test_output(self): | ||
reset() | ||
|
||
fn = os.path.join(temp_dir(), 'luigi-simulate-test.tmp') | ||
|
||
luigi.build([TaskWrap()], local_scheduler=True) | ||
with open(fn, 'r') as f: | ||
data = f.read().strip().split('\n') | ||
|
||
data.sort() | ||
reference = ['TaskA=0', 'TaskA=10', 'TaskA=20', 'TaskA=5', 'TaskB=0', 'TaskC=0', 'TaskD=0'] | ||
reference.sort() | ||
|
||
os.remove(fn) | ||
self.assertEqual(data, reference) | ||
|
||
@is_writable() | ||
def test_output_again(self): | ||
# Running the test in another process because the PID is used to determine if the target exists | ||
p = Process(target=self.test_output) | ||
p.start() | ||
p.join() |