Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 34 additions & 31 deletions lib/mysql/connector/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __init__(self):
self._compressed_packet_number = -1
self._packet_queue = deque()
self.recvsize = 8192
self._resi_bytes = None

@property
def next_packet_number(self):
Expand Down Expand Up @@ -316,13 +317,22 @@ def recv_py26_plain(self):

def _split_zipped_payload(self, packet_bunch):
"""Split compressed payload"""
if self._resi_bytes is not None:
packet_bunch = self._resi_bytes + packet_bunch
self._resi_bytes = None
while packet_bunch:
if len(packet_bunch) < 3:
self._resi_bytes = packet_bunch
return
if PY2:
payload_length = struct.unpack_from(
"<I",
packet_bunch[0:3] + b'\x00')[0] # pylint: disable=E0602
else:
payload_length = struct.unpack("<I", packet_bunch[0:3] + b'\x00')[0]
if payload_length > len(packet_bunch) - 4:
self._resi_bytes = packet_bunch
return

self._packet_queue.append(packet_bunch[0:payload_length + 4])
packet_bunch = packet_bunch[payload_length + 4:]
Expand All @@ -336,14 +346,15 @@ def recv_compressed(self):
except IndexError:
pass

header = bytearray(b'')
packets = []
try:
abyte = self.sock.recv(1)
while abyte and len(header) < 7:
header += abyte
while True:
header = bytearray(b'')
packets = []
try:
abyte = self.sock.recv(1)
while header:
while abyte and len(header) < 7:
header += abyte
abyte = self.sock.recv(1)

if len(header) < 7:
raise errors.InterfaceError(errno=2013)

Expand Down Expand Up @@ -372,32 +383,24 @@ def recv_compressed(self):

packets.append((payload_length, zip_payload))

if zip_payload_length <= 16384:
# We received the full compressed packet
break

# Get next compressed packet
header = init_bytearray(b'')
abyte = self.sock.recv(1)
while abyte and len(header) < 7:
header += abyte
abyte = self.sock.recv(1)
except IOError as err:
raise errors.OperationalError(
errno=2055, values=(self.get_address(), _strioerror(err)))

except IOError as err:
raise errors.OperationalError(
errno=2055, values=(self.get_address(), _strioerror(err)))
# Compressed packet can contain more than 1 MySQL packets
# We decompress and make one so we can split it up
tmp = init_bytearray(b'')
for payload_length, payload in packets:
# payload_length can not be 0; this was previously handled
if PY2:
tmp += zlib.decompress(buffer(payload)) # pylint: disable=E0602
else:
tmp += zlib.decompress(payload)
self._split_zipped_payload(tmp)
del tmp

# Compressed packet can contain more than 1 MySQL packets
# We decompress and make one so we can split it up
tmp = init_bytearray(b'')
for payload_length, payload in packets:
# payload_length can not be 0; this was previously handled
if PY2:
tmp += zlib.decompress(buffer(payload)) # pylint: disable=E0602
else:
tmp += zlib.decompress(payload)
self._split_zipped_payload(tmp)
del tmp
if len(self._packet_queue) != 0:
break

try:
pkt = self._packet_queue.popleft()
Expand Down