Skip to content
This repository has been archived by the owner on Apr 19, 2022. It is now read-only.

Commit

Permalink
Merge pull request #189 from zline/master
Browse files Browse the repository at this point in the history
Common base class for exceptions; differentiating transient and fatal exceptions
  • Loading branch information
Wouter de Bie committed Feb 25, 2016
2 parents 214c307 + d41a1eb commit 1afbc4e
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 26 deletions.
19 changes: 11 additions & 8 deletions snakebite/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

from snakebite.platformutils import get_current_username
from snakebite.formatter import format_bytes
from snakebite.errors import RequestError
from snakebite.errors import RequestError, TransientException, FatalException
from snakebite.crc32c import crc

import google.protobuf.internal.encoder as encoder
Expand Down Expand Up @@ -139,7 +139,9 @@ def _buffer_bytes(self, n):
log.debug("Bytes read: %d, total: %d" % (len(bytes_read), self.buffer_length))
return n
if len(bytes_read) < n:
raise Exception("RpcBufferedReader only managed to read %s out of %s bytes" % (len(bytes_read), n))
# we'd like to distinguish transient (e.g. network-related) problems
# note: but this error could also be a logic error
raise TransientException("RpcBufferedReader only managed to read %s out of %s bytes" % (len(bytes_read), n))

def rewind(self, places):
'''Rewinds the current buffer to a position. Needed for reading varints,
Expand Down Expand Up @@ -184,7 +186,7 @@ def __init__(self, host, port, version, effective_user=None, use_sasl=False, hdf
self.hdfs_namenode_principal = hdfs_namenode_principal
if self.use_sasl:
if not _kerberos_available:
raise Exception("Kerberos libs not found. Please install snakebite using 'pip install snakebite[kerberos]'")
raise FatalException("Kerberos libs not found. Please install snakebite using 'pip install snakebite[kerberos]'")

kerberos = Kerberos()
self.effective_user = effective_user or kerberos.user_principal().name
Expand All @@ -196,7 +198,7 @@ def validate_request(self, request):

# Check the request is correctly initialized
if not request.IsInitialized():
raise Exception("Client request (%s) is missing mandatory fields" % type(request))
raise FatalException("Client request (%s) is missing mandatory fields" % type(request))

def get_connection(self, host, port):
'''Open a socket connection to a given host and port and writes the Hadoop header
Expand Down Expand Up @@ -241,7 +243,7 @@ def get_connection(self, host, port):
sasl = SaslRpcClient(self, hdfs_namenode_principal=self.hdfs_namenode_principal)
sasl_connected = sasl.connect()
if not sasl_connected:
raise Exception("SASL is configured, but cannot get connected")
raise TransientException("SASL is configured, but cannot get connected")

rpc_header = self.create_rpc_request_header()
context = self.create_connection_context()
Expand Down Expand Up @@ -486,7 +488,7 @@ def _close_socket(self):

def _read_bytes(self, n, depth=0):
if depth > self.MAX_READ_ATTEMPTS:
raise Exception("Tried to read %d more bytes, but failed after %d attempts" % (n, self.MAX_READ_ATTEMPTS))
raise TransientException("Tried to read %d more bytes, but failed after %d attempts" % (n, self.MAX_READ_ATTEMPTS))

bytes = self.sock.recv(n)
if len(bytes) < n:
Expand Down Expand Up @@ -580,7 +582,7 @@ def readBlock(self, length, pool_id, block_id, generation_stamp, offset, block_t
elif checksum_type in [self.CHECKSUM_CRC32C, self.CHECKSUM_CRC32]:
checksum_len = 4
else:
raise Exception("Checksum type %s not implemented" % checksum_type)
raise FatalException("Checksum type %s not implemented" % checksum_type)

total_read = 0
if block_op_response.status == 0: # datatransfer_proto.Status.Value('SUCCESS')
Expand Down Expand Up @@ -633,7 +635,8 @@ def readBlock(self, length, pool_id, block_id, generation_stamp, offset, block_t
if check_crc and checksum_type != self.CHECKSUM_NULL:
checksum_index = i * chunks_per_load + j
if checksum_index < len(checksums) and crc(chunk) != checksums[checksum_index]:
raise Exception("Checksum doesn't match")
# it makes sense to retry, so TransientError
raise TransientException("Checksum doesn't match")
load += chunk
total_read += len(chunk)
read_on_packet += len(chunk)
Expand Down
18 changes: 10 additions & 8 deletions snakebite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
InvalidInputException,
OutOfNNException,
RequestError,
)
FatalException, TransientException)
from snakebite.namenode import Namenode
from snakebite.service import RpcService

Expand Down Expand Up @@ -103,7 +103,7 @@ def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEF
:type hdfs_namenode_principal: string
'''
if hadoop_version < 9:
raise Exception("Only protocol versions >= 9 supported")
raise FatalException("Only protocol versions >= 9 supported")

self.host = host
self.port = port
Expand Down Expand Up @@ -533,7 +533,7 @@ def _handle_delete(self, path, node, recurse):
if result['result']:
result['message'] = ". Moved %s to %s" % (path, trash_path)
return result
raise Exception("Failed to move to trash: %s" % path)
raise FatalException("Failed to move to trash: %s" % path)
else:
request = client_proto.DeleteRequestProto()
request.src = path
Expand All @@ -547,7 +547,7 @@ def __should_move_to_trash(self, path):
if path.startswith(self.trash):
return False # Path already in trash
if posixpath.dirname(self.trash).startswith(path):
raise Exception("Cannot move %s to the trash, as it contains the trash" % path)
raise FatalException("Cannot move %s to the trash, as it contains the trash" % path)

return True

Expand Down Expand Up @@ -798,7 +798,7 @@ def getmerge(self, path, dst, newline=False, check_crc=False):
elif not load['error'] is '':
if os.path.isfile(temporary_target):
os.remove(temporary_target)
raise Exception(load['error'])
raise FatalException(load['error'])
if newline and load['response']:
f.write("\n")
yield {"path": dst, "response": '', "result": True, "error": load['error'], "source_path": path}
Expand Down Expand Up @@ -1145,7 +1145,7 @@ def _read_file(self, path, node, tail_only, check_crc, tail_length=1024):
successful_read = True
yield load
except Exception as e:
log.error(e)
log.getChild('transient').error(e)
if not location.id.storageID in failed_nodes:
failed_nodes.append(location.id.storageID)
successful_read = False
Expand All @@ -1157,7 +1157,7 @@ def _read_file(self, path, node, tail_only, check_crc, tail_length=1024):
if successful_read:
break
if successful_read is False:
raise Exception("Failure to read block %s" % block.b.blockId)
raise TransientException("Failure to read block %s" % block.b.blockId)

def _find_items(self, paths, processor, include_toplevel=False, include_children=False, recurse=False, check_nonexistence=False):
''' Request file info from the NameNode and call the processor on the node(s) returned
Expand Down Expand Up @@ -1388,7 +1388,9 @@ def __init__(self, namenodes, use_trash=False, effective_user=None, use_sasl=Fal
self.hdfs_namenode_principal = hdfs_namenode_principal

if not namenodes:
raise OutOfNNException("List of namenodes is empty - couldn't create the client")
# Using InvalidInputException instead of OutOfNNException because the later is transient but current case
# is not.
raise InvalidInputException("List of namenodes is empty - couldn't create the client")
self.namenode = self._switch_namenode(namenodes)
self.namenode.next()

Expand Down
40 changes: 32 additions & 8 deletions snakebite/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,65 @@
# the License.


class ConnectionFailureException(Exception):
class SnakebiteException(Exception):
"""
Common base class for all snakebite exceptions.
"""
pass


class FatalException(SnakebiteException):
"""
FatalException indicates that retry of current operation alone would have the same effect.
"""
pass


class TransientException(SnakebiteException):
"""
TransientException indicates that retry of current operation could help.
"""
pass


class ConnectionFailureException(TransientException):
def __init__(self, msg):
super(ConnectionFailureException, self).__init__(msg)


class DirectoryException(Exception):
class DirectoryException(FatalException):
def __init__(self, msg):
super(DirectoryException, self).__init__(msg)


class FileAlreadyExistsException(Exception):
class FileAlreadyExistsException(FatalException):
def __init__(self, msg):
super(FileAlreadyExistsException, self).__init__(msg)


class FileException(Exception):
class FileException(FatalException):
def __init__(self, msg):
super(FileException, self).__init__(msg)


class FileNotFoundException(Exception):
class FileNotFoundException(FatalException):
def __init__(self, msg):
super(FileNotFoundException, self).__init__(msg)


class InvalidInputException(Exception):
class InvalidInputException(FatalException):
def __init__(self, msg):
super(InvalidInputException, self).__init__(msg)


class OutOfNNException(Exception):
class OutOfNNException(TransientException):
def __init__(self, msg):
super(OutOfNNException, self).__init__(msg)


class RequestError(Exception):
class RequestError(TransientException):
"""
Note: request error could be transient and could be fatal, depending on underlying error.
"""
def __init__(self, msg):
super(RequestError, self).__init__(msg)
5 changes: 3 additions & 2 deletions test/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from snakebite.client import HAClient, AutoConfigClient, Client
from snakebite.config import HDFSConfig
from snakebite.namenode import Namenode
from snakebite.errors import OutOfNNException, RequestError
from snakebite.errors import OutOfNNException, RequestError, InvalidInputException


class ClientTest(unittest2.TestCase):
original_hdfs_try_path = HDFSConfig.hdfs_try_paths
Expand Down Expand Up @@ -61,7 +62,7 @@ def test_wrapped_methods(self):

def test_empty_namenodes_haclient(self):
namenodes = ()
self.assertRaises(OutOfNNException, HAClient, namenodes)
self.assertRaises(InvalidInputException, HAClient, namenodes)

@patch('os.environ.get')
def test_empty_namenodes_autoclient(self, environ_get):
Expand Down

0 comments on commit 1afbc4e

Please sign in to comment.