Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is query_api() thread-safe? #429

Open
rojikada opened this issue Apr 8, 2022 · 6 comments
Open

Is query_api() thread-safe? #429

rojikada opened this issue Apr 8, 2022 · 6 comments
Labels
question Further information is requested

Comments

@rojikada
Copy link

rojikada commented Apr 8, 2022

Steps to reproduce:
List the minimal actions needed to reproduce the behavior.

  1. We use one connection with InfluxDBClient -> query_api().
  2. Run multiple threads re-using this same connection.
  3. We got an error (see screenshots).

Expected behavior:
I would expect influx to be thread-safe.

Actual behavior:
An error during read happend.

urllib3.exceptions.ProtocolError: ('Connection aborted.', OSError(9, 'Bad file descriptor'))
return self._sslobj.read(len, buffer)
File "/usr/lib/python3.8/ssl.py", line 1099, in read
return self.read(nbytes, buffer)
File "/usr/lib/python3.8/ssl.py", line 1241, in recv_into
return self._sock.recv_into(b)
File "/usr/lib/python3.8/socket.py", line 669, in readinto
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/lib/python3.8/http/client.py", line 277, in _read_status
version, status, reason = self._read_status()
File "/usr/lib/python3.8/http/client.py", line 316, in begin
response.begin()
File "/usr/lib/python3.8/http/client.py", line 1348, in getresponse
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 440, in _make_request
File "", line 3, in raise_from
six.raise_from(e, None)
File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 445, in _make_request
httplib_response = self._make_request(
File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 699, in urlopen
raise value.with_traceback(tb)
File "/usr/local/lib/python3.8/dist-packages/urllib3/packages/six.py", line 734, in reraise
raise six.reraise(type(error), error, _stacktrace)
File "/usr/local/lib/python3.8/dist-packages/urllib3/util/retry.py", line 507, in increment
retries = retries.increment(
File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 755, in urlopen
response = conn.urlopen(method, u.request_uri, **kw)
File "/usr/local/lib/python3.8/dist-packages/urllib3/poolmanager.py", line 375, in urlopen
return self.urlopen(method, url, **extra_kw)
File "/usr/local/lib/python3.8/dist-packages/urllib3/request.py", line 170, in request_encode_body
return self.request_encode_body(
File "/usr/local/lib/python3.8/dist-packages/urllib3/request.py", line 78, in request
r = self.pool_manager.request(
File "/usr/local/lib/python3.8/dist-packages/influxdb_client/rest.py", line 179, in request
return self.request("POST", url,
File "/usr/local/lib/python3.8/dist-packages/influxdb_client/rest.py", line 300, in POST
return self.rest_client.POST(url,
File "/usr/local/lib/python3.8/dist-packages/influxdb_client/api_client.py", line 385, in request
response_data = self.request(
File "/usr/local/lib/python3.8/dist-packages/influxdb_client/api_client.py", line 170, in __call_api
return self._call_api(resource_path, method,
File "/usr/local/lib/python3.8/dist-packages/influxdb_client/api_client.py", line 340, in call_api
return self.api_client.call_api(
File "/usr/local/lib/python3.8/dist-packages/influxdb_client/service/query_service.py", line 340, in post_query_with_http
>
(data) = self.post_query_with_http_info(**kwargs) # noqa: E501
File "/usr/local/lib/python3.8/dist-packages/influxdb_client/service/query_service.py", line 260, in post_query
response = self._query_api.post_query(org=org, query=self._create_query(query, self.default_dialect),
File "/usr/local/lib/python3.8/dist-packages/influxdb_client/client/query_api.py", line 140, in query_data_frame_stream
_generator = self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index)
File "/usr/local/lib/python3.8/dist-packages/influxdb_client/client/query_api.py", line 116, in query_data_frame
result = self.query_api.query_data_frame(query, data_frame_index=["_time"])
Traceback (most recent call last):
During handling of the above exception, another exception occurred:
OSError: [Errno 9] Bad file descriptor
return self._sslobj.read(len, buffer)
File "/usr/lib/python3.8/ssl.py", line 1099, in read
return self.read(nbytes, buffer)
File "/usr/lib/python3.8/ssl.py", line 1241, in recv_into
return self._sock.recv_into(b)
File "/usr/lib/python3.8/socket.py", line 669, in readinto
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/lib/python3.8/http/client.py", line 277, in _read_status
version, status, reason = self._read_status()
File "/usr/lib/python3.8/http/client.py", line 316, in begin
response.begin()
File "/usr/lib/python3.8/http/client.py", line 1348, in getresponse
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 440, in _make_request
File "", line 3, in raise_from
six.raise_from(e, None)
File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 445, in _make_request
httplib_response = self._make_request(
File "/usr/local/lib/python3.8/dist-packages/urllib3/connectionpool.py", line 699, in urlopen

Specifications:

  • Client Version: 1.16.0
  • InfluxDB Version: InfluxDB 2.0
  • Platform: Ubuntu 20.04.4 LTS
@rojikada rojikada closed this as completed Apr 8, 2022
@rojikada
Copy link
Author

rojikada commented Apr 8, 2022

Aha, I though urllib3 is not threadsafe, but it is. urllib is not.

@rojikada rojikada reopened this Apr 8, 2022
@bednar
Copy link
Contributor

bednar commented Apr 12, 2022

Hi @rojikada,

thanks for using our client.

The urllib3 should be thread safe for the use case of the client. The urllib3 thread-safe issue comes when opening connections to more hosts than configured the number of pools in PoolManager (currently 4).

Regards

@bednar bednar added the question Further information is requested label Apr 12, 2022
@powersj
Copy link
Contributor

powersj commented Aug 5, 2022

Hi @rojikada - were you able to resolve your issue? If not, could you perhaps share some code that reproduces the issue that we can look at?

Thanks!

@rojikada
Copy link
Author

rojikada commented Aug 9, 2022

Hello, I currently don’t have any shareable code, however the main theme was: Apscheduler - multiple threads and all of them doing requests

@powersj
Copy link
Contributor

powersj commented Aug 10, 2022

Hi @rojikada,

I would be happy to continue digging in further, but having a reproducer of some sort will be required otherwise I am guessing at what you are doing :)

Let me know if you could share something. Thanks!

@powersj powersj added the waiting for response waiting for response from contributor label Aug 23, 2022
@rojikada
Copy link
Author

rojikada commented Dec 9, 2022

Hello, I don't work on this anymore and got it fixed with locks.

But this is some basic example of the doing:

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor 
from influxdb_client import InfluxDBClient


class Test(object):
    def __init__(self):
        self.client = InfluxDBClient(...)
        self.query = self.client.query_api()

    def test_read(self):
        query = f'some random data query'
        self.query_api.query_data_frame(query)

t = Test()
scheduler = BackgroundScheduler(executors={'default': ProcessPoolExecutor(4)})
scheduler.add_job()
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})
scheduler.add_job(t.test_read, 'interval', seconds=1, kwargs={})

while True:
    sleep(2)

@telegraf-tiger telegraf-tiger bot removed the waiting for response waiting for response from contributor label Dec 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants