This repository has been archived by the owner on Aug 27, 2023. It is now read-only.
/
simple.py
327 lines (282 loc) · 11 KB
/
simple.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
""" Views for simple pip interaction """
import logging
import posixpath
import pkg_resources
from pyramid.httpexceptions import HTTPBadRequest, HTTPConflict, HTTPFound, HTTPNotFound
from pyramid.view import view_config
from pyramid_duh import addslash, argify
from pyramid_rpc.xmlrpc import xmlrpc_method
from pypicloud.route import Root, SimplePackageResource, SimpleResource
from pypicloud.util import get_packagetype, normalize_name, parse_filename
LOG = logging.getLogger(__name__)
@view_config(context=Root, request_method="POST", subpath=(), renderer="json")
@view_config(context=SimpleResource, request_method="POST", subpath=(), renderer="json")
@argify
def upload(
request, content, name=None, version=None, summary=None, requires_python=None
):
"""Handle update commands"""
action = request.param(":action", "file_upload")
# Direct uploads from the web UI go here, and don't have a name/version
if name is None or version is None:
name, version = parse_filename(content.filename)
else:
name = normalize_name(name)
if action == "file_upload":
if not request.access.has_permission(name, "write"):
return request.forbid()
try:
return request.db.upload(
content.filename,
content.file,
name=name,
version=version,
summary=summary,
requires_python=requires_python or None,
)
except ValueError as e:
return HTTPConflict(*e.args)
else:
return HTTPBadRequest("Unknown action '%s'" % action)
@xmlrpc_method(endpoint="pypi")
@xmlrpc_method(endpoint="pypi_slash")
def search(request, criteria, query_type):
"""
Perform searches from pip. This handles XML RPC requests to the "pypi"
endpoint (configured as /pypi/) that specify the method "search".
"""
filtered = []
for pkg in request.db.search(criteria, query_type):
if request.access.has_permission(pkg.name, "read"):
filtered.append(pkg.search_summary())
return filtered
@view_config(
context=SimpleResource, request_method="GET", subpath=(), renderer="simple.jinja2"
)
@addslash
def simple(request):
"""Render the list of all unique package names"""
names = request.db.distinct()
i = 0
while i < len(names):
name = names[i]
if not request.access.has_permission(name, "read"):
del names[i]
continue
i += 1
return {"pkgs": names}
def _package_versions(context, request):
"""Render the links for all versions of a package"""
fallback = request.registry.fallback
if fallback == "redirect":
if request.registry.always_show_upstream:
return _simple_redirect_always_show(context, request)
else:
return _simple_redirect(context, request)
elif fallback == "cache":
if request.registry.always_show_upstream:
return _simple_cache_always_show(context, request)
else:
return _simple_cache(context, request)
else:
return _simple_serve(context, request)
@view_config(
context=SimplePackageResource,
request_method="GET",
subpath=(),
renderer="package.jinja2",
)
@addslash
def package_versions(context, request):
"""Render the links for all versions of a package"""
return _package_versions(context, request)
@view_config(
context=SimplePackageResource,
name="json",
request_method="GET",
subpath=(),
renderer="json",
)
def package_versions_json(context, request):
"""Render the package versions in JSON format"""
pkgs = _package_versions(context, request)
if not isinstance(pkgs, dict):
return pkgs
response = {
"info": {
"name": context.name,
"license": "",
"classifiers": [],
},
"releases": {},
}
max_version = None
for filename, pkg in pkgs["pkgs"].items():
name, version_str = parse_filename(filename)
version = pkg_resources.parse_version(version_str)
if max_version is None or version > max_version:
max_version = version
release = {
"filename": filename,
"packagetype": get_packagetype(filename),
"url": pkg.get("non_hashed_url", pkg["url"]),
"requires_python": pkg["requires_python"],
}
if pkg.get("hash_sha256"):
release["digests"] = {"md5": pkg["hash_md5"], "sha256": pkg["hash_sha256"]}
release["md5_digest"] = pkg["hash_md5"]
response["releases"].setdefault(version_str, []).append(release)
if max_version is not None:
response["info"]["version"] = str(max_version)
response["urls"] = response["releases"].get(str(max_version), [])
return response
def get_fallback_packages(request, package_name, redirect=True):
"""Get all package versions for a package from the fallback_base_url"""
releases = request.locator.get_releases(package_name)
pkgs = {}
if not request.access.has_permission(package_name, "fallback"):
return pkgs
for release in releases:
url = release["url"]
filename = posixpath.basename(url)
if not redirect:
url = request.app_url("api", "package", release["name"], filename)
pkgs[filename] = {
"url": url,
"requires_python": release["requires_python"],
"hash_sha256": release["digests"].get("sha256"),
"hash_md5": release["digests"].get("md5"),
}
return pkgs
def packages_to_dict(request, packages):
"""Convert a list of packages to a dict used by the template"""
pkgs = {}
for package in packages:
url = package.get_url(request)
# We could also do with a url without the sha256 fragment for the JSON api
non_fragment_url = url
if "#sha256=" in url:
non_fragment_url = non_fragment_url[: url.find("#sha256=")]
pkgs[package.filename] = {
"url": url,
"non_hashed_url": non_fragment_url,
"requires_python": package.data.get("requires_python"),
"hash_sha256": package.data.get("hash_sha256"),
"hash_md5": package.data.get("hash_md5"),
}
return pkgs
def _pkg_response(pkgs):
"""Take a package mapping and return either a dict for jinja or a 404"""
if pkgs:
return {"pkgs": pkgs}
else:
return HTTPNotFound("No packages found")
def _redirect(context, request):
"""Return a 302 to the fallback url for this package"""
if request.registry.fallback_base_url:
path = request.path.lstrip("/")
redirect_url = "%s/%s" % (request.registry.fallback_base_url.rstrip("/"), path)
else:
redirect_url = "%s/%s/" % (
request.registry.fallback_url.rstrip("/"),
context.name,
)
return HTTPFound(location=redirect_url)
def _simple_redirect(context, request):
"""Service /simple with fallback=redirect"""
normalized_name = normalize_name(context.name)
packages = request.db.all(normalized_name)
if packages:
if not request.access.has_permission(normalized_name, "read"):
if request.is_authenticated:
return _redirect(context, request)
else:
return request.request_login()
else:
return _pkg_response(packages_to_dict(request, packages))
else:
return _redirect(context, request)
def _simple_redirect_always_show(context, request):
"""Service /simple with fallback=redirect"""
normalized_name = normalize_name(context.name)
packages = request.db.all(normalized_name)
if packages:
if not request.access.has_permission(normalized_name, "read"):
if request.is_authenticated:
return _redirect(context, request)
else:
return request.request_login()
else:
pkgs = get_fallback_packages(request, context.name)
stored_pkgs = packages_to_dict(request, packages)
# Overwrite existing package urls
for filename, url in stored_pkgs.items():
pkgs[filename] = url
return _pkg_response(pkgs)
else:
return _redirect(context, request)
def _simple_cache(context, request):
"""Service /simple with fallback=cache"""
normalized_name = normalize_name(context.name)
if not request.access.has_permission(normalized_name, "read"):
if request.is_authenticated:
return HTTPNotFound("No packages found named %r" % normalized_name)
else:
return request.request_login()
packages = request.db.all(normalized_name)
if packages:
return _pkg_response(packages_to_dict(request, packages))
if not request.access.can_update_cache():
if request.is_authenticated:
return HTTPNotFound("No packages found named %r" % normalized_name)
else:
return request.request_login()
else:
pkgs = get_fallback_packages(request, context.name, False)
return _pkg_response(pkgs)
def _simple_cache_always_show(context, request):
"""Service /simple with fallback=mirror"""
normalized_name = normalize_name(context.name)
if not request.access.has_permission(normalized_name, "read"):
if request.is_authenticated:
return _redirect(context, request)
else:
return request.request_login()
packages = request.db.all(normalized_name)
if packages:
if not request.access.can_update_cache():
if request.is_authenticated:
pkgs = get_fallback_packages(request, context.name)
stored_pkgs = packages_to_dict(request, packages)
# Overwrite existing package urls
for filename, data in stored_pkgs.items():
pkgs[filename] = data
return _pkg_response(pkgs)
else:
return request.request_login()
else:
pkgs = get_fallback_packages(request, context.name, False)
stored_pkgs = packages_to_dict(request, packages)
# Overwrite existing package urls
for filename, data in stored_pkgs.items():
pkgs[filename] = data
return _pkg_response(pkgs)
else:
if not request.access.can_update_cache():
if request.is_authenticated:
return _redirect(context, request)
else:
return request.request_login()
else:
pkgs = get_fallback_packages(request, context.name, False)
return _pkg_response(pkgs)
def _simple_serve(context, request):
"""Service /simple with fallback=none"""
normalized_name = normalize_name(context.name)
if not request.access.has_permission(normalized_name, "read"):
if request.is_authenticated:
return HTTPNotFound("No packages found named %r" % normalized_name)
else:
return request.request_login()
packages = request.db.all(normalized_name)
return _pkg_response(packages_to_dict(request, packages))