Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions rig/machine_control/scp_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import socket
import struct
import time
import select
from . import consts
from .packets import SCPPacket

Expand Down Expand Up @@ -165,7 +166,6 @@ def send_scp_burst(self, buffer_size, window_size,
"""
parameters_and_callbacks = iter(parameters_and_callbacks)

# Non-blocking and then the following is a busy loop.
self.sock.setblocking(False)

# Calculate the receive length, this should be the smallest power of
Expand All @@ -177,14 +177,14 @@ class TransmittedPacket(object):
"""A packet which has been transmitted and still awaits a response.
"""
__slots__ = ["callback", "packet", "n_tries",
"time_sent", "full_timeout"]
"timeout", "timeout_time"]

def __init__(self, callback, packet, extra_timeout):
def __init__(self, callback, packet, timeout):
self.callback = callback
self.packet = packet
self.full_timeout = extra_timeout
self.n_tries = 1
self.time_sent = time.time()
self.timeout = timeout
self.timeout_time = time.time() + self.timeout

queued_packets = True
outstanding_packets = {}
Expand All @@ -195,7 +195,7 @@ def __init__(self, callback, packet, extra_timeout):
# If there are fewer outstanding packets than the window can take
# and we still might have packets left to send then transmit a
# packet and add it to the list of outstanding packets.
if len(outstanding_packets) < window_size and queued_packets:
while len(outstanding_packets) < window_size and queued_packets:
try:
args = next(parameters_and_callbacks)
except StopIteration:
Expand Down Expand Up @@ -233,17 +233,28 @@ def __init__(self, callback, packet, extra_timeout):
# Actually send the packet
self.sock.send(outstanding_packets[seq].packet)

# Listen on the socket for an acknowledgement packet, there not be
# one.
try:
ack = self.sock.recv(receive_length)
except IOError:
# There wasn't a returned packet, we may spend quite some time
# here.
ack = None

# Process the received packet (if there is one)
if ack is not None:
# Listen on the socket for an acknowledgement packet, there may not
# be one.
if outstanding_packets:
timeout = min((o.timeout_time for o in
six.itervalues(outstanding_packets)
)) - time.time()
else:
timeout = 0.0
r, w, x = select.select([self.sock], [], [], max(timeout, 0.0))
# Process the received packet (if there is one).
while r:
# Note that 'r' is never changed so this while loop will either
# get skipped if r is empty or loop until 'break' of r is not.
# Since we may receive multiple packets at once, it is better
# to try and pull all out of the socket immediately rather than
# running around the parent loop again and incuring the
# 'select' cost.
try:
ack = self.sock.recv(receive_length)
except IOError:
break

# Extract the sequence number from the bytestring, iff possible
rc, seq = struct.unpack_from("<2H", ack,
consts.SDP_HEADER_LENGTH + 2)
Expand All @@ -258,7 +269,7 @@ def __init__(self, callback, packet, extra_timeout):
elif rc in self.error_codes:
raise self.error_codes[rc]
else:
raise Exception(
raise SCPError(
"Unhandled exception code {:#2x}".format(rc)
)
else:
Expand All @@ -277,9 +288,8 @@ def __init__(self, callback, packet, extra_timeout):
# Look through all the remaining outstanding packets, if any of
# them have timed out then we retransmit them.
current_time = time.time()
for seq, outstanding in six.iteritems(outstanding_packets):
if (current_time - outstanding.time_sent >
outstanding.full_timeout):
for outstanding in six.itervalues(outstanding_packets):
if outstanding.timeout_time < current_time:
# This packet has timed out, if we have sent it more than
# the given number of times then raise a timeout error for
# it.
Expand All @@ -289,7 +299,8 @@ def __init__(self, callback, packet, extra_timeout):
# Otherwise we retransmit it
self.sock.send(outstanding.packet)
outstanding.n_tries += 1
outstanding.time_sent = current_time
outstanding.timeout_time = (current_time +
outstanding.timeout)

def read(self, buffer_size, window_size, x, y, p, address, length_bytes):
"""Read a bytestring from an address in memory.
Expand Down
Loading