-
Notifications
You must be signed in to change notification settings - Fork 8
/
analyses_history.py
175 lines (160 loc) · 7.18 KB
/
analyses_history.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import datetime
from typing import Any
from typing import Dict
from typing import List
from intezer_sdk.analyses_results import AnalysesHistoryResult
from intezer_sdk.api import IntezerApiClient
from intezer_sdk.api import get_global_api
DEFAULT_LIMIT = 100
DEFAULT_OFFSET = 0
FILE_ANALYSES_REQUEST = '/analyses/history'
URL_ANALYSES_REQUEST = '/url-analyses/history'
ENDPOINT_ANALYSES_REQUEST = '/endpoint-analyses/history'
def query_file_analyses_history(*,
start_date: datetime.datetime,
end_date: datetime.datetime,
api: IntezerApiClient = None,
aggregated_view: bool = None,
sources: List[str] = None,
verdicts: List[str] = None,
file_hash: str = None,
family_names: List[str] = None,
file_name: str = None,
limit: int = DEFAULT_LIMIT,
offset: int = DEFAULT_OFFSET
) -> AnalysesHistoryResult:
"""
Query for file analyses history.
:param start_date: Date to query from.
:param end_date: Date to query until.
:param api: Instance of Intezer API for request server.
:param aggregated_view: Should the result be aggregated by latest hash.
:param sources: Filter the analyses by its source.
:param verdicts: Filter by the analysis's verdict
:param file_name: Filter by the uploaded file's name
:param family_names: Filter by the analysis's malicious family name
:param file_hash: Filter by the file's hash, in one of the following formats: SHA256, SHA1 or MD5
:param limit: Number of analyses returned by the query.
:param offset: Number of analyses to skips the before beginning to return the analyses.
:return: File query result from server as Results iterator.
"""
api = api or get_global_api()
api.assert_on_premise_above_v22_10()
filters = generate_analyses_history_filter(
start_date=start_date,
end_date=end_date,
aggregated_view=aggregated_view,
sources=sources,
verdicts=verdicts,
limit=limit,
offset=offset
)
if file_hash:
filters['hash'] = file_hash
if family_names:
filters['family_names'] = family_names
if file_name:
filters['file_name'] = file_name
return AnalysesHistoryResult(FILE_ANALYSES_REQUEST, api, filters)
def query_endpoint_analyses_history(*,
start_date: datetime.datetime,
end_date: datetime.datetime,
api: IntezerApiClient = None,
aggregated_view: bool = None,
sources: List[str] = None,
verdicts: List[str] = None,
limit: int = DEFAULT_LIMIT,
offset: int = DEFAULT_OFFSET
) -> AnalysesHistoryResult:
"""
Query for endpoint analyses history.
:param start_date: Date to query from.
:param end_date: Date to query until.
:param api: Instance of Intezer API for request server.
:param aggregated_view: Should the result be aggregated by latest computer.
:param sources: Filter the analyses by its source.
:param verdicts: Filter by the analysis's verdict
:param limit: Number of analyses returned by the query.
:param offset: Number of analyses to skips the before beginning to return the analyses.
:return: Endpoint query result from server as Results iterator.
"""
api = api or get_global_api()
api.assert_on_premise_above_v22_10()
filters = generate_analyses_history_filter(
start_date=start_date,
end_date=end_date,
aggregated_view=aggregated_view,
sources=sources,
verdicts=verdicts,
limit=limit,
offset=offset
)
return AnalysesHistoryResult(ENDPOINT_ANALYSES_REQUEST, api, filters)
def query_url_analyses_history(*,
start_date: datetime.datetime,
end_date: datetime.datetime,
api: IntezerApiClient = None,
sources: List[str] = None,
verdicts: List[str] = None,
sub_verdicts: List[str] = None,
did_download_file: bool = None,
submitted_url: str = None,
aggregated_view: bool = False,
limit: int = DEFAULT_LIMIT,
offset: int = DEFAULT_OFFSET
) -> AnalysesHistoryResult:
"""
Query for url analyses history.
:param start_date: Date to query from.
:param end_date: Date to query until.
:param api: Instance of Intezer API for request server.
:param sources: Filter the analyses by its source.
:param verdicts: Filter by the analysis's verdict
:param sub_verdicts: Filter by the analysis's verdict
:param did_download_file: Should the result be aggregated by latest url.
:param submitted_url: Filter by specific url
:param aggregated_view: Should the result be aggregated by latest url.
:param limit: Number of analyses returned by the query.
:param offset: Number of analyses to skips the before beginning to return the analyses.
:return: URL query result from server as Results iterator.
"""
api = api or get_global_api()
api.assert_on_premise_above_v22_10()
filters = generate_analyses_history_filter(
start_date=start_date,
end_date=end_date,
aggregated_view=aggregated_view,
sources=sources,
verdicts=verdicts,
limit=limit,
offset=offset
)
if did_download_file:
filters['did_download_file'] = did_download_file
if submitted_url:
filters['submitted_url'] = submitted_url
if sub_verdicts:
filters['sub_verdicts'] = sub_verdicts
return AnalysesHistoryResult(URL_ANALYSES_REQUEST, api, filters)
def generate_analyses_history_filter(*,
start_date: datetime.datetime,
end_date: datetime.datetime,
aggregated_view: bool = None,
sources: List[str] = None,
verdicts: List[str] = None,
limit: int = DEFAULT_LIMIT,
offset: int = DEFAULT_OFFSET
) -> Dict[str, Any]:
base_filter = {
'start_date': int(start_date.timestamp()),
'end_date': int(end_date.timestamp()),
'limit': limit,
'offset': offset
}
if aggregated_view is not None:
base_filter['aggregated_view'] = aggregated_view
if sources:
base_filter['sources'] = sources
if verdicts:
base_filter['verdicts'] = verdicts
return base_filter