Skip to content

Commit

Permalink
Add new option to send dropped files to MISP
Browse files Browse the repository at this point in the history
Note that we need pymisp==2.4.111.2 which fixes a bug, see MISP/PyMISP#321
  • Loading branch information
zaphodef committed Jul 25, 2019
1 parent b79b244 commit 4f6d811
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 2 deletions.
2 changes: 1 addition & 1 deletion cuckoo/private/cwd/conf/reporting.conf
Expand Up @@ -28,7 +28,7 @@ url = {{ reporting.misp.url }}
apikey = {{ reporting.misp.apikey }}

# The various modes describe which information should be submitted to MISP,
# separated by whitespace. Available modes: maldoc ipaddr hashes url.
# separated by whitespace. Available modes: maldoc ipaddr hashes url dropped_files.
mode = {{ reporting.misp.mode }}

distribution = {{ reporting.misp.distribution }}
Expand Down
39 changes: 39 additions & 0 deletions cuckoo/reporting/misp.py
Expand Up @@ -16,6 +16,7 @@

log = logging.getLogger(__name__)


class MISP(Report):
"""Enrich MISP with Cuckoo results."""

Expand Down Expand Up @@ -70,6 +71,40 @@ def domain_ipaddr(self, results, event):
self.misp.add_domains_ips(event, domains)
self.misp.add_ipdst(event, sorted(list(ipaddrs)))

def dropped_files(self, results, event):
"""
Add all the dropped files as MISP attributes.
"""
from pymisp import MISPEvent
for entry in results.get("dropped", []):
self.misp.upload_sample(
filename=entry.get("name"),
filepath_or_bytes=entry.get("path"),
event_id=event["Event"]["id"],
category="External analysis",
comment="Dropped file",
)

# Load the event from MISP (we cannot use event as it
# does not contain the sample uploaded above, nor it is
# a MISPEvent but a simple dict)
e = MISPEvent()
e.from_dict(Event=self.misp.get_event(event["Event"]["id"])["Event"])
dropped_file_obj = e.objects[-1]

# Add the real location of the dropped file (during the analysis)
real_filepath = entry.get("filepath")
dropped_file_obj.add_attribute("fullpath", real_filepath)

# Add Yara matches if any
for match in entry.get("yara", []):
desc = match["meta"]["description"]
dropped_file_obj.add_attribute("text",
value=desc, comment="Yara match")

# Update the event
self.misp.update_event(event["Event"]["id"], e)

def family(self, results, event):
for config in results.get("metadata", {}).get("cfgextr", []):
self.misp.add_detection_name(
Expand Down Expand Up @@ -194,6 +229,7 @@ def run(self, results):
filepath_or_bytes=self.task["target"],
event_id=event["Event"]["id"],
category="External analysis",
comment="Sample run",
)

self.signature(results, event)
Expand All @@ -207,4 +243,7 @@ def run(self, results):
if "ipaddr" in mode:
self.domain_ipaddr(results, event)

if "dropped_files" in mode:
self.dropped_files(results, event)

self.family(results, event)
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -208,7 +208,7 @@ def do_setup(**kwargs):
"pillow==3.2",
"pyelftools==0.24",
"pyguacamole==0.6",
"pymisp==2.4.106",
"pymisp==2.4.111.2",
"pymongo==3.0.3",
"python-dateutil==2.4.2",
"python-magic==0.4.12",
Expand Down
59 changes: 59 additions & 0 deletions tests/test_reporting.py
Expand Up @@ -265,6 +265,65 @@ def test_misp_sample_hashes():
comment="File submitted to Cuckoo"
)

def test_misp_dropped_files():
r = MISP()
r.misp = mock.MagicMock()

r.misp.upload_sample_files.return_value = None
r.misp.update_event.return_value = None
r.misp.get_event.return_value = {
"Event": {
'info': 'test',
'Object': [
{"name": "file"}
]
}
}
r.dropped_files({
"dropped": [
{
"path": "tests/files/foo.txt",
"filepath": "/tmp/foo.txt",
"name": "foo.txt",
"yara": [
{
"meta": {
"description": "Test"
}
}
],
}
]
} , {
"Event": {
"id": "0"
}
})

r.misp.upload_sample.assert_called_once_with(
filename="foo.txt", filepath_or_bytes="tests/files/foo.txt",
event_id="0", category="External analysis",
comment="Dropped file"
)

r.misp.update_event.assert_called_once()
params, dict_params = r.misp.update_event.call_args
event_id, event = params
assert event_id == "0"
obj = event.objects[-1]

assert obj.has_attributes_by_relation(["fullpath"])
attr = obj.get_attributes_by_relation("fullpath")[0]
assert 'value' in attr
assert attr.value == "/tmp/foo.txt"

assert obj.has_attributes_by_relation(["text"])
attr = obj.get_attributes_by_relation("text")[0]
assert 'value' in attr
assert attr.value == "Test"
assert 'comment' in attr
assert attr.comment == "Yara match"

def test_misp_signatures():
r = MISP()
r.misp = mock.MagicMock()
Expand Down

0 comments on commit 4f6d811

Please sign in to comment.