/
repository.py
155 lines (136 loc) · 6.27 KB
/
repository.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from __future__ import annotations
import os
import pathlib
import weakref
from typing import Any, Iterable
from urllib.parse import urlparse, urlunparse
import requests
import requests_toolbelt
import rich.progress
from unearth.auth import get_keyring_provider
from pdm import termui
from pdm.cli.commands.publish.package import PackageFile
from pdm.exceptions import PdmUsageError
from pdm.project import Project
from pdm.project.config import DEFAULT_REPOSITORIES
keyring = get_keyring_provider()
class Repository:
def __init__(
self,
project: Project,
url: str,
username: str | None,
password: str | None,
ca_certs: str | None,
) -> None:
self.url = url
self.session = project.environment._build_session([])
if ca_certs is not None:
self.session.set_ca_certificates(pathlib.Path(ca_certs))
self._credentials_to_save: tuple[str, str, str] | None = None
username, password = self._ensure_credentials(username, password)
self.session.auth = (username, password)
weakref.finalize(self, self.session.close)
self.ui = project.core.ui
def _ensure_credentials(self, username: str | None, password: str | None) -> tuple[str, str]:
netloc = urlparse(self.url).netloc
if username and password:
return username, password
if password:
return "__token__", password
token = self._get_pypi_token_via_oidc()
if token is not None:
return "__token__", token
if not termui.is_interactive():
raise PdmUsageError("Username and password are required")
username, password, save = self._prompt_for_credentials(netloc, username)
if save and keyring is not None and termui.confirm("Save credentials to keyring?"):
self._credentials_to_save = (netloc, username, password)
return username, password
def _get_pypi_token_via_oidc(self) -> str | None:
ACTIONS_ID_TOKEN_REQUEST_TOKEN = os.getenv("ACTIONS_ID_TOKEN_REQUEST_TOKEN")
ACTIONS_ID_TOKEN_REQUEST_URL = os.getenv("ACTIONS_ID_TOKEN_REQUEST_URL")
if not ACTIONS_ID_TOKEN_REQUEST_TOKEN or not ACTIONS_ID_TOKEN_REQUEST_URL:
return None
self.ui.echo("Getting PyPI token via GitHub Actions OIDC...")
try:
parsed_url = urlparse(self.url)
audience_url = urlunparse(parsed_url._replace(path="/_/oidc/audience"))
resp = self.session.get(audience_url)
resp.raise_for_status()
resp = self.session.get(
ACTIONS_ID_TOKEN_REQUEST_URL,
params=resp.json(),
headers={"Authorization": f"bearer {ACTIONS_ID_TOKEN_REQUEST_TOKEN}"},
)
resp.raise_for_status()
oidc_token = resp.json()["value"]
mint_token_url = urlunparse(parsed_url._replace(path="/_/oidc/github/mint-token"))
resp = self.session.post(mint_token_url, json={"token": oidc_token})
resp.raise_for_status()
token = resp.json()["token"]
except requests.RequestException:
self.ui.echo("Failed to get PyPI token via GitHub Actions OIDC", err=True)
return None
else:
if os.getenv("GITHUB_ACTIONS"):
# tell GitHub Actions to mask the token in any console logs
print(f"::add-mask::{token}")
return token
def _prompt_for_credentials(self, service: str, username: str | None) -> tuple[str, str, bool]:
if keyring is not None:
cred = keyring.get_auth_info(service, username)
if cred is not None:
return cred[0], cred[1], False
if username is None:
username = termui.ask("[primary]Username")
password = termui.ask("[primary]Password", password=True)
return username, password, True
def _save_credentials(self, service: str, username: str, password: str) -> None:
assert keyring is not None
self.ui.echo("Saving credentials to keyring")
keyring.save_auth_info(service, username, password)
@staticmethod
def _convert_to_list_of_tuples(data: dict[str, Any]) -> list[tuple[str, Any]]:
result: list[tuple[str, Any]] = []
for key, value in data.items():
if isinstance(value, (list, tuple)) and key != "gpg_signature":
for item in value:
result.append((key, item))
else:
result.append((key, value))
return result
def get_release_urls(self, packages: list[PackageFile]) -> Iterable[str]:
if self.url.startswith(DEFAULT_REPOSITORIES["pypi"].rstrip("/")):
base = "https://pypi.org/"
elif self.url.startswith(DEFAULT_REPOSITORIES["testpypi"].rstrip("/")):
base = "https://test.pypi.org/"
else:
return set()
return {f"{base}project/{package.metadata['name']}/{package.metadata['version']}/" for package in packages}
def upload(self, package: PackageFile, progress: rich.progress.Progress) -> requests.Response:
payload = package.metadata_dict
payload.update(
{
":action": "file_upload",
"protocol_version": "1",
}
)
field_parts = self._convert_to_list_of_tuples(payload)
progress.live.console.print(f"Uploading [success]{package.base_filename}")
with open(package.filename, "rb") as fp:
field_parts.append(("content", (package.base_filename, fp, "application/octet-stream")))
def on_upload(monitor: requests_toolbelt.MultipartEncoderMonitor) -> None:
progress.update(job, completed=monitor.bytes_read)
monitor = requests_toolbelt.MultipartEncoderMonitor.from_fields(field_parts, callback=on_upload)
job = progress.add_task("", total=monitor.len)
resp = self.session.post(
self.url,
data=monitor,
headers={"Content-Type": monitor.content_type},
allow_redirects=False,
)
if resp.status_code < 400 and self._credentials_to_save is not None:
self._save_credentials(*self._credentials_to_save)
self._credentials_to_save = None
return resp