Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eventlet concurrency issues #478

Closed
akotulu opened this issue Sep 28, 2017 · 7 comments
Closed

eventlet concurrency issues #478

akotulu opened this issue Sep 28, 2017 · 7 comments

Comments

@akotulu
Copy link

akotulu commented Sep 28, 2017

Hey!

Good library, but you need to throw out eventlet and replace it with Python threads. Eventlet is somewhat useless library, sorry author, but implementing 'threading' without using low level system calls, won’t make it concurrent. You’re adding extra layer of complexity on top of Python processes main thread execution. Wrapping functions inside a object and calling them after one has finished, will halt when client uses external library which won’t return pointer back to hub. Library would be useful if you had done all the lifting in C or equivalent and made python bindings to it, like threading library is doing, yes there are convenience methods missing, but this should be place for your library. Today’s concurrency comes from low level timers; when triggered, current CPU state is saved to registers and depending on priority, next state is loaded and executed. Finally, we have discovered real bottleneck - extra complexity at low level which improved may change many things.

When spawning a new container, code will halt at start() and print command won’t be called at all if kombu.mixins.run waits for consumer at restart_limit.can_consume(_tokens)

self._pipe = ServiceContainer(ReportsService, config=self._config)
self._pipe.start()

print('wurx')

self._pipe.wait()

kombu.mixins.run

def run(self, _tokens=1):
    restart_limit = self.restart_limit
    errors = (self.connection.connection_errors +
              self.connection.channel_errors)
    while not self.should_stop:
        try:
            logging.getLogger('nameko').debug('kombu.mixins.run') # added line
            if restart_limit.can_consume(_tokens):
                for _ in self.consume(limit=None):  # pragma: no cover
                    pass
            else:
                sleep(restart_limit.expected_time(_tokens))
        except errors:
            warn(W_CONN_LOST, exc_info=1)

Here is log showing what happens. Thread ID is same for spawned kombu thread which locks up execution.

root@0e878e34558b:/var/reports/service# python3 service.py
2017-09-28 03:17:52,280 > 140068001568512 - DEBUG - 1
2017-09-28 03:17:52,282 > 140068001568512 - DEBUG - starting <ServiceContainer [reports_dispatch] at 0x7f641a8cbfd0>
2017-09-28 03:17:52,282 > 140068001568512 - DEBUG - ASDASDFASFSA<ServiceContainer [reports_dispatch] at 0x7f641a8cbfd0>
2017-09-28 03:17:52,282 > 140068001568512 - DEBUG - SpawningSet({<RpcConsumer at 0x7f641abaaef0>, <QueueConsumer at 0x7f641a8e3470>})
2017-09-28 03:17:52,284 > 140068001568512 - DEBUG - 6
2017-09-28 03:17:52,284 > 140068001568512 - DEBUG - registering provider <Rpc [reports_dispatch.kill] at 0x7f641a8cbf28> for <RpcConsumer at 0x7f641abaaef0>
2017-09-28 03:17:52,284 > 140068001568512 - DEBUG - 7
2017-09-28 03:17:52,292 > 140068001568512 - DEBUG - Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'capabilities': {'exch]
2017-09-28 03:17:52,293 > 140068001568512 - DEBUG - Open OK!
2017-09-28 03:17:52,293 > 140068001568512 - DEBUG - 6
2017-09-28 03:17:52,293 > 140068001568512 - DEBUG - registering provider <Rpc [reports_dispatch.run] at 0x7f641a8e34a8> for <RpcConsumer at 0x7f641abaaef0>
2017-09-28 03:17:52,294 > 140068001568512 - DEBUG - 7
2017-09-28 03:17:52,294 > 140068001568512 - DEBUG - 6
2017-09-28 03:17:52,294 > 140068001568512 - DEBUG - registering provider <Rpc [reports_dispatch.ps] at 0x7f641a8cbf60> for <RpcConsumer at 0x7f641abaaef0>
2017-09-28 03:17:52,294 > 140068001568512 - DEBUG - 7
2017-09-28 03:17:52,294 > 140068001568512 - DEBUG - registering provider <RpcConsumer at 0x7f641abaaef0> for <QueueConsumer at 0x7f641a8e3470>
2017-09-28 03:17:52,294 > 140068001568512 - DEBUG - 2
2017-09-28 03:17:52,294 > 140068001568512 - DEBUG - container startedw324324<ServiceContainer [reports_dispatch] at 0x7f641a8cbfd0>
2017-09-28 03:17:52,295 > 140068001568512 - DEBUG - SpawningSet({<RpcConsumer at 0x7f641abaaef0>, <QueueConsumer at 0x7f641a8e3470>})
2017-09-28 03:17:52,295 > 140068001568512 - DEBUG - quee start<QueueConsumer at 0x7f641a8e3470>
2017-09-28 03:17:52,295 > 140068001568512 - DEBUG - starting <QueueConsumer at 0x7f641a8e3470>
2017-09-28 03:17:52,295 > 140068001568512 - DEBUG - 144<RpcConsumer at 0x7f641abaaef0>

2017-09-28 03:17:52,296 > 140068001568512 - DEBUG - kombu.mixins.run

2017-09-28 03:17:52,303 > 140068001568512 - DEBUG - Start from server, version: 0.9, properties: {'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'capabilities': {'exch]
2017-09-28 03:17:52,304 > 140068001568512 - DEBUG - Open OK!
2017-09-28 03:17:52,304 > 140068001568512 - DEBUG - using channel_id: 1
2017-09-28 03:17:52,305 > 140068001568512 - DEBUG - Channel open
2017-09-28 03:17:52,305 > 140068001568512 - DEBUG - setting up consumers <QueueConsumer at 0x7f641a8e3470>
2017-09-28 03:17:52,307 > 140068001568512 - DEBUG - consumer started <QueueConsumer at 0x7f641a8e3470>
2017-09-28 03:17:52,307 > 140068001568512 - DEBUG - waiting for consumer ready <QueueConsumer at 0x7f641a8e3470>
2017-09-28 03:17:52,307 > 140068001568512 - DEBUG - START RELEASED QUQU<QueueConsumer at 0x7f641a8e3470>
2017-09-28 03:17:52,307 > 140068001568512 - DEBUG - started <QueueConsumer at 0x7f641a8e3470>
2017-09-28 03:17:52,307 > 140068001568512 - DEBUG - quee end<QueueConsumer at 0x7f641a8e3470>

Problem relies in messages.py line

self._gt = self.container.spawn_managed_thread(self.run)

kombu expects a thread which has its own process life cycle.

Some notes.

ServiceRunner currenty supports static service array, but in real world everything changes (summer to winter etc), runner should support adding and removing services at runtime.
ServiceContainer(config={}) argument should be optional.

Python was called scripting language, as you write instructions for virtual machine - nothing has changed. You should learn python and move to harder more complex things like C > asm and make improvements at lower level. As a hobby extend and help to improve your preferred language knowing how it runs (to eventlet author).

@mattbennett
Copy link
Member

Ignoring the disparaging comments about Eventlet, your problem is probably that you're using librabbitmq rather than pyamqp. The former is a C-library which means it is not automatically monkeypatched into the Eventlet scheduler. If you have it installed kombu will prefer it unless you specify pyamqp:// in your AMQP URI.

Eventlet and the other greenthreading libraries work just fine and can outperform native Python threads if you work within their constraints.

Re: making the ServiceRunner dynamic, I disagree. It's simple enough to restart the process with a different configuration when the seasons change.

@akotulu
Copy link
Author

akotulu commented Sep 28, 2017

Restarting the whole universe to add a new planet seems logical.

Installed packages in Docker debian:stretch container

apt-get install -y python3 python3-pip erlang rabbitmq-server
pip3 install pyodbc apscheduler nameko

Config:

AMQP_URI: 'pyamqp://guest:guest@localhost'
WEB_SERVER_ADDRESS: '0.0.0.0:8000'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10

LOGGING:
  version: 1
  handlers:
    console:
      class: logging.StreamHandler
      formatter: simple
    file:
      class: logging.handlers.TimedRotatingFileHandler
      level: DEBUG
      formatter: simple
      when: W0
      backupCount: 4
      filename: /var/log/reports.log

  loggers:
    nameko:
      level: DEBUG
      handlers: [console]
    amqp:
      level: DEBUG
      handlers: [console]

  formatters:
    simple:
      format: '%(asctime)s > %(thread)d - %(levelname)s - %(message)s'

Well you should have a look around in Eventlet code, I sed what it does.

@mattbennett
Copy link
Member

I can't reproduce this. Working debian:stretch container example here: https://gist.github.com/mattbennett/53c0fb4bf8260b7e086ad0f817cdd005

The ServiceRunner mostly exists as a convenience and a dev tool. For production deployments I'd highly recommend using a runner per service, so they get an independent process and you can make use of multiple cores.

@akotulu
Copy link
Author

akotulu commented Sep 30, 2017

Provided example project as a comment in your example I wanted to do, but was forced to use another approach.

@mattbennett
Copy link
Member

The code you've added to the gist isn't valid for several reasons. The reason that threads aren't working for you is that you've re-implemented the runner but omitted the eventlet monkey patch.

From the docs:

Nameko is built on top of the eventlet library, which provides concurrency via “greenthreads”. The concurrency model is co-routines with implicit yielding.

Implicit yielding relies on monkey patching the standard library, to trigger a yield when a thread waits on I/O. If you host services with nameko run on the command line, Nameko will apply the monkey patch for you.

Since you've not using nameko run, you need to apply the monkeypatch yourself.

@akotulu
Copy link
Author

akotulu commented Oct 1, 2017

Well if you would be so kind and point me where this so called 'monkey patching' magic happens in the runner? Your runner starts first service and waits when it receives rpc call and repeats for next service, till all of them are 'loaded.'

@mattbennett
Copy link
Member

It's here. To understand what it's doing you should read eventlet docs, specifically the bit on patching.

The Nameko service running machinery is spread across three locations:

  • nameko.cli.run is the main entry point script. It finds and imports service classes, attaches signals, starts the ServiceRunner etc.
  • nameko.runners.ServiceRunner is a simple wrapper around multiple ServiceContainers
  • nameko.containers.ServiceContainer actually hosts the service class, manages entrypoint and dependency provider lifecycles, and spawns workers.

There is configuration-level support for using a custom ServiceContainer (the SERVICE_CONTAINER_CLS context key). If you want a custom runner or wrapper script, you'll need to implement it yourself. The eventlet monkeypatch should be applied as soon as possible, because any imports that happen beforehand will not be the patched versions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants