Skip to content

Commit

Permalink
Fixed various things that Landscape was complaining about
Browse files Browse the repository at this point in the history
  • Loading branch information
Erik Bernhardsson committed Jan 28, 2015
1 parent 1939a06 commit 80d5ffd
Show file tree
Hide file tree
Showing 12 changed files with 91 additions and 78 deletions.
8 changes: 4 additions & 4 deletions examples/top_artists.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Streams(luigi.Task):

def run(self):
with self.output().open('w') as output:
for i in xrange(1000):
for _ in xrange(1000):
output.write('{} {} {}\n'.format(
random.randint(0, 999),
random.randint(0, 999),
Expand Down Expand Up @@ -59,10 +59,10 @@ def requires(self):
def run(self):
artist_count = defaultdict(int)

for input in self.input():
with input.open('r') as in_file:
for t in self.input():
with t.open('r') as in_file:
for line in in_file:
timestamp, artist, track = line.strip().split()
_, artist, track = line.strip().split()
artist_count[artist] += 1

with self.output().open('w') as out_file:
Expand Down
4 changes: 2 additions & 2 deletions examples/wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def output(self):

def run(self):
count = {}
for file in self.input(): # The input() method is a wrapper around requires() that returns Target objects
for line in file.open('r'): # Target objects are a file system/format abstraction and this will return a file stream object
for f in self.input(): # The input() method is a wrapper around requires() that returns Target objects
for line in f.open('r'): # Target objects are a file system/format abstraction and this will return a file stream object
for word in line.strip().split():
count[word] = count.get(word, 0) + 1

Expand Down
2 changes: 1 addition & 1 deletion luigi/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def getintdict(self, section):
except NoSectionError:
return {}

def set(self, section, option, value):
def set(self, section, option, value=None):
if not ConfigParser.has_section(self, section):
ConfigParser.add_section(self, section)

Expand Down
4 changes: 3 additions & 1 deletion luigi/contrib/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ class CascadingClient(object):
'rename_dont_move',
]

def __init__(self, clients, method_names=ALL_METHOD_NAMES):
def __init__(self, clients, method_names=None):
self.clients = clients
if method_names is None:
method_names = self.ALL_METHOD_NAMES

for method_name in method_names:
new_method = self._make_method(method_name)
Expand Down
69 changes: 38 additions & 31 deletions luigi/hadoop.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import mrrunner
import json
import glob
import abc

logger = logging.getLogger('luigi-interface')

Expand All @@ -45,12 +46,12 @@ def attach(*packages):
_attached_packages.extend(packages)


def dereference(file):
if os.path.islink(file):
def dereference(f):
if os.path.islink(f):
# by joining with the dirname we are certain to get the absolute path
return dereference(os.path.join(os.path.dirname(file), os.readlink(file)))
return dereference(os.path.join(os.path.dirname(f), os.readlink(f)))
else:
return file
return f


def get_extra_files(extra_files):
Expand All @@ -66,8 +67,8 @@ def get_extra_files(extra_files):
if os.path.isdir(src):
src_prefix = os.path.join(src, '')
for base, dirs, files in os.walk(src):
for file in files:
f_src = os.path.join(base, file)
for f in files:
f_src = os.path.join(base, f)
f_src_stripped = f_src[len(src_prefix):]
f_dst = os.path.join(dst, f_src_stripped)
result.append((f_src, f_dst))
Expand Down Expand Up @@ -327,13 +328,15 @@ class HadoopJobRunner(JobRunner):
TODO: add code to support Elastic Mapreduce (using boto) and local execution.
'''

def __init__(self, streaming_jar, modules=[], streaming_args=[], libjars=[], libjars_in_hdfs=[], jobconfs={}, input_format=None, output_format=None):
def __init__(self, streaming_jar, modules=None, streaming_args=None, libjars=None, libjars_in_hdfs=None, jobconfs=None, input_format=None, output_format=None):
def get(x, default):
return x is not None and x or default
self.streaming_jar = streaming_jar
self.modules = modules
self.streaming_args = streaming_args
self.libjars = libjars
self.libjars_in_hdfs = libjars_in_hdfs
self.jobconfs = jobconfs
self.modules = get(modules, [])
self.streaming_args = get(streaming_args, [])
self.libjars = get(libjars, [])
self.libjars_in_hdfs = get(libjars_in_hdfs, [])
self.jobconfs = get(jobconfs, {})
self.input_format = input_format
self.output_format = output_format
self.tmp_dir = False
Expand Down Expand Up @@ -396,7 +399,7 @@ def run_job(self, job):
dst_tmp = '%s_%09d' % (dst.replace('/', '_'), random.randint(0, 999999999))
files += ['%s#%s' % (src, dst_tmp)]
# -files doesn't support subdirectories, so we need to create the dst_tmp -> dst manually
job._add_link(dst_tmp, dst)
job.add_link(dst_tmp, dst)

if files:
arglist += ['-files', ','.join(files)]
Expand Down Expand Up @@ -438,7 +441,7 @@ def run_job(self, job):
# submit job
create_packages_archive(packages, self.tmp_dir + '/packages.tar')

job._dump(self.tmp_dir)
job.dump(self.tmp_dir)

run_and_track_hadoop_job(arglist)

Expand Down Expand Up @@ -478,20 +481,20 @@ class LocalJobRunner(JobRunner):
def __init__(self, samplelines=None):
self.samplelines = samplelines

def sample(self, input, n, output):
for i, line in enumerate(input):
def sample(self, input_stream, n, output):
for i, line in enumerate(input_stream):
if n is not None and i >= n:
break
output.write(line)

def group(self, input):
def group(self, input_stream):
output = StringIO.StringIO()
lines = []
for i, line in enumerate(input):
for i, line in enumerate(input_stream):
parts = line.rstrip('\n').split('\t')
blob = md5(str(i)).hexdigest() # pseudo-random blob to make sure the input isn't sorted
lines.append((parts[:-1], blob, line))
for k, _, line in sorted(lines):
for _, _, line in sorted(lines):
output.write(line)
output.seek(0)
return output
Expand All @@ -507,28 +510,28 @@ def run_job(self, job):
if job.reducer == NotImplemented:
# Map only job; no combiner, no reducer
map_output = job.output().open('w')
job._run_mapper(map_input, map_output)
job.run_mapper(map_input, map_output)
map_output.close()
return

job.init_mapper()
# run job now...
map_output = StringIO.StringIO()
job._run_mapper(map_input, map_output)
job.run_mapper(map_input, map_output)
map_output.seek(0)

if job.combiner == NotImplemented:
reduce_input = self.group(map_output)
else:
combine_input = self.group(map_output)
combine_output = StringIO.StringIO()
job._run_combiner(combine_input, combine_output)
job.run_combiner(combine_input, combine_output)
combine_output.seek(0)
reduce_input = self.group(combine_output)

job.init_reducer()
reduce_output = job.output().open('w')
job._run_reducer(reduce_input, reduce_output)
job.run_reducer(reduce_input, reduce_output)
reduce_output.close()


Expand All @@ -546,6 +549,10 @@ class BaseHadoopJobTask(luigi.Task):
_counter_dict = {}
task_id = None

@abc.abstractmethod
def job_runner(self):
pass

def jobconfs(self):
jcs = []
jcs.append('mapred.job.name=%s' % self.task_id)
Expand Down Expand Up @@ -729,7 +736,7 @@ def extra_files(self):
'''
return []

def _add_link(self, src, dst):
def add_link(self, src, dst):
if not hasattr(self, '_links'):
self._links = []
self._links.append((src, dst))
Expand All @@ -753,9 +760,9 @@ def _setup_links(self):
'Missing files for distributed cache: ' +
', '.join(missing))

def _dump(self, dir=''):
def dump(self, directory=''):
"""Dump instance to file."""
file_name = os.path.join(dir, 'job-instance.pickle')
file_name = os.path.join(directory, 'job-instance.pickle')
if self.__module__ == '__main__':
d = pickle.dumps(self)
module_name = os.path.basename(sys.argv[0]).rsplit('.', 1)[0]
Expand Down Expand Up @@ -789,7 +796,7 @@ def _reduce_input(self, inputs, reducer, final=NotImplemented):
yield output
self._flush_batch_incr_counter()

def _run_mapper(self, stdin=sys.stdin, stdout=sys.stdout):
def run_mapper(self, stdin=sys.stdin, stdout=sys.stdout):
"""Run the mapper on the hadoop node."""
self.init_hadoop()
self.init_mapper()
Expand All @@ -799,14 +806,14 @@ def _run_mapper(self, stdin=sys.stdin, stdout=sys.stdout):
else:
self.internal_writer(outputs, stdout)

def _run_reducer(self, stdin=sys.stdin, stdout=sys.stdout):
def run_reducer(self, stdin=sys.stdin, stdout=sys.stdout):
"""Run the reducer on the hadoop node."""
self.init_hadoop()
self.init_reducer()
outputs = self._reduce_input(self.internal_reader((line[:-1] for line in stdin)), self.reducer, self.final_reducer)
self.writer(outputs, stdout)

def _run_combiner(self, stdin=sys.stdin, stdout=sys.stdout):
def run_combiner(self, stdin=sys.stdin, stdout=sys.stdout):
self.init_hadoop()
self.init_combiner()
outputs = self._reduce_input(self.internal_reader((line[:-1] for line in stdin)), self.combiner, self.final_combiner)
Expand All @@ -815,8 +822,8 @@ def _run_combiner(self, stdin=sys.stdin, stdout=sys.stdout):
def internal_reader(self, input_stream):
"""Reader which uses python eval on each part of a tab separated string.
Yields a tuple of python objects."""
for input in input_stream:
yield map(eval, input.split("\t"))
for input_line in input_stream:
yield map(eval, input_line.split("\t"))

def internal_writer(self, outputs, stdout):
"""Writer which outputs the python repr for each item"""
Expand Down
20 changes: 10 additions & 10 deletions luigi/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class HiveClient(object): # interface
__metaclass__ = abc.ABCMeta

@abc.abstractmethod
def table_location(self, table, database='default', partition={}):
def table_location(self, table, database='default', partition=None):
"""
Returns location of db.table (or db.table.partition). partition is a dict of partition key to
value.
Expand All @@ -87,7 +87,7 @@ def table_schema(self, table, database='default'):
pass

@abc.abstractmethod
def table_exists(self, table, database='default', partition={}):
def table_exists(self, table, database='default', partition=None):
"""
Returns true iff db.table (or db.table.partition) exists. partition is a dict of partition key to
value.
Expand All @@ -104,9 +104,9 @@ class HiveCommandClient(HiveClient):

""" Uses `hive` invocations to find information """

def table_location(self, table, database='default', partition={}):
def table_location(self, table, database='default', partition=None):
cmd = "use {0}; describe formatted {1}".format(database, table)
if partition:
if partition is not None:
cmd += " PARTITION ({0})".format(self.partition_spec(partition))

stdout = run_hive_cmd(cmd)
Expand All @@ -115,8 +115,8 @@ def table_location(self, table, database='default', partition={}):
if "Location:" in line:
return line.split("\t")[1]

def table_exists(self, table, database='default', partition={}):
if not partition:
def table_exists(self, table, database='default', partition=None):
if partition is None:
stdout = run_hive_cmd('use {0}; show tables like "{1}";'.format(database, table))

return stdout and table in stdout
Expand Down Expand Up @@ -157,18 +157,18 @@ def table_schema(self, table, database='default'):

class MetastoreClient(HiveClient):

def table_location(self, table, database='default', partition={}):
def table_location(self, table, database='default', partition=None):
with HiveThriftContext() as client:
if partition:
if partition is not None:
partition_str = self.partition_spec(partition)
thrift_table = client.get_partition_by_name(database, table, partition_str)
else:
thrift_table = client.get_table(database, table)
return thrift_table.sd.location

def table_exists(self, table, database='default', partition={}):
def table_exists(self, table, database='default', partition=None):
with HiveThriftContext() as client:
if not partition:
if partition is None:
return table in client.get_all_tables(database)
else:
return partition in self._existing_partitions(table, database, client)
Expand Down
9 changes: 3 additions & 6 deletions luigi/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def listdir(self, path):
return [s for s in self.get_all_data().keys()
if s.startswith(path)]

def mkdir(self, path):
def mkdir(self, path, parents=True, raise_if_exists=False):
"""mkdir is a noop"""
pass

Expand All @@ -81,9 +81,6 @@ def rename(self, path, fail_if_exists=False):
contents = self.fs.get_all_data().pop(self._fn)
self.fs.get_all_data()[path] = contents

def move_dir(self, path):
self.move(path, raise_if_exists=True)

@property
def path(self):
return self._fn
Expand All @@ -107,8 +104,8 @@ def close(self2):
self.fs.get_all_data()[fn] = self2.getvalue()
StringIO.StringIO.close(self2)

def __exit__(self, type, value, traceback):
if not type:
def __exit__(self, exc_type, exc_val, exc_tb):
if not exc_type:
self.close()

def __enter__(self):
Expand Down
10 changes: 5 additions & 5 deletions luigi/mrrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ def __init__(self, job=None):

def run(self, kind, stdin=sys.stdin, stdout=sys.stdout):
if kind == "map":
self.job._run_mapper(stdin, stdout)
self.job.run_mapper(stdin, stdout)
elif kind == "combiner":
self.job._run_combiner(stdin, stdout)
self.job.run_combiner(stdin, stdout)
elif kind == "reduce":
self.job._run_reducer(stdin, stdout)
self.job.run_reducer(stdin, stdout)
else:
raise Exception('weird command: %s' % kind)

Expand All @@ -64,7 +64,7 @@ def print_exception(exc):
print >> sys.stderr, 'luigi-exc-hex=%s' % tb.encode('hex')


def main(args=sys.argv, stdin=sys.stdin, stdout=sys.stdout, print_exception=print_exception):
def main(args=None, stdin=sys.stdin, stdout=sys.stdout, print_exception=print_exception):
"""Run either the mapper or the reducer from the class instance in the file "job-instance.pickle".
Arguments:
Expand All @@ -75,7 +75,7 @@ def main(args=sys.argv, stdin=sys.stdin, stdout=sys.stdout, print_exception=prin
# Set up logging.
logging.basicConfig(level=logging.WARN)

kind = args[1]
kind = args is not None and args[1] or sys.argv[1]
Runner().run(kind, stdin=stdin, stdout=stdout)
except Exception as exc:
# Dump encoded data that we will try to fetch using mechanize
Expand Down

0 comments on commit 80d5ffd

Please sign in to comment.