-
Notifications
You must be signed in to change notification settings - Fork 9
/
test_revoke_default_sg.py
139 lines (96 loc) · 3.91 KB
/
test_revoke_default_sg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from unittest import mock
from unittest.mock import MagicMock
import pytest
from pytest import fixture
from revokedefaultsg.app import RevokeDefaultSg, UnknownEventException
DEFAULT_GROUP = {"GroupName": "default"}
NOT_A_DEFAULT_GROUP = {"GroupName": "not default"}
TEST_SG = "sg-123"
@fixture
def good_event():
return {
"id": "12345678-b00a-ede7-937b-b4da1faf5b81",
"detail": {
"eventVersion": "1.05",
"eventTime": "2020-02-05T23:04:14Z",
"eventSource": "ec2.amazonaws.com",
"eventName": "AuthorizeSecurityGroupIngress",
"eventID": "12345678-6466-4720-955e-e342e782d405",
"eventType": "AwsApiCall",
"requestParameters": {"groupId": "sg-123"},
},
}
@fixture
def bad_event():
return {
"id": "12345678-b00a-ede7-937b-b4da1faf5b81",
"detail": {
"eventVersion": "1.05",
"eventTime": "2020-02-05T23:04:14Z",
"eventSource": "barrista.arround.the.corner",
"eventName": "AuthorizeSecurityGroupIngress",
"eventID": "12345678-6466-4720-955e-e342e782d405",
"eventType": "DrinkMoreCoffee",
},
}
@fixture
def unknown_event():
return {"foo": "bar"}
@fixture()
def obj():
obj = RevokeDefaultSg.__new__(RevokeDefaultSg)
obj.logger = MagicMock()
obj.ec2_client = MagicMock()
obj.ec2_resource = MagicMock()
return obj
def test_should_process_event_and_revoke_if_default_sg(obj, good_event):
obj._is_default_sg = MagicMock(return_value=True)
obj._revoke_and_tag = MagicMock()
obj.process_event(good_event)
obj._is_default_sg.assert_called_once_with(TEST_SG)
obj._revoke_and_tag.assert_called_once_with(TEST_SG)
def test_should_process_event_and_do_nothing_if_non_default_sg(obj, good_event):
obj._is_default_sg = MagicMock(return_value=False)
obj._revoke_and_tag = MagicMock()
obj.process_event(good_event)
obj._is_default_sg.assert_called_once_with(TEST_SG)
obj._revoke_and_tag.assert_not_called()
def test_should_extract_sg_id_from_good_event(obj, good_event):
assert obj._extract_sg_id(good_event) == TEST_SG
def test_should_find_default_sg(obj):
obj.ec2_client.describe_security_groups.return_value = {
"SecurityGroups": [DEFAULT_GROUP]
}
assert obj._is_default_sg(TEST_SG)
def test_should_find_non_default_sg(obj):
obj.ec2_client.describe_security_groups.return_value = {
"SecurityGroups": [NOT_A_DEFAULT_GROUP]
}
assert not obj._is_default_sg(TEST_SG)
def test_should_tag_if_ingress_was_revoked(obj):
mock_security_group = MagicMock()
mock_security_group.ip_permissions = "ingress"
mock_security_group.ip_permissions_egress = None
obj.ec2_resource.SecurityGroup.return_value = mock_security_group
obj._revoke_and_tag(TEST_SG)
mock_security_group.create_tags.assert_called_once_with(Tags=mock.ANY)
def test_should_tag_if_egress_was_revoked(obj):
mock_security_group = MagicMock()
mock_security_group.ip_permissions = None
mock_security_group.ip_permissions_egress = "egress"
obj.ec2_resource.SecurityGroup.return_value = mock_security_group
obj._revoke_and_tag(TEST_SG)
mock_security_group.create_tags.assert_called_once_with(Tags=mock.ANY)
def test_should_not_tag_if_nothing_was_revoked(obj):
mock_security_group = MagicMock()
mock_security_group.ip_permissions = None
mock_security_group.ip_permissions_egress = None
obj.ec2_resource.SecurityGroup.return_value = mock_security_group
obj._revoke_and_tag(TEST_SG)
mock_security_group.create_tags.assert_not_called()
def test_should_raise_exception_if_unknown_event(obj, unknown_event):
with pytest.raises(UnknownEventException):
obj._extract_sg_id(unknown_event)
def test_should_raise_exception_if_bad_event(obj, bad_event):
with pytest.raises(UnknownEventException):
obj._extract_sg_id(bad_event)