Skip to content
This repository has been archived by the owner on Apr 19, 2022. It is now read-only.

Commit

Permalink
Merge pull request #192 from garthcn/nn-principal
Browse files Browse the repository at this point in the history
Handle hdfs_namenode_principal properly and refactor auto configuration logic
  • Loading branch information
ravwojdyla committed Feb 25, 2016
2 parents d8735e5 + c41803d commit 214c307
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 91 deletions.
5 changes: 3 additions & 2 deletions snakebite/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class SocketRpcChannel(RpcChannel):
'''Socket implementation of an RpcChannel.
'''

def __init__(self, host, port, version, effective_user=None, use_sasl=False):
def __init__(self, host, port, version, effective_user=None, use_sasl=False, hdfs_namenode_principal=None):
'''SocketRpcChannel to connect to a socket server on a user defined port.
It possible to define version and effective user for the communication.'''
self.host = host
Expand All @@ -181,6 +181,7 @@ def __init__(self, host, port, version, effective_user=None, use_sasl=False):
self.version = version
self.client_id = str(uuid.uuid4())
self.use_sasl = use_sasl
self.hdfs_namenode_principal = hdfs_namenode_principal
if self.use_sasl:
if not _kerberos_available:
raise Exception("Kerberos libs not found. Please install snakebite using 'pip install snakebite[kerberos]'")
Expand Down Expand Up @@ -237,7 +238,7 @@ def get_connection(self, host, port):
self.write(struct.pack('B', self.AUTH_PROTOCOL_NONE)) # serialization type (protobuf = 0)

if self.use_sasl:
sasl = SaslRpcClient(self)
sasl = SaslRpcClient(self, hdfs_namenode_principal=self.hdfs_namenode_principal)
sasl_connected = sasl.connect()
if not sasl_connected:
raise Exception("SASL is configured, but cannot get connected")
Expand Down
21 changes: 15 additions & 6 deletions snakebite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class Client(object):
3: "s"
}

def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEFAULT_VERSION, use_trash=False, effective_user=None, use_sasl=False):
def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEFAULT_VERSION, use_trash=False, effective_user=None, use_sasl=False, hdfs_namenode_principal=None):
'''
:param host: Hostname or IP address of the NameNode
:type host: string
Expand All @@ -99,15 +99,18 @@ def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEF
:type effective_user: string
:param use_sasl: Use SASL authentication or not
:type use_sasl: boolean
:param hdfs_namenode_principal: Kerberos principal to use for HDFS
:type hdfs_namenode_principal: string
'''
if hadoop_version < 9:
raise Exception("Only protocol versions >= 9 supported")

self.host = host
self.port = port
self.use_sasl = use_sasl
self.hdfs_namenode_principal = hdfs_namenode_principal
self.service_stub_class = client_proto.ClientNamenodeProtocol_Stub
self.service = RpcService(self.service_stub_class, self.port, self.host, hadoop_version, effective_user, self.use_sasl)
self.service = RpcService(self.service_stub_class, self.port, self.host, hadoop_version, effective_user, self.use_sasl, self.hdfs_namenode_principal)
self.use_trash = use_trash
self.trash = self._join_user_path(".Trash")
self._server_defaults = None
Expand Down Expand Up @@ -1366,18 +1369,23 @@ def _wrap_methods(cls):
else:
setattr(cls, name, cls._ha_return_method(meth))

def __init__(self, namenodes, use_trash=False, effective_user=None, use_sasl=False):
def __init__(self, namenodes, use_trash=False, effective_user=None, use_sasl=False, hdfs_namenode_principal=None):
'''
:param namenodes: Set of namenodes for HA setup
:type namenodes: list
:param use_trash: Use a trash when removing files.
:type use_trash: boolean
:param effective_user: Effective user for the HDFS operations (default: None - current user)
:type effective_user: string
:param use_sasl: Use SASL authentication or not
:type use_sasl: boolean
:param hdfs_namenode_principal: Kerberos principal to use for HDFS
:type hdfs_namenode_principal: string
'''
self.use_trash = use_trash
self.effective_user = effective_user
self.use_sasl = use_sasl
self.hdfs_namenode_principal = hdfs_namenode_principal

if not namenodes:
raise OutOfNNException("List of namenodes is empty - couldn't create the client")
Expand All @@ -1393,7 +1401,8 @@ def _switch_namenode(self, namenodes):
namenode.version,
self.use_trash,
self.effective_user,
self.use_sasl)
self.use_sasl,
self.hdfs_namenode_principal)
else:
msg = "Request tried and failed for all %d namenodes: " % len(namenodes)
for namenode in namenodes:
Expand Down Expand Up @@ -1483,7 +1492,7 @@ def __init__(self, hadoop_version=Namenode.DEFAULT_VERSION, effective_user=None,
'''

configs = HDFSConfig.get_external_config()
nns = [Namenode(c['namenode'], c['port'], hadoop_version) for c in configs]
nns = [Namenode(nn['namenode'], nn['port'], hadoop_version) for nn in configs['namenodes']]
if not nns:
raise OutOfNNException("Tried and failed to find namenodes - couldn't created the client!")
super(AutoConfigClient, self).__init__(nns, HDFSConfig.use_trash, effective_user, HDFSConfig.use_sasl)
super(AutoConfigClient, self).__init__(nns, configs.get('use_trash', False), effective_user, configs.get('use_sasl', False), configs.get('hdfs_namenode_principal', None))
25 changes: 12 additions & 13 deletions snakebite/commandlineparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def __use_cl_port_first(self, alt):
return self.args.port if self.args.port else alt

def read_config(self):
self.configs = HDFSConfig.get_external_config()

# Try to retrieve namenode config from within CL arguments
if self._read_config_cl():
Expand All @@ -280,17 +281,15 @@ def read_config(self):
elif os.path.exists('/etc/snakebiterc'):
self._read_config_snakebiterc('/etc/snakebiterc')
else:
# Try to read the configuration for HDFS configuration files
configs = HDFSConfig.get_external_config()
# if configs exist and contain something
if configs:
for config in configs:
# if configs from HDFS config files exist and contain something
if self.configs:
for config in self.configs['namenodes']:
nn = Namenode(config['namenode'],
self.__use_cl_port_first(config['port']))
self.namenodes.append(nn)
if self.__usetrash_unset():
self.args.usetrash = HDFSConfig.use_trash
self.use_sasl = HDFSConfig.use_sasl
self.args.usetrash = self.configs['use_trash']
self.use_sasl = self.configs['use_sasl']

if len(self.namenodes):
return
Expand All @@ -315,7 +314,7 @@ def read_config(self):
sys.exit(1)

def _read_config_snakebiterc(self, path = os.path.join(os.path.expanduser('~'), '.snakebiterc')):
old_version_info = "You're are using snakebite %s with Trash support together with old snakebiterc, please update/remove your %s file. By default Trash is %s." % (path, version(), 'disabled' if not HDFSConfig.use_trash else 'enabled')
old_version_info = "You're are using snakebite %s with Trash support together with old snakebiterc, please update/remove your %s file. By default Trash is %s." % (path, version(), 'disabled' if not self.configs['use_trash'] else 'enabled')
with open(path) as config_file:
configs = json.load(config_file)

Expand All @@ -331,7 +330,7 @@ def _read_config_snakebiterc(self, path = os.path.join(os.path.expanduser('~'),
# commandline setting has higher priority
print_info(old_version_info)
# There's no info about Trash in version 1, use default policy:
self.args.usetrash = HDFSConfig.use_trash
self.args.usetrash = self.configs['use_trash']
elif isinstance(configs, dict):
# Version 2: {}
# Can be either new configuration or just one namenode
Expand All @@ -346,7 +345,7 @@ def _read_config_snakebiterc(self, path = os.path.join(os.path.expanduser('~'),

if self.__usetrash_unset():
# commandline setting has higher priority
self.args.usetrash = configs.get("use_trash", HDFSConfig.use_trash)
self.args.usetrash = configs.get("use_trash", self.configs['use_trash'])
else:
# config is a single namenode - no HA
self.namenodes.append(Namenode(configs['namenode'],
Expand All @@ -355,7 +354,7 @@ def _read_config_snakebiterc(self, path = os.path.join(os.path.expanduser('~'),
if self.__usetrash_unset():
# commandline setting has higher priority
print_info(old_version_info)
self.args.usetrash = HDFSConfig.use_trash
self.args.usetrash = self.configs['use_trash']
else:
print_error_exit("Config retrieved from %s is corrupted! Remove it!" % path)

Expand Down Expand Up @@ -404,7 +403,7 @@ def _read_config_cl(self):
self.namenodes.append(Namenode(self.args.namenode, self.args.port))
# we got the info from CL -> check if use_trash is set - if not use default policy:
if self.__usetrash_unset():
self.args.usetrash = HDFSConfig.use_trash
self.args.usetrash = self.configs['use_trash']
return True
else:
return False
Expand Down Expand Up @@ -441,7 +440,7 @@ def setup_client(self):
use_trash = self.args.usetrash and not self.args.skiptrash
else:
use_trash = self.args.usetrash
self.client = HAClient(self.namenodes, use_trash, None, self.use_sasl)
self.client = HAClient(self.namenodes, use_trash, None, self.use_sasl, self.configs['hdfs_namenode_principal'])

def execute(self):
if self.args.help:
Expand Down
75 changes: 45 additions & 30 deletions snakebite/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@


class HDFSConfig(object):
use_trash = False
use_sasl = False

hdfs_namenode_principal = None

@classmethod
def get_config_from_env(cls):
"""
Expand All @@ -23,18 +18,21 @@ def get_config_from_env(cls):
Returns list of dicts - list of namenode representations
"""
core_path = os.path.join(os.environ['HADOOP_HOME'], 'conf', 'core-site.xml')
configs = cls.read_core_config(core_path)
core_configs = cls.read_core_config(core_path)

hdfs_path = os.path.join(os.environ['HADOOP_HOME'], 'conf', 'hdfs-site.xml')
tmp_config = cls.read_hdfs_config(hdfs_path)

if tmp_config:
# if config exists in hdfs - it's HA config, update configs
configs = tmp_config
hdfs_configs = cls.read_hdfs_config(hdfs_path)

if not configs:
if (not core_configs) and (not hdfs_configs):
raise Exception("No config found in %s nor in %s" % (core_path, hdfs_path))

configs = {
'use_trash': hdfs_configs.get('use_trash', core_configs.get('use_trash', False)),
'use_sasl': core_configs.get('use_sasl', False),
'hdfs_namenode_principal': hdfs_configs.get('hdfs_namenode_principal', None),
'namenodes': hdfs_configs.get('namenodes', []) or core_configs.get('namenodes', [])
}

return configs

@staticmethod
Expand All @@ -51,47 +49,57 @@ def read_hadoop_config(hdfs_conf_path):

@classmethod
def read_core_config(cls, core_site_path):
config = []
configs = {}

namenodes = []
for property in cls.read_hadoop_config(core_site_path):

# fs.default.name is the key name for the file system on EMR clusters
if property.findall('name')[0].text in ('fs.defaultFS', 'fs.default.name'):
parse_result = urlparse(property.findall('value')[0].text)
log.debug("Got namenode '%s' from %s" % (parse_result.geturl(), core_site_path))

config.append({"namenode": parse_result.hostname,
namenodes.append({"namenode": parse_result.hostname,
"port": parse_result.port if parse_result.port
else Namenode.DEFAULT_PORT})

if property.findall('name')[0].text == 'fs.trash.interval':
cls.use_trash = True
configs['use_trash'] = True

if property.findall('name')[0].text == 'hadoop.security.authentication':
log.debug("Got hadoop.security.authentication '%s'" % (property.findall('value')[0].text))
if property.findall('value')[0].text == 'kerberos':
cls.use_sasl = True
configs['use_sasl'] = True
else:
cls.use_sasl = False

return config
configs['use_sasl'] = False

if namenodes:
configs['namenodes'] = namenodes

return configs

@classmethod
def read_hdfs_config(cls, hdfs_site_path):
configs = []
configs = {}

namenodes = []
for property in cls.read_hadoop_config(hdfs_site_path):
if property.findall('name')[0].text.startswith("dfs.namenode.rpc-address"):
parse_result = urlparse("//" + property.findall('value')[0].text)
log.debug("Got namenode '%s' from %s" % (parse_result.geturl(), hdfs_site_path))
configs.append({"namenode": parse_result.hostname,
namenodes.append({"namenode": parse_result.hostname,
"port": parse_result.port if parse_result.port
else Namenode.DEFAULT_PORT})

if property.findall('name')[0].text == 'fs.trash.interval':
cls.use_trash = True
configs['use_trash'] = True

if property.findall('name')[0].text == 'dfs.namenode.kerberos.principal':
log.debug("hdfs principal found: '%s'" % (property.findall('value')[0].text))
cls.hdfs_namenode_principal = property.findall('value')[0].text
configs['hdfs_namenode_principal'] = property.findall('value')[0].text

if namenodes:
configs['namenodes'] = namenodes

return configs

Expand All @@ -117,16 +125,23 @@ def get_external_config(cls):
cls.core_try_paths = (core_path,) + cls.core_try_paths

# Try to find other paths
configs = []
core_configs = {}
for core_conf_path in cls.core_try_paths:
configs = cls.read_core_config(core_conf_path)
if configs:
core_configs = cls.read_core_config(core_conf_path)
if core_configs:
break

hdfs_configs = {}
for hdfs_conf_path in cls.hdfs_try_paths:
tmp_config = cls.read_hdfs_config(hdfs_conf_path)
if tmp_config:
# if there is hdfs-site data available return it
return tmp_config
hdfs_configs = cls.read_hdfs_config(hdfs_conf_path)
if hdfs_configs:
break

configs = {
'use_trash': hdfs_configs.get('use_trash', core_configs.get('use_trash', False)),
'use_sasl': core_configs.get('use_sasl', False),
'hdfs_namenode_principal': hdfs_configs.get('hdfs_namenode_principal', None),
'namenodes': hdfs_configs.get('namenodes', []) or core_configs.get('namenodes', [])
}

return configs
5 changes: 3 additions & 2 deletions snakebite/rpc_sasl.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ def log_protobuf_message(header, message):
log.debug("%s:\n\n\033[92m%s\033[0m" % (header, message))

class SaslRpcClient:
def __init__(self, trans):
def __init__(self, trans, hdfs_namenode_principal=None):
#self.sasl_client_factory = sasl_client_factory
self.sasl = None
#self.mechanism = mechanism
self._trans = trans
self.hdfs_namenode_principal = hdfs_namenode_principal

def _send_sasl_message(self, message):
rpcheader = RpcRequestHeaderProto()
Expand Down Expand Up @@ -81,7 +82,7 @@ def _recv_sasl_message(self):

def connect(self):
# use service name component from principal
service = re.split('[\/@]', str(HDFSConfig.hdfs_namenode_principal))[0]
service = re.split('[\/@]', str(self.hdfs_namenode_principal))[0]

negotiate = RpcSaslProto()
negotiate.state = 1
Expand Down
4 changes: 2 additions & 2 deletions snakebite/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

class RpcService(object):

def __init__(self, service_stub_class, port, host, hadoop_version, effective_user=None,use_sasl=False):
def __init__(self, service_stub_class, port, host, hadoop_version, effective_user=None,use_sasl=False, hdfs_namenode_principal=None):
self.service_stub_class = service_stub_class
self.port = port
self.host = host

# Setup the RPC channel
self.channel = SocketRpcChannel(host=self.host, port=self.port, version=hadoop_version, effective_user=effective_user,use_sasl=use_sasl)
self.channel = SocketRpcChannel(host=self.host, port=self.port, version=hadoop_version, effective_user=effective_user, use_sasl=use_sasl, hdfs_namenode_principal=hdfs_namenode_principal)
self.service = self.service_stub_class(self.channel)

# go through service_stub methods and add a wrapper function to
Expand Down
4 changes: 2 additions & 2 deletions test/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from snakebite.errors import OutOfNNException, RequestError

class ClientTest(unittest2.TestCase):
original_hdfs_try_path = set(HDFSConfig.hdfs_try_paths)
original_core_try_path = set(HDFSConfig.core_try_paths)
original_hdfs_try_path = HDFSConfig.hdfs_try_paths
original_core_try_path = HDFSConfig.core_try_paths

def setUp(self):
# Make sure HDFSConfig is in vanilla state
Expand Down

0 comments on commit 214c307

Please sign in to comment.