Skip to content

Commit

Permalink
Merge pull request #758 from p-l-/fix-mongo-top-addr
Browse files Browse the repository at this point in the history
DB/Mongo: fix {nmap,view}.topvalues("addr")
  • Loading branch information
p-l- committed Aug 26, 2019
2 parents 831aea4 + 538051f commit 9695efb
Showing 1 changed file with 49 additions and 24 deletions.
73 changes: 49 additions & 24 deletions ivre/db/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ def _old_array(*values, sep="###", convert_to_string=False):
return {'$concat': result}


def log_pipeline(pipeline):
"""Simple function to log (when config.DEBUG_DB is set) a MongoDB
pipeline for the aggregation framework.
"""
utils.LOGGER.debug("DB: MongoDB aggregation pipeline: %r", pipeline)


class MongoDB(DB):

schema_migrations_indexes = []
Expand Down Expand Up @@ -537,12 +545,12 @@ def _distinct(self, column, field, flt=None, sort=None, limit=None,
"""
is_ipfield = field in self.ipaddr_fields
pipeline = self._distinct_pipeline(field, flt=flt, sort=sort,
limit=limit, skip=skip,
is_ipfield=is_ipfield)
log_pipeline(pipeline)
cursor = self.set_limits(
self.db[column].aggregate(
self._distinct_pipeline(field, flt=flt, sort=sort, limit=limit,
skip=skip, is_ipfield=is_ipfield),
cursor={}
)
self.db[column].aggregate(pipeline, cursor={})
)
if is_ipfield:
if self.mongodb_32_more:
Expand Down Expand Up @@ -588,11 +596,13 @@ def _features_port_list(self, flt, yieldall, use_service, use_product,
pipeline.append(
{'$sort': OrderedDict([("_id", 1)])}
)
log_pipeline(pipeline)
for rec in self.db[
self.columns[self._features_column]
].aggregate(pipeline, cursor={}):
yield rec['_id']
else:
log_pipeline(pipeline)
for rec in self.db[
self.columns[self._features_column]
].aggregate(pipeline, cursor={}):
Expand Down Expand Up @@ -1559,6 +1569,7 @@ def getlocations(self, flt):
{"$project": {"_id": 0, "coords": "$infos.loc.coordinates"}},
{"$group": {"_id": "$coords", "count": {"$sum": 1}}},
]
log_pipeline(pipeline)
return ({'_id': tuple(rec['_id'][::-1]), 'count': rec['count']}
for rec in col.aggregate(pipeline, cursor={}))

Expand Down Expand Up @@ -1703,6 +1714,7 @@ def get_mean_open_ports(self, flt):
"id": "$_id",
"mean": {"$multiply": ["$count", "$ports"]}}},
]
log_pipeline(aggr)
return self.db[self.columns[self.column_hosts]].aggregate(aggr,
cursor={})

Expand Down Expand Up @@ -1733,6 +1745,7 @@ def group_by_port(self, flt):
{"$group": {"_id": "$ports",
"ids": {"$addToSet": "$_id"}}},
]
log_pipeline(aggr)
return self.db[self.columns[self.column_hosts]].aggregate(aggr,
cursor={})

Expand Down Expand Up @@ -2420,8 +2433,8 @@ def outputproc(x):
elif field == "addr":
specialproj = {
"_id": 0,
'$addr_0': 1,
'$addr_1': 1,
'addr_0': 1,
'addr_1': 1,
}
if self.mongodb_32_more:
specialflt = [{"$project": {field: ['$addr_0', '$addr_1']}}]
Expand Down Expand Up @@ -3363,6 +3376,7 @@ def outputproc(x):
skip=skip, least=least, aggrflt=aggrflt,
specialproj=specialproj, specialflt=specialflt,
)
log_pipeline(pipeline)
cursor = self.set_limits(
self.db[self.columns[self.column_hosts]].aggregate(pipeline,
cursor={})
Expand Down Expand Up @@ -3495,6 +3509,8 @@ def diff_categories(self, category1, category2, flt=None,
"port": "$ports.port"},
"categories": {"$push": "$categories"}}},
]
log_pipeline(pipeline)

cursor = self.db[self.columns[self.column_hosts]].aggregate(
pipeline, cursor={}
)
Expand Down Expand Up @@ -4120,6 +4136,7 @@ def outputproc(x):
}
pipeline = self._topvalues(field, flt=flt, specialproj=specialproj,
**kargs)
log_pipeline(pipeline)
cursor = self.set_limits(
self.db[self.columns[self.column_passive]].aggregate(
pipeline, cursor={},
Expand Down Expand Up @@ -4903,7 +4920,7 @@ def count(self, flt):
destinations = 0
flows = self.db[self.columns[self.column_flow]].count(flt)
if flows > 0:
sources = next(self.db[self.columns[self.column_flow]].aggregate([
pipeline = [
{'$match': flt},
{
'$group': {
Expand All @@ -4920,24 +4937,30 @@ def count(self, flt):
'_id': None,
'count': {'$sum': 1}
}}
]))['count']
]
log_pipeline(pipeline)
sources = next(
self.db[self.columns[self.column_flow]].aggregate(pipeline)
)['count']

pipeline = [
{'$match': flt},
{
'$group': {
'_id': {
'dst_addr_0': '$dst_addr_0',
'dst_addr_1': '$dst_addr_1'
},
}
},
{'$group': {
'_id': None,
'count': {'$sum': 1}
}}
]
log_pipeline(pipeline)
destinations = next(
self.db[self.columns[self.column_flow]].aggregate([
{'$match': flt},
{
'$group': {
'_id': {
'dst_addr_0': '$dst_addr_0',
'dst_addr_1': '$dst_addr_1'
},
}
},
{'$group': {
'_id': None,
'count': {'$sum': 1}
}}
])
self.db[self.columns[self.column_flow]].aggregate(pipeline)
)['count']
return {'clients': sources, 'servers': destinations, 'flows': flows}

Expand Down Expand Up @@ -5061,6 +5084,7 @@ def topvalues(self, flt, fields, collect_fields=None, sum_fields=None,
if topnbr is not None:
pipeline.append({"$limit": topnbr})

log_pipeline(pipeline)
res = self.db[self.columns[self.column_flow]].aggregate(pipeline,
cursor={})
for entry in res:
Expand Down Expand Up @@ -5757,6 +5781,7 @@ def flow_daily(self, precision, flt, after=None, before=None):
'_id.second': 1
}})

log_pipeline(pipeline)
res = self.db[self.columns[self.column_flow]].aggregate(pipeline,
cursor={})

Expand Down

0 comments on commit 9695efb

Please sign in to comment.