/
base.py
169 lines (141 loc) · 6.97 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo_log import log as logging
from oslo_utils.fixture import uuidsentinel as uuids
from nova import context as nova_context
from nova import exception
from nova import test
from nova.tests.unit import policy_fixture
LOG = logging.getLogger(__name__)
class BasePolicyTest(test.TestCase):
# NOTE(gmann): Set this flag to True if you would like to tests the
# new behaviour of policy without deprecated rules.
# This means you can simulate the phase when policies completely
# switch to new behaviour by removing the support of old rules.
without_deprecated_rules = False
# Add rules here other than base rules which need to override
# to remove the deprecated rules.
# For Example:
# rules_without_deprecation{
# "os_compute_api:os-deferred-delete:restore":
# "rule:system_admin_or_owner"}
rules_without_deprecation = {}
def setUp(self):
super(BasePolicyTest, self).setUp()
self.policy = self.useFixture(policy_fixture.RealPolicyFixture())
self.admin_project_id = uuids.admin_project_id
self.project_id = uuids.project_id
self.project_id_other = uuids.project_id_other
# all context are with implied roles.
self.legacy_admin_context = nova_context.RequestContext(
user_id="legacy_admin", project_id=self.admin_project_id,
roles=['admin', 'member', 'reader'])
# system scoped users
self.system_admin_context = nova_context.RequestContext(
user_id="admin",
roles=['admin', 'member', 'reader'], system_scope='all')
self.system_member_context = nova_context.RequestContext(
user_id="member",
roles=['member', 'reader'], system_scope='all')
self.system_reader_context = nova_context.RequestContext(
user_id="reader", roles=['reader'], system_scope='all')
self.system_foo_context = nova_context.RequestContext(
user_id="foo", roles=['foo'], system_scope='all')
# project scoped users
self.project_admin_context = nova_context.RequestContext(
user_id="project_admin", project_id=self.project_id,
roles=['admin', 'member', 'reader'])
self.project_member_context = nova_context.RequestContext(
user_id="project_member", project_id=self.project_id,
roles=['member', 'reader'])
self.project_reader_context = nova_context.RequestContext(
user_id="project_reader", project_id=self.project_id,
roles=['reader'])
self.project_foo_context = nova_context.RequestContext(
user_id="project_foo", project_id=self.project_id,
roles=['foo'])
self.other_project_member_context = nova_context.RequestContext(
user_id="other_project_member",
project_id=self.project_id_other,
roles=['member', 'reader'])
self.all_contexts = [
self.legacy_admin_context, self.system_admin_context,
self.system_member_context, self.system_reader_context,
self.system_foo_context,
self.project_admin_context, self.project_member_context,
self.project_reader_context, self.other_project_member_context,
self.project_foo_context,
]
if self.without_deprecated_rules:
# To simulate the new world, remove deprecations by overriding
# rules which has the deprecated rules.
self.rules_without_deprecation.update({
"system_admin_or_owner":
"rule:system_admin_api or rule:project_member_api",
"system_or_project_reader":
"rule:system_reader_api or rule:project_reader_api",
"system_admin_api":
"role:admin and system_scope:all",
"system_reader_api":
"role:reader and system_scope:all",
})
self.policy.set_rules(self.rules_without_deprecation,
overwrite=False)
def common_policy_check(self, authorized_contexts,
unauthorized_contexts, rule_name,
func, req, *arg, **kwarg):
# NOTE(brinzhang): When fatal=False is passed as a parameter
# in context.can(), we cannot get the desired ensure_raises().
# At this time, we can call ensure_return() to assert the func's
# response to ensure that changes are right.
fatal = kwarg.pop('fatal', True)
authorized_response = []
unauthorize_response = []
self.assertEqual(len(self.all_contexts),
len(authorized_contexts) + len(
unauthorized_contexts),
"Few context are missing. check all contexts "
"mentioned in self.all_contexts are tested")
def ensure_return(req, *args, **kwargs):
return func(req, *arg, **kwargs)
def ensure_raises(req, *args, **kwargs):
exc = self.assertRaises(
exception.PolicyNotAuthorized, func, req, *arg, **kwarg)
self.assertEqual(
"Policy doesn't allow %s to be performed." %
rule_name, exc.format_message())
# Verify all the context having allowed scope and roles pass
# the policy check.
for context in authorized_contexts:
LOG.info("Testing authorized context: %s", context)
req.environ['nova.context'] = context
args1 = copy.deepcopy(arg)
kwargs1 = copy.deepcopy(kwarg)
if not fatal:
authorized_response.append(
ensure_return(req, *args1, **kwargs1))
else:
func(req, *args1, **kwargs1)
# Verify all the context not having allowed scope or roles fail
# the policy check.
for context in unauthorized_contexts:
LOG.info("Testing unauthorized context: %s", context)
req.environ['nova.context'] = context
args1 = copy.deepcopy(arg)
kwargs1 = copy.deepcopy(kwarg)
if not fatal:
unauthorize_response.append(
ensure_return(req, *args1, **kwargs1))
else:
ensure_raises(req, *args1, **kwargs1)
return authorized_response, unauthorize_response