Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The transport layer should not know about events #2420

Merged
merged 1 commit into from Sep 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 5 additions & 26 deletions raiden/network/transport/matrix.py
Expand Up @@ -22,7 +22,6 @@
from matrix_client.errors import MatrixError, MatrixRequestError
from matrix_client.user import User

from raiden import messages
from raiden.constants import ID_TO_NETWORKNAME
from raiden.exceptions import (
InvalidAddress,
Expand All @@ -46,8 +45,7 @@
from raiden.network.utils import get_http_rtt
from raiden.raiden_service import RaidenService
from raiden.storage.serialize import JSONSerializer
from raiden.transfer import events as transfer_events, views
from raiden.transfer.mediated_transfer import events as mediated_transfer_events
from raiden.transfer import views
from raiden.transfer.queue_identifier import QueueIdentifier
from raiden.transfer.state import (
NODE_NETWORK_REACHABLE,
Expand Down Expand Up @@ -202,7 +200,7 @@ def start(
self.greenlets = [self._client.sync_thread]

self._client.set_presence_state(UserPresence.ONLINE.value)
self._send_queued_messages() # uses property instead of initial_queues
self._send_queued_messages(initial_queues)

self.log.info('TRANSPORT STARTED', config=self._config)

Expand Down Expand Up @@ -674,12 +672,9 @@ def send_delivered_for(message: SignedMessage):
self.log.warning('Exception while processing message', exc_info=True)
return

def _send_queued_messages(self):
for queue_identifier, events in self._queueids_to_queues.items():
node_address = self._raiden_service.address
for event in events:
message = _event_to_message(event, node_address)
self._raiden_service.sign(message)
def _send_queued_messages(self, queueids_to_queues):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to adapt all uses of _queueids_to_queues to iterate over Messages instead of SendMessageEvents, or possibly just plain in test:

  • _receive_delivered
  • _send_with_retry
  • send_async

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow, what are you referring to?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods I enumerated above use _queueids_to_queues as the values being List[SendMessageEvent], specifically iterating over the events and comparing the message_identifier of the message with the attribute of the same name of the event. If now the QueueIdsToQueues is an alias for Dict[QueueIdentifier, List[Message]], you need to update the type alias and ensure these places where this mapping is used comply with this new signature. If the Message instance passed to transport is the same inside the various versions/instances of this map, or all Message instances implements correctly equality comparison, you may want to optimize these lookups by using something like message in self._queueids_to_queues[queue_identifier], instead of the any iterator.

for queue_identifier, messages in queueids_to_queues.items():
for message in messages:
self.start_health_check(queue_identifier.recipient)
self.send_async(queue_identifier, message)

Expand Down Expand Up @@ -1069,19 +1064,3 @@ def _get_room_id_for_address(self, address: Address) -> Optional[str]:
self._set_room_id_for_address(address, None)
return None
return room_id


def _event_to_message(event, node_address):
eventtypes_to_messagetype = {
mediated_transfer_events.SendBalanceProof: messages.Secret,
mediated_transfer_events.SendLockedTransfer: messages.LockedTransfer,
mediated_transfer_events.SendRefundTransfer: messages.RefundTransfer,
mediated_transfer_events.SendRevealSecret: messages.RevealSecret,
mediated_transfer_events.SendSecretRequest: messages.SecretRequest,
transfer_events.SendDirectTransfer: messages.DirectTransfer,
transfer_events.SendProcessed: messages.Processed,
}
message_class = eventtypes_to_messagetype.get(type(event))
if message_class is None:
raise TypeError(f'Event type {type(event)} is not supported.')
return message_class.from_event(event)
18 changes: 7 additions & 11 deletions raiden/network/transport/udp/udp_transport.py
Expand Up @@ -14,8 +14,9 @@
RaidenShuttingDown,
UnknownAddress,
)

from raiden.message_handler import on_message
from raiden.messages import Delivered, Message, Ping, Pong, decode, message_from_sendevent
from raiden.messages import Delivered, Message, Ping, Pong, decode
from raiden.network.transport.udp import healthcheck
from raiden.network.transport.udp.udp_utils import (
event_first_of,
Expand Down Expand Up @@ -207,7 +208,7 @@ def __init__(self, discovery, udpsocket, throttle_policy, config):
def start(
self,
raiden: RaidenService,
queueids_to_queues: typing.Dict[QueueIdentifier, typing.List[Event]],
queueids_to_queues: typing.Dict[QueueIdentifier, typing.List[Message]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use QueueIdsToQueues here, and change the type there

):
if not self.event_stop.ready():
raise RuntimeError('UDPTransport started while running')
Expand All @@ -221,15 +222,10 @@ def start(
self.server.set_handle(self._receive)

for queue_identifier, queue in queueids_to_queues.items():
encoded_queue = list()

for sendevent in queue:
message = message_from_sendevent(sendevent, raiden.address)
raiden.sign(message)
encoded = message.encode()

encoded_queue.append((encoded, sendevent.message_identifier))

encoded_queue = [
(message.encode(), message.message_identifier)
for message in queue
]
self.init_queue_for(queue_identifier, encoded_queue)

self.server.start()
Expand Down
27 changes: 20 additions & 7 deletions raiden/raiden_service.py
Expand Up @@ -16,7 +16,8 @@
from raiden.connection_manager import ConnectionManager
from raiden.constants import SNAPSHOT_STATE_CHANGES_COUNT
from raiden.exceptions import InvalidAddress, RaidenRecoverableError, RaidenShuttingDown
from raiden.messages import LockedTransfer, SignedMessage
from raiden.messages import LockedTransfer, SignedMessage, message_from_sendevent

from raiden.network.blockchain_service import BlockChainService
from raiden.network.proxies import SecretRegistry, TokenNetworkRegistry
from raiden.storage import serialize, sqlite, wal
Expand Down Expand Up @@ -314,12 +315,24 @@ def start(self):

self.alarm.start()

queueids_to_queues = views.get_all_messagequeues(chain_state)
# repopulate identifier_to_results for pending transfers
for queue_messages in queueids_to_queues.values():
for message in queue_messages:
if isinstance(message, SendDirectTransfer):
self.identifier_to_results[message.payment_identifier] = AsyncResult()
events_queues = views.get_all_messagequeues(chain_state)
queueids_to_queues = dict()

for queue_identifier, event_queue in events_queues.items():

# repopulate identifier_to_results for pending transfers
for event in event_queue:
if type(event) == SendDirectTransfer:
self.identifier_to_results[event.payment_identifier] = AsyncResult()

message_queue = []
for event in event_queue:
message = message_from_sendevent(event, self.address)
self.sign(message)
message_queue.append(message)

queueids_to_queues[queue_identifier] = message_queue

self.transport.start(self, queueids_to_queues)

# exceptions on these subtasks should crash the app and bubble up
Expand Down