-
Notifications
You must be signed in to change notification settings - Fork 284
/
Copy pathpeobject.py
237 lines (207 loc) · 11.5 KB
/
peobject.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#!/usr/bin/env python3
from __future__ import annotations
import logging
from base64 import b64encode
from datetime import datetime
from hashlib import md5, sha1, sha256, sha512
from io import BytesIO
from pathlib import Path
from typing import Any
from . import FileObject
from .abstractgenerator import AbstractMISPObjectGenerator
from ..exceptions import InvalidMISPObject
import lief
import lief.PE
try:
import pydeep # type: ignore
HAS_PYDEEP = True
except ImportError:
HAS_PYDEEP = False
logger = logging.getLogger('pymisp')
def make_pe_objects(lief_parsed: lief.PE.Binary,
misp_file: FileObject,
standalone: bool = True,
default_attributes_parameters: dict[str, Any] = {}) -> tuple[FileObject, PEObject, list[PESectionObject]]:
pe_object = PEObject(parsed=lief_parsed, standalone=standalone, default_attributes_parameters=default_attributes_parameters)
misp_file.add_reference(pe_object.uuid, 'includes', 'PE indicators')
pe_sections = []
for s in pe_object.sections:
pe_sections.append(s)
return misp_file, pe_object, pe_sections
class PEObject(AbstractMISPObjectGenerator):
__pe: lief.PE.Binary
def __init__(self, parsed: lief.PE.Binary | None = None, # type: ignore[no-untyped-def]
filepath: Path | str | None = None,
pseudofile: BytesIO | list[int] | None = None,
**kwargs) -> None:
"""Creates an PE object, with lief"""
super().__init__('pe', **kwargs)
if not HAS_PYDEEP:
logger.warning("pydeep is missing, please install pymisp this way: pip install pymisp[fileobjects]")
if pseudofile:
if isinstance(pseudofile, BytesIO):
p = lief.PE.parse(obj=pseudofile)
elif isinstance(pseudofile, bytes):
p = lief.PE.parse(raw=list(pseudofile))
elif isinstance(pseudofile, list):
p = lief.PE.parse(raw=pseudofile)
else:
raise InvalidMISPObject(f'Pseudo file can be BytesIO or bytes got {type(pseudofile)}')
if not p:
raise InvalidMISPObject('Unable to parse pseudofile')
self.__pe = p
elif filepath:
if p := lief.PE.parse(filepath):
self.__pe = p
else:
raise InvalidMISPObject(f'Unable to parse {filepath}')
elif parsed:
# Got an already parsed blob
if isinstance(parsed, lief.PE.Binary):
self.__pe = parsed
else:
raise InvalidMISPObject(f'Not a lief.PE.Binary: {type(parsed)}')
self.generate_attributes()
def _is_exe(self) -> bool:
if not self._is_dll() and not self._is_driver():
return self.__pe.header.has_characteristic(lief.PE.Header.CHARACTERISTICS.EXECUTABLE_IMAGE)
return False
def _is_dll(self) -> bool:
return self.__pe.header.has_characteristic(lief.PE.Header.CHARACTERISTICS.DLL)
def _is_driver(self) -> bool:
# List from pefile
system_DLLs = {'ntoskrnl.exe', 'hal.dll', 'ndis.sys', 'bootvid.dll', 'kdcom.dll'}
if system_DLLs.intersection([imp.lower() for imp in self.__pe.libraries]):
return True
return False
def _get_pe_type(self) -> str:
if self._is_dll():
return 'dll'
elif self._is_driver():
return 'driver'
elif self._is_exe():
return 'exe'
else:
return 'unknown'
def generate_attributes(self) -> None:
self.add_attribute('type', value=self._get_pe_type())
# General information
self.add_attribute('entrypoint-address', value=self.__pe.entrypoint)
self.add_attribute('compilation-timestamp', value=datetime.utcfromtimestamp(self.__pe.header.time_date_stamps).isoformat())
self.add_attribute('imphash', value=lief.PE.get_imphash(self.__pe, lief.PE.IMPHASH_MODE.PEFILE))
self.add_attribute('authentihash', value=self.__pe.authentihash_sha256.hex())
r_manager = self.__pe.resources_manager
if isinstance(r_manager, lief.PE.ResourcesManager):
version = r_manager.version
if isinstance(version, lief.PE.ResourceVersion) and version.string_file_info is not None:
fileinfo = dict(version.string_file_info.langcode_items[0].items.items())
self.add_attribute('original-filename', value=fileinfo.get('OriginalFilename'))
self.add_attribute('internal-filename', value=fileinfo.get('InternalName'))
self.add_attribute('file-description', value=fileinfo.get('FileDescription'))
self.add_attribute('file-version', value=fileinfo.get('FileVersion'))
self.add_attribute('product-name', value=fileinfo.get('ProductName'))
self.add_attribute('product-version', value=fileinfo.get('ProductVersion'))
self.add_attribute('company-name', value=fileinfo.get('CompanyName'))
self.add_attribute('legal-copyright', value=fileinfo.get('LegalCopyright'))
self.add_attribute('lang-id', value=version.string_file_info.langcode_items[0].key)
# Sections
self.sections = []
if self.__pe.sections:
pos = 0
for section in self.__pe.sections:
if not section.name and not section.size:
# Skip section if name is none AND size is 0.
continue
s = PESectionObject(section, standalone=self._standalone, default_attributes_parameters=self._default_attributes_parameters)
self.add_reference(s.uuid, 'includes', f'Section {pos} of PE')
if ((self.__pe.entrypoint >= section.virtual_address)
and (self.__pe.entrypoint < (section.virtual_address + section.virtual_size))):
if isinstance(section.name, bytes):
section_name = section.name.decode()
else:
section_name = section.name
self.add_attribute('entrypoint-section-at-position', value=f'{section_name}|{pos}')
pos += 1
self.sections.append(s)
self.add_attribute('number-sections', value=len(self.sections))
# Signatures
self.certificates = []
self.signers = []
for sign in self.__pe.signatures:
for c in sign.certificates:
cert_obj = PECertificate(c)
self.add_reference(cert_obj.uuid, 'signed-by')
self.certificates.append(cert_obj)
for s_info in sign.signers:
signer_obj = PESigners(s_info)
self.add_reference(signer_obj.uuid, 'signed-by')
self.signers.append(signer_obj)
class PECertificate(AbstractMISPObjectGenerator):
def __init__(self, certificate: lief.PE.x509, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__('x509')
self.__certificate = certificate
self.generate_attributes()
def generate_attributes(self) -> None:
self.add_attribute('issuer', value=self.__certificate.issuer)
self.add_attribute('serial-number', value=self.__certificate.serial_number)
if len(self.__certificate.valid_from) == 6:
self.add_attribute('validity-not-before',
value=datetime(year=self.__certificate.valid_from[0],
month=self.__certificate.valid_from[1],
day=self.__certificate.valid_from[2],
hour=self.__certificate.valid_from[3],
minute=self.__certificate.valid_from[4],
second=self.__certificate.valid_from[5]))
if len(self.__certificate.valid_to) == 6:
self.add_attribute('validity-not-after',
value=datetime(year=self.__certificate.valid_to[0],
month=self.__certificate.valid_to[1],
day=self.__certificate.valid_to[2],
hour=self.__certificate.valid_to[3],
minute=self.__certificate.valid_to[4],
second=self.__certificate.valid_to[5]))
self.add_attribute('version', value=self.__certificate.version)
self.add_attribute('subject', value=self.__certificate.subject)
self.add_attribute('signature_algorithm', value=self.__certificate.signature_algorithm)
self.add_attribute('raw-base64', value=b64encode(self.__certificate.raw))
class PESigners(AbstractMISPObjectGenerator):
def __init__(self, signer: lief.PE.SignerInfo, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__('authenticode-signerinfo')
self.__signer = signer
self.generate_attributes()
def generate_attributes(self) -> None:
self.add_attribute('issuer', value=self.__signer.issuer)
self.add_attribute('serial-number', value=self.__signer.serial_number)
self.add_attribute('version', value=self.__signer.version)
self.add_attribute('digest_algorithm', value=str(self.__signer.digest_algorithm))
self.add_attribute('encryption_algorithm', value=str(self.__signer.encryption_algorithm))
self.add_attribute('digest-base64', value=b64encode(self.__signer.encrypted_digest))
info: lief.PE.SpcSpOpusInfo = self.__signer.get_attribute(lief.PE.Attribute.TYPE.SPC_SP_OPUS_INFO) # type: ignore[assignment]
if info:
self.add_attribute('program-name', value=info.program_name)
self.add_attribute('url', value=info.more_info)
class PESectionObject(AbstractMISPObjectGenerator):
def __init__(self, section: lief.PE.Section, **kwargs) -> None: # type: ignore[no-untyped-def]
"""Creates an PE Section object. Object generated by PEObject."""
super().__init__('pe-section')
self.__section = section
self.__data = bytes(self.__section.content)
self.generate_attributes()
def generate_attributes(self) -> None:
self.add_attribute('name', value=self.__section.name)
self.add_attribute('size-in-bytes', value=self.__section.size)
if int(self.__section.size) > 0:
# zero-filled sections can create too many correlations
to_ids = float(self.__section.entropy) > 0
disable_correlation = not to_ids
self.add_attribute('entropy', value=self.__section.entropy)
self.add_attribute('md5', value=md5(self.__data).hexdigest(), disable_correlation=disable_correlation, to_ids=to_ids)
self.add_attribute('sha1', value=sha1(self.__data).hexdigest(), disable_correlation=disable_correlation, to_ids=to_ids)
self.add_attribute('sha256', value=sha256(self.__data).hexdigest(), disable_correlation=disable_correlation, to_ids=to_ids)
self.add_attribute('sha512', value=sha512(self.__data).hexdigest(), disable_correlation=disable_correlation, to_ids=to_ids)
if HAS_PYDEEP and float(self.__section.entropy) > 0:
if self.__section.name == '.rsrc':
# ssdeep of .rsrc creates too many correlations
disable_correlation = True
to_ids = False
self.add_attribute('ssdeep', value=pydeep.hash_buf(self.__data).decode(), disable_correlation=disable_correlation, to_ids=to_ids)