-
Notifications
You must be signed in to change notification settings - Fork 822
/
misc.py
90 lines (70 loc) · 2.86 KB
/
misc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import errno
import logging
import os
from typing import Iterable, Tuple
from ..exceptions import PostgresException
logger = logging.getLogger(__name__)
def postgres_version_to_int(pg_version: str) -> int:
"""Convert the server_version to integer
>>> postgres_version_to_int('9.5.3')
90503
>>> postgres_version_to_int('9.3.13')
90313
>>> postgres_version_to_int('10.1')
100001
>>> postgres_version_to_int('10') # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
PostgresException: 'Invalid PostgreSQL version format: X.Y or X.Y.Z is accepted: 10'
>>> postgres_version_to_int('9.6') # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
PostgresException: 'Invalid PostgreSQL version format: X.Y or X.Y.Z is accepted: 9.6'
>>> postgres_version_to_int('a.b.c') # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
PostgresException: 'Invalid PostgreSQL version: a.b.c'
"""
try:
components = list(map(int, pg_version.split('.')))
except ValueError:
raise PostgresException('Invalid PostgreSQL version: {0}'.format(pg_version))
if len(components) < 2 or len(components) == 2 and components[0] < 10 or len(components) > 3:
raise PostgresException('Invalid PostgreSQL version format: X.Y or X.Y.Z is accepted: {0}'.format(pg_version))
if len(components) == 2:
# new style version numbers, i.e. 10.1 becomes 100001
components.insert(1, 0)
return int(''.join('{0:02d}'.format(c) for c in components))
def postgres_major_version_to_int(pg_version: str) -> int:
"""
>>> postgres_major_version_to_int('10')
100000
>>> postgres_major_version_to_int('9.6')
90600
"""
return postgres_version_to_int(pg_version + '.0')
def parse_lsn(lsn: str) -> int:
t = lsn.split('/')
return int(t[0], 16) * 0x100000000 + int(t[1], 16)
def parse_history(data: str) -> Iterable[Tuple[int, int, str]]:
for line in data.split('\n'):
values = line.strip().split('\t')
if len(values) == 3:
try:
yield int(values[0]), parse_lsn(values[1]), values[2]
except (IndexError, ValueError):
logger.exception('Exception when parsing timeline history line "%s"', values)
def format_lsn(lsn: int, full: bool = False) -> str:
template = '{0:X}/{1:08X}' if full else '{0:X}/{1:X}'
return template.format(lsn >> 32, lsn & 0xFFFFFFFF)
def fsync_dir(path: str) -> None:
if os.name != 'nt':
fd = os.open(path, os.O_DIRECTORY)
try:
os.fsync(fd)
except OSError as e:
# Some filesystems don't like fsyncing directories and raise EINVAL. Ignoring it is usually safe.
if e.errno != errno.EINVAL:
raise
finally:
os.close(fd)