Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream_framework.tasks.follow_many hangs in celery for cassandra backend #170

Open
aruljothi opened this issue Feb 29, 2016 · 11 comments
Open

Comments

@aruljothi
Copy link

Friends,

I tried to port our Redis backend for Stream framework to Cassandra and followed the instructions along with ensuring keyspace and model creation (sync_table). But when i try to create a follow relationship or any stream framework operations for that matter involving running follow_many task on the celery just hung.

I could see following from the celery logs,

  1. Task crossed pre_run state (through signal), but never exits with success or failure state
  2. I see following in the logs of worker process after the task completed pre_run state
    SELECT * FROM mystream.user_feed WHERE "feed_id" = %(0)s LIMIT 5000
    Sending options message heartbeat on idle connection (139973351007568) cassandra
    Sending options message heartbeat on idle connection (139973368092368) cassandra
    Received options response on connection (139973368092368) from cassandra
    Received options response on connection (139973351007568) from cassandra

if i use celery always eager, the task completes with success state.

Any help is greatly appreciated. Please let me know if you need any more details from my side.

Regards,
Aruljothi.S

@aruljothi
Copy link
Author

Also is there a user group where i can post clarifications and queries for stream-framework

@tschellenbach
Copy link
Owner

Are you using Celery with something other than RabbitMQ as the broker? Celery workers aren't always reliable with other broker backends. Especially at high load. Are you having this issue in development or production?

@aruljothi
Copy link
Author

I am using celery with rabbitmq and it is working perfectly fine for Redis backend. our app is in POC state and it is in development.

@aruljothi
Copy link
Author

This link seems to explain the behaviour that i encounter http://stackoverflow.com/questions/24785299/python-cassandra-driver-operationtimeout-on-every-query-in-celery-task

BTW, if i do strace on the celery process which hungs, it gives me following
strace -f -p 32 -s 10000
Process 32 attached
futex(0x2a3ba10, FUTEX_WAIT_PRIVATE, 0, NULL

Since you guys are already using cassandra + celery, i think i might be missing something on some setup. Any help is greatly appreciated

@aruljothi
Copy link
Author

It seems it has to do with sharing cassandra session across celery worker process. I had to put following in celery.py to ensure it is working fine. This also took care of keyspace creation and model sync in cassandra

from celery.signals import worker_process_init, beat_init

def cassandra_init(_args, *_kwargs):
'''
initialize cassandra for mystreamapp
'''
import os, importlib
from stream_framework.storage.cassandra.connection import setup_connection, settings as stream_framework_settings
from cassandra.cqlengine.connection import (cluster as cql_cluster, session as cql_session)
from cassandra.cqlengine.management import sync_table, create_keyspace_simple
from myproject.core.semaphore_obj_mgr import SemaphoreObjMgr
key_space = stream_framework_settings.STREAM_DEFAULT_KEYSPACE
feed_module = "myproject.mystreamapp.lib.gen_feed"
feed_classes = ("MyStreamappFlatFeed", "MyStreamappAggregatedFeed", "MyStreamappUserFeed")
feed_module_obj = importlib.import_module(feed_module)
#this ensures a worker instance won't be using the shared cassandra session
if cql_cluster is not None:
cql_cluster.shutdown()
if cql_session is not None:
cql_session.shutdown()
#this ensures a worker instance will get the new cassandra session
setup_connection()
os.environ["CQLENG_ALLOW_SCHEMA_MANAGEMENT"] = "True"
sp_mgr = SemaphoreObjMgr("chkcassandra")
if sp_mgr.is_process_complete():
#not required to be processed, may be already processed by another worker process
return True
sp_mgr.acquire()
create_keyspace_simple(name=key_space, replication_factor=1)
for feed_cls in feed_classes:
feed_cls_obj = getattr(feed_module_obj, feed_cls)
timeline = feed_cls_obj.get_timeline_storage()
sync_table(timeline.model)
sp_mgr.release()

Initialize worker context for both standard and periodic tasks.

worker_process_init.connect(cassandra_init)
beat_init.connect(cassandra_init)

@tbarbugli
Copy link
Collaborator

that's right, forgot about it. We should add this to the install documentation for Celery

@tbarbugli tbarbugli reopened this Mar 1, 2016
@tbarbugli
Copy link
Collaborator

@aruljothi it's not a good idea to have the sync_table in the worker_process_init callback function ;)

@aruljothi
Copy link
Author

@tbarbugli I felt it, but since i kept that part of the code inside semaphore, it happens only once. What is your recommendation. The way i see it, it has to be called only once right?.

@tbarbugli
Copy link
Collaborator

no idea how you implemented the semaphore but I guess that only make sure
that one process or one server runs that code at the same time :) anyways
you should only run sync_table when something in your schema changes (same
way you apply Django schema migrations)

On Tue, Mar 1, 2016 at 5:29 PM, Aruljothi notifications@github.com wrote:

@tbarbugli https://github.com/tbarbugli I felt it, but since i kept
that part of the code inside semaphore, it happens only once. What is your
recommendation. The way i see it, it has to be called only once right?.


Reply to this email directly or view it on GitHub
#170 (comment)
.

@aruljothi
Copy link
Author

i got it, currently i used redis lock to implement the semaphore which is accessible across servers. However i understand the spirit of your feedback. I'll see where i can fit it in.

I have an unrelated query (i'll open a separate ticket if you want)

I created my feed as shown below and it was working fine when i used Redis backend, but it fails with Cassandra at following location at stream_framework/storage/base.py (def serializer)

85 serializer_instance = serializer_class(
86 activity_class=self.activity_class, **kwargs)

where the serializer_class is CassandraActivitySerializer (defined below) and that expects model as a parameter in init(), but it is not provided here. I think i am missing something here. Can you please help me here.

class MyAppFeed(CassandraFeed):
'''
Customizing CassandraFeed class for MyApp requirements
'''
activity_class = MyAppActivity
activity_serializer = CassandraActivitySerializer

Regards,
Aruljothi.S

@aruljothi
Copy link
Author

@tbarbugli I have opened another ticket (171) for the above query, please review and help me on this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants