A timbr-machine works by ingesting data from a queue and processing it in a nested group of user-defined functions via a dask infrastructure in a thread. Optionally, a data source that takes a generator can be run which calls .next() and puts the result in the queue, and runs in a thread. 

A user can run a machine in an ipython kernel. Combined with juno-magic, a presumably frequest use case might involve running a timbr-machine in an external ipython kernel somewhere as a long-running pipeline. In general, it's useful to have some amount of introspection into both the active state of the machine as well as general descriptive overall metrics. We'll look at some potential introspective methods and the architecture behind them.   

Test Case imports

In [6]:
from twython import TwythonStreamer
import collections
from threading import Thread

from __future__ import print_function
from textblob import TextBlob

timbr-machine imports

In [1]:
from IPython import get_ipython

from multiprocessing.pool import ThreadPool
import dask as da
# NOTE: sync mode wil likely be faster
from dask.async import get_sync as get
# from dask.threaded import get

_pool = ThreadPool()
da.set_options(pool=_pool)

try:
    from Queue import Empty, Full, Queue # Python 2
except ImportError:
    from queue import Empty, Full, Queue # Python 3

from bson.objectid import ObjectId
from functools import wraps # should be used but isn't currently
import inspect

import zmq
import json
import threading
import time

In [2]:
class StoppableThread(threading.Thread):
    """Thread class with a stop() method. The thread itself has to check
    regularly for the stopped() condition."""

    def __init__(self):
        super(StoppableThread, self).__init__()
        self._stop = threading.Event()

    def stop(self):
        self._stop.set()

    def stopped(self):
        return self._stop.isSet()


def identity(x):
    return x


def wrap_transform(fn):
    """
    This function returns a new function that accepts an arbitrary number of arguments
    and calls the wrapped function with the number of arguments that it supports. For
    example:

    def f(a, b):
        ...

    g = wrap_transform(f)

    assert g(a, b, c, d) == f(a, b)

    """
    assert callable(fn)
    try:
        info = inspect.getargspec(fn)
        nargs = len(info.args)
    except TypeError:
        # fallback to pipeline mode
        nargs = 1
    def wrapped(*args, **kwargs):
        # print("called with {}".format(str(args)))
        return fn(*args[:nargs])
    return wrapped


def json_serializable_exception(e, extra_data={}):
    #extra_data["_traceback"] = tb.format_tb(e)
    #extra_data["_exception"] = tb.format_exception_only(e)
    extra_data["_exception"] = str(e)
    #extra_data["_exception_dict"] = e.__dict__
    return(extra_data)

import os, errno

def mkdir_p(path):
    try:
        os.makedirs(path)
    except OSError as exc: # Python >2.5
        if exc.errno == errno.EEXIST and os.path.isdir(path):
            pass
        else: raise

BaseMachine

In [4]:
def json_serialize(obj):
    try:
        return json.dumps(obj)
    except TypeError as te:
        return json.dumps(json_serializable_exception(te))


class BaseMachine(object):
    def __init__(self, stages=8, bufsize=1024):
        self.q = Queue(bufsize)
        self.tbl = {}
        self.stages = stages
        self._dsk = None
        self._dirty = True
        self._getter = get
        self._socket = None


        self.serialize_fn = json_serialize

        self.REFERENCE_DASK = {
            "oid_s": (str, "oid"),
            "in_s": (self.serialize_fn, "in")
        }
        self.REFERENCE_DASK.update({"f{}_s".format(i): (self.serialize_fn, "f{}".format(i)) for i in xrange(self.stages)})

    def put(self, msg):
        # NOTE: Non-blocking
        self.q.put(msg, False)

    def get(self, block=False, timeout=0.5):
        dsk = dict(self.dsk)
        dsk["in"] = (self.q.get, block, timeout)
        output = self._getter(dsk, ["oid_s", "in_s"] + ["f{}_s".format(i) for i in xrange(self.stages)])
        return output

    def __len__(self):
        return stages

    def __setitem__(self, pos, fn):
        assert isinstance(pos, (int, long))
        assert pos >=0 and pos < self.stages
        wrapped_fn = wrap_transform(fn)
        self.tbl["f{}".format(pos)] = wrapped_fn
        self.dirty = True

    def __getitem__(self, pos):
        assert isinstance(pos, (int, long))
        assert pos >=0 and pos < self.stages
        return self.tbl["f{}".format(pos)]

    def __missing__(self, pos):
        return wrap_transform(identity)

    def __delitem__(self, pos):
        assert isinstance(pos, (int, long))
        assert pos >=0 and pos < self.stages
        del self.tbl["f{}".format(pos)]
        self.dirty = True

    def __call__(self, data, include_serialized=False):
        dsk = dict(self.dsk)
        dsk["in"] = data
        args = ["oid", "in"] + ["f{}".format(i) for i in xrange(self.stages)]
        if include_serialized:
            args.extend(["oid_s", "in_s"] + ["f{}_s".format(i) for i in xrange(self.stages)])

        output = self._getter(dsk, args)
        return output

    @property
    def dsk(self):
        if self._dsk is None or self.dirty:
            self._dsk = {}
            self._dsk["oid"] = (ObjectId,)
            for i in xrange(self.stages):
                fkey = "f{}".format(i)
                cmd = [self.tbl.get(fkey, wrap_transform(identity))]
                cmd.extend(["f{}".format(j) for j in reversed(xrange(i)) if i > 0])
                cmd.append("in")
                cmd.extend(["f{}_s".format(j) for j in reversed(xrange(i)) if i > 0])
                cmd.append("in_s")
                self._dsk[fkey] = tuple(cmd)
            self._dsk.update(self.REFERENCE_DASK)
            self.dirty = False
        return self._dsk


Machine

In [5]:
class MachineConsumer(StoppableThread):
    def __init__(self, machine):
        super(MachineConsumer, self).__init__()
        self.machine = machine
        self._socket = None
        # set local kernel key
        with open(get_ipython().config["IPKernelApp"]["connection_file"]) as f:
            config = json.load(f)
            self._kernel_key = config["key"]
        mkdir_p("/tmp/timbr-machine") # NOTE: Not Windows Safe (but should be)
        self.initialize_pub_stream("ipc:///tmp/timbr-machine/{}".format(self._kernel_key))

    def initialize_pub_stream(self, endpoint):
        ctx = zmq.Context()
        self._socket = ctx.socket(zmq.PUB)
        self._socket.bind(endpoint)

    def run(self):
        while not self.stopped():
            try:
                # NOTE: self.get should never throw exceptions from inside the dask
                output = self.machine.get(block=True, timeout=0.1)
                hdr = output[0]
                msg = "[{}]".format(",".join(output[1:]))
                # print(output)
                self._socket.send_multipart([hdr, msg.encode("utf-8")])
            except Empty:
                pass


class SourceConsumer(StoppableThread):
    def __init__(self, machine, generator):
        super(SourceConsumer, self).__init__()
        self.g = generator
        self.machine = machine

    def run(self):
        while not self.stopped():
            try:
                # NOTE: next() may block which is okay but put may raise Full
                # which will interrupt the source
                msg = self.g.next()
                self.machine.put(msg)
            except (StopIteration, Full):
                break


class Machine(BaseMachine):
    def __init__(self, stages=8, bufsize=1024):
        super(Machine, self).__init__(stages, bufsize)
        self._consumer_thread = None
        self._running = False

    def start(self):
        if not self._running:
            self._consumer_thread = MachineConsumer(self)
            self._consumer_thread.start()
            self._running = True

    def stop(self):
        self._consumer_thread.stop()
        time.sleep(0.2) # give the thread a chance to stop
        self._running = False

    def set_source(self, source_generator):
        self._source = SourceConsumer(self, source_generator)
        self._source.start()
        return self._source

A Use Case

In [104]:
MACHINE = Machine()

def raise_error_every_now_and_then(x):
    if recent_data._count > 1 and recent_data._count % 31 == 0:
        raise TypeError

MACHINE[0] = lambda x: x.get("text", "")
MACHINE[1] = lambda x: TextBlob(x).sentiment
MACHINE[2] = lambda a, b: recent_data.append((b, a))
MACHINE[3] = raise_error_every_now_and_then

class CountingDeque(collections.deque):
    def __init__(self, *args, **kwargs):
        self._count = 0
        super(CountingDeque, self).__init__(*args, **kwargs)
    
    def append(self, *args):
        super(CountingDeque, self).append(*args)
        self._count += 1        
        
recent_data = CountingDeque(maxlen=50)
#thatsdogs
app_key = "is4Leas6P8ajv4ERNojyJ7psg"
app_secret = "JIb2EEGWbE6NmS4NbdMCARfoCONdYxwG6mfnLY9Z61Q9ZkM9cD"
access_token = "1881035263-ghn9BPqkY4PMyVdfsuaNEeTYBtRXwfKo8Op07Cw"
token_secret = "uKrTloEChEQWShUfU3FS9ejXyi906HjHbsB4T4QDtE7HW"

class Streamer(TwythonStreamer):
    
    def on_success(self, data):
        #print data
        MACHINE.put(data)
        
#     def on_error(self, status_code, data):
#         MACHINE.put({"error": status_code})

streamer = Streamer(app_key, app_secret, access_token, token_secret)
streamthread = Thread(target= streamer.statuses.filter, kwargs={"track": "trump"})

Our test case puts tweets onto the machine queue at a high rate. We've included a function that throws an arbitrary error every 31 tweets so that we can investitage pipeline error behavior and various ways to deal with errors in our pipeline. 

First, we'll see how the machine behaves when an error is raised inside the dask.

In [22]:
MACHINE.start()
streamthread.start()

Exception in thread Thread-20:
Traceback (most recent call last):
  File "/Users/jamiepolackwich1/anaconda/envs/juno-machine/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "<ipython-input-5-a506283690e7>", line 22, in run
    output = self.machine.get(block=True, timeout=0.1)
  File "<ipython-input-4-c6caacfa5d44>", line 34, in get
    output = self._getter(dsk, ["oid_s", "in_s"] + ["f{}_s".format(i) for i in xrange(self.stages)])
  File "/Users/jamiepolackwich1/anaconda/envs/juno-machine/lib/python2.7/site-packages/dask-0.10.0-py2.7.egg/dask/async.py", line 519, in get_sync
    raise_on_exception=True, **kwargs)
  File "/Users/jamiepolackwich1/anaconda/envs/juno-machine/lib/python2.7/site-packages/dask-0.10.0-py2.7.egg/dask/async.py", line 490, in get_async
    fire_task()
  File "/Users/jamiepolackwich1/anaconda/envs/juno-machine/lib/python2.7/site-packages/dask-0.10.0-py2.7.egg/dask/async.py", line 461, in fire_task
    get_id, raise_on_exception])

In [24]:
recent_data._count

31

Exception in thread Thread-19:
Traceback (most recent call last):
  File "/Users/jamiepolackwich1/anaconda/envs/juno-machine/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/Users/jamiepolackwich1/anaconda/envs/juno-machine/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/Users/jamiepolackwich1/anaconda/envs/juno-machine/lib/python2.7/site-packages/twython/streaming/types.py", line 66, in filter
    self.streamer._request(url, 'POST', params=params)
  File "/Users/jamiepolackwich1/anaconda/envs/juno-machine/lib/python2.7/site-packages/twython/streaming/api.py", line 154, in _request
    if self.on_success(data):  # pragma: no cover
  File "<ipython-input-21-831e1ec9c551>", line 32, in on_success
    MACHINE.put(data)
  File "<ipython-input-4-c6caacfa5d44>", line 29, in put
    self.q.put(msg, False)
  File "/Users/jamiepolackwich1/anaconda/envs/juno-machine/lib/python2.7/Queue.py", line 123, in pu

In [103]:
try:    
    streamer.disconnect()
    MACHINE.stop()
    del streamer
    del MACHINE
except Exception as e:
    pass

So, once the machine errored out, it stopped running, and consequently stopped taking items off of its queue, which eventually raised Full. Let's see what's going on with the consumer thread:

In [34]:
print(MACHINE._running)
print(MACHINE._consumer_thread.stopped())
print(MACHINE._consumer_thread.isAlive())

True
False
False


The thread running our machine consumer has shutdown after the error was raised, but our machine hasn't registered that information because our StoppableThread baseclass terminated on an unhandled exception.

Notably, our only method of checking that our MACHINE was running (and that it had effectively stopped) was by appending results to a custom counting deque.

It's immediately apparent that we lack insight into the "state" of the machine as it is operating "now", not to mention how it has operated in the past (except maybe that no errors occured until that one). Assuming we actively seek the path to enlightenment, we're first tasked to define machine state as it is most relevent to our purposes.

We can start by asking the same, very relevant question we first asked in our experimental field work: is this running or not? That question is important, because we're presumably interfacing with our machine to accomplish some task, and the answer determines our next action in real life as an end-user, even if that's to do nothing. So, in the event we don't have access to that information, we will wait forever and die, or else try to glean clues via outside artifacts like above or some other way that's shitty. 

In [105]:
class Machine(BaseMachine):
    def __init__(self, stages=8, bufsize=1024):
        super(Machine, self).__init__(stages, bufsize)
        self._consumer_thread = None
#-         self._running = False

    def start(self):
        if not self.running:
            self._consumer_thread = MachineConsumer(self)
            self._consumer_thread.start()
#-             self._running = True

    def stop(self):
        self._consumer_thread.stop()
        time.sleep(0.2) # give the thread a chance to stop
#-         self._running = False

    def set_source(self, source_generator):
        self._source = SourceConsumer(self, source_generator)
        self._source.start()
        return self._source
#++++    
    @property
    def running(self):
        if self._consumer_thread is None:
            return False
        return self._consumer_thread.is_alive()
#++++

In [106]:
MACHINE.start()
streamthread.start()

Exception in thread Thread-27:
Traceback (most recent call last):
  File "/Users/jamiepolackwich1/anaconda/envs/juno-machine/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "<ipython-input-5-a506283690e7>", line 22, in run
    output = self.machine.get(block=True, timeout=0.1)
  File "<ipython-input-4-c6caacfa5d44>", line 34, in get
    output = self._getter(dsk, ["oid_s", "in_s"] + ["f{}_s".format(i) for i in xrange(self.stages)])
  File "/Users/jamiepolackwich1/anaconda/envs/juno-machine/lib/python2.7/site-packages/dask-0.10.0-py2.7.egg/dask/async.py", line 519, in get_sync
    raise_on_exception=True, **kwargs)
  File "/Users/jamiepolackwich1/anaconda/envs/juno-machine/lib/python2.7/site-packages/dask-0.10.0-py2.7.egg/dask/async.py", line 490, in get_async
    fire_task()
  File "/Users/jamiepolackwich1/anaconda/envs/juno-machine/lib/python2.7/site-packages/dask-0.10.0-py2.7.egg/dask/async.py", line 461, in fire_task
    get_id, raise_on_exception])

In [107]:
MACHINE.running

False

Whatever a machine state is, it's obviously something that's built to change in time, because we're using it like a data processing pipeline. So figuring out a consistent way to talk about the things that change is a good thing probs.

It's useful to consider the idea of an event. An event is something that happens at some time that can change the state of the machine. Using our test-case as an example, a user-defined dask function throwing an error shutoff our machine, which qualifies. Putting data on a full machine queue can also break our machine, however that type of event is inherently trigger by some other event, like a processing bottleneck or a broken consumer thread. The idea is to have enough information at any time avilable to us so that we can whatever next steps we'd like efficiently.  

Although that line of thinking implies some approach to handling errors from different events, it's also useful for dealing with other kinds of events. For instance, the absence of data in the queue after some time might signify that some source is exhausted. In such a case it might be prudent to stop the machine and issue a notification of completion. Progress can be measured via data processed; in the event that errors are being handled, where and how often might move a user to go back and change things; in general, if we design our event handling system to do things, we still ought to be able to discern that events are being handled from our information system.

Despite the use case, the same core machine will serve every pipeline. We can define core metrics that describe our machine state consistently in any case. If they're really fundamental state metrics, we can define them in BaseMachine along with a base_display maybe (without actually subclassing):

In [79]:
import functools
from IPython.display import Image, HTML, display


class BaseMachineWithStats(BaseMachine):
    def __init__(self, *args, **kwargs):
        super(BaseMachineWithStats, self).__init__(*args, **kwargs)
        self._status = {"CurrentOID": None, "LastOID": None, "Processed": 0, "QueueSize": self.q.qsize()}
    
    def get(self, *args, **kwargs):
        self._before_get(*args, **kwargs)
        result = super(BaseMachine, self).get(*args, **kwargs)
        self._after_get(result)
        
    @property
    def status(self):
        return self._status
    
    #default
    def display_status(self):
        stats = self.status
        s0 = "<div style='border:1px; border-style:solid; width:400px; height:auto; float:left;'><b>Current Ingest Time -- {}</b></div>".format(stats['CurrentTime'])
        s1 = "<div style='border:1px; border-style:solid; width:400px; height:auto; float:left;'><b>Current Ingest ID -- {}</b></div>".format(stats['CurrentOID'])
        s2 = "<div style='border:1px; border-style:solid; width:400px; height:auto; float:left;'><b>Total Datum Processed -- {}</b></div>".format(stats['Processed'])
        s3 = "<div style='border:1px; border-style:solid; width:400px; height:auto; float:left;'><b>Current Queue Depth -- {}</b></div>".format(stats["QueueSize"])
        display(HTML("\n".join([s0, s1, s2, s3])))

However, it could also be interesting to be able to configure custom status hook plugins. In this way, we could write templates for situations where a fixed number of data is known beforehand, for instance, and display_status might include a progress bar. 

A similar approach could be used to create event handler templates for specific scenarious. For instance, a "RawEventDebugger" event handler template might raise all possible errors locally, shutdown the machine, and provide in-depth tracebacks or something. "DaskEventDebugger" might only raise dask errors, etc. After learning about the pipeline you're developing, you can use a "BaseEventHandler" or customize your own. Shutdown and Restart hooks could be implemented.   

In [None]:
def statushook(method):
    @functools.wraps(method)
    def wrapper(inst, *args, **kwargs):
        try:
            inst.__getattribute__('_pre_' + method.im_func.func_name)(*args, **kwargs)
        except AttributeError as ae:
            pass
        result = method(*args, **kwargs)
        kwargs.update('result', result)
        try:
            inst.__getattribute__('_post_' + method.im_func.func_name)(*args, **kwargs)
        except AttributeError as ae:
            pass
        return result
    return wrapper

In [None]:
# General class for making tables.  Pass in header, iterable/s of iterable/s
class StatusTable(object):    
    def __init__(self, status, headers=None, title=None, cell_padding=False, abrv_links = False):
        self.nitr = iterable_of_iterables
        self.headers = headers
        self.title = title
        self.htmls = ''
        self.abrv_links = abrv_links
        self.cell_padding = cell_padding
    
    def html_tag(self, obj):
        
        #obj = str(obj)
        if 'http' in obj:
            if self.abrv_links:
                return '<td><a href = ' + obj.encode('utf-8') + '>' + '...' + obj[-10:].encode('utf-8') + '</a></td>'
            else:
                return '<td><a href = ' + obj.encode('utf-8') + '>' + obj.encode('utf-8') + '</a></td>'
        else:
            return '<td>' + obj.encode('utf-8') + '</td>'

    def table_view(self):
        if self.title != None:
            self.htmls += '<h3 align=center>{}</h3>'.format(self.title)
        if self.cell_padding:
            self.htmls += '<table border=1 cellpadding=3; style=width:100%>\n'
        else:
            self.htmls += '<table border=1; style=width:100%>\n'
        if self.headers != None:
            self.htmls += '<tr>' + ''.join([self.html_tag('<b>' + _h + '</b>') for _h in self.headers])
        for itr in self.nitr:
            self.htmls += '<tr>' + ''.join([self.html_tag(_obj) for _obj in itr]) + '</tr>\n'
        self.htmls += '</table>'
        display(HTML(self.htmls))