Skip to content

Commit

Permalink
[SD-5516]Fixing to query so that scheduled items don't block publishi…
Browse files Browse the repository at this point in the history
…ng of normal items.
  • Loading branch information
Mayur Dhamanwala authored and petrjasek committed Sep 27, 2016
1 parent bde416b commit 6fe05bd
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 43 deletions.
26 changes: 17 additions & 9 deletions apps/publish/enqueue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

from apps.archive.common import ITEM_OPERATION, ARCHIVE, get_utc_schedule, insert_into_versions
from apps.archive.common import ITEM_OPERATION, ARCHIVE, insert_into_versions
from apps.legal_archive.commands import import_into_legal_archive
from apps.publish.enqueue.enqueue_published import EnqueuePublishedService
import cProfile
Expand All @@ -17,7 +17,7 @@
import superdesk
from superdesk.celery_task_utils import get_lock_id
from superdesk.lock import lock, unlock
from superdesk.metadata.item import ITEM_STATE, CONTENT_STATE, PUBLISH_SCHEDULE
from superdesk.metadata.item import ITEM_STATE, CONTENT_STATE, PUBLISH_SCHEDULE, SCHEDULE_SETTINGS

from apps.publish.enqueue.enqueue_corrected import EnqueueCorrectedService
from apps.publish.enqueue.enqueue_killed import EnqueueKilledService
Expand Down Expand Up @@ -132,12 +132,23 @@ def enqueue_item(published_item):

def get_published_items():
"""
Returns a list of items marked for publishing.
Get all items with queue state: "pending" that are not scheduled or scheduled time has lapsed.
"""
query = {QUEUE_STATE: PUBLISH_STATE.PENDING}
query = {
QUEUE_STATE: PUBLISH_STATE.PENDING,
'$or': [
{
ITEM_STATE: {'$ne': CONTENT_STATE.SCHEDULED}
},
{
ITEM_STATE: CONTENT_STATE.SCHEDULED,
'{}.utc_{}'.format(SCHEDULE_SETTINGS, PUBLISH_SCHEDULE): {'$lte': utcnow()}
}
]
}
request = ParsedRequest()
request.sort = 'publish_sequence_no'
request.max_results = 100
request.max_results = 200
return list(get_resource_service(PUBLISHED).get_from_mongo(req=request, lookup=query))


Expand All @@ -147,13 +158,10 @@ def enqueue_items(published_items):
:param list published_items: the list of items marked for publishing
"""
failed_items = {}
current_utc = utcnow()

for queue_item in published_items:
try:
schedule_utc_datetime = get_utc_schedule(queue_item, PUBLISH_SCHEDULE)
if not schedule_utc_datetime or schedule_utc_datetime < current_utc:
enqueue_item(queue_item)
enqueue_item(queue_item)
except:
logger.exception('Failed to queue item {}'.format(queue_item.get('_id')))
failed_items[str(queue_item.get('_id'))] = queue_item
Expand Down
81 changes: 47 additions & 34 deletions apps/publish/publish_content_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from datetime import timedelta

from apps.publish import init_app
from apps.publish.enqueue import enqueue_items
from apps.publish.enqueue import enqueue_items, get_published_items
from superdesk import config
from superdesk.publish.publish_content import get_queue_items
from superdesk.tests import TestCase
Expand Down Expand Up @@ -75,6 +75,46 @@ class PublishContentTests(TestCase):
"_updated": "2015-04-20T05:04:25.000Z",
"item_id": '2'}]

published_items = [
{
"_id": 1,
"item_id": "1",
"_created": utcnow(),
"_updated": utcnow(),
"queue_state": "pending",
"state": "published"
},
{
"_id": 2,
"item_id": "2",
"_created": utcnow(),
"_updated": utcnow(),
"queue_state": "pending",
"state": "scheduled",
"publish_schedule": utcnow() - timedelta(minutes=5),
"schedule_settings": {
"utc_publish_schedule": utcnow() - timedelta(minutes=5),
"timezone": "UTC",
"utc_embargo": None
}
},
{
"_id": 3,
"item_id": "3",
"_created": utcnow(),
"_updated": utcnow(),
"queue_state": "pending",
"state": "scheduled",
"publish_schedule": utcnow() + timedelta(minutes=5),
"schedule_settings": {
"utc_publish_schedule": utcnow() + timedelta(minutes=5),
"timezone": "UTC",
"utc_embargo": None
}
}

]

def setUp(self):
with self.app.app_context():
init_app(self.app)
Expand All @@ -97,36 +137,9 @@ def test_enqueue_item_not_scheduled(self, *mocks):
enqueue_items(queue_items)
fake_enqueue_item.assert_called_with(queue_items[0])

@mock.patch('apps.publish.enqueue.enqueue_item')
def test_enqueue_item_scheduled_in_future(self, *mocks):
fake_enqueue_item = mocks[0]
queue_items = [
{
'_id': '1', 'item_id': 'item_1', 'queue_state': 'pending',
'state': 'scheduled',
'publish_schedule': utcnow() + timedelta(minutes=20),
'schedule_settings': {
'utc_publish_schedule': utcnow() + timedelta(minutes=20),
'time_zone': None
}
}
]
enqueue_items(queue_items)
assert not fake_enqueue_item.called, 'method should not have been called'

@mock.patch('apps.publish.enqueue.enqueue_item')
def test_enqueue_item_scheduled_elapsed(self, *mocks):
fake_enqueue_item = mocks[0]
queue_items = [
{
'_id': '1', 'item_id': 'item_1', 'queue_state': 'pending',
'state': 'scheduled',
'publish_schedule': utcnow() + timedelta(minutes=-2),
'schedule_settings': {
'utc_publish_schedule': utcnow() + timedelta(minutes=-2),
'time_zone': None
}
}
]
enqueue_items(queue_items)
fake_enqueue_item.assert_called_with(queue_items[0])
def test_get_enqueue_items(self):
self.app.data.insert('published', self.published_items)
items = get_published_items()
self.assertEqual(2, len(items))
ids = [item[config.ID_FIELD] for item in items]
self.assertNotIn(3, ids)

0 comments on commit 6fe05bd

Please sign in to comment.