Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions mypy/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,24 @@ class IPCBase:
connection = None # type: _IPCHandle

def __init__(self, name: str, timeout: Optional[float]) -> None:
self.READ_SIZE = 100000
self.name = name
self.timeout = timeout

def read(self) -> bytes:
def read(self, size: int = 100000) -> bytes:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is worth putting in the interface, since it doesn't modify any visible behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just felt it was more fitting to put it in the function scope since it is only used for reads. To me it makes the code easier to understand (one less instance attribute to think about in the other methods).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, sure

"""Read bytes from an IPC connection until its empty."""
bdata = bytearray()
if sys.platform == 'win32':
while True:
ov, err = _winapi.ReadFile(self.connection, self.READ_SIZE, overlapped=True)
ov, err = _winapi.ReadFile(self.connection, size, overlapped=True)
# TODO: remove once typeshed supports Literal types
assert isinstance(ov, _winapi.Overlapped)
assert isinstance(err, int)
try:
if err != 0:
assert err == _winapi.ERROR_IO_PENDING
if err == _winapi.ERROR_IO_PENDING:
timeout = int(self.timeout * 1000) if self.timeout else _winapi.INFINITE
res = _winapi.WaitForSingleObject(ov.event, timeout)
assert res == _winapi.WAIT_OBJECT_0
if res != _winapi.WAIT_OBJECT_0:
raise IPCException("Bad result from I/O wait: {}".format(res))
except BaseException:
ov.cancel()
raise
Expand All @@ -77,11 +76,14 @@ def read(self) -> bytes:
if err == 0:
# we are done!
break
elif err == _winapi.ERROR_MORE_DATA:
# read again
continue
elif err == _winapi.ERROR_OPERATION_ABORTED:
raise IPCException("ReadFile operation aborted.")
else:
while True:
more = self.connection.recv(self.READ_SIZE)
more = self.connection.recv(size)
if not more:
break
bdata.extend(more)
Expand All @@ -96,16 +98,18 @@ def write(self, data: bytes) -> None:
assert isinstance(ov, _winapi.Overlapped)
assert isinstance(err, int)
try:
if err != 0:
assert err == _winapi.ERROR_IO_PENDING
if err == _winapi.ERROR_IO_PENDING:
timeout = int(self.timeout * 1000) if self.timeout else _winapi.INFINITE
res = _winapi.WaitForSingleObject(ov.event, timeout)
assert res == _winapi.WAIT_OBJECT_0
if res != _winapi.WAIT_OBJECT_0:
raise IPCException("Bad result from I/O wait: {}".format(res))
elif err != 0:
raise IPCException("Failed writing to pipe with error: {}".format(err))
except BaseException:
ov.cancel()
raise
bytes_written, err = ov.GetOverlappedResult(True)
assert err == 0
assert err == 0, err
assert bytes_written == len(data)
except WindowsError as e:
raise IPCException("Failed to write with error: {}".format(e.winerror))
Expand Down
2 changes: 1 addition & 1 deletion mypy/test/testipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def server(msg: str, q: 'Queue[str]') -> None:
class IPCTests(TestCase):
def test_transaction_large(self) -> None:
queue = Queue() # type: Queue[str]
msg = 't' * 100001 # longer than the max read size of 100_000
msg = 't' * 200000 # longer than the max read size of 100_000
p = Process(target=server, args=(msg, queue), daemon=True)
p.start()
connection_name = queue.get()
Expand Down