Skip to content

Commit

Permalink
update root activities on parser tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
VincentVW committed Jul 28, 2016
1 parent 25297ae commit 9e003f5
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 21 deletions.
Empty file removed OIPA/task_queue/scheduler.py
Empty file.
80 changes: 59 additions & 21 deletions OIPA/task_queue/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from rq import Queue, Connection, Worker
from rq.job import Job
from redis import Redis
from django.conf import settings


redis_conn = Redis()

Expand All @@ -31,16 +33,10 @@ def start_worker(queue_name):

@job
def stop_worker(queue_name):
print 'test'
# queue = Queue(queue_name, connection=redis_conn)
# amount_of_workers = 1

# workers = Worker.all(connection=redis_conn)

# for w in workers:
# if w.queues[0].name == queue_name:
# w.request_stop()
# break
queue = Queue(queue_name, connection=redis_conn)
workers = Worker.all(connection=redis_conn)
last_worker_index = len(workers) - 1
worker[last_worker_index].request_stop()


###############################
Expand Down Expand Up @@ -80,38 +76,63 @@ def delete_all_tasks_from_queue(queue_name):

@job
def force_parse_all_existing_sources():
"""
First parse all organisation sources, then all activity sources
"""
queue = django_rq.get_queue("parser")
for e in IatiXmlSource.objects.all():
queue.enqueue(force_parse_source_by_url, args=(e.source_url,), timeout=7200)

for e in IatiXmlSource.objects.all().filter(type=2):
queue.enqueue(force_parse_source_by_url, args=(e.source_url,), timeout=14400)

for e in IatiXmlSource.objects.all().filter(type=1):
queue.enqueue(force_parse_source_by_url, args=(e.source_url,), timeout=14400)

if settings.ROOT_ORGANISATIONS:
queue.enqueue(update_searchable_activities, timeout=14400)


@job
def add_new_sources_from_registry_and_parse_all():
queue = django_rq.get_queue("default")
queue.enqueue(get_new_sources_from_iati_api, timeout=7200)
queue.enqueue(parse_all_existing_sources, timeout=7200)
queue.enqueue(get_new_sources_from_iati_api, timeout=14400)
queue.enqueue(parse_all_existing_sources, timeout=14400)


@job
def parse_all_existing_sources():
"""
First parse all organisation sources, then all activity sources
"""
queue = django_rq.get_queue("parser")
for e in IatiXmlSource.objects.all():
queue.enqueue(parse_source_by_url, args=(e.source_url,), timeout=7200)

for e in IatiXmlSource.objects.all().filter(type=2):
queue.enqueue(parse_source_by_url, args=(e.source_url,), timeout=14400)

for e in IatiXmlSource.objects.all().filter(type=1):
queue.enqueue(parse_source_by_url, args=(e.source_url,), timeout=14400)

if settings.ROOT_ORGANISATIONS:
queue.enqueue(update_searchable_activities, timeout=14400)


@job
def parse_all_sources_by_publisher_ref(org_ref):
queue = django_rq.get_queue("parser")
for e in IatiXmlSource.objects.filter(publisher__org_id=org_ref):
queue.enqueue(parse_source_by_url, args=(e.source_url,), timeout=7200)
queue.enqueue(parse_source_by_url, args=(e.source_url,), timeout=14400)

if settings.ROOT_ORGANISATIONS:
queue.enqueue(update_searchable_activities, timeout=14400)


@job
def force_parse_by_publisher_ref(org_ref):
queue = django_rq.get_queue("parser")
for e in IatiXmlSource.objects.filter(publisher__org_id=org_ref):
queue.enqueue(force_parse_source_by_url, args=(e.source_url,), timeout=7200)
queue.enqueue(force_parse_source_by_url, args=(e.source_url,), timeout=14400)

if settings.ROOT_ORGANISATIONS:
queue.enqueue(update_searchable_activities, timeout=14400)


@job
Expand Down Expand Up @@ -233,12 +254,29 @@ def update_city_data():
######## SEARCHABLE ACTIVITIES TASKS ########
###############################


@job
def update_searchable_activities():
import time
import django_rq
from django.core import management
management.call_command('set_searchable_activities', verbosity=0, interactive=False)


counter = 0
workers = Worker.all(connection=redis_conn)

while True:
has_other_jobs = False
for w in workers:
if w.queues[0].name == "parser":
current_job = w.get_current_job()
if current_job and current_job.description != 'task_queue.tasks.update_searchable_activities()':
has_other_jobs = True

if not has_other_jobs:
management.call_command('set_searchable_activities', verbosity=0, interactive=False)
elif counter > 2:
# if waited for over half an hour, fail
raise ValueError('Waited for 30 min, still other jobs running on parser queue so failing the update_searchable_activities task')
else:
counter += 1
time.sleep(30)

0 comments on commit 9e003f5

Please sign in to comment.