Skip to content

Commit

Permalink
Add more tests stressing conccurent reading and writing on utp socket (
Browse files Browse the repository at this point in the history
…#474)

* Add more tests stressing concurrent reading and writing

* Fix bug when remote window dropped below packet size
  • Loading branch information
KonradStaniec committed Feb 10, 2022
1 parent 05ef9a8 commit 779d767
Show file tree
Hide file tree
Showing 4 changed files with 320 additions and 66 deletions.
10 changes: 9 additions & 1 deletion eth/utp/utp_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type
transport: DatagramTransport
utpRouter: UtpRouter[TransportAddress]

SendCallbackBuilder* = proc (d: DatagramTransport): SendCallback[TransportAddress] {.gcsafe, raises: [Defect].}

# This should probably be defined in TransportAddress module, as hash function should
# be consitent with equality function
# in nim zero arrays always have hash equal to 0, irrespectively of array size, to
Expand Down Expand Up @@ -78,6 +80,7 @@ proc new*(
address: TransportAddress,
socketConfig: SocketConfig = SocketConfig.init(),
allowConnectionCb: AllowConnectionCallback[TransportAddress] = nil,
sendCallbackBuilder: SendCallbackBuilder = nil,
rng = newRng()): UtpProtocol {.raises: [Defect, CatchableError].} =

doAssert(not(isNil(acceptConnectionCb)))
Expand All @@ -90,7 +93,12 @@ proc new*(
)

let ta = newDatagramTransport(processDatagram, udata = router, local = address)
router.sendCb = initSendCallback(ta)

if (sendCallbackBuilder == nil):
router.sendCb = initSendCallback(ta)
else:
router.sendCb = sendCallbackBuilder(ta)

UtpProtocol(transport: ta, utpRouter: router)

proc shutdownWait*(p: UtpProtocol): Future[void] {.async.} =
Expand Down
142 changes: 78 additions & 64 deletions eth/utp/utp_socket.nim
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ type

writeLoop: Future[void]

zeroWindowTimer: Moment
# timer which is started when peer max window drops below current packet size
zeroWindowTimer: Option[Moment]

# last measured delay between current local timestamp, and remote sent
# timestamp. In microseconds
Expand Down Expand Up @@ -287,7 +288,8 @@ const
allowedAckWindow*: uint16 = 3

# Timeout after which the send window will be reset to its minimal value after it dropped
# to zero. i.e when we received a packet from remote peer with `wndSize` set to 0.
# lower than our current packet size. i.e when we received a packet
# from remote peer with `wndSize` set to number <= current packet size
defaultResetWindowTimeout = seconds(15)

# If remote peer window drops to zero, then after some time we will reset it
Expand Down Expand Up @@ -446,10 +448,15 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
await socket.flushPackets()

if socket.isOpened():
let currentPacketSize = uint32(socket.getPacketSize())

if (socket.sendBufferTracker.maxRemoteWindow == 0 and currentTime > socket.zeroWindowTimer):
debug "Reset remote window to minimal value"
socket.sendBufferTracker.updateMaxRemote(minimalRemoteWindow)
if (socket.zeroWindowTimer.isSome() and currentTime > socket.zeroWindowTimer.unsafeGet()):
if socket.sendBufferTracker.maxRemoteWindow <= currentPacketSize:
socket.sendBufferTracker.updateMaxRemote(minimalRemoteWindow)
socket.zeroWindowTimer = none[Moment]()
debug "Reset remote window to minimal value",
minRemote = minimalRemoteWindow


if (currentTime > socket.rtoTimeout):
debug "CheckTimeouts rto timeout",
Expand Down Expand Up @@ -487,7 +494,7 @@ proc checkTimeouts(socket: UtpSocket) {.async.} =
# on timeout reset duplicate ack counter
socket.duplicateAck = 0

let currentPacketSize = uint32(socket.getPacketSize())


if (socket.curWindowPackets == 0 and socket.sendBufferTracker.maxWindow > currentPacketSize):
# there are no packets in flight even though there is place for more than whole packet
Expand Down Expand Up @@ -566,57 +573,59 @@ proc resetSendTimeout(socket: UtpSocket) =
socket.rtoTimeout = getMonoTimestamp().moment + socket.retransmitTimeout

proc handleDataWrite(socket: UtpSocket, data: seq[byte], writeFut: Future[WriteResult]): Future[void] {.async.} =
if writeFut.finished():
# write future was cancelled befere we got chance to process it, short circuit
# processing and move to next loop iteration
return
if writeFut.finished():
# write future was cancelled befere we got chance to process it, short circuit
# processing and move to next loop iteration
return

let pSize = socket.getPacketSize()
let endIndex = data.high()
var i = 0
var bytesWritten = 0

while i <= endIndex:
let lastIndex = i + pSize - 1
let lastOrEnd = min(lastIndex, endIndex)
let dataSlice = data[i..lastOrEnd]
let payloadLength = uint32(len(dataSlice))
try:
await socket.sendBufferTracker.reserveNBytesWait(payloadLength)

if socket.curWindowPackets == 0:
socket.resetSendTimeout()

let pSize = socket.getPacketSize()
let endIndex = data.high()
var i = 0
var bytesWritten = 0
let wndSize = socket.getRcvWindowSize()

while i <= endIndex:
let lastIndex = i + pSize - 1
let lastOrEnd = min(lastIndex, endIndex)
let dataSlice = data[i..lastOrEnd]
let payloadLength = uint32(len(dataSlice))
try:
await socket.sendBufferTracker.reserveNBytesWait(payloadLength)
if socket.curWindowPackets == 0:
socket.resetSendTimeout()

let dataPacket =
dataPacket(
socket.seqNr,
socket.connectionIdSnd,
socket.ackNr,
wndSize,
dataSlice,
socket.replayMicro
)
let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 1, false, payloadLength)
socket.registerOutgoingPacket(outgoingPacket)
await socket.sendData(outgoingPacket.packetBytes)
except CancelledError as exc:
# write loop has been cancelled in the middle of processing due to the
# socket closing
# this approach can create partial write in case destroyin socket in the
# the middle of the write
doAssert(socket.state == Destroy)
if (not writeFut.finished()):
let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
writeFut.complete(res)
# we need to re-raise exception so the outer loop will be properly cancelled too
raise exc
bytesWritten = bytesWritten + len(dataSlice)
i = lastOrEnd + 1

# Before completeing future with success (as all data was sent sucessfuly)
# we need to check if user did not cancel write on his end
let dataPacket =
dataPacket(
socket.seqNr,
socket.connectionIdSnd,
socket.ackNr,
wndSize,
dataSlice,
socket.replayMicro
)
let outgoingPacket = OutgoingPacket.init(encodePacket(dataPacket), 1, false, payloadLength)
socket.registerOutgoingPacket(outgoingPacket)
await socket.sendData(outgoingPacket.packetBytes)
except CancelledError as exc:
# write loop has been cancelled in the middle of processing due to the
# socket closing
# this approach can create partial write in when destroying the socket in the
# the middle of the write
doAssert(socket.state == Destroy)
if (not writeFut.finished()):
writeFut.complete(Result[int, WriteError].ok(bytesWritten))
let res = Result[int, WriteError].err(WriteError(kind: SocketNotWriteable, currentState: socket.state))
writeFut.complete(res)
# we need to re-raise exception so the outer loop will be properly cancelled too
raise exc
bytesWritten = bytesWritten + len(dataSlice)
i = lastOrEnd + 1

# Before completing the future with success (as all data was sent successfully)
# we need to check if user did not cancel write on his end
if (not writeFut.finished()):
writeFut.complete(Result[int, WriteError].ok(bytesWritten))

proc handleClose(socket: UtpSocket): Future[void] {.async.} =
try:
Expand Down Expand Up @@ -706,7 +715,7 @@ proc new[A](
sendBufferTracker: SendBufferTracker.new(0, 1024 * 1024, cfg.optSndBuffer, startMaxWindow),
# queue with infinite size
writeQueue: newAsyncQueue[WriteRequest](),
zeroWindowTimer: currentTime + cfg.remoteWindowResetTimeout,
zeroWindowTimer: none[Moment](),
socketKey: UtpSocketKey.init(to, rcvId),
slowStart: true,
fastTimeout: false,
Expand Down Expand Up @@ -1131,11 +1140,13 @@ proc generateAckPacket*(socket: UtpSocket): Packet =
else:
none[array[4, byte]]()

let bufferSize = socket.getRcvWindowSize()

ackPacket(
socket.seqNr,
socket.connectionIdSnd,
socket.ackNr,
socket.getRcvWindowSize(),
bufferSize,
socket.replayMicro,
bitmask
)
Expand Down Expand Up @@ -1175,7 +1186,8 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
seqNr = p.header.seqNr,
ackNr = p.header.ackNr,
timestamp = p.header.timestamp,
timestampDiff = p.header.timestampDiff
timestampDiff = p.header.timestampDiff,
remoteWindow = p.header.wndSize

let timestampInfo = getMonoTimestamp()

Expand Down Expand Up @@ -1255,7 +1267,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
let isPossibleDuplicatedOldPacket = pastExpected >= (int(uint16.high) + 1) - reorderBufferMaxSize

if (isPossibleDuplicatedOldPacket and p.header.pType != ST_STATE):
asyncSpawn socket.sendAck()
discard socket.sendAck()

debug "Got an invalid packet sequence number, too far off",
pastExpected = pastExpected
Expand Down Expand Up @@ -1311,13 +1323,14 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
let diff = uint32((socket.ourHistogram.getValue() - minRtt).microseconds())
socket.ourHistogram.shift(diff)

let currentPacketSize = uint32(socket.getPacketSize())
let (newMaxWindow, newSlowStartTreshold, newSlowStart) =
applyCongestionControl(
socket.sendBufferTracker.maxWindow,
socket.slowStart,
socket.slowStartTreshold,
socket.socketConfig.optSndBuffer,
uint32(socket.getPacketSize()),
currentPacketSize,
microseconds(actualDelay),
ackedBytes,
minRtt,
Expand All @@ -1336,14 +1349,15 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
slowStartTreshold = newSlowStartTreshold,
slowstart = newSlowStart

if (socket.sendBufferTracker.maxRemoteWindow == 0):
if (socket.zeroWindowTimer.isNone() and socket.sendBufferTracker.maxRemoteWindow <= currentPacketSize):
# when zeroWindowTimer will be hit and maxRemoteWindow still will be equal to 0
# then it will be reset to minimal value
socket.zeroWindowTimer = timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout
socket.zeroWindowTimer = some(timestampInfo.moment + socket.socketConfig.remoteWindowResetTimeout)

debug "Remote window size dropped to 0",
debug "Remote window size dropped below packet size",
currentTime = timestampInfo.moment,
resetZeroWindowTime = socket.zeroWindowTimer
resetZeroWindowTime = socket.zeroWindowTimer,
currentPacketSize = currentPacketSize

# socket.curWindowPackets == acks means that this packet acked all remaining packets
# including the sent fin packets
Expand Down Expand Up @@ -1488,7 +1502,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
# need improvement, as with this approach there is no direct control over
# how many concurrent tasks there are and how to cancel them when socket
# is closed
asyncSpawn socket.sendAck()
discard socket.sendAck()

# we got packet out of order
else:
Expand All @@ -1515,7 +1529,7 @@ proc processPacket*(socket: UtpSocket, p: Packet) {.async.} =
reorderCount = socket.reorderCount
# we send ack packet, as we reoreder count is > 0, so the eack bitmask will be
# generated
asyncSpawn socket.sendAck()
discard socket.sendAck()

of ST_STATE:
if (socket.state == SynSent and (not socket.connectionFuture.finished())):
Expand Down
3 changes: 2 additions & 1 deletion tests/utp/all_utp_tests.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ import
./test_utp_socket,
./test_utp_socket_sack,
./test_utp_router,
./test_clock_drift_calculator
./test_clock_drift_calculator,
./test_protocol_integration
Loading

0 comments on commit 779d767

Please sign in to comment.