Skip to content

Commit

Permalink
Merge pull request #738 from vruello/flows_timeslot_precision_clean
Browse files Browse the repository at this point in the history
Flow: timeslots related features with MongoDB backend
  • Loading branch information
p-l- committed Aug 24, 2019
2 parents aeedbe1 + 08ea3bf commit 5d47349
Show file tree
Hide file tree
Showing 12 changed files with 774 additions and 145 deletions.
4 changes: 4 additions & 0 deletions ivre/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@
# When recording flow times, record the whole range from start_time to end_time
# This option is experimental and possibly useless in practice
FLOW_TIME_FULL_RANGE = True
# When recording flow times, represents the beginning of the first timeslot
# as a Unix timestamp shifted to local time.
# 0 means that the first timeslot starts at 1970-01-01 00:00 (Local time).
FLOW_TIME_BASE = 0
# Store high level protocols metadata in flows. It may take much more space.
FLOW_STORE_METADATA = True

Expand Down
128 changes: 122 additions & 6 deletions ivre/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2500,11 +2500,9 @@ def date_round(cls, date):

@classmethod
def from_filters(cls, filters, limit=None, skip=0, orderby="", mode=None,
timeline=False):
timeline=False, after=None, before=None, precision=None):
"""
Returns a flow.Query object representing the given filters
Note: limit, skip, orderby, mode and timeline are IGNORED. They are
present only for compatibility reasons with neo4j backend.
This should be inherited by backend specific classes
"""
query = flow.Query()
Expand All @@ -2519,16 +2517,134 @@ def _get_timeslots(cls, start_time, end_time):
Returns an array of timeslots included between start_time and end_time
"""
times = []
time = cls.date_round(start_time)
end_timeslot = cls.date_round(end_time)
while time <= end_timeslot:
first_timeslot = cls._get_timeslot(
start_time, config.FLOW_TIME_PRECISION, config.FLOW_TIME_BASE
)
time = first_timeslot['start']
last_timeslot = cls._get_timeslot(
end_time, config.FLOW_TIME_PRECISION, config.FLOW_TIME_BASE
)
end_time = last_timeslot['start']
while time <= end_time:
d = OrderedDict()
d['start'] = time
d['duration'] = config.FLOW_TIME_PRECISION
times.append(d)
time += timedelta(seconds=config.FLOW_TIME_PRECISION)
return times

@staticmethod
def _get_timeslot(time, precision, base):
ts = utils.datetime2timestamp(time)
ts += utils.tz_offset(ts)
new_ts = ts - (((ts % precision) - base) % precision)
new_ts -= utils.tz_offset(new_ts)
d = OrderedDict()
d["start"] = datetime.fromtimestamp(new_ts)
d["duration"] = precision
return d

def reduce_precision(self, new_precision, flt=None,
before=None, after=None, current_precision=None):
"""
Changes precision of timeslots to <new_precision> of flows
honoring:
- the given filter <flt> if specified
- that have been seen before <before> if specified
- that have been seen after <after> if specified
- timeslots changed must currently have <current_precision> if
specified
<base> represents the timestamp of the base point.
If <current_precision> is specified:
- <new_precision> must be a multiple of <current_precision>
- <new_precision> must be greater than <current_precision>
Timeslots that do not respect these rules will not be updated.
"""
raise NotImplementedError("Only available with MongoDB backend.")

def list_precisions(self):
"""
Retrieves the list of timeslots precisions in the database.
"""
raise NotImplementedError("Only available with MongoDB backend.")

def count(self, flt):
"""
Returns a dict {'client': nb_clients, 'servers': nb_servers',
'flows': nb_flows} according to the given filter.
"""
raise NotImplementedError

def flow_daily(self, precision, flt, after=None, before=None):
"""
Returns a generator within each element is a dict
{
flows: [("proto/dport", count), ...]
time_in_day: time
}
"""
raise NotImplementedError

def to_graph(self, flt, limit=None, skip=None, orderby=None, mode=None,
timeline=False, after=None, before=None):
"""
Returns a dict {"nodes": [], "edges": []}.
"""
raise NotImplementedError

def to_iter(self, flt, limit=None, skip=None, orderby=None, mode=None,
timeline=False, after=None, before=None, precision=None):
"""
Returns a generator which yields dict {"src": src, "dst": dst,
"flow": flow}.
"""
raise NotImplementedError

def host_details(self, node_id):
"""
Returns details about an host with the given address
Details means a dict : {
in_flows: set() => incoming flows (proto, dport),
out_flows: set() => outcoming flows (proto, dport),
elt: {} => data about the host
clients: set() => hosts which talked to this host
servers: set() => hosts which this host talked to
}
"""
raise NotImplementedError

def flow_details(self, flow_id):
"""
Returns details about a flow with the given ObjectId.
Details mean : {
elt: {} => basic data about the flow,
meta: [] => meta entries corresponding to the flow
}
"""
raise NotImplementedError

def topvalues(self, flt, fields, collect_fields=None, sum_fields=None,
limit=None, skip=None, least=False, topnbr=10):
"""
Returns the top values honoring the given `query` for the given
fields list `fields`, counting and sorting the aggregated records
by `sum_fields` sum and storing the `collect_fields` fields of
each original entry in aggregated records as a list.
By default, the aggregated records are sorted by their number of
occurences.
Return format:
{
fields: (field_1_value, field_2_value, ...),
count: count,
collected: [
(collect_1_value, collect_2_value, ...),
...
]
}
Collected fields are unique.
"""
raise NotImplementedError


class MetaDB(object):

Expand Down

0 comments on commit 5d47349

Please sign in to comment.