In most cases, one can simply wrap a sensor in the SensorAsOutputThing
class and not worry about the details of how to implement output things. There
are also several pre-defined readers under thingflow.adapters
that can
obtain events from external sources like message brokers, flat files, and
databases.
The most likly reason for implmenting a new OutputThing is that you want to create a new adapter type that does not exist in the standard ThingFlow library. We will walk through the details in this document.
When implmenting an output thing, one subclasses from
thingflow.base.OutputThing
. To emit a new event, the subclass calls the
_dispatch_next
method with the event and port name. To signal an error or
completion of the event stream,
one calls _dispatch_error
or _dispatch_completed
, respectively. The
base class implementation of these methods are responsible for calling the
on_next
, on_error
, and on_completed
methods for each of the
connected things.
The code to call these _dispatch
methods goes into a well-known method to be
called by the scheduler. The specific method depends how the output thing will
interact with the scheduler. There are two
cases supported by ThingFlow and three associated mixin-classes that define
the methods:
DirectOutputThingMixin
defines an_observe
method that can be called directly by the scheduler either in the main thread (viaScheduler.schedule_period()
orScheduler.schedule_recurring()
) or in a separate thread (viaScheduler.schedule_periodic_on_separate_thread()
).EventLoopOutputThingMixin
is used for an output thing that has its own separate event loop. This is run in a separate thread and the connected input things are called in the main thread.
OK, with all that out of the way, let us define a simple OutputThing. We will
create a simple CSV-formatted spreadsheet file reader. Each row in the
file corresponds to an event. Here is the class definition (found in
examples/simple_csv_reader.py
):
import csv
from thingflow.base import OutputThing, DirectOutputThingMixin,\
SensorEvent, FatalError
class SimpleCsvReader(OutputThing, DirectOutputThingMixin):
def __init__(self, filename, has_header_row=True):
super().__init__() # Make sure the output_thing class is initialized
self.filename = filename
self.file = open(filename, 'r', newline='')
self.reader = csv.reader(self.file)
if has_header_row:
# swallow up the header row so it is not passed as data
try:
self.reader.__next__()
except Exception as e:
raise FatalError("Problem reading header row of csv file %s: %s" %
(filename, e))
def _observe(self):
try:
row = self.reader.__next__()
event = SensorEvent(ts=float(row[0]), sensor_id=row[1],
val=float(row[2]))
self._dispatch_next(event)
except StopIteration:
self.file.close()
self._dispatch_completed()
except FatalError:
self._close()
raise
except Exception as e:
self.file.close()
self._dispatch_error(e)
The SimpleCsvReader
class subclasses from both OutputThing
and
DirectOutputThingMixin
. Subclassing from OutputThing
provides the
machinery needed to register connections and propagate events to downstream
input things. DirectOutputThingMixin
defines an empty _observe()
method and
indicates that the scheduler should call _observe()
to dispatch events
whenever the reader has been scheduled.
In the __init__()
constructor, we first make sure that the base class
infrastructure is initialized through super().__init__()
. Next, we
open the file, set up the csv reader, and read the header (if needed).
The main action is happening in _observe()
. When scheduled, it reads
the next row from the csv file and creates a SensorEvent
from it.
This event is passed on to the output port's connections via
_dispatch_next()
. If
the end of the file has been reached (indicated by the StopIteration
exception), we instead call _dispatch_completed()
. There are two
error cases:
- If a
FatalError
exception is thrown, we close our connection and propagate the error up. This will lead to an early termination of the event loop. - If any other exception is thrown, we pass it downstream via
_dispatch_error()
. It will also close the event stream and cause theSimpleCsvReader
to be de-scheduled. The main event loop may continue, assuming that there are other scheduled objects.
We could save some work in implementing our reader by subclassing from
thingflow.adapters.generic.DirectReader
. It provides the dispatch
behavior common to most readers.
Now, let us create a simple data file test.csv
:
ts,id,value 1,1,2 2,1,3 3,1,455 4,1,55
We can instantiate a SimpleCsvReader
to read in the file via:
reader = SimpleCsvReader("test.csv")
Now, let's hook it to an printing input thing and then run it in the event loop:
import asyncio
from thingflow.base import Scheduler
import thingflow.adapters.output # load the output method
reader.output()
scheduler = Scheduler(asyncio.get_event_loop())
scheduler.schedule_recurring(reader)
scheduler.run_forever()
We use schedule_recurring()
instead of schedule_periodic()
, as we
expect all the data to be already present in the file. There is no sense in
taking periodic samples.
The output looks as follows:
SensorEvent(sensor_id='1', ts=1.0, val=2.0) SensorEvent(sensor_id='1', ts=2.0, val=3.0) SensorEvent(sensor_id='1', ts=3.0, val=455.0) SensorEvent(sensor_id='1', ts=4.0, val=55.0) No more active schedules, will exit event loop
Note that the event loop terminates on its own. This is due to the call to
_dispatch_completed()
when the csv reader throws StopIteration
.
There can be cases when the underlying API to be called by the OutputThing
requires its own event loop / event listener. To handle this situation,
use the interface provided by EventLoopOutputThingMixin
. Your main
event loop for the output ting is implemented in the _observe_event_loop()
.
If you call the scheduler's schedule_on_private_event_loop()
method, it
will run this method in a separate thread and then dispatch any events to
the scheduler's main event loop (running in the main thread).
To see some example code demonstrating an output thing using a private event
loop, see thingflow.adapters.mqtt.MQTTReader
.