Skip to content
This repository has been archived by the owner on Apr 19, 2022. It is now read-only.

Commit

Permalink
Merge pull request #171 from bolkedebruin/ISSUE-167
Browse files Browse the repository at this point in the history
Issue 167: use dfs.namenode.principal to figure out correct service name
  • Loading branch information
Wouter de Bie committed Oct 1, 2015
2 parents 2e28a98 + 7850146 commit 6c25d7b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
6 changes: 6 additions & 0 deletions snakebite/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class HDFSConfig(object):
use_trash = False
use_sasl = False

hdfs_namenode_principal = None

@classmethod
def get_config_from_env(cls):
"""
Expand Down Expand Up @@ -87,6 +89,10 @@ def read_hdfs_config(cls, hdfs_site_path):
if property.findall('name')[0].text == 'fs.trash.interval':
cls.use_trash = True

if property.findall('name')[0].text == 'dfs.namenode.kerberos.principal':
log.debug("hdfs principal found: '%s'" % (property.findall('value')[0].text))
cls.hdfs_namenode_principal = property.findall('value')[0].text

return configs

core_try_paths = ('/etc/hadoop/conf/core-site.xml',
Expand Down
43 changes: 40 additions & 3 deletions snakebite/rpc_sasl.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@

import struct
import sasl
import re

from snakebite.protobuf.RpcHeader_pb2 import RpcRequestHeaderProto, RpcResponseHeaderProto, RpcSaslProto
from snakebite.config import HDFSConfig

import google.protobuf.internal.encoder as encoder

import logger
Expand Down Expand Up @@ -61,8 +64,8 @@ def _send_sasl_message(self, message):

s_rpcheader = rpcheader.SerializeToString()
s_message = message.SerializeToString()
header_length = len(s_rpcheader) + encoder._VarintSize(len(s_rpcheader)) + len(s_message) + encoder._VarintSize(len(s_message))

header_length = len(s_rpcheader) + encoder._VarintSize(len(s_rpcheader)) + len(s_message) + encoder._VarintSize(len(s_message))

self._trans.write(struct.pack('!I', header_length))
self._trans.write_delimited(s_rpcheader)
Expand All @@ -77,12 +80,15 @@ def _recv_sasl_message(self):
return sasl_response

def connect(self):
# use service name component from principal
service = re.split('[\/@]', str(HDFSConfig.hdfs_namenode_principal))[0]

negotiate = RpcSaslProto()
negotiate.state = 1
self._send_sasl_message(negotiate)

self.sasl = sasl.Client()
self.sasl.setAttr("service", "hdfs")
self.sasl.setAttr("service", service)
self.sasl.setAttr("host", self._trans.host)
self.sasl.init()

Expand Down Expand Up @@ -133,3 +139,34 @@ def _evaluate_token(self, sasl_response):

return response

def wrap(self, message):
ret, encoded = self.sasl.encode(message)
if not ret:
raise Exception("Cannot encode message: %s" % (self.sasl.getError()))

sasl_message = RpcSaslProto()
sasl_message.state = 5 # WRAP
sasl_message.token = encoded

self._send_sasl_message(sasl_message)

def unwrap(self):
response = self._recv_sasl_message()
if response.state != 5:
raise Exception("Server send non-wrapped response")

ret, decoded = self.sasl.decode(response.token)
if not ret:
raise Exception("Cannot decode message: %s" % (self.sasl.getError()))

return response

def use_wrap(self):
# SASL wrapping is only used if the connection has a QOP, and
# the value is not auth. ex. auth-int & auth-priv
ret, use_wrap = self.sasl.getSSF()
if not ret:
raise Exception("Cannot get negotiated security: %s" % (self.sasl.getError()))

return use_wrap

0 comments on commit 6c25d7b

Please sign in to comment.