From d6d94d61c349a151ac12f099537f6402ec8454c4 Mon Sep 17 00:00:00 2001 From: Ma1icious Date: Thu, 29 Jun 2023 16:28:27 +0800 Subject: [PATCH 1/3] Feat: Migrate windows idt plug-in to volatility 3 --- volatility3/framework/plugins/windows/idt.py | 283 +++++++++++++++++++ 1 file changed, 283 insertions(+) create mode 100644 volatility3/framework/plugins/windows/idt.py diff --git a/volatility3/framework/plugins/windows/idt.py b/volatility3/framework/plugins/windows/idt.py new file mode 100644 index 000000000..96f936c7b --- /dev/null +++ b/volatility3/framework/plugins/windows/idt.py @@ -0,0 +1,283 @@ +from typing import List + +from volatility3.framework import interfaces +from volatility3.framework.renderers import TreeGrid, format_hints +from volatility3.framework.interfaces import plugins +from volatility3.framework.configuration import requirements +from volatility3.plugins.windows import modules + +GDT_DESCRIPTORS = dict(enumerate([ + "Data RO", + "Data RO Ac", + "Data RW", + "Data RW Ac", + "Data RO E", + "Data RO EA", + "Data RW E", + "Data RW EA", + "Code EO", + "Code EO Ac", + "Code RE", + "Code RE Ac", + "Code EO C", + "Code EO CA", + "Code RE C", + "Code RE CA", + "", + "TSS16 Avl", + "LDT", + "TSS16 Busy", + "CallGate16", + "TaskGate", + "Int Gate16", + "TrapGate16", + "", + "TSS32 Avl", + "", + "TSS32 Busy", + "CallGate32", + "", + "Int Gate32", + "TrapGate32", +])) + +class _KIDT(): + def __init__(self, idt_struct): + self.idt = idt_struct + self.Offset = idt_struct.Offset + self.Selector = idt_struct.Selector + self.Access = idt_struct.Access + self.ExtendedOffset = idt_struct.ExtendedOffset + + @property + def Address(self): + if self.ExtendedOffset: + return self.ExtendedOffset << 16 | self.Offset + + return 0 + + +class _KGDT(): + def __init__(self, gdt_struct): + self.gdt = gdt_struct + self.LimitLow = gdt_struct.LimitLow + self.BaseLow = gdt_struct.BaseLow + self.HighWord = gdt_struct.HighWord + + @property + def Type(self): + """Get a string name of the descriptor type""" + flag = self.HighWord.Bits.Type & 1 << 4 + typeval = self.HighWord.Bits.Type & ~(1 << 4) + + if flag == 0: + typeval += 16 + + return GDT_DESCRIPTORS.get(typeval, "UNKNOWN") + + @property + def Base(self): + """Get the base (start) of memory for this GDT""" + return (self.BaseLow + ((self.HighWord.Bits.BaseMid + + (self.HighWord.Bits.BaseHi << 8)) << 16)) + + +class _KPCR(): + def __init__(self, kpcr_obj, ntkrnlmp, layer_name, symbol_table): + self.kpcr = kpcr_obj + self.ntkrnlmp = ntkrnlmp + self.layer_name = layer_name + self.symbol_table = symbol_table + + def idt_entries(self): + base_idt = self.kpcr.IDT.dereference() + idt_index = 0 + for idt_index in range(256): + idt_offset = base_idt.vol.offset + 8 * idt_index + idt_struct = self.ntkrnlmp.object( + object_type="_KIDTENTRY", + layer_name=self.layer_name, + offset=idt_offset, + absolute=True + ) + try: + yield idt_index, _KIDT(idt_struct) + except: + pass + + def gdt_entries(self): + base_gdt = self.kpcr.GDT.dereference() + + # Since the real GDT size is read from a register, we'll just assume + # that there are 128 entries (which is normal for most OS) + for gdt_index in range(128): + gdt_offset = base_gdt.vol.offset + 8 * gdt_index + gdt_struct = self.ntkrnlmp.object( + object_type="_KGDTENTRY", + layer_name=self.layer_name, + offset=gdt_offset, + absolute=True + ) + + try: + yield gdt_index, _KGDT(gdt_struct) + except: + pass + + +class IDT(plugins.PluginInterface): + """Lists the Interrupt Descriptor Table (IDT)""" + + _required_framework_version = (2, 0, 0) + _version = (1, 0, 0) + + @classmethod + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.PluginRequirement( + name="modules", plugin=modules.Modules, version=(1, 0, 0) + ), + ] + + def get_module(self, + context: interfaces.context.ContextInterface, + layer_name: str, + symbol_table: str, + offset: int): + try: + mods = modules.Modules.list_modules(context, layer_name, symbol_table) + + for mod in mods: + if mod.DllBase + mod.SizeOfImage >= offset and mod.DllBase <= offset: + return mod + except: + pass + + return None + + @staticmethod + def get_section_name(ntkrnlmp, layer_name, mod, addr): + """Get the name of the PE section containing + the specified address. + + @param ntkrnlmp: ntkrnlmp module object + @param layer_name: kernel layer name + @param mod: an _LDR_DATA_TABLE_ENTRY + @param addr: virtual address to lookup + + @returns string PE section name + """ + def name_array_to_str(name_array): + name = "" + for char in name_array: + if char <= 0: + break + name += chr(char) + return name + + try: + dos_header = ntkrnlmp.object( + object_type="_IMAGE_DOS_HEADER", + layer_name=layer_name, + offset = mod.DllBase, + absolute=True) + nt_header = dos_header.get_nt_header() + except ValueError: + return '' + + for sec in nt_header.get_sections(): + if (addr > mod.DllBase + sec.VirtualAddress and + addr < sec.Misc.VirtualSize + (mod.DllBase + sec.VirtualAddress)): + + return name_array_to_str(sec.Name) or "" + + return '' + + def get_pcrs(self, ntkrnlmp, layer_name, symbol_table): + # Get the number of processors + cpu_count_offset = ntkrnlmp.get_symbol("KeNumberProcessors").address + cpu_count = ntkrnlmp.object( + object_type="unsigned int", layer_name=layer_name, offset=cpu_count_offset + ) + + for cpu_index in range(cpu_count): + # Calculate the address of KiProcessorBlock + KiProcessorBlock_addr = ntkrnlmp.get_symbol("KiProcessorBlock").address + cpu_index * 4 + KiProcessorBlock = ntkrnlmp.object( + object_type="pointer", + layer_name=layer_name, + offset=KiProcessorBlock_addr, + ) + + # Get kpcrb's offset + kpcrb = KiProcessorBlock.dereference() + kpcrb_offset = kpcrb.vol.offset + + # Get kpcr object + kpcr_offset = ntkrnlmp.get_type("_KPCR").relative_child_offset("PrcbData") + kpcr = ntkrnlmp.object( + object_type="_KPCR", + layer_name=layer_name, + offset=kpcrb_offset - kpcr_offset, + absolute=True) + + yield cpu_index, _KPCR(kpcr, ntkrnlmp, layer_name, symbol_table) + + def _generator(self): + # Initialize the ntkrnlmp object and etc. + kernel = self.context.modules[self.config["kernel"]] + layer_name = kernel.layer_name + symbol_table = kernel.symbol_table_name + kvo = self.context.layers[layer_name].config["kernel_virtual_offset"] + ntkrnlmp = self.context.module(symbol_table, layer_name=layer_name, offset=kvo) + + for cpu_index, kpcr in self.get_pcrs(ntkrnlmp, layer_name, symbol_table): + gdt = dict((i * 8, sd) for i, sd in kpcr.gdt_entries()) + for idt_index, idt in kpcr.idt_entries(): + addr = idt.Address + gdt_entry = gdt.get(idt.Selector, None) + + if gdt_entry is not None and "Code" in gdt_entry.Type: + addr += gdt_entry.Base + + module = self.get_module(self.context, layer_name, symbol_table, addr) + + if addr == 0: + module_name = "NOT USED" + sect_name = '' + elif module: + module_name = module.BaseDllName.get_string() + sect_name = self.get_section_name(ntkrnlmp, layer_name, module, addr) + else: + module_name = "UNKNOWN" + sect_name = '' + + yield ( + 0, + ( + cpu_index, + hex(idt_index).replace("0x", "").upper(), + format_hints.Hex(idt.Selector), + format_hints.Hex(idt.Address), + module_name, + sect_name + ) + ) + + def run(self): + return TreeGrid( + [ + ('CPU', int), + ('Index', str), + ('Selector', format_hints.Hex), + ('Value', format_hints.Hex), + ('Module', str), + ('Section', str) + ], + self._generator() + ) \ No newline at end of file From 85c918e84c3768ef05a42505bbbc7af84c8bb1f6 Mon Sep 17 00:00:00 2001 From: Ma1icious Date: Thu, 29 Jun 2023 16:38:42 +0800 Subject: [PATCH 2/3] Fix: remove unnecessary calls in windows.idt.IDT --- volatility3/framework/plugins/windows/idt.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/volatility3/framework/plugins/windows/idt.py b/volatility3/framework/plugins/windows/idt.py index 96f936c7b..2d8bfa7e5 100644 --- a/volatility3/framework/plugins/windows/idt.py +++ b/volatility3/framework/plugins/windows/idt.py @@ -90,10 +90,10 @@ def __init__(self, kpcr_obj, ntkrnlmp, layer_name, symbol_table): self.symbol_table = symbol_table def idt_entries(self): - base_idt = self.kpcr.IDT.dereference() + base_idt = self.kpcr.IDT idt_index = 0 for idt_index in range(256): - idt_offset = base_idt.vol.offset + 8 * idt_index + idt_offset = base_idt + 8 * idt_index idt_struct = self.ntkrnlmp.object( object_type="_KIDTENTRY", layer_name=self.layer_name, @@ -106,12 +106,12 @@ def idt_entries(self): pass def gdt_entries(self): - base_gdt = self.kpcr.GDT.dereference() + base_gdt = self.kpcr.GDT # Since the real GDT size is read from a register, we'll just assume # that there are 128 entries (which is normal for most OS) for gdt_index in range(128): - gdt_offset = base_gdt.vol.offset + 8 * gdt_index + gdt_offset = base_gdt + 8 * gdt_index gdt_struct = self.ntkrnlmp.object( object_type="_KGDTENTRY", layer_name=self.layer_name, @@ -213,17 +213,13 @@ def get_pcrs(self, ntkrnlmp, layer_name, symbol_table): layer_name=layer_name, offset=KiProcessorBlock_addr, ) - - # Get kpcrb's offset - kpcrb = KiProcessorBlock.dereference() - kpcrb_offset = kpcrb.vol.offset # Get kpcr object kpcr_offset = ntkrnlmp.get_type("_KPCR").relative_child_offset("PrcbData") kpcr = ntkrnlmp.object( object_type="_KPCR", layer_name=layer_name, - offset=kpcrb_offset - kpcr_offset, + offset=KiProcessorBlock - kpcr_offset, absolute=True) yield cpu_index, _KPCR(kpcr, ntkrnlmp, layer_name, symbol_table) From 1b64953ad5bc1afa96e975679aa86a9b61871567 Mon Sep 17 00:00:00 2001 From: Ma1icious Date: Fri, 7 Jul 2023 14:48:10 +0800 Subject: [PATCH 3/3] Fix[Win_IDT]: Use black to format the code --- volatility3/framework/plugins/windows/idt.py | 158 ++++++++++--------- 1 file changed, 87 insertions(+), 71 deletions(-) diff --git a/volatility3/framework/plugins/windows/idt.py b/volatility3/framework/plugins/windows/idt.py index 2d8bfa7e5..d35ec1e42 100644 --- a/volatility3/framework/plugins/windows/idt.py +++ b/volatility3/framework/plugins/windows/idt.py @@ -6,42 +6,47 @@ from volatility3.framework.configuration import requirements from volatility3.plugins.windows import modules -GDT_DESCRIPTORS = dict(enumerate([ - "Data RO", - "Data RO Ac", - "Data RW", - "Data RW Ac", - "Data RO E", - "Data RO EA", - "Data RW E", - "Data RW EA", - "Code EO", - "Code EO Ac", - "Code RE", - "Code RE Ac", - "Code EO C", - "Code EO CA", - "Code RE C", - "Code RE CA", - "", - "TSS16 Avl", - "LDT", - "TSS16 Busy", - "CallGate16", - "TaskGate", - "Int Gate16", - "TrapGate16", - "", - "TSS32 Avl", - "", - "TSS32 Busy", - "CallGate32", - "", - "Int Gate32", - "TrapGate32", -])) - -class _KIDT(): +GDT_DESCRIPTORS = dict( + enumerate( + [ + "Data RO", + "Data RO Ac", + "Data RW", + "Data RW Ac", + "Data RO E", + "Data RO EA", + "Data RW E", + "Data RW EA", + "Code EO", + "Code EO Ac", + "Code RE", + "Code RE Ac", + "Code EO C", + "Code EO CA", + "Code RE C", + "Code RE CA", + "", + "TSS16 Avl", + "LDT", + "TSS16 Busy", + "CallGate16", + "TaskGate", + "Int Gate16", + "TrapGate16", + "", + "TSS32 Avl", + "", + "TSS32 Busy", + "CallGate32", + "", + "Int Gate32", + "TrapGate32", + ] + ) +) + + +class _KIDT: def __init__(self, idt_struct): self.idt = idt_struct self.Offset = idt_struct.Offset @@ -55,15 +60,15 @@ def Address(self): return self.ExtendedOffset << 16 | self.Offset return 0 - -class _KGDT(): + +class _KGDT: def __init__(self, gdt_struct): self.gdt = gdt_struct self.LimitLow = gdt_struct.LimitLow self.BaseLow = gdt_struct.BaseLow self.HighWord = gdt_struct.HighWord - + @property def Type(self): """Get a string name of the descriptor type""" @@ -74,21 +79,22 @@ def Type(self): typeval += 16 return GDT_DESCRIPTORS.get(typeval, "UNKNOWN") - + @property def Base(self): """Get the base (start) of memory for this GDT""" - return (self.BaseLow + ((self.HighWord.Bits.BaseMid + - (self.HighWord.Bits.BaseHi << 8)) << 16)) + return self.BaseLow + ( + (self.HighWord.Bits.BaseMid + (self.HighWord.Bits.BaseHi << 8)) << 16 + ) -class _KPCR(): +class _KPCR: def __init__(self, kpcr_obj, ntkrnlmp, layer_name, symbol_table): self.kpcr = kpcr_obj self.ntkrnlmp = ntkrnlmp self.layer_name = layer_name self.symbol_table = symbol_table - + def idt_entries(self): base_idt = self.kpcr.IDT idt_index = 0 @@ -98,7 +104,7 @@ def idt_entries(self): object_type="_KIDTENTRY", layer_name=self.layer_name, offset=idt_offset, - absolute=True + absolute=True, ) try: yield idt_index, _KIDT(idt_struct) @@ -116,7 +122,7 @@ def gdt_entries(self): object_type="_KGDTENTRY", layer_name=self.layer_name, offset=gdt_offset, - absolute=True + absolute=True, ) try: @@ -143,12 +149,14 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] name="modules", plugin=modules.Modules, version=(1, 0, 0) ), ] - - def get_module(self, + + def get_module( + self, context: interfaces.context.ContextInterface, layer_name: str, symbol_table: str, - offset: int): + offset: int, + ): try: mods = modules.Modules.list_modules(context, layer_name, symbol_table) @@ -157,21 +165,22 @@ def get_module(self, return mod except: pass - + return None - + @staticmethod def get_section_name(ntkrnlmp, layer_name, mod, addr): - """Get the name of the PE section containing - the specified address. + """Get the name of the PE section containing + the specified address. @param ntkrnlmp: ntkrnlmp module object @param layer_name: kernel layer name - @param mod: an _LDR_DATA_TABLE_ENTRY - @param addr: virtual address to lookup - + @param mod: an _LDR_DATA_TABLE_ENTRY + @param addr: virtual address to lookup + @returns string PE section name """ + def name_array_to_str(name_array): name = "" for char in name_array: @@ -184,20 +193,22 @@ def name_array_to_str(name_array): dos_header = ntkrnlmp.object( object_type="_IMAGE_DOS_HEADER", layer_name=layer_name, - offset = mod.DllBase, - absolute=True) + offset=mod.DllBase, + absolute=True, + ) nt_header = dos_header.get_nt_header() except ValueError: return '' for sec in nt_header.get_sections(): - if (addr > mod.DllBase + sec.VirtualAddress and - addr < sec.Misc.VirtualSize + (mod.DllBase + sec.VirtualAddress)): - + if ( + addr > mod.DllBase + sec.VirtualAddress + and addr < sec.Misc.VirtualSize + (mod.DllBase + sec.VirtualAddress) + ): return name_array_to_str(sec.Name) or "" return '' - + def get_pcrs(self, ntkrnlmp, layer_name, symbol_table): # Get the number of processors cpu_count_offset = ntkrnlmp.get_symbol("KeNumberProcessors").address @@ -207,20 +218,23 @@ def get_pcrs(self, ntkrnlmp, layer_name, symbol_table): for cpu_index in range(cpu_count): # Calculate the address of KiProcessorBlock - KiProcessorBlock_addr = ntkrnlmp.get_symbol("KiProcessorBlock").address + cpu_index * 4 + KiProcessorBlock_addr = ( + ntkrnlmp.get_symbol("KiProcessorBlock").address + cpu_index * 4 + ) KiProcessorBlock = ntkrnlmp.object( object_type="pointer", layer_name=layer_name, offset=KiProcessorBlock_addr, ) - + # Get kpcr object kpcr_offset = ntkrnlmp.get_type("_KPCR").relative_child_offset("PrcbData") kpcr = ntkrnlmp.object( object_type="_KPCR", layer_name=layer_name, offset=KiProcessorBlock - kpcr_offset, - absolute=True) + absolute=True, + ) yield cpu_index, _KPCR(kpcr, ntkrnlmp, layer_name, symbol_table) @@ -248,7 +262,9 @@ def _generator(self): sect_name = '' elif module: module_name = module.BaseDllName.get_string() - sect_name = self.get_section_name(ntkrnlmp, layer_name, module, addr) + sect_name = self.get_section_name( + ntkrnlmp, layer_name, module, addr + ) else: module_name = "UNKNOWN" sect_name = '' @@ -261,8 +277,8 @@ def _generator(self): format_hints.Hex(idt.Selector), format_hints.Hex(idt.Address), module_name, - sect_name - ) + sect_name, + ), ) def run(self): @@ -273,7 +289,7 @@ def run(self): ('Selector', format_hints.Hex), ('Value', format_hints.Hex), ('Module', str), - ('Section', str) + ('Section', str), ], - self._generator() - ) \ No newline at end of file + self._generator(), + )