Skip to content

Commit

Permalink
ETCM-196: Allow clock drift in expiry checks.
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed Oct 15, 2020
1 parent 04d3469 commit 7949ddc
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 8 deletions.
Expand Up @@ -5,6 +5,10 @@ import scala.concurrent.duration._
case class DiscoveryConfig(
// How long in the future to set message expiration.
messageExpiration: FiniteDuration,
// Allow incoming messages to be expired by this amount, accounting for the fact
// the the senders clock might run late (or ours is early) and may have sent the
// expiry to what already seems like the past.
maxClockDrift: FiniteDuration,
// Timeout for individual requests.
requestTimeout: FiniteDuration,
// Timeout for collecting multiple potential Neighbors responses.
Expand All @@ -16,6 +20,7 @@ case class DiscoveryConfig(
object DiscoveryConfig {
val default = DiscoveryConfig(
messageExpiration = 60.seconds,
maxClockDrift = Duration.Zero,
requestTimeout = 3.seconds,
kademliaTimeout = 7.seconds,
kademliaBucketSize = 16
Expand Down
Expand Up @@ -50,11 +50,12 @@ object DiscoveryNetwork {
import Payload._

private val expirationMillis = config.messageExpiration.toMillis

private val maxClockDriftMillis = config.maxClockDrift.toMillis
private val currentTimeMillis = clock.realTime(MILLISECONDS)

private val maxNeighborsPerPacket = getMaxNeighborsPerPacket

// This is only sent in Ping packets and is basically ignored by nodes.
private val localNodeAddress = toNodeAddress(peerGroup.processAddress)

/** Start a fiber that accepts incoming channels and starts a dedicated fiber
Expand Down Expand Up @@ -108,7 +109,7 @@ object DiscoveryNetwork {
// Not relevant on the server channel.
Task.unit

case p: Payload.HasExpiration[_] if p.isExpired(timestamp) =>
case p: Payload.HasExpiration[_] if isExpired(p, timestamp) =>
Task(logger.debug(s"Ignoring expired request from ${channel.to}"))

case p: Payload.Request =>
Expand Down Expand Up @@ -184,6 +185,7 @@ object DiscoveryNetwork {
Task(logger.error(s"Error handling incoming request: $ex"))
}

/** Serialize the payload to binary and sign the packet. */
private def pack(payload: Payload): Task[Packet] =
Packet
.pack(payload, privateKey)
Expand All @@ -192,6 +194,7 @@ object DiscoveryNetwork {
packet => Task.pure(packet)
)

/** Set a future expiration time on the payload. */
private def setExpiration(payload: Payload): Task[Payload] = {
payload match {
case p: Payload.HasExpiration[_] =>
Expand All @@ -201,6 +204,18 @@ object DiscoveryNetwork {
}
}

/** Check whether an incoming packet is expired. According to the spec anyting with
* an absolute expiration timestamp in the past is expired, however it's a known
* issue that clock drift among nodes leads to dropped messages. Therefore we have
* the option to set an acceptable leeway period as well.
*
* For example if another node sets the expiration of its message 1 minute in the future,
* but our clock is 90 seconds ahead of time, we already see it as expired. Setting
* our expiration time to 1 hour wouldn't help in this case.
*/
private def isExpired(payload: HasExpiration[_], now: Long): Boolean =
payload.expiration < now - maxClockDriftMillis

override val ping = (to: A) =>
(localEnrSeq: Option[ENRSeq]) =>
peerGroup.client(to).use { channel =>
Expand Down Expand Up @@ -276,7 +291,7 @@ object DiscoveryNetwork {
// Not relevant on the client channel.
Task.pure(None)

case p: Payload.HasExpiration[_] if p.isExpired(timestamp) =>
case p: Payload.HasExpiration[_] if isExpired(p, timestamp) =>
Task(logger.debug(s"Ignoring expired response from ${channel.to}")).as(None)

case p: Payload.Response =>
Expand Down
Expand Up @@ -20,9 +20,7 @@ object Payload {
trait HasExpiration[T <: Payload] {
// Absolute UNIX timestamp.
def expiration: Long
def withExpiration(e: Long): T
def isExpired(now: Long): Boolean =
expiration < now
def withExpiration(at: Long): T
}

case class Ping(
Expand Down
Expand Up @@ -587,7 +587,7 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers {

override val test = for {
_ <- network.startHandling(
handleWithNone.withEffect {
handleWithSome.withEffect {
Task { called = true }
}
)
Expand All @@ -605,6 +605,28 @@ class DiscoveryNetworkSpec extends AsyncFlatSpec with Matchers {
}
}

it should s"not respond to $rpc if the request is expired but within the clock drift" in test {
new GenericRPCFixture {

override lazy val config = defaultConfig.copy(
maxClockDrift = 15.seconds
)

override val test = for {
_ <- network.startHandling(handleWithSome)
channel <- peerGroup.createServerChannel(from = remoteAddress)
(request: Payload) = requestMap(rpc) match {
case p: Payload.HasExpiration[_] => p.withExpiration(System.currentTimeMillis - 5000)
case p => p
}
_ <- channel.sendPayloadToSUT(request, remotePrivateKey)
msg <- channel.nextMessageFromSUT()
} yield {
msg should not be empty
}
}
}

it should s"forward the caller to the $rpc handler" in test {
new GenericRPCFixture {
def assertCaller(caller: (PublicKey, InetSocketAddress)) = Task {
Expand Down Expand Up @@ -743,7 +765,8 @@ object DiscoveryNetworkSpec extends Matchers {
requestTimeout = 100.millis,
messageExpiration = 60.seconds,
kademliaTimeout = 250.millis,
kademliaBucketSize = 16
kademliaBucketSize = 16,
maxClockDrift = Duration.Zero
)

trait Fixture {
Expand Down

0 comments on commit 7949ddc

Please sign in to comment.