Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avoid cursor timeout in fix links command #2382

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion superdesk/eve_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def get_from_mongo(self, endpoint_name, req, lookup):
"""
req.if_modified_since = None
backend = self._backend(endpoint_name)
cursor, _ = backend.find(endpoint_name, req, lookup)
cursor, _ = backend.find(endpoint_name, req, lookup, perform_count=False)
self._cursor_hook(cursor=cursor, req=req)
return cursor

Expand Down
27 changes: 24 additions & 3 deletions superdesk/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license

from typing import Dict, Any
import logging

from typing import Union
from typing import Dict, Any, Union
from flask import current_app as app, json
from eve.utils import ParsedRequest, config
from eve.methods.common import resolve_document_etag


log = logging.getLogger(__name__)
logger = logging.getLogger(__name__)


class BaseService:
Expand Down Expand Up @@ -129,6 +128,28 @@ def find_and_modify(self, query, update, **kwargs):
def get_all(self):
return self.get_from_mongo(None, {}).sort("_id")

def get_all_batch(self, size=500, max_iterations=10000):
"""Gets all items using multiple queries.

When processing big collection and doing something time consuming you might get
a mongo cursor timeout, this should avoid it fetching `size` items in memory
and closing the cursor in between.
"""
last_id = None
for i in range(max_iterations):
if last_id is not None:
lookup = {"_id": {"$gt": last_id}}
else:
lookup = {}
items = list(self.get_from_mongo(req=None, lookup=lookup).sort("_id").limit(size))
if not len(items):
break
for item in items:
yield item
last_id = item["_id"]
else:
logger.warning("Not enough iterations for resource %s", self.datasource)

def _validator(self, skip_validation=False):
resource_def = app.config["DOMAIN"][self.datasource]
schema = resource_def["schema"]
Expand Down
2 changes: 1 addition & 1 deletion superdesk/storage/fix_links.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def run(self, prefix, resource, dry_run):
continue
print("Updating resource", res)
service = superdesk.get_resource_service(res)
for item in service.get_all():
for item in service.get_all_batch():
orig = copy.deepcopy(item)
hrefs = {}
updates = self.get_updates(item, prefix, hrefs)
Expand Down
13 changes: 13 additions & 0 deletions tests/datalayer_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,16 @@ def test_find_one_type(self):
item = self.app.data.find_one("archive", req=None, guid="foo")
self.assertIsNotNone(item)
self.assertEqual("archive", item.get("_type"))

def test_get_all_batch(self):
SIZE = 500
items = []
for i in range(SIZE):
items.append({"_id": "test-{:04d}".format(i)})
service = superdesk.get_resource_service("archive")
service.create(items)
counter = 0
for item in service.get_all_batch(size=5):
assert item["_id"] == "test-{:04d}".format(counter)
counter += 1
assert counter == SIZE