-
-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
bzr.py
100 lines (79 loc) · 2.72 KB
/
bzr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""Bazaar-related utilities."""
import csv
import re
from io import StringIO
from readthedocs.projects.exceptions import RepositoryError
from readthedocs.vcs_support.base import BaseVCS, VCSVersion
class Backend(BaseVCS):
"""Bazaar VCS backend."""
supports_tags = True
fallback_branch = ''
def update(self, identifier=None):
super().update(identifier=identifier)
if self.repo_exists():
return self.up()
return self.clone()
def repo_exists(self):
try:
code, _, _ = self.run('bzr', 'status', record=False)
return code == 0
except RepositoryError:
return False
def up(self):
self.run('bzr', 'revert')
return self.run('bzr', 'up')
def clone(self):
self.make_clean_working_dir()
try:
self.run("bzr", "checkout", self.repo_url, ".")
except RepositoryError:
raise RepositoryError(RepositoryError.CLONE_ERROR())
@property
def tags(self):
try:
code, stdout, stderr = self.run('bzr', 'tags', record_as_success=True)
return self.parse_tags(stdout)
except RepositoryError:
# error (or no tags found)
return []
def parse_tags(self, data):
"""
Parses output of bzr tags.
Example:
0.1.0 171
0.1.1 173
0.1.2 174
0.2.0-pre-alpha 177
Can't forget about poorly formatted tags or tags that lack revisions,
such as:
3.3.0-rc1 ?
tag with spaces 123
"""
# parse the lines into a list of tuples (commit-hash, tag ref name)
# StringIO below is expecting Unicode data, so ensure that it gets it.
if not isinstance(data, str):
data = str(data)
squashed_data = re.sub(r' +', ' ', data)
raw_tags = csv.reader(StringIO(squashed_data), delimiter=' ')
vcs_tags = []
for row in raw_tags:
name = ' '.join(row[:-1])
commit = row[-1]
if commit != '?':
vcs_tags.append(VCSVersion(self, commit, name))
return vcs_tags
@property
def commit(self):
_, stdout, _ = self.run('bzr', 'revno')
return stdout.strip()
def checkout(self, identifier=None):
super().checkout()
if not identifier:
return self.up()
try:
code, stdout, stderr = self.run('bzr', 'switch', identifier)
return code, stdout, stderr
except RepositoryError:
raise RepositoryError(
RepositoryError.FAILED_TO_CHECKOUT.format(identifier),
)