Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rpc message order #311

Merged
merged 2 commits into from
Mar 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions substrateinterface/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@
logger = logging.getLogger(__name__)


def list_remove_iter(xs: list):
removed = False
def remove():
nonlocal removed
removed = True

i = 0
while i < len(xs):
removed = False
yield xs[i], remove
if removed:
xs.pop(i)
else:
i += 1


class KeypairType:
ED25519 = 0
SR25519 = 1
Expand Down Expand Up @@ -743,16 +759,13 @@ def rpc_request(self, method, params, result_handler=None):
subscription_id = None

while json_body is None:

self.__rpc_message_queue.append(json.loads(self.websocket.recv()))

# Search for subscriptions
for message in self.__rpc_message_queue:
for message, remove_message in list_remove_iter(self.__rpc_message_queue):

# Check if result message is matching request ID
if 'id' in message and message['id'] == request_id:

self.__rpc_message_queue.remove(message)
remove_message()

# Check if response has error
if 'error' in message:
Expand All @@ -769,11 +782,11 @@ def rpc_request(self, method, params, result_handler=None):
json_body = message

# Process subscription updates
for message in self.__rpc_message_queue:
for message, remove_message in list_remove_iter(self.__rpc_message_queue):
# Check if message is meant for this subscription
if 'params' in message and message['params']['subscription'] == subscription_id:

self.__rpc_message_queue.remove(message)
remove_message()

self.debug_message(f"Websocket result [{subscription_id} #{update_nr}]: {message}")

Expand All @@ -784,6 +797,10 @@ def rpc_request(self, method, params, result_handler=None):

update_nr += 1

# Read one more message to queue
if json_body is None:
self.__rpc_message_queue.append(json.loads(self.websocket.recv()))

else:

if result_handler:
Expand Down