Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update MDM accound driven enrollment #931

Merged
merged 1 commit into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/mdm/test_dep_enrollment_public_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
@override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage')
@patch("zentral.core.queues.backends.kombu.EventQueues.post_event")
@patch("zentral.contrib.mdm.public_views.dep.verify_iphone_ca_signed_payload")
class MDMOTAEnrollmentPublicViewsTestCase(TestCase):
class MDMDEPEnrollmentPublicViewsTestCase(TestCase):
@classmethod
def setUpTestData(cls):
cls.mbu = MetaBusinessUnit.objects.create(name=get_random_string(12))
Expand Down
48 changes: 35 additions & 13 deletions tests/mdm/test_setup_user_enrollment.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.utils.crypto import get_random_string
from accounts.models import User
from zentral.contrib.inventory.models import MetaBusinessUnit
from .utils import force_push_certificate, force_scep_config, force_user_enrollment
from .utils import force_push_certificate, force_realm, force_scep_config, force_user_enrollment


@override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage')
Expand Down Expand Up @@ -57,7 +57,7 @@ def test_create_user_enrollment_get(self):
self.assertTemplateUsed(response, "mdm/userenrollment_form.html")
self.assertContains(response, "Create user enrollment")

def test_create_user_enrollment_post(self):
def test_create_user_enrollment_no_realm(self):
self._login("mdm.add_userenrollment", "mdm.view_userenrollment")
name = get_random_string(64)
push_certificate = force_push_certificate()
Expand All @@ -70,6 +70,24 @@ def test_create_user_enrollment_post(self):
"es-meta_business_unit": self.mbu.pk},
follow=True)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/userenrollment_form.html")
self.assertFormError(response.context["user_enrollment_form"], "realm", "This field is required")

def test_create_user_enrollment_post(self):
self._login("mdm.add_userenrollment", "mdm.view_userenrollment")
realm = force_realm()
name = get_random_string(64)
push_certificate = force_push_certificate()
scep_config = force_scep_config()
response = self.client.post(reverse("mdm:create_user_enrollment"),
{"ue-realm": realm.pk,
"ue-name": name,
"ue-scep_config": scep_config.pk,
"ue-scep_verification": "",
"ue-push_certificate": push_certificate.pk,
"es-meta_business_unit": self.mbu.pk},
follow=True)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/userenrollment_detail.html")
self.assertContains(response, name)
self.assertContains(response, push_certificate.name)
Expand All @@ -83,17 +101,17 @@ def test_create_user_enrollment_post(self):
# view User enrollment

def test_view_user_enrollment_redirect(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login_redirect(reverse("mdm:user_enrollment", args=(enrollment.pk,)))

def test_view_user_enrollment_permission_denied(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login()
response = self.client.get(reverse("mdm:user_enrollment", args=(enrollment.pk,)))
self.assertEqual(response.status_code, 403)

def test_view_user_enrollment_no_extra_perms(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login("mdm.view_userenrollment")
response = self.client.get(reverse("mdm:user_enrollment", args=(enrollment.pk,)))
self.assertEqual(response.status_code, 200)
Expand All @@ -105,7 +123,7 @@ def test_view_user_enrollment_no_extra_perms(self):
self.assertNotContains(response, enrollment.scep_config.get_absolute_url())

def test_view_user_enrollment_extra_perms(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login("mdm.view_userenrollment", "mdm.view_pushcertificate", "mdm.view_scepconfig")
response = self.client.get(reverse("mdm:user_enrollment", args=(enrollment.pk,)))
self.assertEqual(response.status_code, 200)
Expand All @@ -119,41 +137,45 @@ def test_view_user_enrollment_extra_perms(self):
# update User enrollment

def test_update_user_enrollment_redirect(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login_redirect(reverse("mdm:update_user_enrollment", args=(enrollment.pk,)))

def test_update_user_enrollment_permission_denied(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login()
response = self.client.get(reverse("mdm:update_user_enrollment", args=(enrollment.pk,)))
self.assertEqual(response.status_code, 403)

def test_update_user_enrollment_get(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login("mdm.change_userenrollment")
response = self.client.get(reverse("mdm:update_user_enrollment", args=(enrollment.pk,)))
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/userenrollment_form.html")
self.assertContains(response, f"[USER] {enrollment.name}")

def test_update_user_enrollment_post(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login("mdm.change_userenrollment", "mdm.view_userenrollment")
new_realm = force_realm()
new_name = get_random_string(64)
response = self.client.post(reverse("mdm:update_user_enrollment", args=(enrollment.pk,)),
{"ue-name": new_name,
{"ue-realm": new_realm.pk,
"ue-name": new_name,
"ue-scep_config": enrollment.scep_config.pk,
"ue-scep_verification": "on",
"ue-push_certificate": enrollment.push_certificate.pk,
"es-meta_business_unit": self.mbu.pk},
follow=True)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "mdm/userenrollment_detail.html")
self.assertContains(response, new_realm.name)
self.assertContains(response, new_name)
self.assertContains(response, enrollment.push_certificate.name)
self.assertContains(response, enrollment.scep_config.name)
self.assertContains(response, "with CSR verification")
enrollment = response.context["object"]
self.assertEqual(enrollment.realm, new_realm)
self.assertEqual(enrollment.name, new_name)

# list User enrollments
Expand All @@ -162,15 +184,15 @@ def test_list_user_enrollments_redirect(self):
self._login_redirect(reverse("mdm:enrollments"))

def test_list_user_enrollments_no_perm_empty(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login()
response = self.client.get(reverse("mdm:enrollments"))
self.assertEqual(response.status_code, 200)
self.assertNotContains(response, "1 User enrollment")
self.assertNotContains(response, enrollment.name)

def test_list_user_enrollments(self):
enrollment = force_user_enrollment(self.mbu, None)
enrollment = force_user_enrollment(self.mbu)
self._login("mdm.view_userenrollment")
response = self.client.get(reverse("mdm:enrollments"))
self.assertEqual(response.status_code, 200)
Expand Down
161 changes: 161 additions & 0 deletions tests/mdm/test_user_enrollment_public_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import plistlib
from unittest.mock import Mock, patch
from django.test import TestCase, override_settings
from django.urls import reverse
from django.utils.crypto import get_random_string
from zentral.contrib.mdm.crypto import verify_signed_payload
from zentral.contrib.mdm.events import UserEnrollmentRequestEvent
from zentral.contrib.mdm.models import UserEnrollmentSession
from zentral.contrib.mdm.public_views.user import user_enroll_callback
from zentral.contrib.inventory.models import MetaBusinessUnit
from .utils import force_realm_user, force_user_enrollment


@override_settings(STATICFILES_STORAGE='django.contrib.staticfiles.storage.StaticFilesStorage')
@patch("zentral.core.queues.backends.kombu.EventQueues.post_event")
class MDMUserEnrollmentPublicViewsTestCase(TestCase):
@classmethod
def setUpTestData(cls):
cls.mbu = MetaBusinessUnit.objects.create(name=get_random_string(12))
cls.mbu.create_enrollment_business_unit()
cls.realm, cls.realm_user = force_realm_user()

def assertAbort(self, post_event, reason, **kwargs):
last_event = post_event.call_args.args[0]
self.assertIsInstance(last_event, UserEnrollmentRequestEvent)
self.assertEqual(last_event.payload["status"], "failure")
self.assertEqual(last_event.payload["reason"], reason)
for k, v in kwargs.items():
if k == "serial_number":
self.assertEqual(last_event.metadata.machine_serial_number, v)
else:
self.assertEqual(last_event.payload.get(k), v)

def assertSuccess(self, post_event, **kwargs):
last_event = post_event.call_args.args[0]
self.assertIsInstance(last_event, UserEnrollmentRequestEvent)
self.assertEqual(last_event.payload["status"], "success")
for k, v in kwargs.items():
self.assertEqual(last_event.payload.get(k), v)

# service discovery

def test_service_discovery(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
response = self.client.get(reverse("mdm_public:user_enrollment_service_discovery",
args=(enrollment.enrollment_secret.secret,)))
self.assertEqual(
response.json(),
{'Servers': [{'Version': 'mdm-byod',
'BaseURL': 'https://zentral' + reverse("mdm_public:enroll_user",
args=(enrollment.enrollment_secret.secret,))}]}
)

# enroll user view

def test_enroll_user_unknown_secret(self, post_event):
response = self.client.post(reverse("mdm_public:enroll_user", args=(get_random_string(12),)))
self.assertEqual(response.status_code, 400)
self.assertAbort(post_event, "secret verification failed: 'unknown secret'")

def test_enroll_user_no_realm(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
enrollment.realm = None # Should never happen
enrollment.save()
response = self.client.post(reverse("mdm_public:enroll_user", args=(enrollment.enrollment_secret.secret,)))
self.assertEqual(response.status_code, 400)
self.assertAbort(post_event, "This user enrollment has no realm")

def test_enroll_user_no_authorization(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
self.assertEqual(enrollment.userenrollmentsession_set.count(), 0)
response = self.client.post(reverse("mdm_public:enroll_user", args=(enrollment.enrollment_secret.secret,)))
self.assertEqual(response.status_code, 401)
self.assertEqual(enrollment.userenrollmentsession_set.count(), 1)
enrollment_session = enrollment.userenrollmentsession_set.first()
self.assertEqual(enrollment_session.status, "ACCOUNT_DRIVEN_START")
auth_url = "https://zentral" + reverse("mdm_public:authenticate_user",
args=(enrollment_session.enrollment_secret.secret,))
self.assertEqual(response.headers["WWW-Authenticate"], f'Bearer method="apple-as-web" url="{auth_url}"')

def test_enroll_user_invalid_access_token(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
_, realm_user = force_realm_user(self.realm)
enrollment_session = UserEnrollmentSession.objects.create_from_user_enrollment(enrollment)
self.assertIsNone(enrollment_session.access_token)
enrollment_session.set_account_driven_authenticated_status(realm_user)
self.assertIsNotNone(enrollment_session.access_token)
response = self.client.post(
reverse("mdm_public:enroll_user", args=(enrollment.enrollment_secret.secret,)),
headers={"Authorization": "Bearer " + get_random_string(12)}
)
self.assertEqual(response.status_code, 400)
self.assertAbort(post_event, "Invalid access token")

def test_enroll_user(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
_, realm_user = force_realm_user(self.realm)
enrollment_session = UserEnrollmentSession.objects.create_from_user_enrollment(enrollment)
enrollment_session.set_account_driven_authenticated_status(realm_user)
response = self.client.post(
reverse("mdm_public:enroll_user", args=(enrollment.enrollment_secret.secret,)),
headers={"Authorization": f"Bearer {enrollment_session.access_token}"}
)
self.assertEqual(response.status_code, 200)
self.assertSuccess(post_event)
_, data = verify_signed_payload(response.content)
payload = plistlib.loads(data)
mdm_payload = [p for p in payload["PayloadContent"] if p["PayloadType"] == "com.apple.mdm"][0]
self.assertEqual(mdm_payload["AssignedManagedAppleID"], realm_user.email)
self.assertEqual(mdm_payload["EnrollmentMode"], "BYOD")

# authenticate user view

def test_authenticate_user_unknown_secret(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
UserEnrollmentSession.objects.create_from_user_enrollment(enrollment)
response = self.client.get(reverse("mdm_public:authenticate_user", args=(get_random_string(12),)))
self.assertEqual(response.status_code, 400)
self.assertAbort(post_event, "secret verification failed: 'unknown secret'")

def test_authenticate_user_no_realm(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
enrollment.realm = None # Should never happen
enrollment.save()
enrollment_session = UserEnrollmentSession.objects.create_from_user_enrollment(enrollment)
response = self.client.get(
reverse("mdm_public:authenticate_user", args=(enrollment_session.enrollment_secret.secret,))
)
self.assertEqual(response.status_code, 400)
self.assertAbort(post_event, "This user enrollment has no realm")

def test_authenticate_user(self, post_event):
enrollment = force_user_enrollment(self.mbu, self.realm)
enrollment_session = UserEnrollmentSession.objects.create_from_user_enrollment(enrollment)
self.assertIsNone(enrollment_session.access_token)
response = self.client.get(
reverse("mdm_public:authenticate_user", args=(enrollment_session.enrollment_secret.secret,))
)
self.assertEqual(response.status_code, 302)
realm = enrollment.realm
ras = realm.realmauthenticationsession_set.first()
self.assertEqual(response.url, f"/public/realms/{realm.pk}/ldap/{ras.pk}/login/")
self.assertEqual(ras.callback, "zentral.contrib.mdm.public_views.user.user_enroll_callback")
self.assertEqual(ras.callback_kwargs, {"user_enrollment_session_pk": enrollment_session.pk})
enrollment_session.refresh_from_db()
self.assertIsNone(enrollment_session.access_token)
self.assertEqual(enrollment_session.status, "ACCOUNT_DRIVEN_START")
# fake the realm auth
_, ras.user = force_realm_user(realm)
request = Mock()
request.session = self.client.session
response = user_enroll_callback(request, ras, enrollment_session.pk)
enrollment_session.refresh_from_db()
self.assertIsNotNone(enrollment_session.access_token)
self.assertEqual(enrollment_session.status, "ACCOUNT_DRIVEN_AUTHENTICATED")
self.assertEqual(response.status_code, 308)
self.assertEqual(
response.url,
"apple-remotemanagement-user-login://authentication-results?access-token="
+ enrollment_session.access_token
)
4 changes: 2 additions & 2 deletions tests/mdm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ def force_ota_enrollment(mbu, realm=None):
)


def force_user_enrollment(mbu, realm):
def force_user_enrollment(mbu, realm=None):
return UserEnrollment.objects.create(
push_certificate=force_push_certificate(),
realm=realm,
realm=realm or force_realm(),
scep_config=force_scep_config(),
name=get_random_string(12),
enrollment_secret=EnrollmentSecret.objects.create(meta_business_unit=mbu)
Expand Down
8 changes: 5 additions & 3 deletions zentral/contrib/mdm/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ class Meta:
"scep_config", "scep_verification",
"blueprint")


class UserEnrollmentEnrollForm(forms.Form):
managed_apple_id = forms.EmailField(label="Email", required=True)
def clean(self):
cleaned_data = super().clean()
if not cleaned_data.get("realm"):
self.add_error("realm", "This field is required")
return cleaned_data


class PushCertificateForm(forms.ModelForm):
Expand Down
Loading
Loading