Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

You are using a asynchronous protocol as a synchronous protocol #82

Closed
unconst opened this issue Feb 19, 2021 · 10 comments
Closed

You are using a asynchronous protocol as a synchronous protocol #82

unconst opened this issue Feb 19, 2021 · 10 comments

Comments

@unconst
Copy link

unconst commented Feb 19, 2021

Note, the web-socket responses are not in order. This is not being checked in the def rpc_request

For example here are the logs on the json responses from my chain:

websocket send {'jsonrpc': '2.0', 'method': 'chain_getRuntimeVersion', 'params': [None], 'id': 476}
websocket recv {'jsonrpc': '2.0', 'result': {'digest': {'logs': ['0x0661757261202f04081000000000', '0x0561757261010118d950da221b075187a225bacd8213239e3b6272ae2645651741e54a120f0b6c51e9a09927dda327a09fffe545b79d4cff45ba5836c974fc92519365554e1582']}, 'extrinsicsRoot': '0x9a080ec74c7c8dbead85bdab1806d6ec4024a54fa144f4611fb0b15e096e7a16', 'number': '0x1f4a4', 'parentHash': '0xb7b4f1c3af4002229e41a909d674cef33fe7ef21ceb09a07d3adcf73efdbef26', 'stateRoot': '0x41a4734bcc0cce98bf14ac522ff71dc4b6146c75674767c1fb8a167aa05ea07c'}, 'id': 475}

@unconst
Copy link
Author

unconst commented Feb 19, 2021

Note the id numbers here

@unconst
Copy link
Author

unconst commented Feb 19, 2021

This can be bandaid fixed by adding the following line
while True:
json_body = json.loads(self.websocket.recv())
if 'id' not in json_body:
continue
if json_body['id'] != self.request_id:
continue
else:
break

However, the protocol is no longer asynchronous, you can't run the queries in parallel.

@arjanz
Copy link
Member

arjanz commented Feb 22, 2021

When we made this library we intended to use it in a synchronous context, so we were aware we couldn't subscribe to data, etc. However we assumed the messages would in order and overlooked the check for the request_id, thanks for pointing this out!

Eventually I think we want to provide a synchronous and an asynchronous version, depending on the use-case, so subscriptions would also be possible.

If you want credit for this fix, we would be happy to merge a PR, otherwise I'll add this myself (and add a unit test to simulate the message order issue)

@unconst
Copy link
Author

unconst commented Feb 22, 2021

Unfortunately (i've found out) the above will not work because the extrinsics following the first do not append the "id" tag.
In other words, your first message returns a result with the id. but the following message attached to that extrinsic will not have the 'id', i.e. the message with the finalized and inblock status.

Here is an example
Send:
-> {"jsonrpc": "2.0", "method": "author_submitAndWatchExtrinsic", "params": ["0x6d028472b87f1c96959319917938fcec0b6d5db9f3f12f0dbcb9c25bc70b5ea5c7f71b01aeabe40cb84debd5507fc2dd6018f9a5c43875773755aa7ed87866751e0a14507e4d3a205e339cb9d53a8fb13467374ef4c7e3c35239c25d3cd9496abec2af810000000703353561bf0000000000000000000000009b1f04009ca86e2971878474b6534e860dbc2baed65a5af64454563c39831970c2c2301e"], "id": 19}

Recv:
<- {'jsonrpc': '2.0', 'result': 'KUEtgGTjTlkAzcEu', 'id': 19}
<- {"jsonrpc":"2.0","method":"author_extrinsicUpdate","params":{"result":"ready","subscription":"KUEtgGTjTlkAzcEu"}}

You can see that the first response adds a tag, and then that tag can be found in the "params"
However, note that you cannot directly access that tag since it is nested behind the key. In this case "subscription"
I'm not sure is "subscription" is universal or specific to my query here, since my message was "subscribe"

@arjanz
Copy link
Member

arjanz commented Feb 22, 2021

I understand the issue.. What we could do though, is 'park' all the messages that doesn't contain the expected id but contain a subscription identifier in a dict with lists for example ({'KUEtgGTjTlkAzcEu': ['{message_1}, '{message_2} ...] ... }, and the moment we retrieve the expected id = 19 message, then process the messages with the same subscription IDs in that message.

What I understand is that the result KUEtgGTjTlkAzcEu is an (global) unique subscription reference, where updates about that subscription will contain the same string.

I'll have a closer look at it tomorrow

@arjanz arjanz added this to Needs triage in Substrate compatibility via automation Mar 15, 2021
@arjanz arjanz moved this from Needs triage to In progress in Substrate compatibility Mar 15, 2021
@arjanz
Copy link
Member

arjanz commented Mar 29, 2021

Since latest release https://github.com/polkascan/py-substrate-interface/releases/tag/v0.12.1 subscription support is improved, so these issues shouldn't occur anymore. I would appreciate if you could confirm this..

@arjanz
Copy link
Member

arjanz commented Mar 31, 2021

I ran some multi-thread tests and although in some extreme cases it still could have some threading issues, I think the current state of the library is usable in a multi-threaded environment:

from threading import Thread
from substrateinterface import SubstrateInterface

import logging
logging.basicConfig(level=logging.DEBUG)

substrate = SubstrateInterface(
    url='wss://rpc.polkadot.io'
)


def check_events():
    def subscription_handler(obj, update_nr, subscription_id):
        # Do something with the update
        print('Events:', obj.value)

        # The execution will block until an arbitrary value is returned, which will be the result of the `query`
        if update_nr > 5:
            return {'updates': update_nr, 'message': 'processed events'}

    result = substrate.query("System", "Events", [], subscription_handler=subscription_handler)


def check_blocks():
    def block_subscription_handler(obj, update_nr, subscription_id):
        print(f"New block #{obj['header']['number']} produced by {obj['author']}")

        if update_nr > 5:
            return {'updates': update_nr, 'message': 'processed heads'}

    result = substrate.subscribe_block_headers(block_subscription_handler, include_author=True, finalized_only=True)


if __name__ == "__main__":
    
    t1 = Thread(target=check_events)
    t2 = Thread(target=check_blocks)

    t1.start()
    print('Thread 1 started')
    t2.start()
    print('Thread 2 started')

    t1.join()
    print('Thread 1 joined')
    t2.join()
    print('Thread 2 joined')

@arjanz arjanz closed this as completed Apr 6, 2021
Substrate compatibility automation moved this from In progress to Closed Apr 6, 2021
@stolpa4
Copy link

stolpa4 commented Oct 7, 2021

@arjanz I'm trying to use your app for my simple python wallet.

Some time ago I found out a method SubstrateInterface.init_runtime. It does change the instance of SubstrateInterface, as I see it now. So it definitely is not intended to be used in a multi-threading environment.

Right now I'm having issues, trying to receive account balances from a public node for multiple accounts in parallel.
So, my guess is, the library is not thread-safe when one works with metadata. Is it correct?

@stolpa4
Copy link

stolpa4 commented Oct 7, 2021

Here's the simple example.

from multiprocessing.pool import ThreadPool
from threading import Lock


def get_globals():
    s = SubstrateInterface('wss://westend-rpc.polkadot.io')
    addrs = ['5HpLdCTNBQDjFomqpG2XWadgB4zHTuqQqNHhUyYbett7k1RR',
             '5HbwGTqpEw8LeUaCMtunWmCfFnJSN1gcnVjuqxAoDyuSc2rE',
             '5HpLdCTNBQDjFomqpG2XWadgB4zHTuqQqNHhUyYbett7k1RR',
             '5DXomcfWBhckmx8N9jG7GuVzJcTQpREC5hYoCteD6KcwnacY',
             '5FFD5XPNCuGYcZpKg8YcS6EyCRrudFAP1ycUWfnUeueJMWpc']
    l = Lock()
    return s, addrs, l


def get_balances(s, addrs):
    return [get_balance(s, addr) for addr in addrs]


def get_balance(s, addr, lock=None):
    if lock: lock.acquire()
    try:
        return s.query(module='System', storage_function='Account', params=[addr])['data']['free'].decode()
    finally:
        if lock: lock.release()


def get_balances_threading(s, addrs, lock=None):
    # hangs
    with ThreadPool(len(addrs)) as pool:
        return list(pool.imap(lambda addr: get_balance(s, addr, lock=lock), addrs))


if __name__ == '__main__':
    s, addrs, lock = get_globals()
    print('Get balances:')
    print(get_balances(s, addrs))
    print('Get balances threading with lock:')
    print(get_balances_threading(s, addrs, lock=lock))
    print('Get balances threading without lock:')
    print(get_balances_threading(s, addrs))

The code does print to terminal this:

Get balances:
[13617492357141157, 139084399998549, 13617492357141157, 556199966539, 12054620958223]
Get balances threading with lock:
[13617492357141157, 139084399998549, 13617492357141157, 556199966539, 12054620958223]
Get balances threading without lock:
# HERE THE PROGRAM HANGS

So it seems to me, that some context-specific attributes don't allow to use the SubstrateInterface from multiple threads.

@vtexier
Copy link
Contributor

vtexier commented May 27, 2022

Some time ago I found out a method SubstrateInterface.init_runtime. It does change the instance of SubstrateInterface, as I see it now. So it definitely is not intended to be used in a multi-threading environment.

Exactly, substrate-interface is not threadsafe. I lost some time before figuring it out.

@arjanz may be you should mention it explicitly in the README.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
No open projects
Development

No branches or pull requests

4 participants