In [5]:
from bluesky.run_engine import RunEngine
import asyncio
try:
    from asyncio import current_task
except ImportError:
    # handle py < 3,7
    from asyncio.tasks import Task
    current_task = Task.current_task
    del Task

class RunEngineBessy(RunEngine):
    """
    A run engine with an extra command that allows any method of an object to be called
    """    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.register_command("run_func", self._run_func)

    async def _run_func(self, msg):
        """
        run a method of an object passing it args and kwargs, wait for it to complete

        Expected message object is

            Msg('run_func', obj, *args, **kwargs)

        where arguments are passed through to `obj.<method_name>(*args, **kwargs)`.
        """
        
        method_name = msg.args[0]
        args = msg.args[1:]
        kwargs = dict(msg.kwargs)
        group = kwargs.pop('group', None)
        self._movable_objs_touched.add(msg.obj)

        #find the method name
        if hasattr(msg.obj, method_name):
            method = getattr(msg.obj,method_name)
            #check it's callable
            if callable(method):
                ret = method(args, **kwargs)

                p_event = asyncio.Event(loop=self._loop_for_kwargs)
                pardon_failures = self._pardon_failures

                def done_callback(status=None):
                    self.log.debug("The object %r reports set is done "
                                "with status %r", msg.obj, ret.success)
                    self._loop.call_soon_threadsafe(
                        self._status_object_completed, ret, p_event, pardon_failures)

                try:
                    ret.add_callback(done_callback)
                except AttributeError:
                    # for ophyd < v0.8.0
                    ret.finished_cb = done_callback
                self._groups[group].add(p_event.wait)
                self._status_objs[group].add(ret)

                return ret




In [6]:
#create an instance of it
RE = RunEngineBessy({})



In [10]:
#make a test device with some method

from ophyd.status import DeviceStatus
from ophyd import Device
import time

class TestPrintDevice(Device):

    def print_it(self, string):

        sta = DeviceStatus(self)
        print(string)

        time.sleep(10)              #just to show that the "wait" Msg will wait
        sta.set_finished()
        return sta


test_print_device = TestPrintDevice(name="test_print_device")

In [11]:
#make the run engine call that method, waiting on it to complete
from bluesky import Msg
from bluesky.preprocessors import run_wrapper, ensure_generator

plan = ensure_generator([Msg("run_func",test_print_device,"print_it", "Hello World",group = 'test_group'),Msg("wait",group="test_group")])


RE(run_wrapper(plan))

('Hello World',)


  p_event = asyncio.Event(loop=self._loop_for_kwargs)


('78576525-d87a-41d8-8ffe-d94ee3c79afe',)