-
Notifications
You must be signed in to change notification settings - Fork 156
/
utils.py
220 lines (168 loc) Β· 6.22 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
import functools
import os
import sys
import uuid
from inspect import getfullargspec
from typing import Callable
from . import exceptions
# This is only a subset of the chars allowed per the spec. In particular `@` is
# not included, because there are some servers that (incorrectly) encode it to
# `%40` when it's part of a URL path, and reject or "repair" URLs that contain
# `@` in the path. So it's better to just avoid it.
SAFE_UID_CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_.-+"
_missing = object()
def expand_path(p: str) -> str:
"""Expand $HOME in a path and normalise slashes."""
p = os.path.expanduser(p)
p = os.path.normpath(p)
return p
def split_dict(d: dict, f: Callable):
"""Puts key into first dict if f(key), otherwise in second dict"""
a = {}
b = {}
for k, v in d.items():
if f(k):
a[k] = v
else:
b[k] = v
return a, b
def uniq(s):
"""Filter duplicates while preserving order. ``set`` can almost always be
used instead of this, but preserving order might prove useful for
debugging."""
d = set()
for x in s:
if x not in d:
d.add(x)
yield x
def get_etag_from_file(f):
"""Get etag from a filepath or file-like object.
This function will flush/sync the file as much as necessary to obtain a
correct value.
"""
if hasattr(f, "read"):
f.flush() # Only this is necessary on Linux
if sys.platform == "win32":
os.fsync(f.fileno()) # Apparently necessary on Windows
stat = os.fstat(f.fileno())
else:
stat = os.stat(f)
mtime = getattr(stat, "st_mtime_ns", None)
if mtime is None:
mtime = stat.st_mtime
return f"{mtime:.9f};{stat.st_ino}"
def get_storage_init_specs(cls, stop_at=object):
if cls is stop_at:
return ()
spec = getfullargspec(cls.__init__)
traverse_superclass = getattr(cls.__init__, "_traverse_superclass", True)
if traverse_superclass:
if traverse_superclass is True: # noqa
supercls = next(
getattr(x.__init__, "__objclass__", x) for x in cls.__mro__[1:]
)
else:
supercls = traverse_superclass
superspecs = get_storage_init_specs(supercls, stop_at=stop_at)
else:
superspecs = ()
return (spec,) + superspecs
def get_storage_init_args(cls, stop_at=object):
"""
Get args which are taken during class initialization. Assumes that all
classes' __init__ calls super().__init__ with the rest of the arguments.
:param cls: The class to inspect.
:returns: (all, required), where ``all`` is a set of all arguments the
class can take, and ``required`` is the subset of arguments the class
requires.
"""
all, required = set(), set()
for spec in get_storage_init_specs(cls, stop_at=stop_at):
all.update(spec.args[1:])
last = -len(spec.defaults) if spec.defaults else len(spec.args)
required.update(spec.args[1:last])
return all, required
def checkdir(path: str, create: bool = False, mode: int = 0o750) -> None:
"""Check whether ``path`` is a directory.
:param create: Whether to create the directory (and all parent directories)
if it does not exist.
:param mode: Mode to create missing directories with.
"""
if not os.path.isdir(path):
if os.path.exists(path):
raise OSError(f"{path} is not a directory.")
if create:
os.makedirs(path, mode)
else:
raise exceptions.CollectionNotFound(f"Directory {path} does not exist.")
def checkfile(path, create=False):
"""
Check whether ``path`` is a file.
:param create: Whether to create the file's parent directories if they do
not exist.
"""
checkdir(os.path.dirname(path), create=create)
if not os.path.isfile(path):
if os.path.exists(path):
raise OSError(f"{path} is not a file.")
if create:
with open(path, "wb"):
pass
else:
raise exceptions.CollectionNotFound(f"File {path} does not exist.")
class cached_property:
"""A read-only @property that is only evaluated once. Only usable on class
instances' methods.
"""
def __init__(self, fget, doc=None):
self.__name__ = fget.__name__
self.__module__ = fget.__module__
self.__doc__ = doc or fget.__doc__
self.fget = fget
def __get__(self, obj, cls):
if obj is None: # pragma: no cover
return self
obj.__dict__[self.__name__] = result = self.fget(obj)
return result
def href_safe(ident, safe=SAFE_UID_CHARS):
return not bool(set(ident) - set(safe))
def generate_href(ident=None, safe=SAFE_UID_CHARS):
"""
Generate a safe identifier, suitable for URLs, storage hrefs or UIDs.
If the given ident string is safe, it will be returned, otherwise a random
UUID.
"""
if not ident or not href_safe(ident, safe):
return str(uuid.uuid4())
else:
return ident
def synchronized(lock=None):
if lock is None:
from threading import Lock
lock = Lock()
def inner(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
with lock:
return f(*args, **kwargs)
return wrapper
return inner
def open_graphical_browser(url, new=0, autoraise=True):
"""Open a graphical web browser.
This is basically like `webbrowser.open`, but without trying to launch CLI
browsers at all. We're excluding those since it's undesirable to launch
those when you're using vdirsyncer on a server. Rather copypaste the URL
into the local browser, or use the URL-yanking features of your terminal
emulator.
"""
import webbrowser
cli_names = {"www-browser", "links", "links2", "elinks", "lynx", "w3m"}
if webbrowser._tryorder is None: # Python 3.7
webbrowser.register_standard_browsers()
for name in webbrowser._tryorder:
if name in cli_names:
continue
browser = webbrowser.get(name)
if browser.open(url, new, autoraise):
return
raise RuntimeError("No graphical browser found. Please open the URL " "manually.")