-
Notifications
You must be signed in to change notification settings - Fork 284
/
Copy pathemailobject.py
467 lines (410 loc) · 22.1 KB
/
emailobject.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
#!/usr/bin/env python3
from __future__ import annotations
import re
import logging
import ipaddress
import email.utils
from email import policy, message_from_bytes
from email.message import EmailMessage
from io import BytesIO
from pathlib import Path
from typing import cast, Any
from extract_msg import openMsg
from extract_msg.msg_classes import MessageBase
from extract_msg.attachments import AttachmentBase, SignedAttachment
from extract_msg.properties import FixedLengthProp
from RTFDE.exceptions import MalformedEncapsulatedRtf, NotEncapsulatedRtf # type: ignore
from RTFDE.deencapsulate import DeEncapsulator # type: ignore
from oletools.common.codepages import codepage2codec # type: ignore
from ..exceptions import InvalidMISPObject, MISPObjectException, NewAttributeError
from .abstractgenerator import AbstractMISPObjectGenerator
logger = logging.getLogger('pymisp')
class MISPMsgConverstionError(MISPObjectException):
pass
class EMailObject(AbstractMISPObjectGenerator):
def __init__(self, filepath: Path | str | None=None, pseudofile: BytesIO | bytes | None=None, # type: ignore[no-untyped-def]
attach_original_email: bool = True, **kwargs) -> None:
super().__init__('email', **kwargs)
self.attach_original_email = attach_original_email
self.encapsulated_body: str | None = None
self.eml_from_msg: bool | None = None
self.raw_emails: dict[str, BytesIO | None] = {'msg': None, 'eml': None}
self.__pseudofile = self.create_pseudofile(filepath, pseudofile)
self.email = self.parse_email()
self.generate_attributes()
def parse_email(self) -> EmailMessage:
"""Convert email into EmailMessage."""
content_in_bytes = self.__pseudofile.getvalue().strip()
eml = message_from_bytes(content_in_bytes,
_class=EmailMessage,
policy=policy.default)
eml = cast(EmailMessage, eml) # Only needed to quiet mypy
if len(eml) != 0:
self.raw_emails['eml'] = self.__pseudofile
return eml
else:
logger.debug("Email not in standard .eml format. Attempting to decode email from other formats.")
try: # Check for .msg formatted emails.
# Msg files have the same header signature as the CFB format
if content_in_bytes[:8] == b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1":
message = self._msg_to_eml(content_in_bytes)
if len(message) != 0:
self.eml_from_msg = True
self.raw_emails['msg'] = self.__pseudofile
self.raw_emails['msg'] = BytesIO(message.as_bytes())
return message
except ValueError as _e: # Exception
logger.debug("Email not in .msg format or is a corrupted .msg. Attempting to decode email from other formats.")
logger.debug(f"Error: {_e} ")
try:
if content_in_bytes[:3] == b'\xef\xbb\xbf': # utf-8-sig byte-order mark (BOM)
eml_bytes = content_in_bytes.decode("utf_8_sig").encode("utf-8")
eml = email.message_from_bytes(eml_bytes,
policy=policy.default)
eml = cast(EmailMessage, eml) # Only needed to quiet mypy
if len(eml) != 0:
self.raw_emails['eml'] = BytesIO(eml_bytes)
return eml
except UnicodeDecodeError:
pass
raise InvalidMISPObject("EmailObject does not know how to decode data passed to it. Object may not be an email. If this is an email please submit it as an issue to PyMISP so we can add support.")
@staticmethod
def create_pseudofile(filepath: Path | str | None = None,
pseudofile: BytesIO | bytes | None = None) -> BytesIO:
"""Creates a pseudofile using directly passed data or data loaded from file path.
"""
if filepath:
with open(filepath, 'rb') as f:
return BytesIO(f.read())
elif pseudofile and isinstance(pseudofile, BytesIO):
return pseudofile
elif pseudofile and isinstance(pseudofile, bytes):
return BytesIO(pseudofile)
else:
raise InvalidMISPObject('File buffer (BytesIO) or a path is required.')
def _msg_to_eml(self, msg_bytes: bytes) -> EmailMessage:
"""Converts a msg into an eml."""
# NOTE: openMsg returns a MessageBase, not a MSGFile
msg_obj: MessageBase = openMsg(msg_bytes) # type: ignore
# msg obj stores the original raw header here
message, body, attachments = self._extract_msg_objects(msg_obj)
eml = self._build_eml(message, body, attachments)
return eml
def _extract_msg_objects(self, msg_obj: MessageBase) -> tuple[EmailMessage, dict[str, Any], list[AttachmentBase] | list[SignedAttachment]]:
"""Extracts email objects needed to construct an eml from a msg."""
message: EmailMessage = email.message_from_string(msg_obj.header.as_string(), policy=policy.default) # type: ignore
body = {}
if msg_obj.body is not None:
body['text'] = {"obj": msg_obj.body,
"subtype": 'plain',
"charset": "utf-8",
"cte": "base64"}
if msg_obj.htmlBody is not None:
try:
if isinstance(msg_obj.props['3FDE0003'], FixedLengthProp):
_html_encoding_raw = msg_obj.props['3FDE0003'].value
_html_encoding = codepage2codec(_html_encoding_raw)
else:
_html_encoding = msg_obj.stringEncoding
except KeyError:
_html_encoding = msg_obj.stringEncoding
body['html'] = {'obj': msg_obj.htmlBody.decode(),
"subtype": 'html',
"charset": _html_encoding,
"cte": "base64"}
if msg_obj.rtfBody is not None:
body['rtf'] = {"obj": msg_obj.rtfBody.decode(),
"subtype": 'rtf',
"charset": 'ascii',
"cte": "base64"}
try:
rtf_obj = DeEncapsulator(msg_obj.rtfBody)
rtf_obj.deencapsulate()
if (rtf_obj.content_type == "html") and (msg_obj.htmlBody is None):
self.encapsulated_body = 'text/html'
body['html'] = {"obj": rtf_obj.html,
"subtype": 'html',
"charset": rtf_obj.text_codec,
"cte": "base64"}
elif (rtf_obj.content_type == "text") and (msg_obj.body is None):
self.encapsulated_body = 'text/plain'
body['text'] = {"obj": rtf_obj.plain_text,
"subtype": 'plain',
"charset": rtf_obj.text_codec}
except NotEncapsulatedRtf:
logger.debug("RTF body in Msg object is not encapsualted.")
except MalformedEncapsulatedRtf:
logger.info("RTF body in Msg object contains encapsulated content, but it is malformed and can't be converted.")
attachments = msg_obj.attachments
return message, body, attachments
def _build_eml(self, message: EmailMessage, body: dict[str, Any], attachments: list[Any]) -> EmailMessage:
"""Constructs an eml file from objects extracted from a msg."""
# Order the body objects by increasing complexity and toss any missing objects
body_objects: list[dict[str, Any]] = [i for i in [body.get('text'),
body.get('html'),
body.get('rtf')] if i is not None]
# If this a non-multipart email then we only need to attach the payload
if message.get_content_maintype() != 'multipart':
for _body in body_objects:
if "text/{}".format(_body['subtype']) == message.get_content_type():
message.set_content(**_body)
return message
raise MISPMsgConverstionError("Unable to find appropriate eml payload in message body.")
# If multipart we are going to have to set the content type to null and build it back up.
_orig_boundry = message.get_boundary()
message.clear_content()
# See if we are dealing with `related` inline content
related_content = {}
if isinstance(body.get('html', None), dict):
_html = body.get('html', {}).get('obj')
for attch in attachments:
if _html.find(f"cid:{attch.cid}") != -1:
_content_type = attch.getStringStream('__substg1.0_370E')
maintype, subtype = _content_type.split("/", 1)
related_content[attch.cid] = (attch,
{'obj': attch.data,
"maintype": maintype,
"subtype": subtype,
"cid": attch.cid,
"filename": attch.longFilename})
if len(related_content) > 0:
if body.get('text', None) is not None:
# Text always goes first in an alternative, but we need the related object first
body_text = body.get('text')
if isinstance(body_text, dict):
message.add_related(**body_text)
else:
body_html = body.get('html')
if isinstance(body_html, dict):
message.add_related(**body_html)
for mime_items in related_content.values():
if isinstance(mime_items[1], dict):
message.add_related(**mime_items[1])
if p := message.get_payload():
if isinstance(p, list):
cur_attach = p[-1]
else:
cur_attach = p
self._update_content_disp_properties(mime_items[0], cur_attach)
if body.get('text', None):
# Now add the HTML as an alternative within the related obj
if p := message.get_payload():
if isinstance(p, list):
related = p[0]
else:
related = p
related.add_alternative(**body.get('html'))
else:
for mime_dict in body_objects:
# If encapsulated then don't attach RTF
if self.encapsulated_body is not None:
if mime_dict.get('subtype', "") == "rtf":
continue
if isinstance(mime_dict, dict):
message.add_alternative(**mime_dict)
for attch in attachments: # Add attachments at the end.
if attch.cid not in related_content.keys():
_content_type = attch.getStringStream('__substg1.0_370E')
maintype, subtype = _content_type.split("/", 1)
message.add_attachment(attch.data,
maintype=maintype,
subtype=subtype,
cid=attch.cid,
filename=attch.longFilename)
if p := message.get_payload():
if isinstance(p, list):
cur_attach = p[-1]
else:
cur_attach = p
self._update_content_disp_properties(attch, cur_attach)
if _orig_boundry is not None:
message.set_boundary(_orig_boundry) # Set back original boundary
return message
@staticmethod
def _update_content_disp_properties(msg_attch: AttachmentBase, eml_attch: EmailMessage) -> None:
"""Set Content-Disposition params on binary eml objects
You currently have to set non-filename content-disp params by hand in python.
"""
attch_cont_disp_props = {'30070040': "creation-date",
'30080040': "modification-date"}
for num, name in attch_cont_disp_props.items():
try:
eml_attch.set_param(name,
email.utils.format_datetime(msg_attch.props.getValue(num)),
header='Content-Disposition')
except KeyError:
# It's fine if they don't have those values
pass
@property
def attachments(self) -> list[tuple[str | None, BytesIO]]:
to_return = []
try:
for attachment in self.email.iter_attachments():
content = attachment.get_content()
if isinstance(content, str):
content = content.encode()
to_return.append((attachment.get_filename(), BytesIO(content)))
except AttributeError:
# ignore bug in Python3.6, that cause exception for empty email body,
# see https://stackoverflow.com/questions/56391306/attributeerror-str-object-has-no-attribute-copy-when-parsing-multipart-emai
pass
return to_return
def generate_attributes(self) -> None:
# Attach original & Converted
if self.attach_original_email is not None:
self.add_attribute("eml", value="Full email.eml",
data=self.raw_emails.get('eml'),
comment="Converted from MSG format" if self.eml_from_msg else None)
if self.raw_emails.get('msg', None) is not None:
self.add_attribute("msg", value="Full email.msg",
data=self.raw_emails.get('msg'))
message = self.email
body: EmailMessage
if body := message.get_body(preferencelist=['plain']):
comment = f"{body.get_content_type()} body"
if self.encapsulated_body == body.get_content_type():
comment += " De-Encapsulated from RTF in original msg."
self.add_attribute("email-body",
body.get_content(),
comment=comment)
if body := message.get_body(preferencelist=['html']):
comment = f"{body.get_content_type()} body"
if self.encapsulated_body == body.get_content_type():
comment += " De-Encapsulated from RTF in original msg."
self.add_attribute("email-body",
body.get_content(),
comment=comment)
headers = [f"{k}: {v}" for k, v in message.items()]
if headers:
self.add_attribute("header", "\n".join(headers))
if "Date" in message and message['date'].datetime is not None:
self.add_attribute("send-date", message['date'].datetime)
if "To" in message:
self.__add_emails("to", message["To"])
if "Delivered-To" in message:
self.__add_emails("to", message["Delivered-To"])
if "From" in message:
self.__add_emails("from", message["From"])
if "Return-Path" in message:
realname, address = email.utils.parseaddr(message["Return-Path"])
self.add_attribute("return-path", address)
if "Reply-To" in message:
self.__add_emails("reply-to", message["reply-to"])
if "Bcc" in message:
self.__add_emails("bcc", message["Bcc"])
if "Cc" in message:
self.__add_emails("cc", message["Cc"])
if "Subject" in message:
self.add_attribute("subject", message["Subject"])
if "Message-ID" in message:
self.add_attribute("message-id", message["Message-ID"])
if "User-Agent" in message:
self.add_attribute("user-agent", message["User-Agent"])
boundary = message.get_boundary()
if boundary:
self.add_attribute("mime-boundary", boundary)
if "X-Mailer" in message:
self.add_attribute("x-mailer", message["X-Mailer"])
if "Thread-Index" in message:
self.add_attribute("thread-index", message["Thread-Index"])
self.__generate_received()
def __add_emails(self, typ: str, data: str, insert_display_names: bool = True) -> None:
addresses: list[dict[str, str]] = []
display_names: list[dict[str, str]] = []
for realname, address in email.utils.getaddresses([data]):
if address and realname:
addresses.append({"value": address, "comment": f"{realname} <{address}>"})
elif address:
addresses.append({"value": address})
else: # parsing failed, skip
continue
if realname:
display_names.append({"value": realname, "comment": f"{realname} <{address}>"})
for a in addresses:
self.add_attribute(typ, **a)
if insert_display_names and display_names:
try:
for d in display_names:
self.add_attribute(f"{typ}-display-name", **d)
except NewAttributeError:
# email object doesn't support display name for all email addrs
pass
def extract_matches(self, pattern: re.Pattern[str], text: str) -> list[tuple[str, ...]]:
"""Returns all regex matches for a given pattern in a text."""
return re.findall(pattern, text)
def add_ip_attribute(self, ip_candidate: str, received: str, seen_attributes: set[tuple[str, str]]) -> None:
"""Validates and adds an IP address to MISP if it's public and not already seen during extraction."""
try:
ip = ipaddress.ip_address(ip_candidate)
if not ip.is_private and ("received-header-ip", ip_candidate) not in seen_attributes:
self.add_attribute("received-header-ip", ip_candidate, comment=received)
seen_attributes.add(("received-header-ip", ip_candidate))
except ValueError:
pass # Invalid IPs are ignored
def add_hostname_attribute(self, hostname: str, received: str, seen_attributes: set[tuple[str, str]]) -> None:
"""Validates and adds a hostname to MISP if it contains a valid TLD-like format and is not already seen."""
if "." in hostname and not hostname.endswith(".") and len(hostname.split(".")[-1]) > 1:
if ("received-header-hostname", hostname) not in seen_attributes:
self.add_attribute("received-header-hostname", hostname, comment=received)
seen_attributes.add(("received-header-hostname", hostname))
def process_received_header(self, received: str, seen_attributes: set[tuple[str, str]]) -> None:
"""Processes a single 'Received' header and extracts hostnames and IPs."""
# Regex patterns
received_from_regex = re.compile(
r'from\s+([\w.-]+)' # Declared sending hostname
r'(?:\s+\(([^)]+)\))?' # Reverse DNS hostname inside parentheses
)
ipv4_regex = re.compile(
r'\[(?P<ipv4_brackets>(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9]))\]' # IPv4 inside []
r'|\((?P<ipv4_parentheses>(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9]))\)' # IPv4 inside ()
r'|(?<=\.\s)(?P<ipv4_after_domain>(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\.'
r'(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9]))\b' # IPv4 appearing after a domain.
)
ipv6_regex = re.compile(
r'\b(?:[a-fA-F0-9]{1,4}:[a-fA-F0-9]{1,4}(?::[a-fA-F0-9]{1,4}){0,6})\b'
)
# Extract hostnames
matches = self.extract_matches(received_from_regex, received)
for match in matches:
declared_sending_host = match[0].strip() if match[0] else None
reverse_dns_host = match[1].split()[0].strip("[]()").rstrip('.') if match[1] else None
if declared_sending_host:
clean_host = declared_sending_host.strip("[]()")
try:
ipaddress.ip_address(declared_sending_host)
self.add_ip_attribute(declared_sending_host, received, seen_attributes)
except ValueError:
self.add_hostname_attribute(declared_sending_host, received, seen_attributes)
if reverse_dns_host:
try:
ipaddress.ip_address(reverse_dns_host)
self.add_ip_attribute(reverse_dns_host, received, seen_attributes)
except ValueError:
self.add_hostname_attribute(reverse_dns_host, received, seen_attributes)
# Extract and add **only valid** IPv4 addresses
for ipv4_match in self.extract_matches(ipv4_regex, received):
ip_candidate = ipv4_match[0] or ipv4_match[1] or ipv4_match[2] # Select first non-empty match
if ip_candidate:
self.add_ip_attribute(ip_candidate, received, seen_attributes)
# Extract and add IPv6 addresses
for ipv6_match in self.extract_matches(ipv6_regex, received):
self.add_ip_attribute(ipv6_match, received, seen_attributes)
def __generate_received(self) -> None:
"""
Extracts public IP addresses and hostnames from "Received" email headers.
"""
received_items = self.email.get_all("Received")
if not received_items:
return
# Track added attributes to prevent duplicates (store as (type, value) tuples)
seen_attributes: set[tuple[str, str]] = set()
for received in received_items:
self.process_received_header(received, seen_attributes)