# Welcome to this tutorial notebook!

This notebook contains the code necessary to produce the output described in our analysis of CVE 2018-8653.

Executing all the cells will produce an interactive widget demonstrating the allocation throughout a scenario exhibiting the vulnerability.


You can execute the cells that contain python code using the Ctrl+Enter (execute and stay in same cell) or the Shift+Enter (execute and go to next cell) shortcuts.

Please execute the cells in order, as they are dependent of each other.

If you just want to see and use the widget, you can execute all cells in a single click by using the `Cell > Run All` menu entry.

If this is the first time that you are using the REVEN API, we recommend you start with our [guided tour](./guided_tour.ipynb) notebook.

In [None]:
# necessary imports

from UAF_IE.percent import percent
import reven2
from enum import Enum
from UAF_IE.utils import table_line, display_table

# for widgets
from ipywidgets import interactive_output, Layout
from IPython.display import display, HTML
import ipywidgets as widgets

In [None]:
# some constants

STATUS_ACCESS_VIOLATION = 0xC0000005
STATUS_BUFFER_TOO_SMALL = 0xC0000023
STATUS_NO_MEMORY = 0xC0000017
BLOCK_SIZE = 0x970  # Size of a single GcBlock
PVAR_SIZE = 0x18  # Size of a single pvar inside a GcBlock
PVARS_OFFSET = 0x28  # offset after which the pvars are contained inside a GcBlock

In [None]:
# The data structures

class PvarStatus(Enum):
    """
    Indicate the various possible statuses of a pvar
    """
    free = 0
    used = 1
    marked = 2

class AllocEvent:
    """
    A allocation/deallocation event
    """
    def __init__(self, transition, pvar_type, status, is_alloc=False, is_dealloc=False):
        self.transition = transition
        self.pvar_type = pvar_type
        self.status = status
        self.is_alloc = is_alloc
        self.is_dealloc = is_dealloc
    
    def str_event(self):
        return ("Allocated" if self.is_alloc else
               "Deallocated" if self.is_dealloc else
               "")
    
    def __repr__(self):
        return "[{}][{}]Type: {}, Status: {}".format(
            self.transition.id,
            self.str_event(),
            self.pvar_type,
            self.status
        )

class AllocTrack:
    """
    Tracks all allocation/deallocation events for a pvar object.
    """

    def __init__(self, pvar_address):
        self.pvar_address = pvar_address
        self.events = []
        
    def __repr__(self):
        return "{0:<18}\t{1}".format(
            hex(self.pvar_address),
            self.events)


def format_gc_block_alloc_track(x):
    header = "{0:<18}\t{1:<18}\t{2:<18}\t{3:<18}\t{4:<18}\t{5}\n".format(
            "Pvar Address",
            "Tr Allocation",
            "Tr Free",
            "Current Status",
            "Variable type",
            "Previous Alloc/Free")
    res = "" + header
    for track in x:
        res += "{0}\n".format(track)
    return res

class GcBlock:
    """
    Tracks all the pvar of a GcBlock
    """
    def __init__(self, base_address, tr_alloc, tr_alloc_done, 
                 tr_free):
        self.base_address = base_address
        self.tr_alloc = tr_alloc
        self.tr_alloc_done = tr_alloc_done
        self.tr_free = tr_free
        self.gc_alloc_track = []

    def is_allocated(self):
        return self.tr_alloc is not None

    def is_freed(self):
        return self.tr_alloc is not None and self.tr_free is not None

    def __repr__(self):
        return "Allocated at {0}\tFreed at {1}\tSize {2}\nAll allocations:{3}\n".format(
            self.tr_alloc.id if self.tr_alloc is not None else None,
            self.tr_free.id if self.tr_free is not None else None,
            hex(BLOCK_SIZE),
            format_gc_block_alloc_track(self.gc_alloc_track))


In [None]:
def get_gcblock_from_object(rvn, obj_addr):
    """
    From a given object address, retrieve the associated GcBlock object, 
    with its deallocation and allocation transitions.
    """
    
    pblkalloc_symbol = next(rvn.ossi.symbols(pattern="GcBlockFactory::PblkAlloc", 
                                             binary_hint="jscript.dll"))

    tr_alloc = None
    tr_alloc_done = None
    base_address = None
    for ctx in rvn.trace.search.symbol(pblkalloc_symbol):
        call_tr = ctx.transition_before()
        if call_tr.instruction is None or call_tr.instruction.mnemonic != 'call':
            continue
        result_tr = percent(rvn, ctx.transition_before())
        result_ctx = result_tr.context_after()
        tmp_block_base_address = result_ctx.read(reven2.arch.x64.rax, reven2.types.USize)
        if tmp_block_base_address in [STATUS_ACCESS_VIOLATION, 
                                      STATUS_BUFFER_TOO_SMALL,
                                      STATUS_NO_MEMORY]:
            continue
        if 0 <= obj_addr - tmp_block_base_address < BLOCK_SIZE:
            # Fill the GcBlock object
            base_address = tmp_block_base_address
            
            if tr_alloc is not None:
                raise RuntimeError("Unsupported allocation of an already allocated block")
            tr_alloc = ctx.transition_before()
            tr_alloc_done = result_tr
            
    if base_address is None:
        return None
    print("Found GCBlock at address {:#x}".format(base_address))
    print("Looking for transition where the GCBlock has been freed, if any...")
    freeblk_symbol = next(rvn.ossi.symbols(pattern="GcBlockFactory::FreeBlk",
                                           binary_hint="jscript.dll"))

    tr_free = None
    for ctx in rvn.trace.search.symbol(freeblk_symbol):
        call_tr = ctx.transition_before()

        if call_tr.instruction is None or call_tr.instruction.mnemonic != 'call':
            continue

        # in this case, we want to find the free of our block, whose address is in RDX
        free_base_addr = ctx.read(reven2.arch.x64.rdx, reven2.types.USize)

        if free_base_addr != base_address:
            continue

        if tr_free is not None:
            raise RuntimeError("Unsupported multiple frees of a block")
            
        tr_free = ctx.transition_before()
        print("GCBlock is freed at transition {}".format(tr_free.id))
    return GcBlock(base_address, tr_alloc, tr_alloc_done, tr_free)


def get_alloc_track_index(gcblock, pvar_address):
    """
    Get the index in the list of all pvars of a gcblock, from an address
    """
    if pvar_address is None:
        return None
    for index, alloc_track in enumerate(gcblock.gc_alloc_track):
        if pvar_address == alloc_track.pvar_address:
            return index
    return None


In [None]:
def fill_gcblock_accesses(gcblock, accesses):
    """
    Collect events that happens on all pvars of a gcblock
    """
    for access in accesses:
        pvar_index = None

        if access.virtual_address is None:
            # ignoring physical access
            continue

        if access is None:
            continue

        pvar_index = get_alloc_track_index(gcblock, access.virtual_address.offset)
        if pvar_index is None:
            continue

        pvar = gcblock.gc_alloc_track[pvar_index]
            
        ctx_b = access.transition.context_before()
        ctx_a = access.transition.context_after()

        # Get the two values and analyze the operation
        pvar_logical_address = access.virtual_address
        vt_before = ctx_b.read(pvar_logical_address, reven2.types.U16)
        vt_after = ctx_a.read(pvar_logical_address, reven2.types.U16)    
        
        # object was already free
        if vt_before == vt_after == 0:
            pvar.events.append(AllocEvent(access.transition, vt_after, PvarStatus.free))
            continue
            
        # did not change object status
        if vt_before == vt_after:
            continue
        
        # _ -> free
        if vt_after == 0: 
            # not_free -> free
            if vt_before != 0:
                pvar.events.append(AllocEvent(access.transition, vt_after, PvarStatus.free,
                                             is_dealloc=True))
        # _ -> not_free
        else:
            # free -> not_free
            if vt_before == 0:    
                pvar.events.append(AllocEvent(access.transition, vt_after, PvarStatus.used, 
                                              is_alloc=True))
            # not_free -> not_free, marking status changed
            if (vt_before ^ vt_after) == 0x800:
                # became marked
                if vt_after & 0x800 != 0:
                    pvar.events.append(AllocEvent(access.transition, vt_after, 
                                                  PvarStatus.marked))
                # became used
                if vt_before & 0x800 != 0:
                    pvar.events.append(AllocEvent(access.transition, vt_after,
                                                  PvarStatus.used))
                    
    return gcblock


def track_pvar_alloc_free_for_gcblock(rvn, gcblock):
    """
    Get all memory accesses that occurred during the trace inside of a GcBlock
    """
    block_accesses = []

    for offset in range(PVARS_OFFSET, BLOCK_SIZE, PVAR_SIZE):
        gcblock.gc_alloc_track.append(AllocTrack(gcblock.base_address + offset))
        block_accesses += get_memory_access_pvar(rvn, gcblock, 
                                                 gcblock.base_address + offset)

    block_accesses.sort(key=lambda x: x.transition.id)

    return block_accesses


def get_memory_access_pvar(rvn, gcblock, pvar_address):
    """
    Get all memory accesses that occurred during the trace on a single pvar of a GcBlock
    """
    start_memory_accesses_search = (gcblock.tr_alloc_done 
                                    if gcblock.is_allocated() else
                                    rvn.trace.transition(0))
    end_memory_accesses_search = (gcblock.tr_free
                                 if gcblock.is_freed() else
                                 rvn.trace.transition(rvn.trace.transition_count - 1))
                                    
    return rvn.trace.memory_accesses(pvar_address,
                                     8,
                                     from_transition=start_memory_accesses_search,
                                     to_transition=end_memory_accesses_search,
                                     operation=reven2.memhist.MemoryAccessOperation.Write)




In [None]:
def build_allocations(rvn, pvar_addr):
    """
    From the address of a single pvar, find the corresponding gcblock, and then
    track all allocation/deallocation events to each pvar inside of the gcblock
    during the trace.
    
    Returns the result inside of a GcBlock object that can then be displayed with the
    `render_frame` and `display_gcblock` functions defined further below.
    """
    print("Getting GCBlock from object at address {:#x}, this may take a while...".format(
        pvar_addr))
    gcblock = get_gcblock_from_object(rvn, pvar_addr)
    if gcblock is None:
        raise RuntimeError("Object doesn't appear to be in a GCBlock")
    print("Tracking all accesses to the pvars inside of the block...")
    accesses = track_pvar_alloc_free_for_gcblock(rvn, gcblock)
    print("Collecting the corresponding Allocation/Free/Marking events...")
    fill_gcblock_accesses(gcblock, accesses)
    print("All done.")
    return gcblock

In [None]:
# Some helper functions to navigate the events

def pvar_next_change(transition, pvar):
    try:
        return min((event for event in pvar.events if event.transition > transition),
                          key=lambda x: x.transition)
    except ValueError:
        return None
    
def next_change(transition, gcblock):
    try:
        return min((event for event in (pvar_next_change(transition, pvar) for pvar in gcblock.gc_alloc_track) if 
                   event is not None),
                   key=lambda x: x.transition)
    except ValueError:
        return None

def pvar_previous_change(transition, pvar):
    try:
        return max((event for event in pvar.events if event.transition < transition),
                   key=lambda x: x.transition)
    except ValueError:
        return None
    
def previous_change(transition, gcblock):
    try:
        return max((event for event in (pvar_previous_change(transition, pvar) for
                                        pvar in gcblock.gc_alloc_track) if 
                   event is not None),
                   key=lambda x: x.transition)
    except ValueError:
        return None

In [None]:
def render_frame(transition, gcblock):
    """
    Display a table representing of all pvars inside of a GcBlock at a specific transition
    """
    lines = ""
    if transition < gcblock.tr_alloc:
        return "Block is not allocated yet."
    for pvar in gcblock.gc_alloc_track:
        last_event = pvar_previous_change(transition + 1, pvar)
        if last_event is None:
            lines += table_line(cells=["@{:#x}".format(pvar.pvar_address), 
                                       "<em>unknown</em>", 
                                       "<em>unknown</em>", "<em>unknown</em>"])    
        else:
            lines += table_line(cells=["@{:#x}".format(pvar.pvar_address), 
                                       last_event.transition.id, last_event.pvar_type,
                                       last_event.status], 
                                style=("color:green" if 
                                       last_event.transition == transition 
                                       else None))
            
    display_table(headers=["Pvar Address", "Last Change Transition", "Type", "Status"], 
                  html_lines=lines, title="GCBlock @{:#x} state".format(gcblock.base_address),
                 style="width: 100%;")

In [None]:
def display_gcblock(gcblock, server):
    """
    Displays a widget allowing to browse the state of all pvars inside of a gcblock over
    the course of the trace.
    """
    slider = widgets.IntSlider(min=gcblock.tr_alloc.id, 
                               max=(gcblock.tr_free.id if gcblock.tr_free is not None else
                                    server.trace.transition_count - 1), 
                               layout=Layout(width='95%', height='80px'))
    
    event_count = widgets.BoundedIntText(value=1, min=1, max=100, step=1,
                                         description="Event speed: ")
    event_count.layout.width = "10em"
    
    previous_button = widgets.Button(description="< Previous")
    def on_previous_button_click(_):
        change = slider.value
        for _ in range(event_count.value):
            prev_change = previous_change(server.trace.transition(change), gcblock)
            change = prev_change.transition.id if prev_change is not None else change
        slider.value = change
    previous_button.on_click(on_previous_button_click)
    
    next_button = widgets.Button(description="Next >")
    def on_next_button_click(_):
        change = slider.value
        for _ in range(event_count.value):
            nt_change = next_change(server.trace.transition(change), gcblock)
            change = nt_change.transition.id if nt_change is not None else change
        slider.value = change
    next_button.on_click(on_next_button_click)        
    vbox = widgets.VBox([event_count, slider])
    vbox.layout.width = "100%"
    vbox.layout.align_items = "center"
    hbox = widgets.HBox([previous_button, vbox, next_button])

    out = interactive_output(lambda Transition: render_frame(
        server.trace.transition(Transition),
        gcblock),
                             { 'Transition' : slider })
    out.layout.overflow = "visible"
    display(out, hbox)

End of the preparation code, below is the "main" of our script.

In [None]:
# Open the trace and build_allocation for it in gcblock

# Executing this cell may take some time, as it is building all
# allocation events for the entire trace. You can see whether it is executing by looking 
# at the "In [ ]:" on the left of the cell. If it displays a star ("[*]"),
# it is still running. If it displays a number (e.g. [13]), it is done executing.

s = reven2.RevenServer("127.0.0.1", 1337)
gcblock = build_allocations(s, 0x1a7c113f358)

In [None]:
# Widget for the first trace, execute to display the widget
display_gcblock(gcblock, s)

# If you don't see the button and sliders to interact with the widget, use the 
# rightmost scrollbar to access them

In the widget above, the table indicates the status of the GCBlock at a transition comprised between the transition where the GCBlock was allocated and the transition where it was freed.

Each row of the table indicates the address of a pvar inside the GcBlock, along with the last transition it changed, the raw type of the PVAR, and its status that is inferred from the raw type (for instance, a raw type of 0 means that the PVAR is free).

You can select the transition by moving the slider from left to right, or, if you want a precise transition, double-clicking on the transition number to the right of the slider.

Clicking the `< Previous` and `Next >` buttons allows you to move to the `n`th previous/next event that affects the state of the GCBlock, where `n` is the value of `Event speed`. The last changed line will appear green in the table. You may need to scroll to see which line changed.

This is the end of this notebook.

Thank you for following along!