Skip to content

Commit

Permalink
Merge pull request #1440 from p-l-/fix-mongodb-cursor-timeout-view
Browse files Browse the repository at this point in the history
DB/Mongo/db2view: attempt to retry on cursor timeout
  • Loading branch information
p-l- committed Oct 10, 2022
2 parents e7d1df5 + d8fa7b2 commit 2165407
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 41 deletions.
1 change: 1 addition & 0 deletions ivre/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class DB:
datetime_fields = []
list_fields = []
text_fields = []
cursor_timeout_exceptions = ()

def __init__(self):
self.argparser = ArgumentParser(add_help=False)
Expand Down
3 changes: 2 additions & 1 deletion ivre/db/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@


import bson
from pymongo.errors import BulkWriteError
from pymongo.errors import BulkWriteError, CursorNotFound
import pymongo

try:
Expand Down Expand Up @@ -149,6 +149,7 @@ class MongoDB(DB):
schema_latest_versions: List[int] = []
hint_indexes: List[Dict[str, List[IndexKey]]] = []
no_limit = 0
cursor_timeout_exceptions = (CursorNotFound,)

def __init__(self, url):
super().__init__()
Expand Down
97 changes: 57 additions & 40 deletions ivre/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,18 +782,27 @@ def passive_to_view(flt, category=None):
function.
"""
for rec in db.passive.get(flt, sort=[("addr", 1)]):
if rec.get("schema_version") != PASSIVE_SCHEMA_VERSION:
utils.LOGGER.warning(
"Will not handle record with schema_version %d (%d needed) [%r]",
rec.get("schema_version", 0),
PASSIVE_SCHEMA_VERSION,
rec,
)
continue
outrec = passive_record_to_view(rec, category=category)
if outrec is not None:
yield outrec
done = False
skip = 0
while not done:
try:
for rec in db.passive.get(flt, sort=[("addr", 1)], skip=skip):
if rec.get("schema_version") != PASSIVE_SCHEMA_VERSION:
utils.LOGGER.warning(
"Will not handle record with schema_version %d (%d needed) [%r]",
rec.get("schema_version", 0),
PASSIVE_SCHEMA_VERSION,
rec,
)
skip += 1
continue
outrec = passive_record_to_view(rec, category=category)
if outrec is not None:
yield outrec
skip += 1
done = True
except db.passive.cursor_timeout_exceptions:
pass


def from_passive(flt, category=None):
Expand Down Expand Up @@ -848,34 +857,42 @@ def from_nmap(flt, category=None):
cur_addr = None
cur_rec = None
result = None
for rec in db.nmap.get(flt, sort=[("addr", 1)]):
if rec.get("schema_version") != ACTIVE_SCHEMA_VERSION:
utils.LOGGER.warning(
"Will not handle record with schema_version %d (%d needed) [%r]",
rec.get("schema_version", 0),
ACTIVE_SCHEMA_VERSION,
rec,
)
continue
if "addr" not in rec:
continue
rec = nmap_record_to_view(rec, category=category)
if cur_addr is None:
cur_addr = rec["addr"]
cur_rec = rec
continue
if cur_addr != rec["addr"]:
result = cur_rec
cur_rec = rec
cur_addr = rec["addr"]
set_auto_tags(result)
yield result
else:
cur_rec = db.view.merge_host_docs(cur_rec, rec)
continue
if cur_rec is not None:
set_auto_tags(cur_rec)
yield cur_rec
done = False
skip = 0
while not done:
try:
for rec in db.nmap.get(flt, sort=[("addr", 1)], skip=skip):
if rec.get("schema_version") != ACTIVE_SCHEMA_VERSION:
utils.LOGGER.warning(
"Will not handle record with schema_version %d (%d needed) [%r]",
rec.get("schema_version", 0),
ACTIVE_SCHEMA_VERSION,
rec,
)
skip += 1
continue
if "addr" not in rec:
skip += 1
continue
rec = nmap_record_to_view(rec, category=category)
if cur_addr is None:
cur_addr = rec["addr"]
cur_rec = rec
elif cur_addr == rec["addr"]:
cur_rec = db.view.merge_host_docs(cur_rec, rec)
else:
result = cur_rec
cur_rec = rec
cur_addr = rec["addr"]
set_auto_tags(result)
yield result
skip += 1
if cur_rec is not None:
set_auto_tags(cur_rec)
yield cur_rec
done = True
except db.nmap.cursor_timeout_exceptions:
pass


def to_view(itrs):
Expand Down

0 comments on commit 2165407

Please sign in to comment.