Skip to content

Commit

Permalink
Merge pull request #92 from intezer/feat/url-latest-analysis
Browse files Browse the repository at this point in the history
Feat/url latest analysis
  • Loading branch information
matany90 committed May 8, 2023
2 parents 7d693c0 + 51e6f3c commit d67541a
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 3 deletions.
3 changes: 2 additions & 1 deletion CHANGES
@@ -1,5 +1,6 @@
Unreleased
1.16.7
____
- Support latest analysis for URL analysis
- Add support for proxies
- Remove deprecated edr assessment routes

Expand Down
15 changes: 15 additions & 0 deletions examples/url_latest_analysis.py
@@ -0,0 +1,15 @@
import sys
from pprint import pprint

from intezer_sdk import api
from intezer_sdk.analysis import UrlAnalysis


def get_url_latest_analysis(url: str):
api.set_global_api('<api-key>')
analysis = UrlAnalysis.from_latest_analysis(url)
if analysis:
pprint(analysis.result())

if __name__ == '__main__':
get_url_latest_analysis(*sys.argv[1:])
2 changes: 1 addition & 1 deletion intezer_sdk/__init__.py
@@ -1 +1 @@
__version__ = '1.16.6'
__version__ = '1.16.7'
2 changes: 1 addition & 1 deletion intezer_sdk/analyses_history.py
Expand Up @@ -167,7 +167,7 @@ def generate_analyses_history_filter(*,
'offset': offset
}
if aggregated_view is not None:
base_filter['aggregate_view'] = aggregated_view
base_filter['aggregated_view'] = aggregated_view
if sources:
base_filter['sources'] = sources
if verdicts:
Expand Down
33 changes: 33 additions & 0 deletions intezer_sdk/analysis.py
@@ -1,6 +1,7 @@
import datetime
import logging
import os
import re
from http import HTTPStatus
from typing import BinaryIO
from typing import IO
Expand All @@ -17,6 +18,7 @@
from intezer_sdk import operation
from intezer_sdk._api import IntezerApi
from intezer_sdk._util import deprecated
from intezer_sdk.analyses_history import query_url_analyses_history
from intezer_sdk.api import IntezerApiClient
from intezer_sdk.api import get_global_api
from intezer_sdk.base_analysis import Analysis
Expand Down Expand Up @@ -309,6 +311,15 @@ def get_file_analysis_by_id(analysis_id: str, api: IntezerApi = None) -> Optiona
def get_analysis_by_id(analysis_id: str, api: IntezerApi = None) -> Optional[FileAnalysis]:
return get_file_analysis_by_id(analysis_id, api)

def _clean_url(url: str) -> str:
"""
Remove http:// or https:// or www. from the beginning of the URL,
and / from the end of the URL.
"""
url = re.sub(r'^https?://(www\.)?', '', url)
url = re.sub(r'\/$', '', url)

return url

class UrlAnalysis(Analysis):
"""
Expand Down Expand Up @@ -349,6 +360,28 @@ def from_analysis_id(cls, analysis_id: str, api: IntezerApiClient = None) -> Opt
response = IntezerApi(api or get_global_api()).get_url_analysis_response(analysis_id, True)
return cls._create_analysis_from_response(response, api, analysis_id)

@classmethod
def from_latest_analysis(cls,
url: str,
days_threshold_for_latest_analysis: int = 1,
api: IntezerApiClient = None) -> Optional['UrlAnalysis']:
now = datetime.datetime.now()
yesterday = now - datetime.timedelta(days=days_threshold_for_latest_analysis)

analysis_history_url_result = query_url_analyses_history(start_date=yesterday,
end_date=now,
aggregated_view=True,
api=api)
all_analyses_reports = analysis_history_url_result.all()

analyses_ids = [report['analysis_id'] for report in all_analyses_reports
if _clean_url(url) in (_clean_url(report['scanned_url']), _clean_url(report['submitted_url']))]

if not analyses_ids:
return None

return cls.from_analysis_id(analyses_ids[0])

def _set_report(self, report: dict):
super()._set_report(report)
if not self.url:
Expand Down
44 changes: 44 additions & 0 deletions tests/unit/test_url_analysis.py
Expand Up @@ -201,3 +201,47 @@ def test_url_analysis_doesnt_reference_file_analysis_when_not_exists(self):

# Assert
self.assertIsNone(analysis.downloaded_file_analysis)

def test_get_url_latest_analysis(self):
# Arrange
url = 'https://intezer.com'
analysis_id = str(uuid.uuid4())
get_analysis_result = {'analysis_id': analysis_id, 'submitted_url': url}
fetch_history_result = {'analyses': [{'analysis_id': analysis_id, 'scanned_url': url, 'submitted_url': url}],
'total_count': 1}

with responses.RequestsMock() as mock:
mock.add('POST',
url=self.full_url + '/url-analyses/history',
status=200,
json=fetch_history_result)
mock.add('GET',
url='{}/url/{}'.format(self.full_url, analysis_id),
status=200,
json={'result': get_analysis_result, 'status': 'succeeded'})


# Act
analysis = UrlAnalysis.from_latest_analysis(url)

# Assert
self.assertEqual(analysis.url, url)

def test_get_url_latest_analysis_analyses_not_found(self):
# Arrange
url = 'https://intezer.com'
fetch_history_result = {'analyses': [],
'total_count': 0}

with responses.RequestsMock() as mock:
mock.add('POST',
url=self.full_url + '/url-analyses/history',
status=200,
json=fetch_history_result)


# Act
analysis = UrlAnalysis.from_latest_analysis(url)

# Assert
self.assertIsNone(analysis)

0 comments on commit d67541a

Please sign in to comment.