Skip to content
This repository has been archived by the owner on Apr 19, 2022. It is now read-only.

Commit

Permalink
Merge pull request #153 from bolkedebruin/master
Browse files Browse the repository at this point in the history
Add SASL authentication
  • Loading branch information
Wouter de Bie committed Jul 28, 2015
2 parents 1ac3ed3 + dff11d6 commit 388f2ad
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 14 deletions.
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
argparse
protobuf>2.4.1
sasl
python-krbV
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ def run_tests(self):

install_requires = [
'protobuf>2.4.1',
'argparse']
'argparse',
'sasl',
'python-krbV']

tests_require = [
'tox',
Expand Down
25 changes: 22 additions & 3 deletions snakebite/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
import google.protobuf.internal.encoder as encoder
import google.protobuf.internal.decoder as decoder

# SASL
from snakebite.rpc_sasl import SaslRpcClient
from snakebite.kerberos import Kerberos

# Module imports

import logger
Expand Down Expand Up @@ -156,13 +160,14 @@ class SocketRpcChannel(RpcChannel):
RPC_HEADER = "hrpc"
RPC_SERVICE_CLASS = 0x00
AUTH_PROTOCOL_NONE = 0x00
AUTH_PROTOCOL_SASL = 0xDF
RPC_PROTOCOL_BUFFFER = 0x02


'''Socket implementation of an RpcChannel.
'''

def __init__(self, host, port, version, effective_user=None):
def __init__(self, host, port, version, effective_user=None, use_sasl=False):
'''SocketRpcChannel to connect to a socket server on a user defined port.
It possible to define version and effective user for the communication.'''
self.host = host
Expand All @@ -171,7 +176,12 @@ def __init__(self, host, port, version, effective_user=None):
self.call_id = -3 # First time (when the connection context is sent, the call_id should be -3, otherwise start with 0 and increment)
self.version = version
self.client_id = str(uuid.uuid4())
self.effective_user = effective_user or pwd.getpwuid(os.getuid())[0]
self.use_sasl = use_sasl
if self.use_sasl:
kerberos = Kerberos()
self.effective_user = effective_user or kerberos.user_principal().name
else:
self.effective_user = effective_user or pwd.getpwuid(os.getuid())[0]

def validate_request(self, request):
'''Validate the client request against the protocol file.'''
Expand Down Expand Up @@ -214,7 +224,16 @@ def get_connection(self, host, port):
self.write(self.RPC_HEADER) # header
self.write(struct.pack('B', self.version)) # version
self.write(struct.pack('B', self.RPC_SERVICE_CLASS)) # RPC service class
self.write(struct.pack('B', self.AUTH_PROTOCOL_NONE)) # serialization type (protobuf = 0)
if self.use_sasl:
self.write(struct.pack('B', self.AUTH_PROTOCOL_SASL)) # serialization type (protobuf = 0xDF)
else:
self.write(struct.pack('B', self.AUTH_PROTOCOL_NONE)) # serialization type (protobuf = 0)

if self.use_sasl:
sasl = SaslRpcClient(self)
sasl_connected = sasl.connect()
if not sasl_connected:
raise Exception("SASL is configured, but cannot get connected")

rpc_header = self.create_rpc_request_header()
context = self.create_connection_context()
Expand Down
21 changes: 14 additions & 7 deletions snakebite/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class Client(object):
3: "s"
}

def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEFAULT_VERSION, use_trash=False, effective_user=None):
def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEFAULT_VERSION, use_trash=False, effective_user=None, use_sasl=False):
'''
:param host: Hostname or IP address of the NameNode
:type host: string
Expand All @@ -96,19 +96,22 @@ def __init__(self, host, port=Namenode.DEFAULT_PORT, hadoop_version=Namenode.DEF
:type use_trash: boolean
:param effective_user: Effective user for the HDFS operations (default: None - current user)
:type effective_user: string
:param use_sasl: Use SASL authentication or not
:type use_sasl: boolean
'''
if hadoop_version < 9:
raise Exception("Only protocol versions >= 9 supported")

self.host = host
self.port = port
self.use_sasl = use_sasl
self.service_stub_class = client_proto.ClientNamenodeProtocol_Stub
self.service = RpcService(self.service_stub_class, self.port, self.host, hadoop_version, effective_user)
self.service = RpcService(self.service_stub_class, self.port, self.host, hadoop_version, effective_user, self.use_sasl)
self.use_trash = use_trash
self.trash = self._join_user_path(".Trash")
self._server_defaults = None

log.debug("Created client for %s:%s with trash=%s" % (host, port, use_trash))
log.debug("Created client for %s:%s with trash=%s and sasl=%s" % (host, port, use_trash, use_sasl))

def ls(self, paths, recurse=False, include_toplevel=False, include_children=True):
''' Issues 'ls' command and returns a list of maps that contain fileinfo
Expand Down Expand Up @@ -1362,7 +1365,7 @@ def _wrap_methods(cls):
else:
setattr(cls, name, cls._ha_return_method(meth))

def __init__(self, namenodes, use_trash=False, effective_user=None):
def __init__(self, namenodes, use_trash=False, effective_user=None, use_sasl=False):
'''
:param namenodes: Set of namenodes for HA setup
:type namenodes: list
Expand All @@ -1373,6 +1376,7 @@ def __init__(self, namenodes, use_trash=False, effective_user=None):
'''
self.use_trash = use_trash
self.effective_user = effective_user
self.use_sasl = use_sasl

if not namenodes:
raise OutOfNNException("List of namenodes is empty - couldn't create the client")
Expand All @@ -1387,7 +1391,8 @@ def _switch_namenode(self, namenodes):
namenode.port,
namenode.version,
self.use_trash,
self.effective_user)
self.effective_user,
self.use_sasl)
else:
msg = "Request tried and failed for all %d namenodes: " % len(namenodes)
for namenode in namenodes:
Expand Down Expand Up @@ -1466,16 +1471,18 @@ class AutoConfigClient(HAClient):
Different Hadoop distributions use different protocol versions. Snakebite defaults to 9, but this can be set by passing
in the ``hadoop_version`` parameter to the constructor.
'''
def __init__(self, hadoop_version=Namenode.DEFAULT_VERSION, effective_user=None):
def __init__(self, hadoop_version=Namenode.DEFAULT_VERSION, effective_user=None, use_sasl=False):
'''
:param hadoop_version: What hadoop protocol version should be used (default: 9)
:type hadoop_version: int
:param effective_user: Effective user for the HDFS operations (default: None - current user)
:type effective_user: string
:param use_sasl: Use SASL for authenication or not
:type use_sasl: boolean
'''

configs = HDFSConfig.get_external_config()
nns = [Namenode(c['namenode'], c['port'], hadoop_version) for c in configs]
if not nns:
raise OutOfNNException("Tried and failed to find namenodes - couldn't created the client!")
super(AutoConfigClient, self).__init__(nns, HDFSConfig.use_trash, effective_user)
super(AutoConfigClient, self).__init__(nns, HDFSConfig.use_trash, effective_user, HDFSConfig.use_sasl)
4 changes: 3 additions & 1 deletion snakebite/commandlineparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def __init__(self):
self._build_parent_parser()
self._add_subparsers()
self.namenodes = []
self.use_sasl = False

def _build_parent_parser(self):
#general options
Expand Down Expand Up @@ -289,6 +290,7 @@ def read_config(self):
self.namenodes.append(nn)
if self.__usetrash_unset():
self.args.usetrash = HDFSConfig.use_trash
self.use_sasl = HDFSConfig.use_sasl

if len(self.namenodes):
return
Expand Down Expand Up @@ -439,7 +441,7 @@ def setup_client(self):
use_trash = self.args.usetrash and not self.args.skiptrash
else:
use_trash = self.args.usetrash
self.client = HAClient(self.namenodes, use_trash)
self.client = HAClient(self.namenodes, use_trash, None, self.use_sasl)

def execute(self):
if self.args.help:
Expand Down
8 changes: 8 additions & 0 deletions snakebite/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

class HDFSConfig(object):
use_trash = False
use_sasl = False

@classmethod
def get_config_from_env(cls):
Expand Down Expand Up @@ -63,6 +64,13 @@ def read_core_config(cls, core_site_path):
if property.findall('name')[0].text == 'fs.trash.interval':
cls.use_trash = True

if property.findall('name')[0].text == 'hadoop.security.authentication':
log.debug("Got hadoop.security.authentication '%s'" % (property.findall('value')[0].text))
if property.findall('value')[0].text == 'kerberos':
cls.use_sasl = True
else:
cls.use_sasl = False

return config

@classmethod
Expand Down
41 changes: 41 additions & 0 deletions snakebite/kerberos.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2015 Bolke de Bruin
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
'''
kerberos.py - A very light wrapper around krbV
This package contains a class to read a kerberos principal
May 2015
Bolke de Bruin (bolke@xs4all.nl)
'''

import krbV

class Kerberos:
def __init__(self):
self.ctx = krbV.default_context()
self.ccache = self.ctx.default_ccache()

def user_principal(self):
return self.ccache.principal()

135 changes: 135 additions & 0 deletions snakebite/rpc_sasl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2015 Bolke de Bruin
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
'''
rpc_sasl.py - Implementation of SASL on top of Hadoop RPC.
This package contains a class providing a SASL authentication implementation
using Hadoop RPC as a transport. It was inspired the Hadoop Java classes.
May 2015
Bolke de Bruin (bolke@xs4all.nl)
'''

import struct
import sasl

from snakebite.protobuf.RpcHeader_pb2 import RpcRequestHeaderProto, RpcResponseHeaderProto, RpcSaslProto
import google.protobuf.internal.encoder as encoder

import logger

# Configure package logging
log = logger.getLogger(__name__)

def log_protobuf_message(header, message):
log.debug("%s:\n\n\033[92m%s\033[0m" % (header, message))

class SaslRpcClient:
def __init__(self, trans):
#self.sasl_client_factory = sasl_client_factory
self.sasl = None
#self.mechanism = mechanism
self._trans = trans

def _send_sasl_message(self, message):
rpcheader = RpcRequestHeaderProto()
rpcheader.rpcKind = 2 # RPC_PROTOCOL_BUFFER
rpcheader.rpcOp = 0
rpcheader.callId = -33 # SASL
rpcheader.retryCount = -1
rpcheader.clientId = b""

s_rpcheader = rpcheader.SerializeToString()
s_message = message.SerializeToString()

header_length = len(s_rpcheader) + encoder._VarintSize(len(s_rpcheader)) + len(s_message) + encoder._VarintSize(len(s_message))

self._trans.write(struct.pack('!I', header_length))
self._trans.write_delimited(s_rpcheader)
self._trans.write_delimited(s_message)

log_protobuf_message("Send out", message)

def _recv_sasl_message(self):
bytestream = self._trans.recv_rpc_message()
sasl_response = self._trans.parse_response(bytestream, RpcSaslProto)

return sasl_response

def connect(self):
negotiate = RpcSaslProto()
negotiate.state = 1
self._send_sasl_message(negotiate)

self.sasl = sasl.Client()
self.sasl.setAttr("service", "hdfs")
self.sasl.setAttr("host", self._trans.host)
self.sasl.init()

# do while true
while True:
res = self._recv_sasl_message()
# TODO: check mechanisms
if res.state == 1:
mechs = []
for auth in res.auths:
mechs.append(auth.mechanism)

log.debug("Available mechs: %s" % (",".join(mechs)))
s_mechs = str(",".join(mechs))
ret, chosen_mech, initial_response = self.sasl.start(s_mechs)
log.debug("Chosen mech: %s" % chosen_mech)

initiate = RpcSaslProto()
initiate.state = 2
initiate.token = initial_response

for auth in res.auths:
if auth.mechanism == chosen_mech:
auth_method = initiate.auths.add()
auth_method.mechanism = chosen_mech
auth_method.method = auth.method
auth_method.protocol = auth.protocol
auth_method.serverId = self._trans.host

self._send_sasl_message(initiate)
continue

if res.state == 3:
res_token = self._evaluate_token(res)
response = RpcSaslProto()
response.token = res_token
response.state = 4
self._send_sasl_message(response)
continue

if res.state == 0:
return True

def _evaluate_token(self, sasl_response):
ret, response = self.sasl.step(sasl_response.token)
if not ret:
raise Exception("Bad SASL results: %s" % (self.sasl.getError()))

return response

4 changes: 2 additions & 2 deletions snakebite/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

class RpcService(object):

def __init__(self, service_stub_class, port, host, hadoop_version, effective_user=None):
def __init__(self, service_stub_class, port, host, hadoop_version, effective_user=None,use_sasl=False):
self.service_stub_class = service_stub_class
self.port = port
self.host = host

# Setup the RPC channel
self.channel = SocketRpcChannel(host=self.host, port=self.port, version=hadoop_version, effective_user=effective_user)
self.channel = SocketRpcChannel(host=self.host, port=self.port, version=hadoop_version, effective_user=effective_user,use_sasl=use_sasl)
self.service = self.service_stub_class(self.channel)

# go through service_stub methods and add a wrapper function to
Expand Down

0 comments on commit 388f2ad

Please sign in to comment.