# Advanced Operating Systems: Lab 2 - Inter-Process Communication (IPC) - DTrace functions

This notebook provides sample code that collects data from DTrace probes and works not only with aggregations but also probes using `trace()` and `printf()` D functions.

## Import the DTrace module

As in previous labs, first import the `python-dtrace` module:

In [None]:
from dtrace import DTraceConsumerThread
import subprocess

## Define a DTrace convenience function

Next, define the `dtrace_synchronous()` function with an additional argument `out`. The `out` argument is a function that is called whenever DTrace prints output, e.g. with `trace()` or `printf()`. The `walker` argument is a function that is called to collect aggregations. The `walker` and `out` arguments can be used together.

In [None]:
def dtrace_synchronous(script, walker, out, cmdline):
    """
    script - a D script
    walker - a routine to receive data from aggregations
    out - a routine to receive data from output
    cmdline - a command to run
    """
    
    # Create a seperate thread to run the DTrace instrumentation
    dtrace_thread = DTraceConsumerThread(script,
                                     walk_func=walker,
                                     out_func=out,
                                     chew_func=lambda v: None,
                                     chewrec_func=lambda v: None,
                                     sleep=1)
    
    # Start the DTrace instrumentation
    dtrace_thread.start()

    # Display header to indicate that dd(1) has started
    print("## Starting ", cmdline)

    output_dtrace = subprocess.run(cmdline.split(" "))
        
    # The benchmark has completed - stop the DTrace instrumentation
    dtrace_thread.stop()
    dtrace_thread.join()

    # Display footer to indicate that the benchmarking has finished
    if output_dtrace.returncode == 0:
        print("## Finished ", cmdline)
    elif output_dtrace.returncode == 64: # EX_USAGE
        print("## Invalid command", cmdline)
    else:
        print("## Failed with the exit code {}".format(output_dtrace.returncode))
        
    # Explicitly free DTrace resources.
    # Python's Garbage Collector would free DTrace resources when
    # dtrace_thread is reassigned, e.g. when the cell is reexecuted.
    # This could be confusing when analysing kernel from a terminal
    # and the notebook at the same time.
    del dtrace_thread

## Collect aggregations and `printf()` outputs

As an example, in order to collect both information on system-call counts (as in Advanced Operating Systems: Lab 1 - Getting Started with Kernel Tracing) and debug state transitions of one side of a pipe at the same time, we define two actions: one that aggregates system-call counts and one that prints details on the pipe.

Our `out` function called `ipc_out` parses bytes from one output line at the time and must decode information printed to output as opposed to the `walker` function `ipc_walker` that receives a list of keys.

Note that the `ipc_script` D script is an example D script that should be extended with appropriate predicates to only trace information relevant to our benchmark.

In [None]:
ipc_script = """
fbt::pipe_read:entry
{
    printf("%u",
        (unsigned int)((struct pipe *)args[0]->f_data)->pipe_state);
}

syscall:::entry
/execname == "ipc-benchmark"/
{
    @syscalls[probefunc] = count();
}
"""

from collections import defaultdict
syscall_count_values = defaultdict(int)
pipe_reads = []

def ipc_walker(action, identifier, keys, value):
    """
    action -- a type of action (sum, avg, ...)
    identifier -- the id
    keys -- list of keys
    value -- the value
    """
    syscall_count_values[keys[0]] += value

def ipc_out(value):
    """
    value -- the value, of the bytes type.
    """
    value = value.decode('ascii').split(' ')
    pipe_reads.append({'state': int(value[0])})

dtrace_synchronous(ipc_script, ipc_walker, ipc_out, "ipc/ipc-benchmark -j -v -i pipe -b {} -t {} 2proc".format(2**14, 2**15))

for x in syscall_count_values.keys():
    print("Number of ", x, " calls: ", syscall_count_values[x])

pipe_states = [
    [0x004, "PIPE_ASYNC"],
    [0x008, "PIPE_WANTR"],
    [0x010, "PIPE_WANTW"],
    [0x020, "PIPE_WANT"],
    [0x040, "PIPE_SEL"],
    [0x080, "PIPE_EOF"],
    [0x100, "PIPE_LOCKFL"],
    [0x200, "PIPE_LWANT"],
    [0x400, "PIPE_DIRECTW"],
    [0x800, "PIPE_DIRECTOK"],
]
for pipe_read in pipe_reads:
    print("rpipe->pipe_state={}".format([x[1] for x in pipe_states if x[0] & pipe_read['state']]))
