From 03863fd4c68b67a804965b4bceab27e56dbc267a Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Mon, 2 Mar 2015 22:06:21 -0500 Subject: [PATCH 1/2] Implement RFC 6680 (Low-Level) This commit introduces optional support for RFC 6680 to the low-level API. The `get_name_attribute` and `set_name_attribute` methods adhere to the pseudo-API (which uses a single call to get/set multiple values on a single attribute) instead of the C bindings (which require the use of a state variable to get multiple values, and require multiple calls to add multiple values to a single attribute). Note that gss_display_name_ext is not tested, since it is not implemented by MIT krb5. Additionally, in order to test the get/set/delete attribute methods, you will have to install the demo greet plugin that comes with krb5. Otherwise, the tests will be skipped. Part of #4 Also-Authored-By: Simo Sorce --- gssapi/raw/__init__.py | 7 + gssapi/raw/ext_buffer_sets.pxd | 12 ++ gssapi/raw/ext_rfc6680.pyx | 305 ++++++++++++++++++++++++++++ gssapi/raw/ext_rfc6680_comp_oid.pyx | 18 ++ gssapi/raw/named_tuples.py | 9 + gssapi/tests/_utils.py | 25 +++ gssapi/tests/test_raw.py | 123 +++++++++++ setup.py | 6 +- 8 files changed, 504 insertions(+), 1 deletion(-) create mode 100644 gssapi/raw/ext_buffer_sets.pxd create mode 100644 gssapi/raw/ext_rfc6680.pyx create mode 100644 gssapi/raw/ext_rfc6680_comp_oid.pyx diff --git a/gssapi/raw/__init__.py b/gssapi/raw/__init__.py index 7ec7bc79..ee27a5f4 100644 --- a/gssapi/raw/__init__.py +++ b/gssapi/raw/__init__.py @@ -64,3 +64,10 @@ from gssapi.raw.ext_iov_mic import * # noqa except ImportError: pass + +# optional RFC 6680 support +try: + from gssapi.raw.ext_rfc6680 import * # noqa + from gssapi.raw.ext_rfc6680_comp_oid import * # noqa +except ImportError: + pass diff --git a/gssapi/raw/ext_buffer_sets.pxd b/gssapi/raw/ext_buffer_sets.pxd new file mode 100644 index 00000000..56dda543 --- /dev/null +++ b/gssapi/raw/ext_buffer_sets.pxd @@ -0,0 +1,12 @@ +from gssapi.raw.cython_types cimport * + +cdef extern from "python_gssapi.h": + ctypedef struct gss_buffer_set_desc: + size_t count + gss_buffer_desc *elements + ctypedef gss_buffer_set_desc* gss_buffer_set_t + + gss_buffer_set_t GSS_C_NO_BUFFER_SET + + OM_uint32 gss_release_buffer_set(OM_uint32 *min_stat, + gss_buffer_set_t *buffer_set) nogil diff --git a/gssapi/raw/ext_rfc6680.pyx b/gssapi/raw/ext_rfc6680.pyx new file mode 100644 index 00000000..8bf847bf --- /dev/null +++ b/gssapi/raw/ext_rfc6680.pyx @@ -0,0 +1,305 @@ +GSSAPI="BASE" # This ensures that a full module is generated by Cython + +from gssapi.raw.cython_types cimport * +from gssapi.raw.ext_buffer_sets cimport * +from gssapi.raw.names cimport Name +from gssapi.raw.oids cimport OID + +from gssapi.raw.misc import GSSError +from gssapi.raw.named_tuples import InquireNameResult, GetNameAttributeResult + +cdef extern from "gssapi/gssapi_ext.h": + OM_uint32 gss_display_name_ext(OM_uint32 *min_stat, gss_name_t name, + gss_OID name_type, + gss_buffer_t output_name) nogil + + OM_uint32 gss_inquire_name(OM_uint32 *min_stat, gss_name_t name, + int *name_is_mn, gss_OID *mech_type, + gss_buffer_set_t *attrs) nogil + + OM_uint32 gss_get_name_attribute(OM_uint32 *min_stat, gss_name_t name, + gss_buffer_t attr, int *authenticated, + int *complete, gss_buffer_t value, + gss_buffer_t display_value, + int *more) nogil + + OM_uint32 gss_set_name_attribute(OM_uint32 *min_stat, gss_name_t name, + int complete, gss_buffer_t attr, + gss_buffer_t value) nogil + + OM_uint32 gss_delete_name_attribute(OM_uint32 *min_stat, gss_name_t name, + gss_buffer_t attr) nogil + + OM_uint32 gss_export_name_composite(OM_uint32 *min_stat, gss_name_t name, + gss_buffer_t exported_name) nogil + + # GSS_C_NT_COMPOSITE_EXPORT lives in ext_rfc6680_comp_oid.pyx + + +def display_name_ext(Name name not None, OID name_type not None): + """ + Display the given Name using the given name type OID + + This method attempts to display the given Name using the syntax of + the given name type. If this is not possible, an appropriate error + will be raised. + + Args: + name (Name): the name to display + name_type (OID): the name type (see NameType) to use to + display the given name + + Returns: + bytes: the displayed name + + Raises: + OperationUnavailableError: the given name could not be displayed + using the given name type + """ + + # GSS_C_EMPTY_BUFFER + cdef gss_buffer_desc output_name = gss_buffer_desc(0, NULL) + + cdef OM_uint32 maj_stat, min_stat + + maj_stat = gss_display_name_ext(&min_stat, name.raw_name, + &name_type.raw_oid, &output_name) + + if maj_stat == GSS_S_COMPLETE: + name_text = output_name.value[:output_name.length] + gss_release_buffer(&min_stat, &output_name) + return name_text + else: + raise GSSError(maj_stat, min_stat) + + +def inquire_name(Name name not None, mech_name=True, attrs=True): + """ + Get information about a Name + + This method retrives information about the given name, including + the set of attribute names for the given name, as well as whether or + not the name is a Mechanism Name. Additionally, if the given name is + a Mechanism Name, the associated mechansim is returned as well. + + Args: + name (Name): the name about which to inquire + mech_name (bool): whether or not to retrieve if this name + is a mech_name (and the associate mechanism) + attrs (bool): whether or not to retrieve the attribute name list + + Returns: + InquireNameResult: the set of attribute names for the given name, + whether or not the name is a Mechanism Name, and potentially + the associated mechanism if it is a Mechanism Name + + Raises: + GSSError + """ + + cdef int *name_is_mn_ptr = NULL + cdef gss_OID *mn_mech_ptr = NULL + cdef gss_buffer_set_t *attr_names_ptr = NULL + + cdef gss_buffer_set_t attr_names = GSS_C_NO_BUFFER_SET + if attrs: + attr_names_ptr = &attr_names + + cdef int name_is_mn = 0 + cdef gss_OID mn_mech + if mech_name: + name_is_mn_ptr = &name_is_mn + mn_mech_ptr = &mn_mech + + cdef OM_uint32 maj_stat, min_stat + + maj_stat = gss_inquire_name(&min_stat, name.raw_name, name_is_mn_ptr, + mn_mech_ptr, attr_names_ptr) + + cdef int i + cdef OID py_mech = None + if maj_stat == GSS_S_COMPLETE: + py_attr_names = [] + + if attr_names != GSS_C_NO_BUFFER_SET: + for i in range(attr_names.count): + attr_name = attr_names.elements[i] + py_attr_names.append(attr_name.value[:attr_name.length]) + + gss_release_buffer_set(&min_stat, &attr_names) + + if name_is_mn: + py_mech = OID() + py_mech.raw_oid = mn_mech[0] + + return InquireNameResult(py_attr_names, name_is_mn, py_mech) + else: + raise GSSError(maj_stat, min_stat) + + +def set_name_attribute(Name name not None, attr not None, value not None, + bint complete=False): + """ + Set the value(s) of a Name attribute + + This method sets the value(s) of the given attribute on the given name. + + Note that this functionality more closely matches the pseudo-API + presented in RFC 6680, not the C API (which uses multiple calls to + add multiple values). However, multiple calls to this method will + continue adding values, so :func:`delete_name_attribute` must be + used in between calls to "clear" the values. + + Args: + name (Name): the Name on which to set the attribute + attr (bytes): the name of the attribute + value (list): a list of bytes objects to use as the value(s) + complete (bool): whether or not to mark this attribute's value + set as being "complete" + + Raises: + OperationUnavailableError: the given attribute name is unknown + or could not be set + """ + + cdef gss_buffer_desc attr_buff = gss_buffer_desc(len(attr), attr) + cdef gss_buffer_desc val_buff + + cdef OM_uint32 maj_stat, min_stat + + cdef size_t value_len = len(value) + cdef size_t i + for val in value: + val_buff = gss_buffer_desc(len(val), val) + i += 1 + if i == value_len: + maj_stat = gss_set_name_attribute(&min_stat, name.raw_name, + complete, &attr_buff, &val_buff) + else: + maj_stat = gss_set_name_attribute(&min_stat, name.raw_name, 0, + &attr_buff, &val_buff) + + if maj_stat != GSS_S_COMPLETE: + raise GSSError(maj_stat, min_stat) + + +def get_name_attribute(Name name not None, attr not None, more=None): + """ + Get the value(s) of a Name attribute + + This method retrieves the value(s) of the given attribute + for the given Name. + + Note that this functionality matches pseudo-API presented + in RFC 6680, not the C API (which uses a state variable and + multiple calls to retrieve multiple values). + + Args: + name (Name): the Name from which to get the attribute + attr (bytes): the name of the attribute + + Returns: + GetNameAttributeResult: the raw version of the value(s), + the human-readable version of the value(s), whether + or not the attribute was authenticated, and whether or + not the attribute's value set was marked as complete + + Raises: + OperationUnavailableError: the given attribute is unknown + or unset + """ + cdef gss_buffer_desc attr_buff = gss_buffer_desc(len(attr), attr) + + cdef gss_buffer_desc val_buff = gss_buffer_desc(0, NULL) + cdef gss_buffer_desc displ_val_buff = gss_buffer_desc(0, NULL) + cdef int complete + cdef int authenticated + + cdef int more_val = -1 + py_vals = [] + py_displ_vals = [] + + cdef OM_uint32 maj_stat, min_stat + + while more_val != 0: + maj_stat = gss_get_name_attribute(&min_stat, name.raw_name, + &attr_buff, + &authenticated, &complete, + &val_buff, &displ_val_buff, + &more_val) + + if maj_stat == GSS_S_COMPLETE: + py_vals.append(val_buff.value[:val_buff.length]) + py_displ_vals.append( + displ_val_buff.value[:displ_val_buff.length]) + + gss_release_buffer(&min_stat, &val_buff) + gss_release_buffer(&min_stat, &displ_val_buff) + else: + raise GSSError(maj_stat, min_stat) + + return GetNameAttributeResult(py_vals, py_displ_vals, authenticated, + complete) + + +def delete_name_attribute(Name name not None, attr not None): + """ + Remove an attribute from a Name + + This method removes an attribute from a Name. This method may be + used before :func:`set_name_attribute` clear the values of an attribute + before setting a new value (making the latter method work like a 'set' + operation instead of an 'add' operation). + + Note that the removal of certain attributes may not be allowed. + + Args: + name (Name): the name to remove the attribute from + attr (bytes): the name of the attribute + + Raises: + OperationUnavailableError + UnauthorizedError + """ + + cdef gss_buffer_desc attr_buff = gss_buffer_desc(len(attr), attr) + + cdef OM_uint32 maj_stat, min_stat + + maj_stat = gss_delete_name_attribute(&min_stat, name.raw_name, + &attr_buff) + + if maj_stat != GSS_S_COMPLETE: + raise GSSError(maj_stat, min_stat) + + +def export_name_composite(Name name not None): + """ + Export a name, preserving attribute information + + This method functions similarly to :func:`export_name`, except that + it preserves attribute information. The resulting bytes may be imported + using :func:`import_name` with the `NameType.composite_export` name type. + + Args: + name (Name): the name to export + + Returns: + bytes: the exported composite name + + Raises: + GSSError + """ + + cdef gss_buffer_desc res = gss_buffer_desc(0, NULL) + + cdef OM_uint32 maj_stat, min_stat + + maj_stat = gss_export_name_composite(&min_stat, name.raw_name, &res) + + if maj_stat == GSS_S_COMPLETE: + py_res = res.value[:res.length] + gss_release_buffer(&min_stat, &res) + return py_res + else: + raise GSSError(maj_stat, min_stat) diff --git a/gssapi/raw/ext_rfc6680_comp_oid.pyx b/gssapi/raw/ext_rfc6680_comp_oid.pyx new file mode 100644 index 00000000..f5b617b2 --- /dev/null +++ b/gssapi/raw/ext_rfc6680_comp_oid.pyx @@ -0,0 +1,18 @@ +GSSAPI="BASE" # This ensures that a full module is generated by Cython + +from gssapi.raw.cython_types cimport gss_OID +from gssapi.raw.cython_converters cimport c_make_oid + +from gssapi.raw import types as gsstypes + + +# NB(directxman12): this is placed in separate file since the +# GSS_C_NT_COMPOSITE_EXPORT constant didn't appear in MIT +# krb5 until 1.11. However, due to the way that support was +# written for composite tokens, simply using GSS_C_NT_EXPORT_NAME +# will work in prior version which contain support for RFC 6680 +cdef extern from "python_gssapi_ext.h": + gss_OID GSS_C_NT_COMPOSITE_EXPORT + + +gsstypes.NameType.composite_export = c_make_oid(GSS_C_NT_COMPOSITE_EXPORT) diff --git a/gssapi/raw/named_tuples.py b/gssapi/raw/named_tuples.py index ecd54fe4..519b596d 100644 --- a/gssapi/raw/named_tuples.py +++ b/gssapi/raw/named_tuples.py @@ -55,3 +55,12 @@ IOVUnwrapResult = namedtuple('IOVUnwrapResult', ['encrypted', 'qop']) + + +InquireNameResult = namedtuple('InquireNameResult', + ['attrs', 'is_mech_name', 'mech']) + + +GetNameAttributeResult = namedtuple('GetNamedAttributeResult', + ['values', 'display_values', + 'authenticated', 'complete']) diff --git a/gssapi/tests/_utils.py b/gssapi/tests/_utils.py index 4a9d7087..602e1099 100644 --- a/gssapi/tests/_utils.py +++ b/gssapi/tests/_utils.py @@ -120,3 +120,28 @@ def _find_plugin_dirs_src(search_path): return options_raw.split('\n') else: return None + + +_KRB_PREFIX = None + + +def _requires_krb_plugin(plugin_type, plugin_name): + global _KRB_PREFIX + if _KRB_PREFIX is None: + _KRB_PREFIX = get_output("krb5-config --prefix") + + plugin_path = os.path.join(_KRB_PREFIX, 'lib/krb5/plugins', + plugin_type, '%s.so' % plugin_name) + + def make_krb_plugin_test(func): + def krb_plugin_test(self, *args, **kwargs): + if not os.path.exists(plugin_path): + self.skipTest("You do not have the GSSAPI {type}" + "plugin {name} installed".format( + type=plugin_type, name=plugin_name)) + else: + func(self, *args, **kwargs) + + return krb_plugin_test + + return make_krb_plugin_test diff --git a/gssapi/tests/test_raw.py b/gssapi/tests/test_raw.py index eb7d71ae..37f6372e 100644 --- a/gssapi/tests/test_raw.py +++ b/gssapi/tests/test_raw.py @@ -9,6 +9,7 @@ import gssapi.raw as gb import gssapi.raw.misc as gbmisc from gssapi.tests._utils import _extension_test, _minversion_test +from gssapi.tests._utils import _requires_krb_plugin from gssapi.tests import k5test as kt @@ -112,6 +113,128 @@ def test_display_name(self): out_type.shouldnt_be_none() out_type.should_be(gb.NameType.hostbased_service) + # NB(directxman12): we don't test display_name_ext because the krb5 mech + # doesn't actually implement it + + @_extension_test('rfc6680', 'RFC 6680') + def test_inquire_name_not_mech_name(self): + base_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + inquire_res = gb.inquire_name(base_name) + + inquire_res.shouldnt_be_none() + + inquire_res.is_mech_name.should_be_false() + inquire_res.mech.should_be_none() + + @_extension_test('rfc6680', 'RFC 6680') + def test_inquire_name_mech_name(self): + base_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + mech_name = gb.canonicalize_name(base_name, gb.MechType.kerberos) + + inquire_res = gb.inquire_name(mech_name) + inquire_res.shouldnt_be_none() + + inquire_res.is_mech_name.should_be_true() + inquire_res.mech.should_be_a(gb.OID) + inquire_res.mech.should_be(gb.MechType.kerberos) + + @_extension_test('rfc6680', 'RFC 6680') + @_extension_test('rfc6680_comp_oid', 'RFC 6680 (COMPOSITE_EXPORT OID)') + def test_import_export_name_composite_no_attrs(self): + base_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + + canon_name = gb.canonicalize_name(base_name, + gb.MechType.kerberos) + exported_name = gb.export_name_composite(canon_name) + + exported_name.should_be_a(bytes) + + imported_name = gb.import_name(exported_name, + gb.NameType.composite_export) + + imported_name.should_be_a(gb.Name) + + # NB(directxman12): the greet_client plugin only allows for one value + + @_extension_test('rfc6680', 'RFC 6680') + @_requires_krb_plugin('authdata', 'greet_client') + def test_inquire_name_with_attrs(self): + base_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + canon_name = gb.canonicalize_name(base_name, gb.MechType.kerberos) + gb.set_name_attribute(canon_name, b'urn:greet:greeting', + [b'some greeting']) + + inquire_res = gb.inquire_name(canon_name) + inquire_res.shouldnt_be_none() + + inquire_res.attrs.should_be_a(list) + inquire_res.attrs.should_be([b'urn:greet:greeting']) + + @_extension_test('rfc6680', 'RFC 6680') + @_requires_krb_plugin('authdata', 'greet_client') + def test_basic_get_set_delete_name_attributes_no_auth(self): + base_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + canon_name = gb.canonicalize_name(base_name, gb.MechType.kerberos) + + gb.set_name_attribute(canon_name, b'urn:greet:greeting', + [b'some other val'], complete=True) + + get_res = gb.get_name_attribute(canon_name, b'urn:greet:greeting') + get_res.shouldnt_be_none() + + get_res.values.should_be_a(list) + get_res.values.should_be([b'some other val']) + + get_res.display_values.should_be_a(list) + get_res.display_values.should_be(get_res.values) + + get_res.complete.should_be_true() + get_res.authenticated.should_be_false() + + gb.delete_name_attribute(canon_name, b'urn:greet:greeting') + + # NB(directxman12): the code below currently segfaults due to the way + # that krb5 and the krb5 greet plugin is written + # gb.get_name_attribute.should_raise( + # gb.exceptions.OperationUnavailableError, canon_name, + # 'urn:greet:greeting') + + @_extension_test('rfc6680', 'RFC 6680') + @_requires_krb_plugin('authdata', 'greet_client') + def test_import_export_name_composite(self): + base_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + canon_name = gb.canonicalize_name(base_name, gb.MechType.kerberos) + gb.set_name_attribute(canon_name, b'urn:greet:greeting', [b'some val']) + + exported_name = gb.export_name_composite(canon_name) + + exported_name.should_be_a(bytes) + + # TODO(directxman12): when you just import a token as composite, + # appears as this name whose text is all garbled, since it contains + # all of the attributes, etc, but doesn't properly have the attributes. + # Once it's canonicalized, the attributes reappear. However, if you + # just import it as normal export, the attributes appear directly. + # It is thus unclear as to what is going on + + # imported_name_raw = gb.import_name(exported_name, + # gb.NameType.composite_export) + # imported_name = gb.canonicalize_name(imported_name_r, + # gb.MechType.kerberos) + + imported_name = gb.import_name(exported_name, gb.NameType.export) + + imported_name.should_be_a(gb.Name) + + get_res = gb.get_name_attribute(imported_name, b'urn:greet:greeting') + get_res.values.should_be([b'some val']) + def test_compare_name(self): service_name1 = gb.import_name(TARGET_SERVICE_NAME) service_name2 = gb.import_name(TARGET_SERVICE_NAME) diff --git a/setup.py b/setup.py index 0e1ee1f7..9b7d0454 100755 --- a/setup.py +++ b/setup.py @@ -62,7 +62,7 @@ def _get_output(*args, **kwargs): compile_args = get_output('krb5-config --cflags gssapi') link_args = link_args.split() -compile_args = compile_args.split() + ['-g'] +compile_args = compile_args.split() ENABLE_SUPPORT_DETECTION = \ (os.environ.get('GSSAPI_SUPPORT_DETECT', 'true').lower() == 'true') @@ -211,6 +211,10 @@ def gssapi_modules(lst): extension_file('dce', 'gss_wrap_iov'), extension_file('iov_mic', 'gss_get_mic_iov'), + # see ext_rfc6680_comp_oid for more information on this split + extension_file('rfc6680', 'gss_display_name_ext'), + extension_file('rfc6680_comp_oid', 'GSS_C_NT_COMPOSITE_EXPORT'), + # see ext_password{,_add}.pyx for more information on this split extension_file('password', 'gss_acquire_cred_with_password'), extension_file('password_add', 'gss_add_cred_with_password'), From 16f0102ce2798634373c69a51158c0bdeee24574 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Wed, 4 Mar 2015 22:38:23 -0500 Subject: [PATCH 2/2] Implement RFC 6680 (High-Level) This commit introduces optional support for RFC 6680 to the high-level API. For attribute access, a new property named `attributes` was introduced to the Name class. This presents a `MutableMapping` interface to the Name's attributes. When iterables are assigned to attributes (not including strings and bytes), they are considered to be multiple values to be assigned to the attribute. Additionally, attribute names (but not values) are automatically encoded if they are in text (and not bytes) form For inquiry, appropriate properties were added to the `Name` class (`is_mech_name` and `mech`). `display_as` may be used to call `display_name_ext`, and a `composite` argument was introduced to both the `export` method and the constructor. Closes #4 --- gssapi/names.py | 206 +++++++++++++++++++++++++++++--- gssapi/tests/test_high_level.py | 100 ++++++++++++++++ 2 files changed, 291 insertions(+), 15 deletions(-) diff --git a/gssapi/names.py b/gssapi/names.py index 0f244739..b5cad0b9 100644 --- a/gssapi/names.py +++ b/gssapi/names.py @@ -1,9 +1,15 @@ +import collections + import six from gssapi.raw import names as rname from gssapi.raw import NameType +from gssapi.raw import named_tuples as tuples from gssapi import _utils +rname_rfc6680 = _utils.import_gssapi_extension('rfc6680') +rname_rfc6680_comp_oid = _utils.import_gssapi_extension('rfc6680_comp_oid') + class Name(rname.Name): """GSSAPI Name @@ -20,9 +26,45 @@ class Name(rname.Name): text of the name. """ - __slots__ = () + __slots__ = ('_attr_obj') + + def __new__(cls, base=None, name_type=None, token=None, + composite=False): + if token is not None: + if composite: + if rname_rfc6680 is None: + raise NotImplementedError( + "Your GSSAPI implementation does not support RFC 6680 " + "(the GSSAPI naming extensions)") + + if rname_rfc6680_comp_oid is not None: + base_name = rname.import_name(token, + NameType.composite_export) + displ_name = rname.display_name(base_name, name_type=True) + if displ_name.name_type == NameType.composite_export: + # NB(directxman12): there's a bug in MIT krb5 <= 1.13 + # where GSS_C_NT_COMPOSITE_EXPORT doesn't trigger + # immediate import logic. However, we can just use + # the normal GSS_C_NT_EXPORT_NAME in this case. + base_name = rname.import_name(token, NameType.export) + else: + # NB(directxman12): some older versions of MIT krb5 don't + # have support for the GSS_C_NT_COMPOSITE_EXPORT, but do + # support composite tokens via GSS_C_NT_EXPORT_NAME. + base_name = rname.import_name(token, NameType.export) + else: + base_name = rname.import_name(token, NameType.export) + elif isinstance(base, rname.Name): + base_name = base + else: + if isinstance(base, six.text_type): + base = base.encode(_utils._get_encoding()) + + base_name = rname.import_name(base, name_type) - def __new__(cls, base=None, name_type=None, token=None): + return super(Name, cls).__new__(cls, base_name) + + def __init__(self, base=None, name_type=None, token=None, composite=False): """Create or import a GSSAPI name The constructor either creates or imports a GSSAPI name. @@ -32,7 +74,8 @@ def __new__(cls, base=None, name_type=None, token=None): high-level object. If the `token` argument is used, the name will be imported using - the token. + the token. If the token was exported as a composite token, + pass `composite=True`. Otherwise, a new name will be created, using the `base` argument as the string and the `name_type` argument to denote the name type. @@ -43,17 +86,10 @@ def __new__(cls, base=None, name_type=None, token=None): BadMechanismError """ - if token is not None: - base_name = rname.import_name(token, NameType.export) - elif isinstance(base, rname.Name): - base_name = base + if rname_rfc6680 is not None: + self._attr_obj = _NameAttributeMapping(self) else: - if isinstance(base, six.text_type): - base = base.encode(_utils._get_encoding()) - - base_name = rname.import_name(base, name_type) - - return super(Name, cls).__new__(cls, base_name) + self._attr_obj = None def __str__(self): if issubclass(str, six.text_type): @@ -71,6 +107,30 @@ def __bytes__(self): # Python 3 -- someone asked for bytes return rname.display_name(self, name_type=False).name + def display_as(self, name_type): + """ + Display the current name as the given name type. + + This method attempts to display the current Name using + the syntax of the given NameType, if possible. + + Args: + name_type (OID): the NameType to use to display the given name + + Returns: + str: the displayed name + + Raises: + OperationUnavailableError + """ + + if rname_rfc6680 is None: + raise NotImplementedError("Your GSSAPI implementation does not " + "support RFC 6680 (the GSSAPI naming " + "extensions)") + return rname_rfc6680.display_name_ext(self, name_type).encode( + _utils.get_encoding()) + @property def name_type(self): """Get the name type of this name""" @@ -92,7 +152,7 @@ def __repr__(self): return "Name({name}, {name_type})".format(name=disp_res.name, name_type=disp_res.name_type) - def export(self): + def export(self, composite=False): """Export the name This method exports the name into a byte string which can then be @@ -107,7 +167,15 @@ def export(self): BadNameError """ - return rname.export_name(self) + if composite: + if rname_rfc6680 is None: + raise NotImplementedError("Your GSSAPI implementation does " + "not support RFC 6680 (the GSSAPI " + "naming extensions)") + + return rname_rfc6680.export_name_composite(self) + else: + return rname.export_name(self) def canonicalize(self, mech): """Canonicalize a name with respect to a mechanism @@ -134,3 +202,111 @@ def __copy__(self): def __deepcopy__(self, memo): return type(self)(rname.duplicate_name(self)) + + def _inquire(self, **kwargs): + """Inspect the name for information + + This method inspects the name for information. + + If no keyword arguments are passed, all available information + is returned. Otherwise, only the keyword arguments that + are passed and set to `True` are returned. + + Args: + mech_name (bool): get whether this is a mechanism name, + and, if so, the associated mechanism + attrs (bool): get the attributes names for this name + + Returns: + InquireNameResult: the results of the inquiry, with unused + fields set to None + + Raises: + GSSError + """ + + if rname_rfc6680 is None: + raise NotImplementedError("Your GSSAPI implementation does not " + "support RFC 6680 (the GSSAPI naming " + "extensions)") + + if not kwargs: + default_val = True + else: + default_val = False + + attrs = kwargs.get('attrs', default_val) + mech_name = kwargs.get('mech_name', default_val) + + return rname_rfc6680.inquire_name(self, mech_name=mech_name, + attrs=attrs) + + @property + def is_mech_name(self): + return self._inquire(mech_name=True).is_mech_name + + @property + def mech(self): + return self._inquire(mech_name=True).mech + + @property + def attributes(self): + if self._attr_obj is None: + raise NotImplementedError("Your GSSAPI implementation does not " + "support RFC 6680 (the GSSAPI naming " + "extensions)") + + return self._attr_obj + + +class _NameAttributeMapping(collections.MutableMapping): + + """Provides dict-like access to RFC 6680 Name attributes.""" + def __init__(self, name): + self._name = name + + def __getitem__(self, key): + if isinstance(key, six.text_type): + key = key.encode(_utils._get_encoding()) + + res = rname_rfc6680.get_name_attribute(self._name, key) + return tuples.GetNameAttributeResult(frozenset(res.values), + frozenset(res.display_values), + res.authenticated, + res.complete) + + def __setitem__(self, key, value): + if isinstance(key, six.text_type): + key = key.encode(_utils._get_encoding()) + + rname_rfc6680.delete_name_attribute(self._name, key) + + if isinstance(value, tuples.GetNameAttributeResult): + complete = value.complete + value = value.values + elif isinstance(value, tuple) and len(value) == 2: + complete = value[1] + value = value[0] + else: + complete = False + + if (isinstance(value, (six.string_types, bytes)) or + not isinstance(value, collections.Iterable)): + # NB(directxman12): this allows us to easily assign a single + # value, since that's a common case + value = [value] + + rname_rfc6680.set_name_attribute(self._name, key, value, + complete=complete) + + def __delitem__(self, key): + if isinstance(key, six.text_type): + key = key.encode(_utils._get_encoding()) + + rname_rfc6680.delete_name_attribute(self._name, key) + + def __iter__(self): + return iter(self._name._inquire(attrs=True).attrs) + + def __len__(self): + return len(self._name._inquire(attrs=True).attrs) diff --git a/gssapi/tests/test_high_level.py b/gssapi/tests/test_high_level.py index d7c84fd5..abb6b267 100644 --- a/gssapi/tests/test_high_level.py +++ b/gssapi/tests/test_high_level.py @@ -15,6 +15,7 @@ from gssapi import _utils as gssutils from gssapi import exceptions as excs from gssapi.tests._utils import _extension_test, _minversion_test +from gssapi.tests._utils import _requires_krb_plugin from gssapi.tests import k5test as kt @@ -395,6 +396,45 @@ def test_create_from_token(self): name2.shouldnt_be_none() name2.name_type.should_be(gb.NameType.kerberos_principal) + @_extension_test('rfc6680', 'RFC 6680') + def test_create_from_composite_token_no_attrs(self): + name1 = gssnames.Name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + exported_name = name1.canonicalize( + gb.MechType.kerberos).export(composite=True) + name2 = gssnames.Name(token=exported_name, composite=True) + + name2.shouldnt_be_none() + + @_extension_test('rfc6680', 'RFC 6680') + @_requires_krb_plugin('authdata', 'greet_client') + def test_create_from_composite_token_with_attrs(self): + name1 = gssnames.Name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + + canon_name = name1.canonicalize(gb.MechType.kerberos) + canon_name.attributes['urn:greet:greeting'] = b'some val' + + exported_name = canon_name.export(composite=True) + + # TODO(directxman12): when you just import a token as composite, + # appears as this name whose text is all garbled, since it contains + # all of the attributes, etc, but doesn't properly have the attributes. + # Once it's canonicalized, the attributes reappear. However, if you + # just import it as normal export, the attributes appear directly. + # It is thus unclear as to what is going on + # name2_raw = gssnames.Name(token=exported_name, composite=True) + # name2 = name2_raw.canonicalize(gb.MechType.kerberos) + + name2 = gssnames.Name(token=exported_name) + + name2.shouldnt_be_none() + + name2.attributes['urn:greet:greeting'].values.should_be( + set([b'some val'])) + name2.attributes['urn:greet:greeting'].complete.should_be_true() + name2.attributes['urn:greet:greeting'].authenticated.should_be_false() + def test_to_str(self): name = gssnames.Name(SERVICE_PRINCIPAL, gb.NameType.kerberos_principal) @@ -455,6 +495,66 @@ def test_copy(self): name1.should_be(name2) + # NB(directxman12): we don't test display_name_ext because the krb5 mech + # doesn't actually implement it + + @_extension_test('rfc6680', 'RFC 6680') + def test_is_mech_name(self): + name = gssnames.Name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + + name.is_mech_name.should_be_false() + + canon_name = name.canonicalize(gb.MechType.kerberos) + + canon_name.is_mech_name.should_be_true() + canon_name.mech.should_be_a(gb.OID) + canon_name.mech.should_be(gb.MechType.kerberos) + + @_extension_test('rfc6680', 'RFC 6680') + def test_export_name_composite_no_attrs(self): + name = gssnames.Name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + canon_name = name.canonicalize(gb.MechType.kerberos) + exported_name = canon_name.export(composite=True) + + exported_name.should_be_a(bytes) + + @_extension_test('rfc6680', 'RFC 6680') + @_requires_krb_plugin('authdata', 'greet_client') + def test_export_name_composite_with_attrs(self): + name = gssnames.Name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + canon_name = name.canonicalize(gb.MechType.kerberos) + canon_name.attributes['urn:greet:greeting'] = b'some val' + exported_name = canon_name.export(composite=True) + + exported_name.should_be_a(bytes) + + @_extension_test('rfc6680', 'RFC 6680') + @_requires_krb_plugin('authdata', 'greet_client') + def test_basic_get_set_del_name_attribute_no_auth(self): + name = gssnames.Name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + canon_name = name.canonicalize(gb.MechType.kerberos) + + canon_name.attributes['urn:greet:greeting'] = (b'some val', True) + canon_name.attributes['urn:greet:greeting'].values.should_be( + set([b'some val'])) + canon_name.attributes['urn:greet:greeting'].complete.should_be_true() + (canon_name.attributes['urn:greet:greeting'].authenticated + .should_be_false()) + + del canon_name.attributes['urn:greet:greeting'] + + # NB(directxman12): for some reason, the greet:greeting handler plugin + # doesn't properly delete itself -- it just clears the value + # If we try to get its value now, we segfault (due to an issue with + # greet:greeting's delete). Instead, just try setting the value again + # canon_name.attributes.should_be_empty(), which would normally give + # an error. + canon_name.attributes['urn:greet:greeting'] = b'some other val' + class SecurityContextTestCase(_GSSAPIKerberosTestCase): def setUp(self):