# Search for elements and addresses in heap dump mem files

In [1]:
import os
import json
import glob
from dataclasses import dataclass
import graphviz

In [2]:

@dataclass
class ProgramParams:
    """
    Wrapper class for program parameters.
    """
    XXD_LINE_BLOCK_BYTE_SIZE = 16
    POINTER_BYTE_SIZE = 8 # 64-bit, ex: C0 03 7B 09 2A 56 00 00

    TEST_JSON_TEST_FILE_PATH = os.environ['HOME'] + "/Documents/code/phdtrack/phdtrack_project_3/data/302-1644391327.json"
    TEST_HEAP_DUMP_RAW_FILE_PATH = os.environ['HOME'] + "/Documents/code/phdtrack/phdtrack_project_3/data/302-1644391327-heap.raw"
    TEST_DATA_DIR = os.environ['HOME'] + "/Documents/code/phdtrack/phdtrack_project_3/data/graphs"
    #TEST_GRAPH_DATA_FILENAME = "graph_302-1644391327.gv"
    TEST_GRAPH_DATA_FILENAME = "467-1644391327-heap.gv"

    DATA_DIR_PATH = os.environ['HOME'] + "/Documents/code/phdtrack/phdtrack_data/Training/Training/scp/V_7_8_P1/16"
    ENDIANNESS = "little"

    def __init__(self, **kwargs):
        if (
            self.check_path_exists(self.TEST_JSON_TEST_FILE_PATH) and
            self.check_path_exists(self.TEST_HEAP_DUMP_RAW_FILE_PATH) and
            self.check_path_exists(self.TEST_DATA_DIR) and
            self.check_path_exists(self.DATA_DIR_PATH)
        ):
            print("Program paths are OK.")
        else:
            print("Program paths are NOT OK.")
            exit(1)
    
    def check_path_exists(self, path: str):
        """
        Check if the path exists. Return True if it exists, False otherwise.
        """
        if not os.path.exists(path):
            print('WARNING: Path does not exist: %s' % path)
            return False
        return True



PARAMS = ProgramParams()

Program paths are OK.


In [3]:
# read the JSON file and get all pair of addresses and keys
@dataclass
class KeyData:
    """
    Wrapper class for key data.
    """
    name: str
    key: bytes
    addr: bytes
    len: int
    real_len: int

heap_start_addr = None
addr_key_pairs: dict[int, KeyData] = {} # key addr (int in base 16 - hex) -> key data (KeyData)

with open(PARAMS.TEST_JSON_TEST_FILE_PATH, 'r') as f:
    json_data = json.load(f)

    heap_start_addr = bytes.fromhex(json_data["HEAP_START"])
    
    for json_key_name in json_data:
        # match json key names that start with 'KEY_' and are followed by a single letter
        if json_key_name.startswith('KEY_') and len(json_key_name) == 5:
            real_key_addr = bytes.fromhex(json_data[json_key_name + "_ADDR"])
            addr_key_pairs[int.from_bytes(real_key_addr, byteorder='big', signed=False)] = KeyData(
                name=json_key_name,
                key=bytes.fromhex(json_data[json_key_name]),
                addr=real_key_addr,
                len=int(json_data[json_key_name + "_LEN"]),
                real_len=int(json_data[json_key_name + "_REAL_LEN"])
            )
            # print(
            #     'addr: ', hex(int.from_bytes(real_key_addr, byteorder='big', signed=False)), 
            #     'real key addr: ', json_data[json_key_name + "_ADDR"]
            # )

# print nb of keys
print("Nb of keys: %d" % len(addr_key_pairs))

Nb of keys: 6


In [4]:
# read the heap dump file and search for the keys
with open(PARAMS.TEST_HEAP_DUMP_RAW_FILE_PATH, 'rb') as f:
    heap_dump = f.read()

    # split the heap dump into lines of 16 bytes
    heap_dump_lines = [heap_dump[i:i+PARAMS.XXD_LINE_BLOCK_BYTE_SIZE] for i in range(0, len(heap_dump), PARAMS.XXD_LINE_BLOCK_BYTE_SIZE)]
    
    # print first 5 lines
    for i in range(5):
        print(heap_dump_lines[i].hex())
    
    print("Number of dump lines: %d" % len(heap_dump_lines), "of size:", PARAMS.XXD_LINE_BLOCK_BYTE_SIZE, "bytes")

    # go to known key addresses and check if the key is there
    for key_addr in addr_key_pairs:
        key_data = addr_key_pairs[key_addr]

        # get the line index of the key address
        # WARN: Need to divide the line index by 16 because the heap dump is in bytes
        # and line addresses is the address of the first byte of the line.
        # so each line address is 16 bytes apart.
        line_index = (int.from_bytes(key_data.addr, byteorder='big', signed=False) - int.from_bytes(heap_start_addr, byteorder='big', signed=False)) // PARAMS.XXD_LINE_BLOCK_BYTE_SIZE
        print("key name:", key_data.name, "index:", line_index, "index in hex:", hex(line_index))
        if (heap_dump_lines[line_index] == key_data.key):
            print("Key found: %s" % key_data.name)
        else:
            print("Key NOT found: %s" % key_data.name)
        
    

00000000000000005102000000000000
02040706070704070504070204060106
06070107060702020201000000000001
03010001000000000000000000000001
00000000030200000000000000000000
Number of dump lines: 17408 of size: 16 bytes
key name: KEY_A index: 5537 index in hex: 0x15a1
Key found: KEY_A
key name: KEY_B index: 4533 index in hex: 0x11b5
Key found: KEY_B
key name: KEY_C index: 5546 index in hex: 0x15aa
Key found: KEY_C
key name: KEY_D index: 4537 index in hex: 0x11b9
Key found: KEY_D
key name: KEY_E index: 6069 index in hex: 0x17b5
Key found: KEY_E
key name: KEY_F index: 3620 index in hex: 0xe24
Key found: KEY_F


In [4]:
def remove_unique_vertice_graphs(pointers_to_values: dict[int, int]):
    """
    Remove graphs that have only one vertice.
    """
    cleaned_pointers_to_values = {}

    for pointer_addr in pointers_to_values:
        pointer_value = pointers_to_values[pointer_addr]
        if (
            pointer_value in pointers_to_values.keys() or # the pointed pointer is used somewhere else
            pointer_addr in pointers_to_values.values() # the pointer is pointed to by another pointer
        ):
            cleaned_pointers_to_values[pointer_addr] = pointer_value
    
    return cleaned_pointers_to_values

In [5]:
# addr conversions
def addr_to_index(addr: int, min_addr: int) -> int:
    """
    Convert an address to an integer index inside block list.
    """
    index: int = (addr - min_addr) // PARAMS.POINTER_BYTE_SIZE
    return index

def index_to_addr(index: int, min_addr: int) -> int:
    """
    Convert an integer index inside block list to an address.
    """
    addr: int = index * PARAMS.POINTER_BYTE_SIZE + min_addr
    return addr

In [6]:
def get_end_graph_data_str(
    pointers_to_values: dict[int, int], 
    dump_blocks: list[bytes],
    min_addr: int
):
    """
    Add to the end of each graph, the last data value pointed that is not a pointer.
    """
    pointer_to_data: dict[int, bytes] = {}

    for pointer_addr in pointers_to_values:
        pointer_value = pointers_to_values[pointer_addr]
        if (
            not (pointer_value in pointers_to_values.keys()) and # the pointed pointer is used somewhere else
            pointer_addr in pointers_to_values.values() # the pointer is pointed to by another pointer
        ):
            data_index = addr_to_index(pointer_value, min_addr)
            data = dump_blocks[data_index]

            # save the data
            pointer_to_data[pointer_addr] = data
    
    return pointer_to_data

In [7]:
def convert_bytes_to_potential_formats(data: bytes) -> dict[str, str]:
    """
    Given a byte array, convert it to different formats.
    :return: A dictionary with the format name as key and the converted data as value.
    """
    format_to_formated_data: dict[str, str] = {}

    # string formats conversions
    string_formats = ['utf-8', 'ascii']

    def __try_to_apply_format(string_format: str, data: bytes):
        try:
            format_to_formated_data[string_format] = data.decode(string_format)
        except UnicodeDecodeError:
            pass
    
    for string_format in string_formats:
        __try_to_apply_format(string_format, data)

    # int format conversions
    format_to_formated_data['int big-endian'] = str(int.from_bytes(data, byteorder='big', signed=False))
    format_to_formated_data['int little-endian'] = str(int.from_bytes(data, byteorder='little', signed=False))

    # hex format conversion
    format_to_formated_data['hex'] = data.hex()
    
    return format_to_formated_data

In [8]:
def generate_pointer_vertices_string(pointer_to_pointer: dict[int, int]):
    """
    Generate the graphviz string of the graphs, composed of vertices of pointers only.
    :param pointers_to_values: The dict of pointers to values. Should be cleaned from unique vertice graphs.
    :param file_path: Modified file string.
    """
    # check if graphs is empty
    if not pointer_to_pointer or len(pointer_to_pointer) == 0:
        print("No graphs to write to file.")
        return ""

    # generate the graphviz string
    pointer_vertices_str = ""

    # iterate over dict
    for pointer_addr in pointer_to_pointer:
        pointer_value = pointer_to_pointer[pointer_addr]
        pointer_vertices_str += "    \"%s\" -> \"%s\";\n" % (pointer_addr, pointer_value)

    return pointer_vertices_str

In [9]:
def generate_data_vertices_string(pointer_to_data: dict[int, bytes]):
    """
    Generate the graphviz string of the graphs, composed of vertices of pointer to data only.
    """
    # check if dict is empty
    if not pointer_to_data or len(pointer_to_data) == 0:
        print("No data vertices to write to file.")
        return ""

    # generate the graphviz string
    data_vertices_str = ""

    # iterate over dict
    for pointer_addr in pointer_to_data:
        data = pointer_to_data[pointer_addr]
        format_to_formated_data = convert_bytes_to_potential_formats(data)
        for data_format, data_str in format_to_formated_data.items():
            data_vertices_str += "    \"%s\" -> \"%s\" [label=\"%s\"];\n" % (pointer_addr, data_str, data_format)

    return data_vertices_str


In [10]:
def write_graphs_to_file(
    file_path: str,
    pointer_to_pointer: dict[int, int],
    pointer_to_data: dict[int, bytes],
):
    """
    Write the graphs to a file.
    :param pointers_to_values: The dict of pointers to values. Should be cleaned from unique vertice graphs.
    :param file_path: The path to the file to write to.
    """
    graph_file_as_string = ""

    # add graph header
    graph_file_as_string += "digraph \"%s\" {\n" % str(os.path.basename(file_path)).replace("-heap.raw", "")

    # add vertices
    graph_file_as_string += generate_pointer_vertices_string(pointer_to_pointer)
    graph_file_as_string += generate_data_vertices_string(pointer_to_data)

    # end of graph
    graph_file_as_string += "}"

    # finally write the graphs info to file
    with open(file_path, 'w') as graph_file:
        graph_file.write(graph_file_as_string)

In [None]:
import networkx as nx

def create_nx_graph(
    pointer_to_pointer: dict[int, int],
    pointer_to_data: dict[int, bytes]
):
    """
    Create a networkx graph with associated data in end leaf vertices.
    """
    G = nx.DiGraph()

    # add pointer to pointer vertices
    for pointer_addr, pointer_val in pointer_to_pointer.items():
        G.add_edge(pointer_addr, pointer_val)
    
    # add pointer to data vertices
    for pointer_addr, data in pointer_to_data.items():
        G.add_edge(pointer_addr, data)

    print("Number of vertices:", G.number_of_nodes())


def get_biggest_clique():
    


In [11]:
# follow the pointers and build the graphs
def follow_pointers_and_build_graph(
    raw_heap_dump_file_path: str, 
    pointer_byte_size=PARAMS.POINTER_BYTE_SIZE,
    debug=False
):
    pointers_to_values: dict[int, int] = {}

    with open(raw_heap_dump_file_path, 'rb') as f:
        heap_dump = f.read()

        # split the heap dump into lines of POINTER_BYTE_SIZE bytes
        heap_dump_lines = [heap_dump[i:i+pointer_byte_size] for i in range(0, len(heap_dump), pointer_byte_size)]
        
        # print some lines
        if debug: 
            for i in range(100, 105):
                print(heap_dump_lines[i].hex(), "int value:", int.from_bytes(heap_dump_lines[i], byteorder=PARAMS.ENDIANNESS, signed=False))
        
            print("Number of dump lines: %d" % len(heap_dump_lines), "of size:", pointer_byte_size, "bytes")
        

        # get HEAP_START from the JSON file
        heap_start_addr = None
        with open(raw_heap_dump_file_path.replace("-heap.raw", ".json"), 'r') as json_file:
            json_data = json.load(json_file)
            heap_start_addr = bytes.fromhex(json_data["HEAP_START"])
        assert heap_start_addr is not None

        # get the min and max address of the heap
        min_addr = int.from_bytes(heap_start_addr, byteorder='big', signed=False) # HEAP_START
        max_addr = min_addr + len(heap_dump_lines) * pointer_byte_size

        if debug:
            print("min_addr: %d, hex min_addr: %s" % (min_addr, hex(min_addr)))
            print("max_addr: %s, hex max_addr: %s" % (hex(min_addr), hex(max_addr)))

        # go through all the potential pointers in the heap dump
        counter = 0
        for i, potential_ptr in enumerate(heap_dump_lines):
            potential_ptr_int = int.from_bytes(potential_ptr, byteorder=PARAMS.ENDIANNESS, signed=False)
            if (
                potential_ptr_int <= max_addr and 
                potential_ptr_int > 0 and 
                potential_ptr_int % 16 == 0 and
                potential_ptr_int >= min_addr
            ):
                print("found potential_ptr_int: %d, hex potential_ptr_int: %s" % (potential_ptr_int, hex(potential_ptr_int)))

            # check is the potential pointer is in range of the heap
            if potential_ptr_int >= min_addr and potential_ptr_int <= max_addr:
                current_ptr_addr = index_to_addr(i, min_addr)

                # add potential pointer to dict
                pointers_to_values[current_ptr_addr] = potential_ptr_int
                
                counter += 1

        if counter > 0:
            pointers_to_values = remove_unique_vertice_graphs(pointers_to_values)
            end_pointers_to_data = get_end_graph_data_str(pointers_to_values, heap_dump_lines, min_addr) # data values at end of pointer graphs

            # open .gv file
            save_file_path = os.path.join(
                PARAMS.TEST_DATA_DIR, 
                str(os.path.basename(raw_heap_dump_file_path)).replace('.raw', '.gv')
            )

            # save the graph to file
            write_graphs_to_file(save_file_path, pointers_to_values, end_pointers_to_data)

            print("Writing graph to file: %s done." % PARAMS.TEST_DATA_DIR + PARAMS.TEST_GRAPH_DATA_FILENAME)
            print("Nb of found potential pointers: %d" % counter)
            print("Nb of non-unique-vertice-graph vertices : %d" % len(pointers_to_values))
        else:
            print("No potential pointers found in heap dump file: %s" % raw_heap_dump_file_path)


In [12]:
# follow the pointers and build the graphs for all files
file_paths = glob.glob(os.path.join(PARAMS.DATA_DIR_PATH, '*.raw'), recursive=False)
file_paths: list[str] = list(set(file_paths)) # remove duplicates
print("Nb of files to process: %d" % len(file_paths))

# print first 4 file_paths
for i in range(4):
    print(file_paths[i])

count = 0 # limiter for testing
for file_path in file_paths:
    if count == 0:
        print("Processing file: %s" % file_path)
        follow_pointers_and_build_graph(file_path, debug=True)
    count += 1

Nb of files to process: 1065
/home/onyr/Documents/code/phdtrack/phdtrack_data/Training/Training/scp/V_7_8_P1/16/518-1644391327-heap.raw
/home/onyr/Documents/code/phdtrack/phdtrack_data/Training/Training/scp/V_7_8_P1/16/1013-1644391327-heap.raw
/home/onyr/Documents/code/phdtrack/phdtrack_data/Training/Training/scp/V_7_8_P1/16/31516-1644391327-heap.raw
/home/onyr/Documents/code/phdtrack/phdtrack_data/Training/Training/scp/V_7_8_P1/16/32271-1644391327-heap.raw
Processing file: /home/onyr/Documents/code/phdtrack/phdtrack_data/Training/Training/scp/V_7_8_P1/16/518-1644391327-heap.raw
a01404c7f1550000 int value: 94496914412704
e01404c7f1550000 int value: 94496914412768
001504c7f1550000 int value: 94496914412800
201504c7f1550000 int value: 94496914412832
401504c7f1550000 int value: 94496914412864
Number of dump lines: 34816 of size: 8 bytes
min_addr: 94496914411520, hex min_addr: 0x55f1c7041000
max_addr: 0x55f1c7041000, hex max_addr: 0x55f1c7085000
found potential_ptr_int: 94496914472448, hex

In [85]:
# read a .gv file and display it
test_gv_filepath = "/home/onyr/Documents/code/phdtrack/phdtrack_project_3/data/graphs/30266-1644391327-heap.gv"
with open(test_gv_filepath, 'r') as f:
    graph = f.read()
    graph_png_file_path = str(os.path.join(PARAMS.TEST_DATA_DIR, PARAMS.TEST_GRAPH_DATA_FILENAME)).replace('.gv', '.png')
    s = graphviz.Source(graph)
    s.render(outfile=graph_png_file_path, format='png', view=True)


dot: graph is too large for cairo-renderer bitmaps. Scaling by 0.247573 to fit


In [None]:
from graphviz import Graph

g = graphviz.Digraph()

# Add the vertices "A" and "B" to the graph
g.node("A-a")
g.node("B-b")
g.node("C-c")

# Add an edge from "A" to "B" with the label "utf-8"
g.edge("A-a", "B-b", label="utf-8")
g.edge("B-b", "C-c", label="utf 8")

# Print the graph
print(g.source)

for edge in g:
    print(edge)


digraph {
	"A-a"
	"B-b"
	"C-c"
	"A-a" -> "B-b" [label="utf-8"]
	"B-b" -> "C-c" [label="utf 8"]
}

digraph {

	"A-a"

	"B-b"

	"C-c"

	"A-a" -> "B-b" [label="utf-8"]

	"B-b" -> "C-c" [label="utf 8"]

}

