-
Notifications
You must be signed in to change notification settings - Fork 266
/
trigger_analyzer.py
166 lines (140 loc) · 6.23 KB
/
trigger_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from os import path
from auth import permissions
from data import model
from util import dockerfileparse
def is_parent(context, dockerfile_path):
"""
This checks whether the context is a parent of the dockerfile_path.
"""
if context == "" or dockerfile_path == "":
return False
normalized_context = path.normpath(context)
if normalized_context[len(normalized_context) - 1] != path.sep:
normalized_context += path.sep
if normalized_context[0] != path.sep:
normalized_context = path.sep + normalized_context
normalized_subdir = path.normpath(path.dirname(dockerfile_path))
if normalized_subdir[0] != path.sep:
normalized_subdir = path.sep + normalized_subdir
if normalized_subdir[len(normalized_subdir) - 1] != path.sep:
normalized_subdir += path.sep
return normalized_subdir.startswith(normalized_context)
class TriggerAnalyzer(object):
"""
This analyzes triggers and returns the appropriate trigger and robot view to the frontend.
"""
def __init__(
self, handler, namespace_name, server_hostname, new_config_dict, admin_org_permission
):
self.handler = handler
self.namespace_name = namespace_name
self.server_hostname = server_hostname
self.new_config_dict = new_config_dict
self.admin_org_permission = admin_org_permission
def analyze_trigger(self):
# Load the contents of the Dockerfile.
contents = self.handler.load_dockerfile_contents()
if not contents:
return self.analyze_view(
self.namespace_name,
None,
"warning",
message="Specified Dockerfile path for the trigger was not found on the main "
+ "branch. This trigger may fail.",
)
# Parse the contents of the Dockerfile.
parsed = dockerfileparse.parse_dockerfile(contents)
if not parsed:
return self.analyze_view(
self.namespace_name,
None,
"error",
message="Could not parse the Dockerfile specified",
)
# Check whether the dockerfile_path is correct
if self.new_config_dict.get("context") and not is_parent(
self.new_config_dict.get("context"), self.new_config_dict.get("dockerfile_path")
):
return self.analyze_view(
self.namespace_name,
None,
"error",
message="Dockerfile, %s, is not a child of the context, %s."
% (
self.new_config_dict.get("context"),
self.new_config_dict.get("dockerfile_path"),
),
)
# Determine the base image (i.e. the FROM) for the Dockerfile.
base_image = parsed.get_base_image()
if not base_image:
return self.analyze_view(
self.namespace_name, None, "warning", message="No FROM line found in the Dockerfile"
)
# Check to see if the base image lives in Quay.
quay_registry_prefix = "%s/" % self.server_hostname
if not base_image.startswith(quay_registry_prefix):
return self.analyze_view(self.namespace_name, None, "publicbase")
# Lookup the repository in Quay.
result = str(base_image)[len(quay_registry_prefix) :].split("/", 2)
if len(result) != 2:
msg = '"%s" is not a valid Quay repository path' % base_image
return self.analyze_view(self.namespace_name, None, "warning", message=msg)
(base_namespace, base_repository) = result
found_repository = model.repository.get_repository(base_namespace, base_repository)
if not found_repository:
return self.analyze_view(
self.namespace_name,
None,
"error",
message='Repository "%s" referenced by the Dockerfile was not found' % base_image,
)
# If the repository is private and the user cannot see that repo, then
# mark it as not found.
can_read = False
if base_namespace == self.namespace_name:
can_read = permissions.ReadRepositoryPermission(base_namespace, base_repository)
if found_repository.visibility.name != "public" and not can_read:
return self.analyze_view(
self.namespace_name,
None,
"error",
message='Repository "%s" referenced by the Dockerfile was not found' % base_image,
)
if found_repository.visibility.name == "public":
return self.analyze_view(base_namespace, base_repository, "publicbase")
return self.analyze_view(base_namespace, base_repository, "requiresrobot")
def analyze_view(self, image_namespace, image_repository, status, message=None):
# Retrieve the list of robots and mark whether they have read access already.
robots = []
if self.admin_org_permission and self.namespace_name == image_namespace:
if image_repository is not None:
perm_query = model.user.get_all_repo_users_transitive(
image_namespace, image_repository
)
user_ids_with_permission = set([user.id for user in perm_query])
else:
user_ids_with_permission = set()
def robot_view(robot):
assert robot.username.startswith(self.namespace_name + "+"), (
"Expected robot under namespace %s, Found: %s"
% (self.namespace_name, robot.username)
)
result = {
"name": robot.username,
"kind": "user",
"is_robot": True,
"can_read": robot.id in user_ids_with_permission,
}
return result
robots = [
robot_view(robot) for robot in model.user.list_namespace_robots(self.namespace_name)
]
return {
"namespace": image_namespace,
"name": image_repository,
"robots": robots,
"status": status,
"message": message,
"is_admin": self.admin_org_permission,
}