python-processor
Badges
Simple rules
Python processor is a tool for creating chained pipelines for dataprocessing. It have very few key concepts:
- Data object
- Any python dict with two required fields:
sourceandtype. - Source
- An iterable sequence of
data objectsor a function which returnsdata objects. See full list of sources in the docs. - Output
- A function which accepts a
data objectas input and could output another. See full list of outputs in the docs. (or same)data objectas result. - Predicate
- Pipeline consists from sources outputs, but
predicatedecides whichdata objectshould be processed by whichoutput.
Quick example
Here is example of pipeline which reads IMAP folder and sends all emails to Slack chat:
run_pipeline(
sources.imap('imap.gmail.com'
'username',
'password'
'INBOX'),
[prepare_email_for_slack, outputs.slack(SLACK_URL)])Here you construct a pipeline, which uses sources.imap for reading imap folder
"INBOX" of username@gmail.com. In more complex case outputs.fanout
can be used for routing dataobjects to different processors and sources.mix can
be used to merge items two or more sources into a one stream.
Functions prepare_email_to_slack and outputs.slack(SLACK_URL) are processors. First one
is a simple function which accepts data object, returned by imap source and transforming
it to the data object which could be used by slack.output. We need that because slack
requires a different set of fields. Call to outputs.slack(SLACK_URL) returns a
function which gets an object and send it to the specified Slack's endpoint.
It is just example, for working snippets, continue reading this documention ;-)
Note
By the way, did you know there is a Lisp dialect which runs on Python virtual machine? It's name is HyLang, and python processor is written in this language.
Installation
Create a virtual environment with python3::
virtualenv --python=python3 env source env/bin/activate
Install required version of hylang (this step is necessary because Hy syntax is not final yet and frequently changed by language maintainers)::
pip install -U 'git+git://github.com/hylang/hy.git@a3bd90390cb37b46ae33ce3a73ee84a0feacce7d#egg=hy'
If you are on OSX, then install lxml on OSX separately::
STATIC_DEPS=true pip install lxml
Then install the processor::
pip install processor
Usage
Now create an executable python script, where you'll place your pipline's configuration. For example, this simple code creates a process line which searches new results in Twitter and outputs them to console. Of cause, you can output them not only to console, but also post by email, to Slack chat or everywhere else if there is an output for it:
#!env/bin/python3
import os
from processor import run_pipeline, sources, outputs
from twiggy_goodies.setup import setup_logging
for_any_message = lambda msg: True
def prepare(tweet):
return {'text': tweet['text'],
'from': tweet['user']['screen_name']}
setup_logging('twitter.log')
run_pipeline(
sources=[sources.twitter.search(
'My Company',
consumer_key='***', consumer_secret='***',
access_token='***', access_secret='***',
)],
rules=[(for_any_message, [prepare, outputs.debug()])])Running this code, will fetch new results for search by query My Company
and output them on the screen. Of course, you could use any other output,
supported by the processor. Browse online documentation to find out
which sources and outputs are supported and for to configure them.
Ideas for Sources and Outputs
web-hookendpoint (in progress).tailsource which reads file and outputs lines appeared in a file between invocations or is able to emulatetail -fbehaviour. Python module tailer could be used here.grepoutput -- a filter to grep some fields using patterns. Withtailandgrepyou could build a pipeline which watch on a log and send errors by email or to the chat.xmppoutput.ircoutput.rss/atom feed reader.weathersource which tracks tomorrow's weather forecast and outputs a message if it was changed significantly, for example from "sunny" to "rainy".githubsome integrations with github API?jiraor other task tracker of your choice?- suggest your ideas!
Documentation
https://python-processor.readthedocs.org/
Development
To run the all tests run:
tox




