Skip to content

Commit

Permalink
Properly handle incoming peer url
Browse files Browse the repository at this point in the history
  • Loading branch information
KonradStaniec committed May 18, 2018
1 parent e77d4ef commit a306d97
Showing 1 changed file with 14 additions and 25 deletions.
39 changes: 14 additions & 25 deletions src/main/scala/io/iohk/ethereum/network/PeerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,39 +56,35 @@ class PeerActor[R <: HandshakeResult](

def waitingForInitialCommand: Receive = stashMessages orElse {
case HandleConnection(connection, remoteAddress) =>
val rlpxConnection = createRlpxConnection(remoteAddress, None, false)
val rlpxConnection = createRlpxConnection(remoteAddress, None)
rlpxConnection.ref ! RLPxConnectionHandler.HandleConnection(connection)
context become waitingForConnectionResult(rlpxConnection)

case ConnectTo(uri) =>
val rlpxConnection = createRlpxConnection(new InetSocketAddress(uri.getHost, uri.getPort), Some(uri), true)
val rlpxConnection = createRlpxConnection(new InetSocketAddress(uri.getHost, uri.getPort), Some(uri))
rlpxConnection.ref ! RLPxConnectionHandler.ConnectTo(uri)
context become waitingForConnectionResult(rlpxConnection)

case GetStatus => sender() ! StatusResponse(Idle)
}

def createRlpxConnection(remoteAddress: InetSocketAddress, uriOpt: Option[URI], isInitiator: Boolean): RLPxConnection = {
def createRlpxConnection(remoteAddress: InetSocketAddress, uriOpt: Option[URI]): RLPxConnection = {
val ref = rlpxConnectionFactory(context)
context watch ref
RLPxConnection(ref, remoteAddress, uriOpt, isInitiator)
RLPxConnection(ref, remoteAddress, uriOpt)
}
private def createUri(remoteNodeId: ByteString, rlpxConnection: RLPxConnection, maybeUri: Option[URI]): URI = {
private def modifyOutGoingUri(remoteNodeId: ByteString, rlpxConnection: RLPxConnection, uri: URI): URI = {
val host = getHostName(rlpxConnection.remoteAddress.getAddress)
val port = rlpxConnection.remoteAddress.getPort

maybeUri.fold(new URI(s"enode://${Hex.toHexString(remoteNodeId.toArray)}@$host:$port?discport=$port")){uri =>
// this is outgoing connection, so query should not be null
val query = Option(uri.getQuery).getOrElse(s"discport=$port")
new URI(s"enode://${Hex.toHexString(remoteNodeId.toArray)}@$host:$port?$query")
}
val query = Option(uri.getQuery).getOrElse(s"discport=$port")
new URI(s"enode://${Hex.toHexString(remoteNodeId.toArray)}@$host:$port?$query")
}

def waitingForConnectionResult(rlpxConnection: RLPxConnection, numRetries: Int = 0): Receive =
handleTerminated(rlpxConnection, numRetries) orElse stashMessages orElse {
case RLPxConnectionHandler.ConnectionEstablished(remoteNodeId) =>
val uri = createUri(remoteNodeId, rlpxConnection, rlpxConnection.uriOpt)
processHandshakerNextMessage(initHandshaker, rlpxConnection.copy(uriOpt = Some(uri)), numRetries)
val newUri = rlpxConnection.uriOpt.map(outGoingUri => modifyOutGoingUri(remoteNodeId, rlpxConnection, outGoingUri))
processHandshakerNextMessage(initHandshaker, rlpxConnection.copy(uriOpt = newUri), numRetries)

case RLPxConnectionHandler.ConnectionFailed =>
log.debug("Failed to establish RLPx connection")
Expand Down Expand Up @@ -150,12 +146,8 @@ class PeerActor[R <: HandshakeResult](
context become processingHandshaking(handshaker, rlpxConnection, newTimeout, numRetries)


//TODO EC-506 Do not create uri for incoming connection (there would be need to call rlpxConnection.isInitiator)
case Left(HandshakeSuccess(handshakeResult)) =>
rlpxConnection.uriOpt.foreach { uri =>
if (rlpxConnection.isInitiator)
knownNodesManager ! KnownNodesManager.AddKnownNode(uri)
}
rlpxConnection.uriOpt.foreach { uri =>knownNodesManager ! KnownNodesManager.AddKnownNode(uri) }
context become new HandshakedPeer(rlpxConnection, handshakeResult).receive
unstashAll()

Expand Down Expand Up @@ -190,13 +182,10 @@ class PeerActor[R <: HandshakeResult](
case Terminated(actor) if actor == rlpxConnection.ref =>
log.debug(s"Underlying rlpx connection with peer $peerId closed")
rlpxConnection.uriOpt match {
case Some(uri) if rlpxConnection.isInitiator && numRetries < peerConfiguration.connectMaxRetries =>
case Some(uri) if numRetries < peerConfiguration.connectMaxRetries =>
scheduleConnectRetry(uri, numRetries + 1)
case Some(uri) if rlpxConnection.isInitiator =>
context.parent ! PeerClosedConnection(peerAddress.getHostString, Disconnect.Reasons.Other)
knownNodesManager ! KnownNodesManager.RemoveKnownNode(uri)
context stop self
case Some(uri) =>
context.parent ! PeerClosedConnection(peerAddress.getHostString, Disconnect.Reasons.Other)
knownNodesManager ! KnownNodesManager.RemoveKnownNode(uri)
context stop self
case None =>
Expand All @@ -207,7 +196,7 @@ class PeerActor[R <: HandshakeResult](
def reconnect(uri: URI, numRetries: Int): Unit = {
log.debug("Trying to reconnect")
val address = new InetSocketAddress(uri.getHost, uri.getPort)
val newConnection = createRlpxConnection(address, Some(uri), true)
val newConnection = createRlpxConnection(address, Some(uri))
newConnection.ref ! RLPxConnectionHandler.ConnectTo(uri)
context become waitingForConnectionResult(newConnection, numRetries)
}
Expand Down Expand Up @@ -295,7 +284,7 @@ object PeerActor {
"rlpx-connection")
}

case class RLPxConnection(ref: ActorRef, remoteAddress: InetSocketAddress, uriOpt: Option[URI], isInitiator: Boolean) {
case class RLPxConnection(ref: ActorRef, remoteAddress: InetSocketAddress, uriOpt: Option[URI]) {
def sendMessage(message: MessageSerializable): Unit = {
ref ! RLPxConnectionHandler.SendMessage(message)
}
Expand Down

0 comments on commit a306d97

Please sign in to comment.