Skip to content

Commit

Permalink
hdfs refactor: Move out HDFSCliError
Browse files Browse the repository at this point in the history
This is probably the first commit in a while to actually build and pass
tests. I won't remeld my commits to the enormous merge conflict head
ache it would cause.
  • Loading branch information
Tarrasch committed May 8, 2015
1 parent 82639ab commit cae0da5
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 24 deletions.
3 changes: 2 additions & 1 deletion luigi/contrib/hdfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@

# clients
from luigi.contrib.hdfs import clients as hdfs_clients
from luigi.contrib.hdfs import error as hdfs_error
from luigi.contrib.hdfs import snakebite_client as hdfs_snakebite_client
from luigi.contrib.hdfs import hadoopcli_clients as hdfs_hadoopcli_clients
HDFSCliError = hdfs_clients.HDFSCliError
HDFSCliError = hdfs_error.HDFSCliError
call_check = hdfs_hadoopcli_clients.HdfsClient.call_check
list_path = hdfs_snakebite_client.SnakebiteHdfsClient.list_path
HdfsClient = hdfs_hadoopcli_clients.HdfsClient
Expand Down
15 changes: 0 additions & 15 deletions luigi/contrib/hdfs/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,6 @@
logger = logging.getLogger('luigi-interface')


class HDFSCliError(Exception):

def __init__(self, command, returncode, stdout, stderr):
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
msg = ("Command %r failed [exit code %d]\n" +
"---stdout---\n" +
"%s\n" +
"---stderr---\n" +
"%s" +
"------------") % (command, returncode, stdout, stderr)
super(HDFSCliError, self).__init__(msg)


def get_autoconfig_client(show_warnings=True):
"""
Creates the client as specified in the `client.cfg` configuration.
Expand Down
36 changes: 36 additions & 0 deletions luigi/contrib/hdfs/error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
#
# Copyright 2012-2015 Spotify AB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
The implementations of the hdfs clients. The hadoop cli client and the
snakebite client.
"""


class HDFSCliError(Exception):

def __init__(self, command, returncode, stdout, stderr):
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
msg = ("Command %r failed [exit code %d]\n" +
"---stdout---\n" +
"%s\n" +
"---stderr---\n" +
"%s" +
"------------") % (command, returncode, stdout, stderr)
super(HDFSCliError, self).__init__(msg)
12 changes: 6 additions & 6 deletions luigi/contrib/hdfs/hadoopcli_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from luigi.target import FileAlreadyExists, FileSystem
from luigi.contrib.hdfs.config import load_hadoop_cmd
from luigi.contrib.hdfs import config as hdfs_config
from luigi.contrib.hdfs import clients as hdfs_clients
from luigi.contrib.hdfs import error as hdfs_error
import logging
import subprocess
import datetime
Expand Down Expand Up @@ -64,7 +64,7 @@ def call_check(command):
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True, universal_newlines=True)
stdout, stderr = p.communicate()
if p.returncode != 0:
raise hdfs_clients.HDFSCliError(command, p.returncode, stdout, stderr)
raise hdfs_error.HDFSCliError(command, p.returncode, stdout, stderr)
return stdout

def exists(self, path):
Expand All @@ -84,7 +84,7 @@ def exists(self, path):
for line in stderr.split('\n'):
if not_found_re.match(line):
return False
raise hdfs_clients.HDFSCliError(cmd, p.returncode, stdout, stderr)
raise hdfs_error.HDFSCliError(cmd, p.returncode, stdout, stderr)

def rename(self, path, dest):
parent_dir = os.path.dirname(dest)
Expand Down Expand Up @@ -185,7 +185,7 @@ def mkdir(self, path, parents=True, raise_if_exists=False):
(['-p'] if parents else []) +
[path])
self.call_check(cmd)
except hdfs_clients.HDFSCliError as ex:
except hdfs_error.HDFSCliError as ex:
if "File exists" in ex.stderr:
if raise_if_exists:
raise FileAlreadyExists(ex.stderr)
Expand Down Expand Up @@ -249,7 +249,7 @@ def mkdir(self, path):
"""
try:
self.call_check(load_hadoop_cmd() + ['fs', '-mkdir', path])
except hdfs_clients.HDFSCliError as ex:
except hdfs_error.HDFSCliError as ex:
if "File exists" in ex.stderr:
raise FileAlreadyExists(ex.stderr)
else:
Expand Down Expand Up @@ -285,4 +285,4 @@ def exists(self, path):
elif p.returncode == 1:
return False
else:
raise hdfs_clients.HDFSCliError(cmd, p.returncode, stdout, stderr)
raise hdfs_error.HDFSCliError(cmd, p.returncode, stdout, stderr)
4 changes: 2 additions & 2 deletions luigi/contrib/hdfs/snakebite_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


from luigi.contrib.hdfs import config as hdfs_config
from luigi.contrib.hdfs import clients as hdfs_clients
from luigi.contrib.hdfs import error as hdfs_error
from luigi.contrib.hdfs import hadoopcli_clients as hdfs_hadoopcli_clients
from luigi import six
import luigi.contrib.target
Expand Down Expand Up @@ -90,7 +90,7 @@ def exists(self, path):
try:
return self.get_bite().test(path, exists=True)
except Exception as err: # IGNORE:broad-except
raise hdfs_clients.HDFSCliError("snakebite.test", -1, str(err), repr(err))
raise hdfs_error.HDFSCliError("snakebite.test", -1, str(err), repr(err))

def rename(self, path, dest):
"""
Expand Down

0 comments on commit cae0da5

Please sign in to comment.