Skip to content

Commit

Permalink
Refactored to support paginated audit log api. #770
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveMcGrath committed May 14, 2024
1 parent c57002d commit 64f0f5a
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 80 deletions.
94 changes: 77 additions & 17 deletions tenable/io/audit_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,57 @@
Audit Log
=========
The following methods allow for interaction into the Tenable Vulnerability Management
:devportal:`audit log <audit-log>` API endpoints.
The following methods allow for interaction into the Tenable Vulnerability
Management :devportal:`audit log <audit-log>` API endpoints.
Methods available on ``io.audit_log``:
.. rst-class:: hide-signature
.. autoclass:: AuditLogAPI
:members:
'''
from typing_extensions import Literal
from typing import Tuple, Optional, Dict
from copy import copy
from .base import TIOEndpoint
from restfly.iterator import APIIterator


class AuditLogIterator(APIIterator):
'''
AuditLog Iterator
'''
_next_token: str = None
_payload: Dict

def _get_page(self) -> None:
'''
Request the next page of data
'''
payload = copy(self._payload)
print(self._next_token)
payload['next'] = self._next_token

if self.num_pages >= 1 and self._next_token is None:
raise StopIteration()
resp = self._api.get('audit-log/v1/events', params=payload, box=True)
self._next_token = resp.pagination.get('next')
self.page = resp.events
self.total = resp.pagination.total


class AuditLogAPI(TIOEndpoint):
def events(self, *filters, **kw):
_box = True
_path = 'audit-log/v1/events'

def events(self,
*filters: Tuple[str, str, str],
limit: int = 1000,
filter_type: Literal['and', 'or'] = 'and',
sort: Optional[str] = None,
token: Optional[str] = '0',
return_json: bool = False
):
'''
Retrieve audit logs from Tenable Vulnerability Management.
Expand All @@ -29,27 +67,49 @@ def events(self, *filters, **kw):
- ``('date', 'gt', '2017-07-05')``
- ``('date', 'lt', '2017-07-07')``
- ``('actor_id', 'match', '6000a811-8422-4096-83d3-e4d44f44b97d')``
- ``('target_id', 'match', '6000a811-8422-4096-83d3-e4d44f44b97d')``
- ``('actor_id', 'match', '6000a811-8422-4096-83d3-e4d44f7d')``
- ``('target_id', 'match', '6000a811-8422-4096-83d3-e4d447d')``
limit (int, optional):
The limit of how many events to return. The API will default to
50 unless otherwise specified.
The limit of how many events to return. The API will default
to 50 unless otherwise specified.
filter_type (str, optional):
if multiple filters are present, how should we combine the
filters? Supported values are `and` or `or`. If left
unspecified, the default is `and`.
sort (str, optional):
Should any soerting be performed on the resulting data? The
format is `FIELD_NAME:DIRECTION`. For example, supplying
`received:desc` would sort the results by the received field in
descencing order.
token (str, optional):
The `next` token to request the next page.
return_json (bool, optional):
Should we return the JSON response instead of iterable?
Returns:
:obj:`list`:
:obj:`AuditLogIterator`:
List of event records
Examples:
>>> events = tio.audit_log.events(
... ('date', 'gt', '2018-01-01'), limit=100)
>>> events = tio.audit_log.events(('date', 'gt', '2018-01-01'))
>>> for e in events:
... pprint(e)
'''
return self._api.get('audit-log/v1/events', params={
'f': ['{}.{}:{}'.format(
self._check('filter_field_name', f[0], str),
self._check('filter_operator', f[1], str),
self._check('filter_value', f[2], str)) for f in filters],
'limit': self._check('limit', kw['limit'], int) if 'limit' in kw else 50
}).json()['events']
payload = {
'f': [f'{f[0]}.{f[1]}:{f[2]}' for f in filters],
'ft': filter_type,
'limit': limit,
'next': token,
'sort': sort
}
if return_json:
return self._get(params=payload)
return AuditLogIterator(self._api,
_payload=payload,
_next_token=token
)
151 changes: 88 additions & 63 deletions tests/io/test_audit_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,72 +2,97 @@
test audit-log
'''
import pytest
from tenable.errors import ForbiddenError
from tests.checker import check
import responses
from responses.matchers import query_param_matcher
from copy import copy

@pytest.mark.vcr()
def test_auditlog_event_field_name_typeerror(api):
'''
test to raise exception when type of filter_name param does not match the expected type.
'''
with pytest.raises(TypeError):
api.audit_log.events((1, 'gt', '2018-01-01'))

@pytest.mark.vcr()
def test_auditlog_event_filter_operator_typeerror(api):
'''
test to raise exception when type of filter_operator param does not match the expected type.
'''
with pytest.raises(TypeError):
api.audit_log.events(('date', 1, '2018-01-01'))
@pytest.fixture
def event():
return {
'id': '8ca42afc7d4f42c19a731bc7bdac1efd',
'action': 'user.authenticate.password',
'cud': 'u',
'actor': {
'id': '84bba2d4-42a8-4fee-a259-15cd4b7dbddc',
'name': 'user@company.com'
},
'target': {
'id': '84bba2d4-42a8-4fee-a259-15cd4b7dbddc',
'name': 'user@company.com',
'type': 'User'
},
'description': None,
'is_anonymous': None,
'is_failure': False,
'fields': [
{
'key': 'X-Forwarded-For',
'value': '104.12.225.249, 104.12.225.249'
}, {
'key': 'X-Request-Uuid',
'value': 'abc123:abc123:abc123:abc123'
}
],
'received': '2022-05-24T19:09:47.982Z'
}

@pytest.mark.vcr()
def test_auditlog_event_filter_value_typeerror(api):
'''
test to raise exception when type of filter_value param does not match the expected type.
'''
with pytest.raises(TypeError):
api.audit_log.events(('date', 'gt', 1))

@pytest.mark.vcr()
def test_auditlog_event_limit_typeerror(api):
'''
test to raise exception when type of limit param does not match the expected type.
'''
with pytest.raises(TypeError):
api.audit_log.events(limit='nope')
@responses.activate
def test_audit_log_json(api, event):
resp = {
'pagination': {
'offset': 0,
'limit': 1000,
'count': 100,
'total': 100,
},
'events': [event for _ in range(100)]
}
responses.get('https://cloud.tenable.com/audit-log/v1/events',
json=resp
)
assert resp == api.audit_log.events(return_json=True)

@pytest.mark.vcr()
def test_auditlog_events_standard_user_permissionerror(stdapi):
'''
test to raise exception when standard_user tries to get audit log.
'''
with pytest.raises(ForbiddenError):
stdapi.audit_log.events()

@pytest.mark.vcr()
def test_auditlog_events(api):
'''
test to get audit log
'''
events = api.audit_log.events(('date', 'gt', '2018-01-01'), limit=100)
assert isinstance(events, list)
event = events[-1]
check(event, 'action', str)
check(event, 'actor', dict)
check(event['actor'], 'id', 'uuid')
check(event['actor'], 'name', str, allow_none=True)
check(event, 'crud', str)
check(event, 'description', str, allow_none=True)
check(event, 'fields', list)
for field in event['fields']:
check(field, 'key', str)
check(field, 'value', str)
check(event, 'id', str)
check(event, 'is_anonymous', bool, allow_none=True)
check(event, 'is_failure', bool, allow_none=True)
check(event, 'received', 'datetime')
check(event, 'target', dict)
check(event['target'], 'id', str)
check(event['target'], 'name', str, allow_none=True)
check(event['target'], 'type', str)
@responses.activate
def test_audit_log_iter(api, event):
with responses.RequestsMock() as rsps:
rsps.get('https://cloud.tenable.com/audit-log/v1/events',
json={
'pagination': {
'offset': 0,
'limit': 1000,
'count': 1000,
'total': 2000,
'next': 'abc123',
},
'events': [event for _ in range(1000)]
},
match=[query_param_matcher({
'next': '0',
'ft': 'and',
'limit': 1000
})]
)
rsps.get('https://cloud.tenable.com/audit-log/v1/events',
json={
'pagination': {
'offset': 0,
'limit': 1000,
'count': 1000,
'total': 2000,
},
'events': [event for _ in range(1000)]
},
match=[query_param_matcher({
'next': 'abc123',
'ft': 'and',
'limit': 1000
})]
)
events = api.audit_log.events()
for e in events:
assert e == event
assert events.total == 2000
assert events.count == 2000

0 comments on commit 64f0f5a

Please sign in to comment.