/
responses.py
177 lines (147 loc) · 6.48 KB
/
responses.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from __future__ import unicode_literals
import json
from base64 import b64encode
from datetime import datetime
import time
from moto.core.responses import BaseResponse
from .models import ecr_backends, DEFAULT_REGISTRY_ID
class ECRResponse(BaseResponse):
@property
def ecr_backend(self):
return ecr_backends[self.region]
@property
def request_params(self):
try:
return json.loads(self.body)
except ValueError:
return {}
def _get_param(self, param):
return self.request_params.get(param, None)
def create_repository(self):
repository_name = self._get_param("repositoryName")
if repository_name is None:
repository_name = "default"
repository = self.ecr_backend.create_repository(repository_name)
return json.dumps({"repository": repository.response_object})
def describe_repositories(self):
describe_repositories_name = self._get_param("repositoryNames")
registry_id = self._get_param("registryId")
repositories = self.ecr_backend.describe_repositories(
repository_names=describe_repositories_name, registry_id=registry_id
)
return json.dumps({"repositories": repositories, "failures": []})
def delete_repository(self):
repository_str = self._get_param("repositoryName")
registry_id = self._get_param("registryId")
repository = self.ecr_backend.delete_repository(repository_str, registry_id)
return json.dumps({"repository": repository.response_object})
def put_image(self):
repository_str = self._get_param("repositoryName")
image_manifest = self._get_param("imageManifest")
image_tag = self._get_param("imageTag")
image = self.ecr_backend.put_image(repository_str, image_manifest, image_tag)
return json.dumps({"image": image.response_object})
def list_images(self):
repository_str = self._get_param("repositoryName")
registry_id = self._get_param("registryId")
images = self.ecr_backend.list_images(repository_str, registry_id)
return json.dumps(
{"imageIds": [image.response_list_object for image in images]}
)
def describe_images(self):
repository_str = self._get_param("repositoryName")
registry_id = self._get_param("registryId")
image_ids = self._get_param("imageIds")
images = self.ecr_backend.describe_images(
repository_str, registry_id, image_ids
)
return json.dumps(
{"imageDetails": [image.response_describe_object for image in images]}
)
def batch_check_layer_availability(self):
if self.is_not_dryrun("BatchCheckLayerAvailability"):
raise NotImplementedError(
"ECR.batch_check_layer_availability is not yet implemented"
)
def batch_delete_image(self):
repository_str = self._get_param("repositoryName")
registry_id = self._get_param("registryId")
image_ids = self._get_param("imageIds")
response = self.ecr_backend.batch_delete_image(
repository_str, registry_id, image_ids
)
return json.dumps(response)
def batch_get_image(self):
repository_str = self._get_param("repositoryName")
registry_id = self._get_param("registryId")
image_ids = self._get_param("imageIds")
accepted_media_types = self._get_param("acceptedMediaTypes")
response = self.ecr_backend.batch_get_image(
repository_str, registry_id, image_ids, accepted_media_types
)
return json.dumps(response)
def can_paginate(self):
if self.is_not_dryrun("CanPaginate"):
raise NotImplementedError("ECR.can_paginate is not yet implemented")
def complete_layer_upload(self):
if self.is_not_dryrun("CompleteLayerUpload"):
raise NotImplementedError(
"ECR.complete_layer_upload is not yet implemented"
)
def delete_repository_policy(self):
if self.is_not_dryrun("DeleteRepositoryPolicy"):
raise NotImplementedError(
"ECR.delete_repository_policy is not yet implemented"
)
def generate_presigned_url(self):
if self.is_not_dryrun("GeneratePresignedUrl"):
raise NotImplementedError(
"ECR.generate_presigned_url is not yet implemented"
)
def get_authorization_token(self):
registry_ids = self._get_param("registryIds")
if not registry_ids:
registry_ids = [DEFAULT_REGISTRY_ID]
auth_data = []
for registry_id in registry_ids:
password = "{}-auth-token".format(registry_id)
auth_token = b64encode("AWS:{}".format(password).encode("ascii")).decode()
auth_data.append(
{
"authorizationToken": auth_token,
"expiresAt": time.mktime(datetime(2015, 1, 1).timetuple()),
"proxyEndpoint": "https://{}.dkr.ecr.{}.amazonaws.com".format(
registry_id, self.region
),
}
)
return json.dumps({"authorizationData": auth_data})
def get_download_url_for_layer(self):
if self.is_not_dryrun("GetDownloadUrlForLayer"):
raise NotImplementedError(
"ECR.get_download_url_for_layer is not yet implemented"
)
def get_paginator(self):
if self.is_not_dryrun("GetPaginator"):
raise NotImplementedError("ECR.get_paginator is not yet implemented")
def get_repository_policy(self):
if self.is_not_dryrun("GetRepositoryPolicy"):
raise NotImplementedError(
"ECR.get_repository_policy is not yet implemented"
)
def get_waiter(self):
if self.is_not_dryrun("GetWaiter"):
raise NotImplementedError("ECR.get_waiter is not yet implemented")
def initiate_layer_upload(self):
if self.is_not_dryrun("InitiateLayerUpload"):
raise NotImplementedError(
"ECR.initiate_layer_upload is not yet implemented"
)
def set_repository_policy(self):
if self.is_not_dryrun("SetRepositoryPolicy"):
raise NotImplementedError(
"ECR.set_repository_policy is not yet implemented"
)
def upload_layer_part(self):
if self.is_not_dryrun("UploadLayerPart"):
raise NotImplementedError("ECR.upload_layer_part is not yet implemented")