Skip to content

Commit

Permalink
fix incompatibilities with hadoop 0.20 (fixes GH-20)
Browse files Browse the repository at this point in the history
  • Loading branch information
klbostee committed Dec 13, 2010
1 parent 550528b commit d143163
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 11 deletions.
10 changes: 10 additions & 0 deletions dumbo/backends/common.py
Expand Up @@ -103,6 +103,12 @@ def __repr__(self):
return repr(self.dump())


class RunInfo(object):

def get_input_path(self):
return 'unknown'


class Iteration(object):

def __init__(self, prog, opts):
Expand Down Expand Up @@ -254,3 +260,7 @@ def get_mapredbase_class(self, opts):
def get_joinkey_class(self, opts):
""" Returns a suitable JoinKey class """
return JoinKey

def get_runinfo_class(self, opts):
""" Returns a suitable RunInfo class """
return RunInfo
37 changes: 28 additions & 9 deletions dumbo/backends/streaming.py
Expand Up @@ -18,7 +18,7 @@
import sys
import re

from dumbo.backends.common import Backend, Iteration, FileSystem
from dumbo.backends.common import Backend, Iteration, FileSystem, RunInfo
from dumbo.util import getopt, getopts, configopts, envdef, execute
from dumbo.util import findhadoop, findjar, dumpcode, dumptext

Expand All @@ -36,6 +36,9 @@ def create_filesystem(self, opts):
hadoopopt = getopt(opts, 'hadoop', delete=False)
return StreamingFileSystem(findhadoop(hadoopopt[0]))

def get_runinfo_class(self, opts):
return StreamingRunInfo


class StreamingIteration(Iteration):

Expand Down Expand Up @@ -201,14 +204,22 @@ def run(self):
if addedopts['delinputs'] and addedopts['delinputs'][0] == 'yes':
for (key, value) in self.opts:
if key == 'input':
execute("%s/bin/hadoop dfs -rmr '%s'" % (hadoop, value))
if os.path.exists(hadoop + "/bin/hdfs"):
hdfs = hadoop + "/bin/hdfs"
else:
hdfs = hadoop + "/bin/hadoop"
execute("%s dfs -rmr '%s'" % (hdfs, value))
return retval


class StreamingFileSystem(FileSystem):

def __init__(self, hadoop):
self.hadoop = hadoop
if os.path.exists(hadoop + "/bin/hdfs"):
self.hdfs = hadoop + "/bin/hdfs"
else:
self.hdfs = hadoop + "/bin/hadoop"

def cat(self, path, opts):
addedopts = getopts(opts, ['libjar'], delete=False)
Expand All @@ -220,7 +231,7 @@ def cat(self, path, opts):
shortcuts=dict(configopts('jars')))
try:
import typedbytes
ls = os.popen('%s %s/bin/hadoop dfs -ls %s' % (hadenv, self.hadoop, path))
ls = os.popen('%s %s dfs -ls %s' % (hadenv, self.hdfs, path))
if sum(c in path for c in ("*", "?", "{")) > 0:
# cat each file separately when the path contains special chars
lineparts = (line.split()[-1] for line in ls)
Expand All @@ -247,21 +258,29 @@ def cat(self, path, opts):
return 0

def ls(self, path, opts):
return execute("%s/bin/hadoop dfs -ls '%s'" % (self.hadoop, path),
return execute("%s dfs -ls '%s'" % (self.hdfs, path),
printcmd=False)

def exists(self, path, opts):
shellcmd = "%s/bin/hadoop dfs -stat '%s' >/dev/null 2>&1"
return 1 - int(execute(shellcmd % (self.hadoop, path), printcmd=False) == 0)
shellcmd = "%s dfs -stat '%s' >/dev/null 2>&1"
return 1 - int(execute(shellcmd % (self.hdfs, path), printcmd=False) == 0)

def rm(self, path, opts):
return execute("%s/bin/hadoop dfs -rmr '%s'" % (self.hadoop, path),
return execute("%s dfs -rmr '%s'" % (self.hdfs, path),
printcmd=False)

def put(self, path1, path2, opts):
return execute("%s/bin/hadoop dfs -put '%s' '%s'" % (self.hadoop, path1,
return execute("%s dfs -put '%s' '%s'" % (self.hdfs, path1,
path2), printcmd=False)

def get(self, path1, path2, opts):
return execute("%s/bin/hadoop dfs -get '%s' '%s'" % (self.hadoop, path1,
return execute("%s dfs -get '%s' '%s'" % (self.hdfs, path1,
path2), printcmd=False)


class StreamingRunInfo(RunInfo):

def get_input_path(self):
if os.environ.has_key('mapreduce_map_input_file'):
return os.environ['mapreduce_map_input_file']
return os.environ['map_input_file']
7 changes: 5 additions & 2 deletions dumbo/core.py
Expand Up @@ -236,6 +236,7 @@ def run(mapper,

mrbase_class = loadclassname(os.environ['dumbo_mrbase_class'])
jk_class = loadclassname(os.environ['dumbo_jk_class'])
runinfo = loadclassname(os.environ['dumbo_runinfo_class'])()

if iterarg == iter:
if sys.argv[1].startswith('map'):
Expand Down Expand Up @@ -275,7 +276,7 @@ def run(mapper,
if combconf:
combconf()
if os.environ.has_key('dumbo_addpath'):
path = os.environ['map_input_file']
path = runinfo.get_input_path()
inputs = (((path, k), v) for (k, v) in inputs)
if os.environ.has_key('dumbo_joinkeys'):
inputs = ((jk_class(k), v) for (k, v) in inputs)
Expand Down Expand Up @@ -420,7 +421,9 @@ def run(mapper,
opts.append(('cmdenv', 'dumbo_mrbase_class=' + \
getclassname(backend.get_mapredbase_class(opts))))
opts.append(('cmdenv', 'dumbo_jk_class=' + \
getclassname(backend.get_joinkey_class(opts))))
getclassname(backend.get_joinkey_class(opts))))
opts.append(('cmdenv', 'dumbo_runinfo_class=' + \
getclassname(backend.get_runinfo_class(opts))))
retval = backend.create_iteration(opts).run()
if retval == 127:
print >> sys.stderr, 'ERROR: Are you sure that "python" is on your path?'
Expand Down
4 changes: 4 additions & 0 deletions dumbo/util.py
Expand Up @@ -177,9 +177,13 @@ def findjar(hadoop, name):
hadoop home directory and component base name (e.g 'streaming')"""

jardir_candidates = filter(os.path.exists, [
os.path.join(hadoop, 'mapred', 'build', 'contrib', name),
os.path.join(hadoop, 'build', 'contrib', name),
os.path.join(hadoop, 'mapred', 'contrib', name, 'lib'),
os.path.join(hadoop, 'contrib', name, 'lib'),
os.path.join(hadoop, 'mapred', 'contrib', name),
os.path.join(hadoop, 'contrib', name),
os.path.join(hadoop, 'mapred', 'contrib'),
os.path.join(hadoop, 'contrib')
])
regex = re.compile(r'hadoop.*%s.*\.jar' % name)
Expand Down

0 comments on commit d143163

Please sign in to comment.