Skip to content

Commit

Permalink
Update MDM accound driven enrollment
Browse files Browse the repository at this point in the history
 - Deprecate profile based user enrollment
 - Add more tests
  • Loading branch information
np5 committed Feb 22, 2024
1 parent 0df94da commit 12af371
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 106 deletions.
2 changes: 1 addition & 1 deletion tests/mdm/test_dep_enrollment_public_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage')
@patch("zentral.core.queues.backends.kombu.EventQueues.post_event")
@patch("zentral.contrib.mdm.public_views.dep.verify_iphone_ca_signed_payload")
class MDMOTAEnrollmentPublicViewsTestCase(TestCase):
class MDMDEPEnrollmentPublicViewsTestCase(TestCase):
@classmethod
def setUpTestData(cls):
cls.mbu = MetaBusinessUnit.objects.create(name=get_random_string(12))
Expand Down
48 changes: 35 additions & 13 deletions tests/mdm/test_setup_user_enrollment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.utils.crypto import get_random_string
from accounts.models import User
from zentral.contrib.inventory.models import MetaBusinessUnit
from .utils import force_push_certificate, force_scep_config, force_user_enrollment
from .utils import force_push_certificate, force_realm, force_scep_config, force_user_enrollment


@override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage')
Expand Down Expand Up @@ -57,7 +57,7 @@ def test_create_user_enrollment_get(self):
self.assertTemplateUsed(response, "mdm/userenrollment_form.html")
self.assertContains(response, "Create user enrollment")

def test_create_user_enrollment_post(self):
def test_create_user_enrollment_no_realm(self):
self._login("mdm.add_userenrollment", "mdm.view_userenrollment")
name = get_random_string(64)
push_certificate = force_push_certificate()
Expand All @@ -70,6 +70,24 @@ def test_create_user_enrollment_post(self):
"es-meta_business_unit": self.mbu.pk},
follow=True)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/userenrollment_form.html")
self.assertFormError(response.context["user_enrollment_form"], "realm", "This field is required")

def test_create_user_enrollment_post(self):
self._login("mdm.add_userenrollment", "mdm.view_userenrollment")
realm = force_realm()
name = get_random_string(64)
push_certificate = force_push_certificate()
scep_config = force_scep_config()
response = self.client.post(reverse("mdm:create_user_enrollment"),
{"ue-realm": realm.pk,
"ue-name": name,
"ue-scep_config": scep_config.pk,
"ue-scep_verification": "",
"ue-push_certificate": push_certificate.pk,
"es-meta_business_unit": self.mbu.pk},
follow=True)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/userenrollment_detail.html")
self.assertContains(response, name)
self.assertContains(response, push_certificate.name)
Expand All @@ -83,17 +101,17 @@ def test_create_user_enrollment_post(self):
# view User enrollment

def test_view_user_enrollment_redirect(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login_redirect(reverse("mdm:user_enrollment", args=(enrollment.pk,)))

def test_view_user_enrollment_permission_denied(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login()
response = self.client.get(reverse("mdm:user_enrollment", args=(enrollment.pk,)))
self.assertEqual(response.status_code, 403)

def test_view_user_enrollment_no_extra_perms(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login("mdm.view_userenrollment")
response = self.client.get(reverse("mdm:user_enrollment", args=(enrollment.pk,)))
self.assertEqual(response.status_code, 200)
Expand All @@ -105,7 +123,7 @@ def test_view_user_enrollment_no_extra_perms(self):
self.assertNotContains(response, enrollment.scep_config.get_absolute_url())

def test_view_user_enrollment_extra_perms(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login("mdm.view_userenrollment", "mdm.view_pushcertificate", "mdm.view_scepconfig")
response = self.client.get(reverse("mdm:user_enrollment", args=(enrollment.pk,)))
self.assertEqual(response.status_code, 200)
Expand All @@ -119,41 +137,45 @@ def test_view_user_enrollment_extra_perms(self):
# update User enrollment

def test_update_user_enrollment_redirect(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login_redirect(reverse("mdm:update_user_enrollment", args=(enrollment.pk,)))

def test_update_user_enrollment_permission_denied(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login()
response = self.client.get(reverse("mdm:update_user_enrollment", args=(enrollment.pk,)))
self.assertEqual(response.status_code, 403)

def test_update_user_enrollment_get(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login("mdm.change_userenrollment")
response = self.client.get(reverse("mdm:update_user_enrollment", args=(enrollment.pk,)))
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/userenrollment_form.html")
self.assertContains(response, f"[USER] {enrollment.name}")

def test_update_user_enrollment_post(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login("mdm.change_userenrollment", "mdm.view_userenrollment")
new_realm = force_realm()
new_name = get_random_string(64)
response = self.client.post(reverse("mdm:update_user_enrollment", args=(enrollment.pk,)),
{"ue-name": new_name,
{"ue-realm": new_realm.pk,
"ue-name": new_name,
"ue-scep_config": enrollment.scep_config.pk,
"ue-scep_verification": "on",
"ue-push_certificate": enrollment.push_certificate.pk,
"es-meta_business_unit": self.mbu.pk},
follow=True)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/userenrollment_detail.html")
self.assertContains(response, new_realm.name)
self.assertContains(response, new_name)
self.assertContains(response, enrollment.push_certificate.name)
self.assertContains(response, enrollment.scep_config.name)
self.assertContains(response, "with CSR verification")
enrollment = response.context["object"]
self.assertEqual(enrollment.realm, new_realm)
self.assertEqual(enrollment.name, new_name)

# list User enrollments
Expand All @@ -162,15 +184,15 @@ def test_list_user_enrollments_redirect(self):
self._login_redirect(reverse("mdm:enrollments"))

def test_list_user_enrollments_no_perm_empty(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login()
response = self.client.get(reverse("mdm:enrollments"))
self.assertEqual(response.status_code, 200)
self.assertNotContains(response, "1 User enrollment")
self.assertNotContains(response, enrollment.name)

def test_list_user_enrollments(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login("mdm.view_userenrollment")
response = self.client.get(reverse("mdm:enrollments"))
self.assertEqual(response.status_code, 200)
Expand Down
161 changes: 161 additions & 0 deletions tests/mdm/test_user_enrollment_public_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import plistlib
from unittest.mock import Mock, patch
from django.test import TestCase, override_settings
from django.urls import reverse
from django.utils.crypto import get_random_string
from zentral.contrib.mdm.crypto import verify_signed_payload
from zentral.contrib.mdm.events import UserEnrollmentRequestEvent
from zentral.contrib.mdm.models import UserEnrollmentSession
from zentral.contrib.mdm.public_views.user import user_enroll_callback
from zentral.contrib.inventory.models import MetaBusinessUnit
from .utils import force_realm_user, force_user_enrollment


@override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage')
@patch("zentral.core.queues.backends.kombu.EventQueues.post_event")
class MDMUserEnrollmentPublicViewsTestCase(TestCase):
@classmethod
def setUpTestData(cls):
cls.mbu = MetaBusinessUnit.objects.create(name=get_random_string(12))
cls.mbu.create_enrollment_business_unit()
cls.realm, cls.realm_user = force_realm_user()

def assertAbort(self, post_event, reason, **kwargs):
last_event = post_event.call_args.args[0]
self.assertIsInstance(last_event, UserEnrollmentRequestEvent)
self.assertEqual(last_event.payload["status"], "failure")
self.assertEqual(last_event.payload["reason"], reason)
for k, v in kwargs.items():
if k == "serial_number":
self.assertEqual(last_event.metadata.machine_serial_number, v)
else:
self.assertEqual(last_event.payload.get(k), v)

def assertSuccess(self, post_event, **kwargs):
last_event = post_event.call_args.args[0]
self.assertIsInstance(last_event, UserEnrollmentRequestEvent)
self.assertEqual(last_event.payload["status"], "success")
for k, v in kwargs.items():
self.assertEqual(last_event.payload.get(k), v)

# service discovery

def test_service_discovery(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
response = self.client.get(reverse("mdm_public:user_enrollment_service_discovery",
args=(enrollment.enrollment_secret.secret,)))
self.assertEqual(
response.json(),
{'Servers': [{'Version': 'mdm-byod',
'BaseURL': 'https://zentral' + reverse("mdm_public:enroll_user",
args=(enrollment.enrollment_secret.secret,))}]}
)

# enroll user view

def test_enroll_user_unknown_secret(self, post_event):
response = self.client.post(reverse("mdm_public:enroll_user", args=(get_random_string(12),)))
self.assertEqual(response.status_code, 400)
self.assertAbort(post_event, "secret verification failed: 'unknown secret'")

def test_enroll_user_no_realm(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
enrollment.realm = None # Should never happen
enrollment.save()
response = self.client.post(reverse("mdm_public:enroll_user", args=(enrollment.enrollment_secret.secret,)))
self.assertEqual(response.status_code, 400)
self.assertAbort(post_event, "This user enrollment has no realm")

def test_enroll_user_no_authorization(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
self.assertEqual(enrollment.userenrollmentsession_set.count(), 0)
response = self.client.post(reverse("mdm_public:enroll_user", args=(enrollment.enrollment_secret.secret,)))
self.assertEqual(response.status_code, 401)
self.assertEqual(enrollment.userenrollmentsession_set.count(), 1)
enrollment_session = enrollment.userenrollmentsession_set.first()
self.assertEqual(enrollment_session.status, "ACCOUNT_DRIVEN_START")
auth_url = "https://zentral" + reverse("mdm_public:authenticate_user",
args=(enrollment_session.enrollment_secret.secret,))
self.assertEqual(response.headers["WWW-Authenticate"], f'Bearer method="apple-as-web" url="{auth_url}"')

def test_enroll_user_invalid_access_token(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
_, realm_user = force_realm_user(self.realm)
enrollment_session = UserEnrollmentSession.objects.create_from_user_enrollment(enrollment)
self.assertIsNone(enrollment_session.access_token)
enrollment_session.set_account_driven_authenticated_status(realm_user)
self.assertIsNotNone(enrollment_session.access_token)
response = self.client.post(
reverse("mdm_public:enroll_user", args=(enrollment.enrollment_secret.secret,)),
headers={"Authorization": "Bearer " + get_random_string(12)}
)
self.assertEqual(response.status_code, 400)
self.assertAbort(post_event, "Invalid access token")

def test_enroll_user(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
_, realm_user = force_realm_user(self.realm)
enrollment_session = UserEnrollmentSession.objects.create_from_user_enrollment(enrollment)
enrollment_session.set_account_driven_authenticated_status(realm_user)
response = self.client.post(
reverse("mdm_public:enroll_user", args=(enrollment.enrollment_secret.secret,)),
headers={"Authorization": f"Bearer {enrollment_session.access_token}"}
)
self.assertEqual(response.status_code, 200)
self.assertSuccess(post_event)
_, data = verify_signed_payload(response.content)
payload = plistlib.loads(data)
mdm_payload = [p for p in payload["PayloadContent"] if p["PayloadType"] == "com.apple.mdm"][0]
self.assertEqual(mdm_payload["AssignedManagedAppleID"], realm_user.email)
self.assertEqual(mdm_payload["EnrollmentMode"], "BYOD")

# authenticate user view

def test_authenticate_user_unknown_secret(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
UserEnrollmentSession.objects.create_from_user_enrollment(enrollment)
response = self.client.get(reverse("mdm_public:authenticate_user", args=(get_random_string(12),)))
self.assertEqual(response.status_code, 400)
self.assertAbort(post_event, "secret verification failed: 'unknown secret'")

def test_authenticate_user_no_realm(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
enrollment.realm = None # Should never happen
enrollment.save()
enrollment_session = UserEnrollmentSession.objects.create_from_user_enrollment(enrollment)
response = self.client.get(
reverse("mdm_public:authenticate_user", args=(enrollment_session.enrollment_secret.secret,))
)
self.assertEqual(response.status_code, 400)
self.assertAbort(post_event, "This user enrollment has no realm")

def test_authenticate_user(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
enrollment_session = UserEnrollmentSession.objects.create_from_user_enrollment(enrollment)
self.assertIsNone(enrollment_session.access_token)
response = self.client.get(
reverse("mdm_public:authenticate_user", args=(enrollment_session.enrollment_secret.secret,))
)
self.assertEqual(response.status_code, 302)
realm = enrollment.realm
ras = realm.realmauthenticationsession_set.first()
self.assertEqual(response.url, f"/public/realms/{realm.pk}/ldap/{ras.pk}/login/")
self.assertEqual(ras.callback, "zentral.contrib.mdm.public_views.user.user_enroll_callback")
self.assertEqual(ras.callback_kwargs, {"user_enrollment_session_pk": enrollment_session.pk})
enrollment_session.refresh_from_db()
self.assertIsNone(enrollment_session.access_token)
self.assertEqual(enrollment_session.status, "ACCOUNT_DRIVEN_START")
# fake the realm auth
_, ras.user = force_realm_user(realm)
request = Mock()
request.session = self.client.session
response = user_enroll_callback(request, ras, enrollment_session.pk)
enrollment_session.refresh_from_db()
self.assertIsNotNone(enrollment_session.access_token)
self.assertEqual(enrollment_session.status, "ACCOUNT_DRIVEN_AUTHENTICATED")
self.assertEqual(response.status_code, 308)
self.assertEqual(
response.url,
"apple-remotemanagement-user-login://authentication-results?access-token="
+ enrollment_session.access_token
)
4 changes: 2 additions & 2 deletions tests/mdm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ def force_ota_enrollment(mbu, realm=None):
)


def force_user_enrollment(mbu, realm):
def force_user_enrollment(mbu, realm=None):
return UserEnrollment.objects.create(
push_certificate=force_push_certificate(),
realm=realm,
realm=realm or force_realm(),
scep_config=force_scep_config(),
name=get_random_string(12),
enrollment_secret=EnrollmentSecret.objects.create(meta_business_unit=mbu)
Expand Down
8 changes: 5 additions & 3 deletions zentral/contrib/mdm/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ class Meta:
"scep_config", "scep_verification",
"blueprint")


class UserEnrollmentEnrollForm(forms.Form):
managed_apple_id = forms.EmailField(label="Email", required=True)
def clean(self):
cleaned_data = super().clean()
if not cleaned_data.get("realm"):
self.add_error("realm", "This field is required")
return cleaned_data


class PushCertificateForm(forms.ModelForm):
Expand Down

0 comments on commit 12af371

Please sign in to comment.