-
-
Notifications
You must be signed in to change notification settings - Fork 50
/
portalocker.py
144 lines (125 loc) · 5.08 KB
/
portalocker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import os
import typing
from . import constants, exceptions
# Alias for readability. Due to import recursion issues we cannot do:
# from .constants import LockFlags
LockFlags = constants.LockFlags
if os.name == 'nt': # pragma: no cover
import msvcrt
import pywintypes
import win32con
import win32file
import winerror
__overlapped = pywintypes.OVERLAPPED()
def lock(file_: typing.Union[typing.IO, int], flags: LockFlags):
# Windows locking does not support locking through `fh.fileno()` so
# we cast it to make mypy and pyright happy
file_ = typing.cast(typing.IO, file_)
mode = 0
if flags & LockFlags.NON_BLOCKING:
mode |= win32con.LOCKFILE_FAIL_IMMEDIATELY
if flags & LockFlags.EXCLUSIVE:
mode |= win32con.LOCKFILE_EXCLUSIVE_LOCK
# Save the old position so we can go back to that position but
# still lock from the beginning of the file
savepos = file_.tell()
if savepos:
file_.seek(0)
os_fh = msvcrt.get_osfhandle(file_.fileno()) # type: ignore
try:
win32file.LockFileEx(os_fh, mode, 0, -0x10000, __overlapped)
except pywintypes.error as exc_value:
# error: (33, 'LockFileEx', 'The process cannot access the file
# because another process has locked a portion of the file.')
if exc_value.winerror == winerror.ERROR_LOCK_VIOLATION:
raise exceptions.AlreadyLocked(
exceptions.LockException.LOCK_FAILED,
exc_value.strerror,
fh=file_,
) from exc_value
else:
# Q: Are there exceptions/codes we should be dealing with
# here?
raise
finally:
if savepos:
file_.seek(savepos)
def unlock(file_: typing.IO):
try:
savepos = file_.tell()
if savepos:
file_.seek(0)
os_fh = msvcrt.get_osfhandle(file_.fileno()) # type: ignore
try:
win32file.UnlockFileEx(
os_fh,
0,
-0x10000,
__overlapped,
)
except pywintypes.error as exc:
if exc.winerror != winerror.ERROR_NOT_LOCKED:
# Q: Are there exceptions/codes we should be
# dealing with here?
raise
finally:
if savepos:
file_.seek(savepos)
except OSError as exc:
raise exceptions.LockException(
exceptions.LockException.LOCK_FAILED,
exc.strerror,
fh=file_,
) from exc
elif os.name == 'posix': # pragma: no cover
import errno
import fcntl
# The locking implementation.
# Expected values are either fcntl.flock() or fcntl.lockf(),
# but any callable that matches the syntax will be accepted.
LOCKER = fcntl.flock
def lock(file_: typing.Union[typing.IO, int], flags: LockFlags):
# Locking with NON_BLOCKING without EXCLUSIVE or SHARED enabled results
# in an error
if (flags & LockFlags.NON_BLOCKING) and not flags & (
LockFlags.SHARED | LockFlags.EXCLUSIVE
):
raise RuntimeError(
'When locking in non-blocking mode the SHARED '
'or EXCLUSIVE flag must be specified as well',
)
try:
LOCKER(file_, flags)
except OSError as exc_value:
# Python can use one of several different exception classes to
# represent timeout (most likely is BlockingIOError and IOError),
# but these errors may also represent other failures. On some
# systems, `IOError is OSError` which means checking for either
# IOError or OSError can mask other errors.
# The safest check is to catch OSError (from which the others
# inherit) and check the errno (which should be EACCESS or EAGAIN
# according to the spec).
if exc_value.errno in (errno.EACCES, errno.EAGAIN):
# A timeout exception, wrap this so the outer code knows to try
# again (if it wants to).
raise exceptions.AlreadyLocked(
exc_value,
fh=file_,
) from exc_value
else:
# Something else went wrong; don't wrap this so we stop
# immediately.
raise exceptions.LockException(
exc_value,
fh=file_,
) from exc_value
except EOFError as exc_value:
# On NFS filesystems, flock can raise an EOFError
raise exceptions.LockException(
exc_value,
fh=file_,
) from exc_value
def unlock(file_: typing.IO):
LOCKER(file_.fileno(), LockFlags.UNBLOCK)
else: # pragma: no cover
raise RuntimeError('PortaLocker only defined for nt and posix platforms')