-
Notifications
You must be signed in to change notification settings - Fork 166
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #690 from tenable/was-integration
Was integration
- Loading branch information
Showing
10 changed files
with
1,173 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
.. automodule:: tenable.io.was.api | ||
|
||
|
||
.. automodule:: tenable.io.was.iterator |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,191 @@ | ||
""" | ||
WAS | ||
=== | ||
The following methods allow for interaction into the Tenable.io | ||
:devportal:`WAS <was>` API endpoints. | ||
Methods available on ``tio.was``: | ||
.. rst-class:: hide-signature | ||
.. autoclass:: WasAPI | ||
:members: | ||
""" | ||
|
||
from tenable.io.base import TIOEndpoint, TIOIterator | ||
from tenable.io.was.iterator import WasIterator | ||
from typing import Any, Dict, Tuple | ||
|
||
|
||
class WasAPI(TIOEndpoint): | ||
""" | ||
This class contains methods related to WAS. | ||
""" | ||
|
||
def export(self, **kwargs) -> WasIterator: | ||
""" | ||
Export Web Application Scan Results based on filters applied. | ||
Args: | ||
single_filter (tuple): | ||
A single filter to apply to the scan configuration search. This is a tuple with three elements - | ||
field, operator, and value in that order. | ||
and_filter (list): An array of filters that must all be satisfied. This is a list of tuples with three elements - | ||
field, operator, and value in that order. | ||
or_filter (list): An array of filters where at least one must be satisfied. This is a list of tuples with three elements - | ||
field, operator, and value in that order. | ||
Returns: | ||
WasIterator | ||
Examples: | ||
Passing AND filter to the API | ||
>>> was_iterator = tio.was.export( | ||
... and_filter=[ | ||
... ("scans_started_at", "gte", "2023/03/24"), | ||
... ("scans_status", "contains", ["completed"]) | ||
... ] | ||
... ) | ||
... | ||
... for finding in was_iterator: | ||
... print(finding) | ||
""" | ||
|
||
# Get scan configuration iterator. | ||
scan_config = self._search_scan_configurations(**kwargs) | ||
|
||
# Iterate through the scan configs and collect the parent scan IDs and the finalized_at param. | ||
# This finalized_at property belonging to the parent will be passed down to its children's findings. | ||
parent_scan_ids_with_finalized_at = [_parent_id_with_finalized_at(sc) for sc in scan_config if sc] | ||
|
||
# initialize parent_scan_ids_with_finalized_at if it's empty. | ||
if not parent_scan_ids_with_finalized_at: | ||
parent_scan_ids_with_finalized_at = [] | ||
|
||
self._log.debug(f"We have {len(parent_scan_ids_with_finalized_at)} parent scan ID(s) to process.") | ||
|
||
# Fetch the target scans info for all the above parent scan IDs, and flatten it. | ||
# We need to flatten because, each parent ID will have multiple target scans. | ||
self._log.debug(f"Fetching Target scan IDs for {len(parent_scan_ids_with_finalized_at)} parent scan ID(s)") | ||
target_scans = [scan for p in parent_scan_ids_with_finalized_at for scan in | ||
self._get_target_scan_ids_for_parent(p)] | ||
|
||
# Iterate through the target scans info and collect the target scan IDs. | ||
target_scan_ids_with_parent_finalized_at = [_target_id_with_parent_finalized_at(ts) for ts in target_scans if ts] | ||
self._log.debug(f"We have {len(target_scan_ids_with_parent_finalized_at)} target scan(s) to process.") | ||
|
||
return WasIterator( | ||
api=self._api.was, | ||
target_scan_ids=target_scan_ids_with_parent_finalized_at | ||
) | ||
|
||
def download_scan_report(self, scan_uuid: str) -> Dict: | ||
""" | ||
Downloads the individual target scan results. | ||
Args: | ||
scan_uuid (str) UUID of the scan, whose report to download. | ||
""" | ||
return self._api.get( | ||
path=f"was/v2/scans/{scan_uuid}/report", | ||
headers={ | ||
"Content-Type": "application/json" | ||
} | ||
).json() | ||
|
||
def _search_scan_configurations(self, **kwargs) -> TIOIterator: | ||
""" | ||
Returns a list of web application scan configurations based on the provided filter parameters. | ||
""" | ||
payload = dict() | ||
|
||
# Either single_filter should be passed alone. Or, any or all of these [and_filter, or_filter] can be passed. | ||
if "single_filter" in kwargs and (("and_filter" in kwargs) or ("or_filter" in kwargs)): | ||
raise AttributeError("single_filter cannot be passed alongside and_filter or or_filter.") | ||
|
||
if "single_filter" in kwargs: | ||
payload = _tuple_to_filter(kwargs["single_filter"]) | ||
|
||
if "and_filter" in kwargs: | ||
payload["AND"] = [_tuple_to_filter(t) for t in kwargs["and_filter"]] | ||
|
||
if "or_filter" in kwargs: | ||
payload["OR"] = [_tuple_to_filter(t) for t in kwargs["or_filter"]] | ||
|
||
self._log.debug(f"Fetching the scan configuration information with filters: {payload} ...") | ||
|
||
return TIOIterator( | ||
self._api, | ||
_limit=self._check('limit', 200, int), | ||
_offset=self._check('offset', 0, int), | ||
_query=dict(), | ||
_path='was/v2/configs/search', | ||
_method="POST", | ||
_payload=payload, | ||
_resource='items' | ||
) | ||
|
||
def _get_target_scan_ids_for_parent(self, parent: dict) -> Dict: | ||
""" | ||
Returns the vulns by target scans of the given parent scan ID. | ||
""" | ||
# This method does not have an iterator and is not public as the API it invokes has not been publicly documented. | ||
# However, the API is in use in the Tenable.io UI. | ||
|
||
parent_scan_id = parent["parent_scan_id"] | ||
parent_finalized_at = parent["parent_finalized_at"] | ||
|
||
offset = 0 | ||
limit = 200 | ||
|
||
# initialize the flattened responses list | ||
flattened_list = [] | ||
|
||
while True: | ||
# Fetch the page | ||
response = self._api.post( | ||
path=f"was/v2/scans/{parent_scan_id}/vulnerabilities/by-targets/search?limit={limit}&offset={offset}" | ||
).json() | ||
|
||
# Collect the items; flatten; and write to the flattened list (extend). | ||
items_in_response = response["items"] | ||
items = [{ | ||
"items": item, | ||
"parent_finalized_at": parent_finalized_at | ||
} for item in items_in_response] | ||
|
||
flattened_list.extend(items) | ||
|
||
# Increment the page number by limit | ||
offset += limit | ||
|
||
if not items_in_response: | ||
self._log.debug(f"Stopping the iteration as we encountered an empty response from the API.") | ||
break | ||
|
||
self._log.debug(f"Parent ID: {parent_scan_id} has {len(flattened_list)} target ID(s).") | ||
|
||
return flattened_list | ||
|
||
|
||
def _tuple_to_filter(t: Tuple[str, str, Any]) -> Dict: | ||
""" | ||
Accepts a tuple with three elements, and returns a filter object. | ||
""" | ||
return {"field": t[0], "operator": t[1], "value": t[2]} | ||
|
||
|
||
def _parent_id_with_finalized_at(scan_config: dict): | ||
return { | ||
"parent_scan_id": scan_config["last_scan"]["scan_id"], | ||
"parent_finalized_at": scan_config["last_scan"]["finalized_at"] | ||
} | ||
|
||
|
||
def _target_id_with_parent_finalized_at(target_scan: dict): | ||
return { | ||
"target_scan_id": target_scan["items"]["scan"]["scan_id"], | ||
"parent_finalized_at": target_scan["parent_finalized_at"] | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
from typing import Any, List, Dict | ||
from restfly import APIIterator | ||
|
||
|
||
class WasIterator(APIIterator): | ||
""" | ||
The WAS iterator can be used to handle downloading and processing of the data from | ||
WAS export. | ||
Attributes: | ||
target_scan_ids: | ||
The target Scan IDs for the specified parent Scan ID. | ||
page (list[dict]): | ||
The current chunk of data. | ||
""" | ||
target_scan_ids: List[str] | ||
page: List[Dict] | ||
|
||
def __init__(self, api, target_scan_ids, **kwargs): | ||
self.target_scan_ids = target_scan_ids | ||
super().__init__(api, **kwargs) | ||
|
||
def _get_page(self) -> None: | ||
""" | ||
Get the scan results for the next target scan ID. | ||
""" | ||
current_target_scan = self.target_scan_ids.pop() | ||
current_target_scan_id = current_target_scan["target_scan_id"] | ||
current_parent_finalized_at = current_target_scan["parent_finalized_at"] | ||
self._log.debug(f"Getting the data for target ID: {current_target_scan_id} with finalized at: {current_parent_finalized_at}") | ||
|
||
scan_result = self._api.download_scan_report(current_target_scan_id) | ||
self.page = [_enriched_finding_object(scan_result, f, current_parent_finalized_at) for f in scan_result["findings"]] | ||
|
||
self._log.debug(f"Target ID: {current_target_scan_id} has {len(self.page)} finding(s).") | ||
|
||
# Corner case - if the chunk is empty, request for another page. | ||
if len(self.page) < 1: | ||
self._get_page() | ||
|
||
def next(self) -> Any: | ||
""" | ||
Ask for the next record in the current page. | ||
""" | ||
|
||
# If we've worked through all the items in the current page, get a new page. | ||
if (len(self.target_scan_ids) > 0) and (len(self.page) == 0): | ||
self._get_page() | ||
|
||
# Stop iteration when there are no more target IDs to process and the current page | ||
# has no more elements to process. | ||
elif (len(self.target_scan_ids) == 0) and (len(self.page) == 0): | ||
raise StopIteration() | ||
|
||
return self.page.pop() | ||
|
||
|
||
def _enriched_finding_object(page: Dict, finding: Dict, current_parent_finalized_at: str): | ||
""" | ||
Attaches config and scan info to each finding object. | ||
Note: This adjustment is done to enable integration with Splunk. | ||
""" | ||
return { | ||
"finding": finding, | ||
"parent_scan": { | ||
"finalized_at": current_parent_finalized_at | ||
}, | ||
"config": { | ||
"config_id": page["config"]["config_id"], | ||
"name": page["config"]["name"], | ||
"description": page["config"]["description"], | ||
}, | ||
"scan": page["scan"], | ||
} |
Empty file.
71 changes: 71 additions & 0 deletions
71
tests/io/was/cassettes/test_ensure_iterator_does_not_break_on_no_data_found.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
interactions: | ||
- request: | ||
body: '{"AND": [{"field": "scans_started_at", "operator": "gte", "value": "2100/03/23"}, | ||
{"field": "scans_status", "operator": "contains", "value": ["completed"]}]}' | ||
headers: | ||
Accept: | ||
- '*/*' | ||
Accept-Encoding: | ||
- gzip, deflate | ||
Connection: | ||
- keep-alive | ||
Content-Length: | ||
- '157' | ||
Content-Type: | ||
- application/json | ||
User-Agent: | ||
- Integration/1.0 (pytest; pytenable-automated-testing; Build/unknown) pyTenable/1.4.12 | ||
(Restfly/1.4.7; Python/3.10.6; Darwin/arm64) | ||
X-APIKeys: | ||
- accessKey=TIO_ACCESS_KEY;secretKey=TIO_SECRET_KEY | ||
method: POST | ||
uri: https://cloud.tenable.com/was/v2/configs/search?limit=200&offset=0 | ||
response: | ||
body: | ||
string: !!binary | | ||
H4sIAAAAAAAAAxzGQQrEIAwF0Lv8tQwyy1yllCHUWALVFJOuxLsX5q3exM2ndg61DpoIC75AOcFq | ||
dYl/L20aoG/OCW4jQNtE5yYgHNarnv557sIh5ceBBBtFBghF/MDaV4KGNAdt+3oBAAD//wMAD3nI | ||
IXIAAAA= | ||
headers: | ||
CF-Cache-Status: | ||
- DYNAMIC | ||
CF-RAY: | ||
- 7b084be58c843b68-IAD | ||
Cache-Control: | ||
- no-store | ||
Connection: | ||
- keep-alive | ||
Content-Encoding: | ||
- gzip | ||
Content-Type: | ||
- application/json; charset=utf-8 | ||
Date: | ||
- Fri, 31 Mar 2023 11:36:47 GMT | ||
Expect-CT: | ||
- enforce, max-age=86400 | ||
Pragma: | ||
- no-cache | ||
Server: | ||
- cloudflare | ||
Set-Cookie: | ||
- nginx-cloud-site-id=us-2b; path=/; HttpOnly; SameSite=Strict; Secure | ||
Strict-Transport-Security: | ||
- max-age=63072000; includeSubDomains | ||
Transfer-Encoding: | ||
- chunked | ||
Vary: | ||
- origin | ||
X-Content-Type-Options: | ||
- nosniff | ||
X-Frame-Options: | ||
- DENY | ||
X-Gateway-Site-ID: | ||
- service-nginx-router-us-east-1-prod-86955c7f46-6fnlw | ||
X-Request-Uuid: | ||
- 613a7ed65e7f80cbf34f707f8c26c20e | ||
X-Xss-Protection: | ||
- 1; mode=block | ||
status: | ||
code: 200 | ||
message: OK | ||
version: 1 |
Oops, something went wrong.