Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EN-1912: add service control policy support #384

Merged
merged 2 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
173 changes: 173 additions & 0 deletions functional_tests/aws/organizations/scp/test_create_template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
from __future__ import annotations

from unittest import IsolatedAsyncioTestCase

from functional_tests.conftest import IAMBIC_TEST_DETAILS
from iambic.core.models import ProposedChangeType
from iambic.plugins.v0_1_0.aws.models import Tag
from iambic.plugins.v0_1_0.aws.organizations.scp.models import (
AwsScpPolicyTemplate,
PolicyTargetProperties,
)
from iambic.plugins.v0_1_0.aws.organizations.scp.template_generation import (
get_template_dir,
)

from .utils import generate_policy_template


class CreatePolicyTestCase(IsolatedAsyncioTestCase):
templates: list[AwsScpPolicyTemplate] = []

async def asyncSetUp(self):
self.policy_dir = get_template_dir(IAMBIC_TEST_DETAILS.template_dir_path)

self.org_account = next(
filter(
lambda acc: acc.organization_account,
IAMBIC_TEST_DETAILS.config.aws.accounts,
)
)

self.org_client = await self.org_account.get_boto3_client("organizations")

self.accounts = [
acc
for acc in IAMBIC_TEST_DETAILS.config.aws.accounts
if acc.organization_account is False
]

async def asyncTearDown(self):
for template in self.templates:
template.deleted = True
await template.apply(IAMBIC_TEST_DETAILS.config.aws)

async def test_create_template(self):
client = self.org_client
policy_template = await generate_policy_template(
IAMBIC_TEST_DETAILS.template_dir_path,
self.org_account,
) # type: ignore

policy_template.write()

changes = await policy_template.apply(IAMBIC_TEST_DETAILS.config.aws)

self.check_no_exception_seen(changes)

policy_template = AwsScpPolicyTemplate.load(str(policy_template.file_path))

policy = (
client.describe_policy(PolicyId=policy_template.properties.policy_id)
.get("Policy")
.get("PolicySummary")
)

self.check_policy_changes(policy_template, policy)

self.templates.append(policy_template)

async def test_create_template_with_targets(self):
client = self.org_client
policy_template = await generate_policy_template(
IAMBIC_TEST_DETAILS.template_dir_path,
self.org_account,
) # type: ignore

if not policy_template.properties.targets:
policy_template.properties.targets = PolicyTargetProperties() # type: ignore

policy_template.properties.targets.accounts += [
account.account_id for account in self.accounts
]

policy_template.write()

changes = await policy_template.apply(IAMBIC_TEST_DETAILS.config.aws)

self.check_no_exception_seen(changes)

policy_template = AwsScpPolicyTemplate.load(str(policy_template.file_path))

policy = (
client.describe_policy(PolicyId=policy_template.properties.policy_id)
.get("Policy")
.get("PolicySummary")
)

self.check_policy_changes(policy_template, policy)
self.check_targets(policy_template, IAMBIC_TEST_DETAILS.config.aws.accounts)

self.templates.append(policy_template)

async def test_create_template_with_tags_and_targets(self):
client = self.org_client
policy_template = await generate_policy_template(
IAMBIC_TEST_DETAILS.template_dir_path,
self.org_account,
) # type: ignore

if not policy_template.properties.targets:
policy_template.properties.targets = PolicyTargetProperties() # type: ignore

policy_template.properties.targets.accounts += [
account.account_id for account in self.accounts
]

if not policy_template.properties.tags:
policy_template.properties.tags = []

policy_template.properties.tags += [
Tag(key="created_by", value="functional_test") # type: ignore
]

policy_template.write()

changes = await policy_template.apply(IAMBIC_TEST_DETAILS.config.aws)

self.check_no_exception_seen(changes)

policy_template = AwsScpPolicyTemplate.load(str(policy_template.file_path))

policy = (
client.describe_policy(PolicyId=policy_template.properties.policy_id)
.get("Policy")
.get("PolicySummary")
)

self.check_policy_changes(policy_template, policy)
self.check_targets(policy_template, IAMBIC_TEST_DETAILS.config.aws.accounts)
self.check_tags(policy_template)

self.templates.append(policy_template)

def check_policy_changes(self, policy_template, policy):
self.assertEquals(policy.get("Name"), policy_template.properties.policy_name)
self.assertEquals(
policy.get("Description"), policy_template.properties.description
)

def check_no_exception_seen(self, changes):
self.assertEquals(len(changes.exceptions_seen), 0)
self.assertEquals(
changes.proposed_changes[0].proposed_changes[0].change_type,
ProposedChangeType.CREATE,
)

def check_targets(self, template, accounts):
account_ids = sorted([account.account_id for account in self.accounts])

targets = self.org_client.list_targets_for_policy(
PolicyId=template.properties.policy_id
).get("Targets")

self.assertEquals(
sorted([target.get("TargetId") for target in targets]), account_ids
)

def check_tags(self, template):
listed_tags = self.org_client.list_tags_for_resource(
ResourceId=template.properties.policy_id
).get("Tags")

self.assertIn("created_by", [tag.get("Key") for tag in listed_tags])
68 changes: 68 additions & 0 deletions functional_tests/aws/organizations/scp/test_template_expiration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from __future__ import annotations

import os
from datetime import datetime, timedelta, timezone
from unittest import IsolatedAsyncioTestCase

from functional_tests.aws.organizations.scp.utils import generate_policy_template
from functional_tests.conftest import IAMBIC_TEST_DETAILS
from iambic.core.models import ProposedChangeType
from iambic.core.utils import remove_expired_resources
from iambic.plugins.v0_1_0.aws.organizations.scp.models import AwsScpPolicyTemplate
from iambic.plugins.v0_1_0.aws.organizations.scp.template_generation import (
get_template_dir,
)


class ExpirationPolicyTestCase(IsolatedAsyncioTestCase):
templates: list[AwsScpPolicyTemplate] = []

async def asyncSetUp(self):
self.policy_dir = get_template_dir(IAMBIC_TEST_DETAILS.template_dir_path)

self.org_account = next(
filter(
lambda acc: acc.organization_account,
IAMBIC_TEST_DETAILS.config.aws.accounts,
)
)

self.org_client = await self.org_account.get_boto3_client("organizations")

async def test_expire_policy_template(self):
template = await generate_policy_template(
IAMBIC_TEST_DETAILS.template_dir_path,
self.org_account,
) #

template.expires_at = datetime.now(timezone.utc) - timedelta(days=1)
template.write()

self.assertFalse(template.deleted)
await remove_expired_resources(
template, template.resource_type, template.resource_id
)
self.assertTrue(template.deleted)

async def test_delete_policy(self):
template = await generate_policy_template(
IAMBIC_TEST_DETAILS.template_dir_path,
self.org_account,
) #

template.write()

changes = await template.apply(IAMBIC_TEST_DETAILS.config.aws)

template.deleted = True
template.write()

changes = await template.apply(IAMBIC_TEST_DETAILS.config.aws)

self.assertFalse(changes.proposed_changes[0].exceptions_seen)
self.assertFalse(os.path.exists(template.file_path))

self.assertEquals(
changes.proposed_changes[0].proposed_changes[0].change_type,
ProposedChangeType.DELETE,
)
Loading
Loading