-
Notifications
You must be signed in to change notification settings - Fork 240
/
__init__.py
353 lines (283 loc) · 11.6 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
"""History
"""
import csv
import logging
import re
from abc import ABC, abstractmethod
from pathlib import Path
from typing import List, Optional, Set, Union
import semver
import tomlkit
from dotty_dict import Dotty
from ..errors import ImproperConfigurationError
from ..helpers import LoggedFunction
from ..settings import config
from ..vcs_helpers import get_commit_log, get_formatted_tag, get_last_version
from .logs import evaluate_version_bump # noqa
from .parser_angular import parse_commit_message as angular_parser # noqa isort:skip
from .parser_emoji import parse_commit_message as emoji_parser # noqa isort:skip
from .parser_scipy import parse_commit_message as scipy_parser # noqa isort:skip
from .parser_tag import parse_commit_message as tag_parser # noqa isort:skip
logger = logging.getLogger(__name__)
class VersionDeclaration(ABC):
def __init__(self, path: Union[str, Path]):
self.path = Path(path)
@staticmethod
def from_toml(config_str: str):
"""
Instantiate a `TomlVersionDeclaration` from a string specifying a path and a key
matching the version number.
"""
path, key = config_str.split(":", 1)
return TomlVersionDeclaration(path, key)
@staticmethod
def from_variable(config_str: str):
"""
Instantiate a `PatternVersionDeclaration` from a string specifying a path and a
variable name.
"""
path, variable = config_str.split(":", 1)
pattern = (
rf'{variable} *[:=] *["\']{PatternVersionDeclaration.version_regex}["\']'
)
return PatternVersionDeclaration(path, pattern)
@staticmethod
def from_pattern(config_str: str):
"""
Instantiate a `PatternVersionDeclaration` from a string specifying a path and a
regular expression matching the version number.
"""
path, pattern = config_str.split(":", 1)
pattern = pattern.format(version=PatternVersionDeclaration.version_regex)
return PatternVersionDeclaration(path, pattern)
@abstractmethod
def parse(self) -> Set[str]:
"""
Return the versions.
Because a source can match in multiple places, this method returns a
set of matches. Generally, there should only be one element in this
set (i.e. even if the version is specified in multiple places, it
should be the same version in each place), but it falls on the caller
to check for this condition.
"""
@abstractmethod
def replace(self, new_version: str):
"""
Update the versions.
This method reads the underlying file, replaces each occurrence of the
matched pattern, then writes the updated file.
:param new_version: The new version number as a string
"""
class TomlVersionDeclaration(VersionDeclaration):
def __init__(self, path, key):
super().__init__(path)
self.key = key
def _read(self) -> Dotty:
toml_doc = tomlkit.loads(self.path.read_text())
return Dotty(toml_doc)
def parse(self) -> Set[str]:
_config = self._read()
if self.key in _config:
return {_config.get(self.key)}
return set()
def replace(self, new_version: str) -> None:
_config = self._read()
if self.key in _config:
_config[self.key] = new_version
self.path.write_text(tomlkit.dumps(_config))
class PatternVersionDeclaration(VersionDeclaration):
"""
Represent a version number in a particular file.
The version number is identified by a regular expression. Methods are
provided both the read the version number from the file, and to update the
file with a new version number. Use the `load_version_patterns()` factory
function to create the version patterns specified in the config files.
"""
version_regex = r"(\d+\.\d+(?:\.\d+)?)"
# The pattern should be a regular expression with a single group,
# containing the version to replace.
def __init__(self, path: str, pattern: str):
super().__init__(path)
self.pattern = pattern
def parse(self) -> Set[str]:
"""
Return the versions matching this pattern.
Because a pattern can match in multiple places, this method returns a
set of matches. Generally, there should only be one element in this
set (i.e. even if the version is specified in multiple places, it
should be the same version in each place), but it falls on the caller
to check for this condition.
"""
content = self.path.read_text()
versions = {
m.group(1) for m in re.finditer(self.pattern, content, re.MULTILINE)
}
logger.debug(
f"Parsing current version: path={self.path!r} pattern={self.pattern!r} num_matches={len(versions)}"
)
return versions
def replace(self, new_version: str):
"""
Update the versions matching this pattern.
This method reads the underlying file, replaces each occurrence of the
matched pattern, then writes the updated file.
:param new_version: The new version number as a string
"""
n = 0
old_content = self.path.read_text()
def swap_version(m):
nonlocal n
n += 1
s = m.string
i, j = m.span()
ii, jj = m.span(1)
return s[i:ii] + new_version + s[jj:j]
new_content = re.sub(
self.pattern, swap_version, old_content, flags=re.MULTILINE
)
logger.debug(
f"Writing new version number: path={self.path!r} pattern={self.pattern!r} num_matches={n!r}"
)
self.path.write_text(new_content)
def get_prerelease_pattern() -> str:
return "-" + config.get("prerelease_tag") + "."
@LoggedFunction(logger)
def get_current_version_by_tag(omit_pattern=None) -> str:
"""
Find the current version of the package in the current working directory using git tags.
:return: A string with the version number or 0.0.0 on failure.
"""
version = get_last_version(omit_pattern=omit_pattern)
if version:
return version
logger.debug("no version found, returning default of v0.0.0")
return "0.0.0"
@LoggedFunction(logger)
def get_current_version_by_config_file(omit_pattern=None) -> str:
"""
Get current version from the version variable defined in the configuration.
:return: A string with the current version number
:raises ImproperConfigurationError: if either no versions are found, or
multiple versions are found.
"""
declarations = load_version_declarations()
versions = set.union(*[x.parse() for x in declarations])
if len(versions) == 0:
raise ImproperConfigurationError(
"no versions found in the configured locations"
)
if len(versions) != 1:
version_strs = ", ".join(repr(x) for x in versions)
raise ImproperConfigurationError(f"found conflicting versions: {version_strs}")
version = versions.pop()
logger.debug(f"Regex matched version: {version}")
return version
def get_current_version(prerelease_version: bool = False) -> str:
"""
Get current version from tag or version variable, depending on configuration.
This will not return prerelease versions.
:return: A string with the current version number
"""
omit_pattern = None if prerelease_version else get_prerelease_pattern()
if config.get("version_source") == "tag":
return get_current_version_by_tag(omit_pattern)
current_version = get_current_version_by_config_file(omit_pattern)
if omit_pattern and omit_pattern in current_version:
return get_previous_version(current_version)
return current_version
@LoggedFunction(logger)
def get_new_version(
current_version: str, level_bump: str, prerelease: bool = False
) -> str:
"""
Calculate the next version based on the given bump level with semver.
:param current_version: The version the package has now.
:param level_bump: The level of the version number that should be bumped.
Should be `'major'`, `'minor'` or `'patch'`.
:param prerelease: Should the version bump be marked as a prerelease
:return: A string with the next version number.
"""
if not level_bump:
logger.debug("No bump requested, using input version")
new_version = current_version
else:
new_version = str(
semver.VersionInfo.parse(current_version).next_version(part=level_bump)
)
if prerelease:
logger.debug("Prerelease requested")
potentialy_prereleased_current_version = get_current_version(
prerelease_version=True
)
if get_prerelease_pattern() in potentialy_prereleased_current_version:
logger.debug("Previouse prerelease detected, increment prerelease version")
prerelease_num = (
int(potentialy_prereleased_current_version.split(".")[-1]) + 1
)
else:
logger.debug("No previouse prerelease detected, starting from 0")
prerelease_num = 0
new_version = new_version + get_prerelease_pattern() + str(prerelease_num)
return new_version
@LoggedFunction(logger)
def get_previous_version(version: str, omit_pattern: str = None) -> Optional[str]:
"""
Return the version prior to the given version.
:param version: A string with the version number.
:return: A string with the previous version number.
"""
found_version = False
for commit_hash, commit_message in get_commit_log():
logger.debug(f"Checking commit {commit_hash}")
if version in commit_message:
found_version = True
logger.debug(f'Found version in commit "{commit_message}"')
continue
if found_version:
if omit_pattern and omit_pattern in commit_message:
continue
matches = re.match(r"v?(\d+.\d+.\d+)", commit_message)
if matches:
logger.debug(f"Version matches regex {commit_message}")
return matches.group(1).strip()
return get_last_version(
[version, get_formatted_tag(version)], omit_pattern=omit_pattern
)
@LoggedFunction(logger)
def set_new_version(new_version: str) -> bool:
"""
Update the version number in each configured location.
:param new_version: The new version number as a string.
:return: `True` if it succeeded.
"""
for declaration in load_version_declarations():
declaration.replace(new_version)
return True
def load_version_declarations() -> List[VersionDeclaration]:
"""
Create the `VersionDeclaration` objects specified by the config file.
"""
declarations = []
def iter_fields(x):
if not x:
return
if isinstance(x, list):
yield from x
else:
# Split by commas, but allow the user to escape commas if
# necessary.
yield from next(csv.reader([x]))
for version_var in iter_fields(config.get("version_variable")):
declaration = VersionDeclaration.from_variable(version_var)
declarations.append(declaration)
for version_pat in iter_fields(config.get("version_pattern")):
declaration = VersionDeclaration.from_pattern(version_pat)
declarations.append(declaration)
for version_toml in iter_fields(config.get("version_toml")):
declaration = VersionDeclaration.from_toml(version_toml)
declarations.append(declaration)
if not declarations:
raise ImproperConfigurationError(
"must specify either 'version_variable', 'version_pattern' or 'version_toml'"
)
return declarations