## Lesson 4: In-place instruction rewriting

**Objectives**: use OFRAK's Ghidra backend; use more filtering capabilities to find specific complex blocks and instructions; assemble an instruction using Keystone; rewrite an instruction in-place

In this section, we'll rewrite the `ret` instruction so that the binary loops back to its beginning instead of returning and exiting at the end of the main function.

In [1]:
from ofrak import OFRAK
from ofrak_tutorial.helper_functions import create_hello_world_binary

create_hello_world_binary()

ofrak = OFRAK()

Using OFRAK Community License.


This time, we want to analyze the binary fully, down to the instruction level. We'll do that by using OFRAK's Ghidra backend. Let's create a more powerful OFRAK context:

In [2]:
import ofrak_ghidra

ofrak.injector.discover(ofrak_ghidra)

binary_analysis_context = await ofrak.create_ofrak_context()

Let's unpack recursively again, now that Ghidra is loaded into OFRAK.

In [3]:
root_resource = await binary_analysis_context.create_root_resource_from_file("hello_world")
ghidra_unpack_result = await root_resource.unpack_recursively()
print(f"components run: {sorted(ghidra_unpack_result.components_run)}")
print(f"{len(ghidra_unpack_result.resources_created)} resources created")
print(f"{len(ghidra_unpack_result.resources_modified)} resources modified")

openjdk version "11.0.23" 2024-04-16
OpenJDK Runtime Environment (build 11.0.23+9-post-Debian-1deb11u1)
OpenJDK 64-Bit Server VM (build 11.0.23+9-post-Debian-1deb11u1, mixed mode)
openjdk version "11.0.23" 2024-04-16
OpenJDK Runtime Environment (build 11.0.23+9-post-Debian-1deb11u1)
OpenJDK 64-Bit Server VM (build 11.0.23+9-post-Debian-1deb11u1, mixed mode)


components run: [b'ApkIdentifier', b'CodeRegionUnpacker', b'ComplexBlockUnpacker', b'DecompilationAnalysisIdentifier', b'DeviceTreeBlobIdentifier', b'ElfDynamicSectionUnpacker', b'ElfPointerArraySectionUnpacker', b'ElfRelaUnpacker', b'ElfSymbolUnpacker', b'ElfUnpacker', b'GhidraAnalysisIdentifier', b'GhidraBasicBlockUnpacker', b'LinkableSymbolIdentifier', b'MagicDescriptionIdentifier', b'MagicMimeIdentifier', b'OpenWrtIdentifier', b'UbiIdentifier', b'UbifsIdentifier', b'Uf2FileIdentifier']
309 resources created
310 resources modified


Do we have instructions this time?

In [4]:
from ofrak.core import Instruction
from ofrak_tutorial.helper_functions import get_descendants_tags

assert Instruction in await get_descendants_tags(root_resource)

Good.

**Complex Blocks** in OFRAK are sets of basic blocks representing a logical unit of code. In particular, all functions are complex blocks.

How do we get the complex block corresponding to "main"?

In [5]:
from ofrak.core import ComplexBlock
from ofrak import ResourceFilter, ResourceAttributeValueFilter


async def get_main_complex_block(root_resource):
    return await root_resource.get_only_descendant_as_view(
        v_type=ComplexBlock,
        r_filter=ResourceFilter(
            attribute_filters=(ResourceAttributeValueFilter(ComplexBlock.Symbol, "main"),)
        ),
    )


main_cb = await get_main_complex_block(root_resource)
print(main_cb)

ComplexBlock(virtual_address=4198690, size=23, name='main')


`resource.get_only_descendant_as_view` is a shortcut to:
- get the only descendant matching the filter `r_filter` (asserting there is one and only one such descendant);
- get it as a resource view of `v_type` (in this case, `ComplexBlock`).

Getting the only `ret` instruction in this complex block is a similar process:

In [6]:
async def get_complex_block_ret_instruction(complex_block):
    return await main_cb.resource.get_only_descendant_as_view(
        v_type=Instruction,
        r_filter=ResourceFilter(
            attribute_filters=(ResourceAttributeValueFilter(Instruction.Mnemonic, "ret"),)
        ),
    )


ret_instruction = await get_complex_block_ret_instruction(main_cb)
print(ret_instruction)

Instruction(virtual_address=4198712, size=1, disassembly='ret ', mnemonic='ret', operands='', mode=<InstructionSetMode.NONE: 0>)


Let's assemble our new instruction that loops to the start of the "main" complex block:

In [7]:
from ofrak.core import ProgramAttributes
from ofrak.service.assembler.assembler_service_keystone import KeystoneAssemblerService


async def get_looping_instruction(main_cb, ret_instruction, program_attributes) -> bytes:
    assembler_service = KeystoneAssemblerService()
    return await assembler_service.assemble(
        assembly=f"jmp {main_cb.virtual_address}",
        vm_addr=ret_instruction.virtual_address,
        program_attributes=program_attributes,
    )


program_attributes = await root_resource.analyze(ProgramAttributes)

await get_looping_instruction(main_cb, ret_instruction, program_attributes)

b'\xeb\xe8'

Looks good. Let's put this all together:

In [8]:
import subprocess

from ofrak.core.binary import BinaryPatchModifier, BinaryPatchConfig


async def chase_tail(ofrak_context, input_filename, output_filename):
    # In a real script, we would run the two lines below... But let's be lazy and reuse
    # the already unpacked root_resource that we defined in the global scope.
    # root_resource = await ofrak_context.create_root_resource_from_file(input_filename)
    # await root_resource.unpack_recursively()
    main_cb = await get_main_complex_block(root_resource)
    ret_instruction = await get_complex_block_ret_instruction(main_cb)
    program_attributes = await root_resource.analyze(ProgramAttributes)
    looping_instruction = await get_looping_instruction(
        main_cb, ret_instruction, program_attributes
    )

    range_in_root = await ret_instruction.resource.get_data_range_within_root()
    await root_resource.run(
        BinaryPatchModifier,
        BinaryPatchConfig(
            offset=range_in_root.start,
            patch_bytes=looping_instruction,
        ),
    )

    await root_resource.pack()
    await root_resource.flush_data_to_disk(output_filename)


await chase_tail(binary_analysis_context, "hello_world", "hello_world_forever")
stdout = subprocess.run(
    "chmod +x hello_world_forever && timeout 1s ./hello_world_forever",
    shell=True,
    stdout=subprocess.PIPE,
).stdout.decode("utf-8")
print(stdout[0:70] + "...")

Hello, World!
Hello, World!
Hello, World!
Hello, World!
Hello, World!
...


Someone is chasing its tail and never catching it 😹

[Next page](5_filesystem_modification.ipynb)