From 881fc75ebaa71c3d281eb5c59029f6472244fa75 Mon Sep 17 00:00:00 2001 From: Solly Ross Date: Mon, 22 Dec 2014 14:43:34 -0500 Subject: [PATCH] Clean up "Not Yet Implemented" tests This commit adds in tests for places that were previously marked as "Not Yet Implemented". Additionally, a couple of mistypes were discovered and fixed in heretofore untested methods. Close #22 --- gssapi/creds.py | 5 +- gssapi/raw/sec_contexts.pyx | 8 +- gssapi/tests/test_high_level.py | 139 +++++++++++++++++++++++++------- gssapi/tests/test_raw.py | 36 ++++++++- 4 files changed, 150 insertions(+), 38 deletions(-) diff --git a/gssapi/creds.py b/gssapi/creds.py index ceee9a2b..ad425c6c 100644 --- a/gssapi/creds.py +++ b/gssapi/creds.py @@ -170,8 +170,8 @@ def store(self, store=None, usage='both', mech=None, raise NotImplementedError("Your GSSAPI implementation does " "not have support for RFC 5588") - return rcred_cred_store.store_cred(self, usage, mech, - overwrite, set_default) + return rcred_rfc5588.store_cred(self, usage, mech, + overwrite, set_default) else: if rcred_cred_store is None: raise NotImplementedError("Your GSSAPI implementation does " @@ -332,6 +332,7 @@ def add(self, desired_name, desired_mech, usage='both', raise NotImplementedError("Your GSSAPI implementation does " "not have support for manipulating " "credential stores") + store = _encode_dict(store) res = rcred_cred_store.add_cred_from(store, self, desired_name, desired_mech, usage, diff --git a/gssapi/raw/sec_contexts.pyx b/gssapi/raw/sec_contexts.pyx index c3397560..48270e2f 100644 --- a/gssapi/raw/sec_contexts.pyx +++ b/gssapi/raw/sec_contexts.pyx @@ -310,7 +310,7 @@ def accept_sec_context(input_token not None, Creds acceptor_cred=None, free(bdng) cdef Name on = Name() - cdef Creds oc = Creds() + cdef Creds oc = None cdef OID py_mech_type if maj_stat == GSS_S_COMPLETE or maj_stat == GSS_S_CONTINUE_NEEDED: if output_ttl == GSS_C_INDEFINITE: @@ -319,7 +319,11 @@ def accept_sec_context(input_token not None, Creds acceptor_cred=None, output_ttl_py = output_ttl on.raw_name = initiator_name - oc.raw_creds = delegated_cred + + if delegated_cred is not NULL: + oc = Creds() + oc.raw_creds = delegated_cred + if mech_type is not NULL: py_mech_type = OID() py_mech_type.raw_oid = mech_type[0] diff --git a/gssapi/tests/test_high_level.py b/gssapi/tests/test_high_level.py index d2f3ad7e..ad1836b2 100644 --- a/gssapi/tests/test_high_level.py +++ b/gssapi/tests/test_high_level.py @@ -16,7 +16,6 @@ from gssapi import exceptions as excs from gssapi.tests._utils import _extension_test from gssapi.tests import k5test as kt -from gssapi._utils import import_gssapi_extension TARGET_SERVICE_NAME = b'host' @@ -153,7 +152,34 @@ def test_acquire_by_method(self, str_name, kwargs): @_extension_test('rfc5588', 'RFC 5588') def test_store_acquire(self): - self.skipTest("Not Yet Implemented") + # we need to acquire a forwardable ticket + svc_princ = SERVICE_PRINCIPAL.decode("UTF-8") + self.realm.kinit(svc_princ, flags=['-k', '-f']) + + target_name = gssnames.Name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + + client_creds = gsscreds.Credentials(usage='initiate') + client_ctx = gssctx.SecurityContext( + name=target_name, creds=client_creds, + flags=gb.RequirementFlag.delegate_to_peer) + + client_token = client_ctx.step() + + server_creds = gsscreds.Credentials(usage='accept') + server_ctx = gssctx.SecurityContext(creds=server_creds) + server_ctx.step(client_token) + + deleg_creds = server_ctx.delegated_creds + deleg_creds.shouldnt_be_none() + + store_res = deleg_creds.store(usage='initiate', set_default=True) + store_res.usage.should_be('initiate') + store_res.mech_types.should_include(gb.MechType.kerberos) + + reacquired_creds = gsscreds.Credentials(desired_name=deleg_creds.name, + usage='initiate') + reacquired_creds.shouldnt_be_none() @_extension_test('cred_store', 'credentials store') def test_store_into_acquire_from(self): @@ -181,7 +207,10 @@ def test_store_into_acquire_from(self): retrieved_creds.shouldnt_be_none() def test_create_from_other(self): - self.skipTest("Not Yet Implemented") + raw_creds = gb.acquire_cred(None, cred_usage='accept').creds + + high_level_creds = gsscreds.Credentials(raw_creds) + high_level_creds.usage.should_be('accept') @true_false_perms('name', 'lifetime', 'usage', 'mechs') def test_inquire(self, str_name, kwargs): @@ -235,11 +264,41 @@ def test_inquire_by_mech(self, str_name, kwargs): resp.usage.should_be_none() def test_add(self): - self.skipTest("Not Yet Implemented") + input_creds = gsscreds.Credentials(gb.Creds()) + name = gssnames.Name(SERVICE_PRINCIPAL) + new_creds = input_creds.add(name, gb.MechType.kerberos, + usage='initiate') + + new_creds.shouldnt_be_none() + new_creds.should_be_a(gsscreds.Credentials) + + @_extension_test('cred_store', 'credentials store') + def test_store_into_add_from(self): + CCACHE = 'FILE:{tmpdir}/other_ccache'.format(tmpdir=self.realm.tmpdir) + KT = '{tmpdir}/other_keytab'.format(tmpdir=self.realm.tmpdir) + store = {'ccache': CCACHE, 'keytab': KT} + + princ_name = 'service/cs@' + self.realm.realm + self.realm.addprinc(princ_name) + self.realm.extract_keytab(princ_name, KT) + self.realm.kinit(princ_name, None, ['-k', '-t', KT]) + + initial_creds = gsscreds.Credentials(desired_name=None, + usage='initiate') - # NB(directxman12): we don't test add_cred_from because it really requires - # multiple mechanism support, which would mean something - # like requiring NTLM libraries + store_res = initial_creds.store(store, overwrite=True) + + store_res.mech_types.shouldnt_be_none() + store_res.mech_types.shouldnt_be_empty() + store_res.usage.should_be('initiate') + + name = gssnames.Name(princ_name) + input_creds = gsscreds.Credentials(gb.Creds()) + retrieved_creds = input_creds.add(name, gb.MechType.kerberos, + store=store) + + retrieved_creds.shouldnt_be_none() + retrieved_creds.should_be_a(gsscreds.Credentials) @_extension_test('cred_imp_ext', 'credentials import-export') def test_export(self): @@ -267,37 +326,54 @@ def test_pickle_unpickle(self): @exist_perms(lifetime=30, desired_mechs=[gb.MechType.kerberos], usage='initiate') + @_extension_test('s4u', 'S4U') def test_impersonate(self, str_name, kwargs): - if import_gssapi_extension('s4u') is None: - self.skipTest("The S4U GSSAPI extension is not supported " - "by your GSSAPI implementation") - else: - target_name = gssnames.Name(TARGET_SERVICE_NAME, - gb.NameType.hostbased_service) - # TODO(directxman12): make this use the high-level SecurityContext - client_ctx_resp = gb.init_sec_context(target_name) - client_token = client_ctx_resp[3] - del client_ctx_resp # free everything but the token + target_name = gssnames.Name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + # TODO(directxman12): make this use the high-level SecurityContext + client_ctx_resp = gb.init_sec_context(target_name) + client_token = client_ctx_resp[3] + del client_ctx_resp # free everything but the token - server_name = self.name - server_creds = gsscreds.Credentials(desired_name=server_name, - usage='both') - server_ctx_resp = gb.accept_sec_context(client_token, - acceptor_cred=server_creds) + server_name = self.name + server_creds = gsscreds.Credentials(desired_name=server_name, + usage='both') + server_ctx_resp = gb.accept_sec_context(client_token, + acceptor_cred=server_creds) - imp_creds = server_creds.impersonate(server_ctx_resp[1], **kwargs) + imp_creds = server_creds.impersonate(server_ctx_resp[1], **kwargs) - imp_creds.shouldnt_be_none() - imp_creds.should_be_a(gsscreds.Credentials) + imp_creds.shouldnt_be_none() + imp_creds.should_be_a(gsscreds.Credentials) @_extension_test('s4u', 'S4U') def test_add_with_impersonate(self): - self.skipTest("Not Yet Implemented") + target_name = gssnames.Name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + client_ctx = gssctx.SecurityContext(name=target_name) + client_token = client_ctx.step() + + server_creds = gsscreds.Credentials(usage='both') + server_ctx = gssctx.SecurityContext(creds=server_creds, usage='accept') + server_ctx.step(client_token) + + # use empty creds to test here + input_creds = gsscreds.Credentials(gb.Creds()) + new_creds = input_creds.add(server_ctx.initiator_name, + gb.MechType.kerberos, + impersonator=server_creds, + usage='initiate') + + new_creds.shouldnt_be(None) + new_creds.should_be_a(gsscreds.Credentials) class NamesTestCase(_GSSAPIKerberosTestCase): def test_create_from_other(self): - self.skipTest("Not Yet Implemented") + raw_name = gb.import_name(SERVICE_PRINCIPAL) + high_level_name = gssnames.Name(raw_name) + + bytes(high_level_name).should_be(SERVICE_PRINCIPAL) def test_create_from_name_no_type(self): name = gssnames.Name(SERVICE_PRINCIPAL) @@ -398,11 +474,14 @@ def setUp(self): def _create_client_ctx(self, **kwargs): return gssctx.SecurityContext(name=self.target_name, **kwargs) - def test_process_token(self): - self.skipTest("Not Yet Implemented") + # NB(directxman12): we skip testing process_context_token, because there is + # no concrete, non-deprecated was to obtain an "async" + # token def test_create_from_other(self): - self.skipTest("Not Yet Implemented") + raw_client_ctx, raw_server_ctx = self._create_completed_contexts() + high_level_ctx = gssctx.SecurityContext(raw_client_ctx) + high_level_ctx.target_name.should_be(self.target_name) @exist_perms(desired_lifetime=30, flags=[], mech_type=gb.MechType.kerberos, diff --git a/gssapi/tests/test_raw.py b/gssapi/tests/test_raw.py index 174f74c7..a8d30417 100644 --- a/gssapi/tests/test_raw.py +++ b/gssapi/tests/test_raw.py @@ -227,9 +227,8 @@ def test_inquire_context(self): is_open.should_be_a(bool) is_open.should_be_true() - def test_process_context_token(self): - # TODO(directxman12): figure out how to write a test for this - self.skipTest("Not Yet Implemented") + # NB(directxman12): We don't test `process_context_token` because + # there is no clear non-deprecated way to test it @_extension_test('s4u', 'S4U') def test_add_cred_impersonate_name(self): @@ -296,7 +295,36 @@ def test_acquire_creds_impersonate_name(self): @_extension_test('rfc5588', 'RFC 5588') def test_store_cred_acquire_cred(self): - self.skipTest("Not Yet Implemented") + # we need to acquire a forwardable ticket + svc_princ = SERVICE_PRINCIPAL.decode("UTF-8") + self.realm.kinit(svc_princ, flags=['-k', '-f']) + + target_name = gb.import_name(TARGET_SERVICE_NAME, + gb.NameType.hostbased_service) + + client_creds = gb.acquire_cred(None, cred_usage='initiate').creds + client_ctx_resp = gb.init_sec_context( + target_name, cred=client_creds, + flags=gb.RequirementFlag.delegate_to_peer) + + client_token = client_ctx_resp[3] + + server_creds = gb.acquire_cred(None, cred_usage='accept').creds + server_ctx_resp = gb.accept_sec_context(client_token, + acceptor_cred=server_creds) + + deleg_creds = server_ctx_resp.delegated_creds + deleg_creds.shouldnt_be_none() + store_res = gb.store_cred(deleg_creds, cred_usage='initiate', + set_default=True) + + store_res.shouldnt_be_none() + store_res.usage.should_be('initiate') + store_res.mech_types.should_include(gb.MechType.kerberos) + + deleg_name = gb.inquire_cred(deleg_creds).name + acq_resp = gb.acquire_cred(deleg_name, cred_usage='initiate') + acq_resp.shouldnt_be_none() @_extension_test('cred_store', 'credentials store') def test_store_cred_into_acquire_cred(self):