Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Migrate windows idt plug-in to volatility 3 #976

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
279 changes: 279 additions & 0 deletions volatility3/framework/plugins/windows/idt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
from typing import List

from volatility3.framework import interfaces
from volatility3.framework.renderers import TreeGrid, format_hints
from volatility3.framework.interfaces import plugins
from volatility3.framework.configuration import requirements
from volatility3.plugins.windows import modules

GDT_DESCRIPTORS = dict(enumerate([
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The can all be stored in data in the JSON ISF file, under the enums section. An example is, so the type and size must be defined, and then each of the possible names and associated values should be listed.

    "ETW_COMPRESSION_RESUMPTION_MODE": {
      "base": "int",
      "constants": {
        "EtwCompressionModeNoDisable": 1,
        "EtwCompressionModeNoRestart": 2,
        "EtwCompressionModeRestart": 0
      },
      "size": 4
    },

"Data RO",
"Data RO Ac",
"Data RW",
"Data RW Ac",
"Data RO E",
"Data RO EA",
"Data RW E",
"Data RW EA",
"Code EO",
"Code EO Ac",
"Code RE",
"Code RE Ac",
"Code EO C",
"Code EO CA",
"Code RE C",
"Code RE CA",
"<Reserved>",
"TSS16 Avl",
"LDT",
"TSS16 Busy",
"CallGate16",
"TaskGate",
"Int Gate16",
"TrapGate16",
"<Reserved>",
"TSS32 Avl",
"<Reserved>",
"TSS32 Busy",
"CallGate32",
"<Reserved>",
"Int Gate32",
"TrapGate32",
]))

class _KIDT():
def __init__(self, idt_struct):
self.idt = idt_struct
self.Offset = idt_struct.Offset
self.Selector = idt_struct.Selector
self.Access = idt_struct.Access
self.ExtendedOffset = idt_struct.ExtendedOffset

@property
def Address(self):
if self.ExtendedOffset:
return self.ExtendedOffset << 16 | self.Offset

return 0


class _KGDT():
def __init__(self, gdt_struct):
self.gdt = gdt_struct
self.LimitLow = gdt_struct.LimitLow
self.BaseLow = gdt_struct.BaseLow
self.HighWord = gdt_struct.HighWord

@property
def Type(self):
"""Get a string name of the descriptor type"""
flag = self.HighWord.Bits.Type & 1 << 4
typeval = self.HighWord.Bits.Type & ~(1 << 4)

if flag == 0:
typeval += 16

return GDT_DESCRIPTORS.get(typeval, "UNKNOWN")

@property
def Base(self):
"""Get the base (start) of memory for this GDT"""
return (self.BaseLow + ((self.HighWord.Bits.BaseMid +
(self.HighWord.Bits.BaseHi << 8)) << 16))


class _KPCR():
def __init__(self, kpcr_obj, ntkrnlmp, layer_name, symbol_table):
self.kpcr = kpcr_obj
self.ntkrnlmp = ntkrnlmp
self.layer_name = layer_name
self.symbol_table = symbol_table

def idt_entries(self):
base_idt = self.kpcr.IDT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this may only work for x86. On x64, the _KPCR finds its IDT through a member named IdtBase instead. That may be OK if you only plan to support 32-bit systems, however the plugins' list of architectures include Intel64.

idt_index = 0
for idt_index in range(256):
idt_offset = base_idt + 8 * idt_index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good shortcut here would be to use something similar to the code below (from the Windows handles.py file):

ptrs = ntkrnlmp.object(
            object_type="array",
            offset=table_addr,
            subtype=ntkrnlmp.get_type("pointer"),
            count=100,
        )

You could just create an array of 256 _KIDTENTRY entries and it would result in a list of objects. In that case, you wouldn't have to create 256 objects manually, and you could also remove the hard-coded 8 on line 102, because that would get calculated from the size of the _KIDTENTRY.

idt_struct = self.ntkrnlmp.object(
object_type="_KIDTENTRY",
layer_name=self.layer_name,
offset=idt_offset,
absolute=True
)
try:
yield idt_index, _KIDT(idt_struct)
except:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.

Check notice

Code scanning / CodeQL

Except block handles 'BaseException' Note

Except block directly handles BaseException.
pass

def gdt_entries(self):
base_gdt = self.kpcr.GDT

# Since the real GDT size is read from a register, we'll just assume
# that there are 128 entries (which is normal for most OS)
for gdt_index in range(128):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above regarding the use of array. The code here isn't wrong, it could just be simplified a bit and remove some hard coded values.

gdt_offset = base_gdt + 8 * gdt_index
gdt_struct = self.ntkrnlmp.object(
object_type="_KGDTENTRY",
layer_name=self.layer_name,
offset=gdt_offset,
absolute=True
)

try:
yield gdt_index, _KGDT(gdt_struct)
except:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.

Check notice

Code scanning / CodeQL

Except block handles 'BaseException' Note

Except block directly handles BaseException.
pass


class IDT(plugins.PluginInterface):
"""Lists the Interrupt Descriptor Table (IDT)"""

_required_framework_version = (2, 0, 0)
_version = (1, 0, 0)

@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.ModuleRequirement(
name="kernel",
description="Windows kernel",
architectures=["Intel32", "Intel64"],
),
requirements.PluginRequirement(
name="modules", plugin=modules.Modules, version=(1, 0, 0)
),
]

def get_module(self,
context: interfaces.context.ContextInterface,
layer_name: str,
symbol_table: str,
offset: int):
try:
mods = modules.Modules.list_modules(context, layer_name, symbol_table)

for mod in mods:
if mod.DllBase + mod.SizeOfImage >= offset and mod.DllBase <= offset:
return mod
except:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please figure out exactly which type of exception you're intending to handle, and catch only that one. This allows for other errors that might not have been envisaged to bubble up through the code and potentially be handled more appropriately by something higher up. This goes for all try/except blocks in this code.

Check notice

Code scanning / CodeQL

Except block handles 'BaseException' Note

Except block directly handles BaseException.
pass

return None

@staticmethod
def get_section_name(ntkrnlmp, layer_name, mod, addr):
"""Get the name of the PE section containing
the specified address.

@param ntkrnlmp: ntkrnlmp module object
@param layer_name: kernel layer name
@param mod: an _LDR_DATA_TABLE_ENTRY
@param addr: virtual address to lookup

@returns string PE section name
"""
def name_array_to_str(name_array):
name = ""
for char in name_array:
if char <= 0:
break
name += chr(char)
return name

try:
dos_header = ntkrnlmp.object(
object_type="_IMAGE_DOS_HEADER",
layer_name=layer_name,
offset = mod.DllBase,
absolute=True)
nt_header = dos_header.get_nt_header()
except ValueError:
return ''

for sec in nt_header.get_sections():
if (addr > mod.DllBase + sec.VirtualAddress and
addr < sec.Misc.VirtualSize + (mod.DllBase + sec.VirtualAddress)):

return name_array_to_str(sec.Name) or ""

return ''

def get_pcrs(self, ntkrnlmp, layer_name, symbol_table):
# Get the number of processors
cpu_count_offset = ntkrnlmp.get_symbol("KeNumberProcessors").address
cpu_count = ntkrnlmp.object(
object_type="unsigned int", layer_name=layer_name, offset=cpu_count_offset
)

for cpu_index in range(cpu_count):
# Calculate the address of KiProcessorBlock
KiProcessorBlock_addr = ntkrnlmp.get_symbol("KiProcessorBlock").address + cpu_index * 4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be manually iterating an array of pointers, but it would probably be better to just instantiate an object on top of the KiProcessorBlock offset, which knows that it contains an array of pointers to... whatever type of structure the pointers point to (I don't recall off the top of my head). It also doesn't look like the target attribute is set, so these are likely just pointers to... something. meaning when they're accessed, they're unlikely to work as expected?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just looked at context.object_from_symbol and we current expect the symbol to contain type information to use that, but I might add a parameter, allowing plugins to construct an object from a symbol name and a type, just so we can reuse most of the existing machinery. For now you'd need to get the symbol and then instantiate it at the symbol's address (somewhat as you're doing here).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've now created a branch to try to resolve this, #978 should allow an object_type parameter to be passed to module.object_from_symbol, with the same form as to module.object. Please give this a test and let me know on #978 if it functions as intended... 5:)

KiProcessorBlock = ntkrnlmp.object(
object_type="pointer",
layer_name=layer_name,
offset=KiProcessorBlock_addr,
)

# Get kpcr object
kpcr_offset = ntkrnlmp.get_type("_KPCR").relative_child_offset("PrcbData")
kpcr = ntkrnlmp.object(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like you're trying to wrap the KPCR data into a custom object, rather than using a volatility object? Can I please strongly recommend that you consider defining a JSON file with the appropriate types for a _KPCR if the standard ones aren't suitable? You can even define an override class if there are specific methods or calculations you require (such as idt_entries or gdt_entries). This should reduce the complexity of the code and turn store more information, which should be data, as data rather than embedding it into code.

I don't recall whether it was the KPCR or the KD_DEBUGGER_DATA that started getting encrypted by Microsoft, but if that is the case for the KPCR, we may want a more generic/general means to handle it, so all the code can live in a central location in the core.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was KD_DEBUGGER_DATA that gets encrypted now. AFAIK the _KPCR types are included in the IST files from ntoskrnl, so they're already available.

object_type="_KPCR",
layer_name=layer_name,
offset=KiProcessorBlock - kpcr_offset,
absolute=True)

yield cpu_index, _KPCR(kpcr, ntkrnlmp, layer_name, symbol_table)

def _generator(self):
# Initialize the ntkrnlmp object and etc.
kernel = self.context.modules[self.config["kernel"]]
layer_name = kernel.layer_name
symbol_table = kernel.symbol_table_name
kvo = self.context.layers[layer_name].config["kernel_virtual_offset"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be accessible as kernel.offset, I think.

ntkrnlmp = self.context.module(symbol_table, layer_name=layer_name, offset=kvo)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the same as kernel?


for cpu_index, kpcr in self.get_pcrs(ntkrnlmp, layer_name, symbol_table):
gdt = dict((i * 8, sd) for i, sd in kpcr.gdt_entries())
for idt_index, idt in kpcr.idt_entries():
addr = idt.Address
gdt_entry = gdt.get(idt.Selector, None)

if gdt_entry is not None and "Code" in gdt_entry.Type:
addr += gdt_entry.Base

module = self.get_module(self.context, layer_name, symbol_table, addr)

if addr == 0:
module_name = "NOT USED"
sect_name = ''
elif module:
module_name = module.BaseDllName.get_string()
sect_name = self.get_section_name(ntkrnlmp, layer_name, module, addr)
else:
module_name = "UNKNOWN"
sect_name = ''
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a class derived from BaseAbsentValue when indicating that data is unavailable or not relevant. This allows user interface designers to know more about how to display the information for the medium they're displaying the output.


yield (
0,
(
cpu_index,
hex(idt_index).replace("0x", "").upper(),
format_hints.Hex(idt.Selector),
format_hints.Hex(idt.Address),
module_name,
sect_name
)
)

def run(self):
return TreeGrid(
[
('CPU', int),
('Index', str),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it isn't clear why Index is a str here, when it's actually already a number and then being converted into a string and have the 0x removed and set to uppercase? If it should be displayed as a number, please put is at an int, if it should be a hex value, please put hex. Again, it allows the user interface designers to know what kind of data is, how to align it, and will allow programs ingesting the data to use the actual value (for example if exported to CSV).

('Selector', format_hints.Hex),
('Value', format_hints.Hex),
('Module', str),
('Section', str)
],
self._generator()
)