-
-
Notifications
You must be signed in to change notification settings - Fork 400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Malware bazaar ingestor #2259
Malware bazaar ingestor #2259
Changes from 18 commits
e4f450f
8ed18f1
1d27bf5
6497a98
5ac7d17
b3d20a3
e359d83
87d9a3d
570c62e
e636124
90e30e6
9913052
30ca00a
8088cf4
5eda409
718ff36
ee48aaf
01bf119
0682935
bb6e436
ff7cf12
a60eab2
570b047
33fc56a
bb5e1c6
2164e00
a44d70e
d530b74
63544a4
33ffb01
8d946bb
ff7fa2a
f419a5a
9e7e6b3
1892e4d
7f7b814
75a450c
ad63bc0
7edd869
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
import base64 | ||
import logging | ||
import traceback | ||
import typing | ||
|
@@ -7,6 +8,7 @@ | |
import requests | ||
from billiard.exceptions import SoftTimeLimitExceeded | ||
from django.conf import settings | ||
from django.core.files import File | ||
from django.utils import timezone | ||
from django.utils.functional import cached_property | ||
from requests import HTTPError | ||
|
@@ -121,9 +123,23 @@ | |
self.report.save() | ||
|
||
def after_run_success(self, content: typing.Any): | ||
# exhaust generator | ||
if isinstance(content, typing.Generator): | ||
content = list(content) | ||
self.report.report = content | ||
# avoiding JSON serialization errors for types: File and bytes | ||
report_content = content | ||
if isinstance(report_content, typing.List): | ||
if all(isinstance(n, File) for n in report_content): | ||
report_content = [ | ||
base64.b64encode(f.read()).decode("utf-8") for f in report_content | ||
] | ||
if all(isinstance(n, bytes) for n in report_content): | ||
report_content = [ | ||
base64.b64encode(b).decode("utf-8") for b in report_content | ||
] | ||
|
||
self.content = content | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not necessary |
||
self.report.report = report_content | ||
self.report.status = self.report.Status.SUCCESS.value | ||
self.report.save(update_fields=["status", "report"]) | ||
|
||
|
@@ -268,7 +284,7 @@ | |
if url and url.startswith("http"): | ||
if settings.STAGE_CI or settings.MOCK_CONNECTIONS: | ||
return True | ||
logger.info(f"healthcheck url {url} for {self}") | ||
try: | ||
# momentarily set this to False to | ||
# avoid fails for https services | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -57,13 +57,13 @@ def before_run(self): | |
def after_run_success(self, content): | ||
super().after_run_success(content) | ||
self._config: IngestorConfig | ||
# exhaust generator | ||
deque( | ||
self._config.create_jobs( | ||
# every job created from an ingestor | ||
content, | ||
self.content, | ||
TLP.CLEAR.value, | ||
self._user, | ||
self._config.delay, | ||
), | ||
maxlen=0, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. def after_run_success(self, content):
pre_parsing_content = content # <------------
super().after_run_success(content)
self._config: IngestorConfig
deque(
self._config.create_jobs(
# every job created from an ingestor
pre_parsing_content, # <-----------
TLP.CLEAR.value,
self._user,
self._config.delay,
),
maxlen=0,
)
``` |
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
import io | ||
import logging | ||
from datetime import datetime | ||
from typing import Any, Iterable | ||
from unittest.mock import patch | ||
|
||
import pyzipper | ||
import requests | ||
|
||
from api_app.ingestors_manager.classes import Ingestor | ||
from api_app.ingestors_manager.exceptions import IngestorRunException | ||
from tests.mock_utils import MockUpResponse, if_mock_connections | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class MalwareBazaar(Ingestor): | ||
url: str | ||
hours: int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add as comment the same description that you added in the |
||
signatures: str | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the |
||
|
||
def run(self) -> Iterable[Any]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function imho should be split in 3 parts:
|
||
# extract file hashes per signature | ||
hashes = set() | ||
now = datetime.now() | ||
federicofantini marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for signature in self.signatures: | ||
result = requests.post( | ||
self.url, | ||
data={"query": "get_siginfo", "signature": signature, "limit": 100}, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a timeout to the post request? Just in case |
||
result.raise_for_status() | ||
content = result.json() | ||
logger.info(f"Malware bazaar data for signature {signature} is {content}") | ||
if content["query_status"] != "ok": | ||
raise IngestorRunException( | ||
f"Query status is invalid: {content['query_status']}" | ||
) | ||
if not isinstance(content["data"], list): | ||
raise IngestorRunException(f"Content {content} not expected") | ||
|
||
for elem in content["data"]: | ||
first_seen = datetime.strptime(elem["first_seen"], "%Y-%m-%d %H:%M:%S") | ||
diff = int((now - first_seen).total_seconds()) // 3600 | ||
if elem["signature"] == signature and diff <= self.hours: | ||
hashes.add(elem["sha256_hash"]) | ||
|
||
last_hours_str = ( | ||
"Last hour" if self.hours == 1 else f"Last {self.hours} hours" | ||
) | ||
logger.info( | ||
f"{last_hours_str} {signature} samples: " | ||
f"{len(hashes)}/{len(content['data'])}" | ||
) | ||
|
||
# download sample and create new analysis | ||
for h in hashes: | ||
logger.info(f"Downloading sample {h}") | ||
sample_archive = requests.post( | ||
self.url, | ||
data={ | ||
"query": "get_file", | ||
"sha256_hash": h, | ||
}, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a timeout |
||
sample_archive.raise_for_status() | ||
logger.info(f"Correctly downloaded sample {h}") | ||
with pyzipper.AESZipFile(io.BytesIO(sample_archive.content)) as zf: | ||
zf.setpassword(b"infected") | ||
files = zf.namelist() | ||
if files and len(files) == 1: | ||
sample = zf.read(files[0]) | ||
|
||
yield sample | ||
|
||
@classmethod | ||
def _monkeypatch(cls): | ||
patches = [ | ||
if_mock_connections( | ||
patch( | ||
"requests.post", | ||
return_value=MockUpResponse( | ||
{ | ||
"query_status": "ok", | ||
"data": [ | ||
{ | ||
"sha256_hash": "c5c810beaf075f8fee52146b381b0f94a6" | ||
"e303fada3bce12bcc07fbfa07ba07e", | ||
"sha3_384_hash": "bdd25a594b5a5d8ab14b00c04ee75d6a" | ||
"476bf2a7df49223284eebfac82be107a" | ||
"b94ffaae294ef4cf0a1c23a206e1fbd9", | ||
"sha1_hash": "3fea40223c02a15678912a29147d2b32d05c" | ||
"46df", | ||
"md5_hash": "dc591fd6d108b50bd9aa1f3dce2f3fe4", | ||
"first_seen": "2024-04-11 12:35:10", | ||
"last_seen": None, | ||
"file_name": "17128389081d4616ae42b2693f5ea6783112" | ||
"f41cb2ee5184f49d983f8bf833df0b0e97b4" | ||
"29449.dat-decoded", | ||
"file_size": 240128, | ||
"file_type_mime": "application/x-dosexec", | ||
"file_type": "exe", | ||
"reporter": "abuse_ch", | ||
"anonymous": 0, | ||
"signature": "AgentTesla", | ||
"imphash": "f34d5f2d4577ed6d9ceec516c1f5a744", | ||
"tlsh": "T17534FD037E88EB15E5A87E3782EF6C2413B2B0C" | ||
"71633C60B6F49AF6518516426D7E72D", | ||
"telfhash": None, | ||
"gimphash": None, | ||
"ssdeep": "3072:z+ymieCL2QfOdb/TmqtbqRFP55EMX+CWQ:" | ||
"z+ymieCLPfOdbqq9qRFvXJW", | ||
"dhash_icon": None, | ||
"tags": ["AgentTesla", "base64-decoded", "exe"], | ||
"code_sign": [], | ||
"intelligence": { | ||
"clamav": None, | ||
"downloads": "338", | ||
"uploads": "1", | ||
"mail": None, | ||
}, | ||
} | ||
], | ||
}, | ||
200, | ||
), | ||
), | ||
patch( | ||
"requests.post", | ||
return_value=MockUpResponse( | ||
{}, content=b"AgentTesla malware downloaded!", status_code=200 | ||
), | ||
), | ||
) | ||
] | ||
return super()._monkeypatch(patches=patches) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,14 +12,11 @@ | |
|
||
|
||
class ThreatFox(Ingestor): | ||
url: str | ||
days: int | ||
|
||
BASE_URL = "https://threatfox-api.abuse.ch/api/v1/" | ||
|
||
def run(self) -> Iterable[Any]: | ||
result = requests.post( | ||
self.BASE_URL, json={"query": "get_iocs", "days": self.days} | ||
) | ||
result = requests.post(self.url, json={"query": "get_iocs", "days": self.days}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you change the URL? Why someone should be able to change the URL to something differently? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed the |
||
result.raise_for_status() | ||
content = result.json() | ||
logger.info(f"Threatfox data is {content}") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
# Generated by Django 4.2.11 on 2024-04-09 15:19 | ||
|
||
import datetime | ||
|
||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
dependencies = [ | ||
("ingestors_manager", "0017_4_change_primary_key"), | ||
] | ||
|
||
operations = [ | ||
migrations.AddField( | ||
model_name="ingestorconfig", | ||
name="delay", | ||
field=models.DurationField( | ||
default=datetime.timedelta, | ||
help_text="Expects data in the format 'DD HH:MM:SS'", | ||
), | ||
), | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
from django.db import migrations | ||
from django.db.models.fields.related_descriptors import ( | ||
ForwardManyToOneDescriptor, | ||
ForwardOneToOneDescriptor, | ||
ManyToManyDescriptor, | ||
) | ||
|
||
plugin = {'id': 2, 'python_module': {'health_check_schedule': None, 'update_schedule': {'minute': '0', 'hour': '*', 'day_of_week': '*', 'day_of_month': '*', 'month_of_year': '*'}, 'module': 'malware_bazaar.MalwareBazaar', 'base_path': 'api_app.ingestors_manager.ingestors'}, 'schedule': {'minute': '0', 'hour': '*', 'day_of_week': '*', 'day_of_month': '*', 'month_of_year': '*'}, 'periodic_task': {'crontab': {'minute': '0', 'hour': '*', 'day_of_week': '*', 'day_of_month': '*', 'month_of_year': '*'}, 'name': 'MalwarebazaarIngestor', 'task': 'intel_owl.tasks.execute_ingestor', 'kwargs': '{"config_name": "MalwareBazaar"}', 'queue': 'default', 'enabled': True}, 'user': {'username': 'MalwarebazaarIngestor', 'first_name': '', 'last_name': '', 'email': ''}, 'name': 'MalwareBazaar', 'description': 'MalwareBazaar ingestor', 'disabled': False, 'soft_time_limit': 60, 'routing_key': 'default', 'health_check_status': True, 'maximum_jobs': 30, 'delay': '00:00:30', 'health_check_task': None, 'playbook_to_execute': 2, 'model': 'ingestors_manager.IngestorConfig'} | ||
|
||
params = [{'python_module': {'module': 'malware_bazaar.MalwareBazaar', 'base_path': 'api_app.ingestors_manager.ingestors'}, 'name': 'url', 'type': 'str', 'description': 'API endpoint', 'is_secret': False, 'required': True}, {'python_module': {'module': 'malware_bazaar.MalwareBazaar', 'base_path': 'api_app.ingestors_manager.ingestors'}, 'name': 'hours', 'type': 'int', 'description': 'Download samples that are up to X hours old', 'is_secret': False, 'required': True}, {'python_module': {'module': 'malware_bazaar.MalwareBazaar', 'base_path': 'api_app.ingestors_manager.ingestors'}, 'name': 'signatures', 'type': 'list', 'description': 'Download samples from chosen signatures (aka malware families)', 'is_secret': True, 'required': True}] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it make sense that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe you dont't want to revealt that information, dunno. Anyway I changed that as NOT secret parameter |
||
|
||
values = [{'parameter': {'python_module': {'module': 'malware_bazaar.MalwareBazaar', 'base_path': 'api_app.ingestors_manager.ingestors'}, 'name': 'url', 'type': 'str', 'description': 'API endpoint', 'is_secret': False, 'required': True}, 'analyzer_config': None, 'connector_config': None, 'visualizer_config': None, 'ingestor_config': 'MalwareBazaar', 'pivot_config': None, 'for_organization': False, 'value': 'https://mb-api.abuse.ch/api/v1/', 'updated_at': '2024-04-11T14:42:19.308887Z', 'owner': None}, {'parameter': {'python_module': {'module': 'malware_bazaar.MalwareBazaar', 'base_path': 'api_app.ingestors_manager.ingestors'}, 'name': 'hours', 'type': 'int', 'description': 'Download samples that are up to X hours old', 'is_secret': False, 'required': True}, 'analyzer_config': None, 'connector_config': None, 'visualizer_config': None, 'ingestor_config': 'MalwareBazaar', 'pivot_config': None, 'for_organization': False, 'value': 1, 'updated_at': '2024-04-11T14:42:19.337793Z', 'owner': None}] | ||
|
||
|
||
def _get_real_obj(Model, field, value): | ||
def _get_obj(Model, other_model, value): | ||
if isinstance(value, dict): | ||
real_vals = {} | ||
for key, real_val in value.items(): | ||
real_vals[key] = _get_real_obj(other_model, key, real_val) | ||
value = other_model.objects.get_or_create(**real_vals)[0] | ||
# it is just the primary key serialized | ||
else: | ||
if isinstance(value, int): | ||
if Model.__name__ == "PluginConfig": | ||
value = other_model.objects.get(name=plugin["name"]) | ||
else: | ||
value = other_model.objects.get(pk=value) | ||
else: | ||
value = other_model.objects.get(name=value) | ||
return value | ||
|
||
if ( | ||
type(getattr(Model, field)) | ||
in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor] | ||
and value | ||
): | ||
other_model = getattr(Model, field).get_queryset().model | ||
value = _get_obj(Model, other_model, value) | ||
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value: | ||
other_model = getattr(Model, field).rel.model | ||
value = [_get_obj(Model, other_model, val) for val in value] | ||
return value | ||
|
||
def _create_object(Model, data): | ||
mtm, no_mtm = {}, {} | ||
for field, value in data.items(): | ||
value = _get_real_obj(Model, field, value) | ||
if type(getattr(Model, field)) is ManyToManyDescriptor: | ||
mtm[field] = value | ||
else: | ||
no_mtm[field] = value | ||
try: | ||
o = Model.objects.get(**no_mtm) | ||
except Model.DoesNotExist: | ||
o = Model(**no_mtm) | ||
o.full_clean() | ||
o.save() | ||
for field, value in mtm.items(): | ||
attribute = getattr(o, field) | ||
if value is not None: | ||
attribute.set(value) | ||
return False | ||
return True | ||
|
||
def migrate(apps, schema_editor): | ||
Parameter = apps.get_model("api_app", "Parameter") | ||
PluginConfig = apps.get_model("api_app", "PluginConfig") | ||
python_path = plugin.pop("model") | ||
Model = apps.get_model(*python_path.split(".")) | ||
if not Model.objects.filter(name=plugin["name"]).exists(): | ||
exists = _create_object(Model, plugin) | ||
if not exists: | ||
for param in params: | ||
_create_object(Parameter, param) | ||
for value in values: | ||
_create_object(PluginConfig, value) | ||
|
||
|
||
|
||
def reverse_migrate(apps, schema_editor): | ||
python_path = plugin.pop("model") | ||
Model = apps.get_model(*python_path.split(".")) | ||
Model.objects.get(name=plugin["name"]).delete() | ||
|
||
|
||
|
||
class Migration(migrations.Migration): | ||
atomic = False | ||
dependencies = [ | ||
('api_app', '0062_alter_parameter_python_module'), | ||
('ingestors_manager', '0018_ingestorconfig_delay'), | ||
] | ||
|
||
operations = [ | ||
migrations.RunPython( | ||
migrate, reverse_migrate | ||
) | ||
] | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elif
we do not want to check the content 2 times