Skip to content

upayavira/pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

29 Commits
 
 
 
 
 
 
 
 

Repository files navigation

=== Odoko Pipeline ===

This tool is a simple configurable pipeline content ingestion engine.

See src/test/resources/config for sample configuration examples.

== Genesis ==
This tool is a conversion into Java of an original system written in
Python, that has been handling 50+ feeds actively for a few years 
at one location.

== The Pipeline Concept ==
The Odoko Pipeline toolset has two levels of functionality -
lightweight and heavyweight pipelines. The latter is less mature, and 
likely far less implemented (if at all).

= Lightweight pipelines =
Lightweight pipelines are intended for simple sequences of actions,
e.g. XSLT transforms and regular expression replacements, where there
are no dependencies on external systems that might cause bottlenecks
in pipeline stages. 

In such a pipeline, components are directly wired together in sequence,
and assets are passed from one component to the next for processing.

= Heavyweight pipelines =
A heavyweight pipeline is a sequence of lightweight pipelines, joined
together with a queue. Rather than dispatching an asset to a destination
system, a pipeline may place the asset into a queue for further
processing by another pipeline that has been assigned to that queue.

This approach is intended to handle scaling - queues can be distributed
across hosts, in a multitude of ways.

= Core concepts =
A pipeline system starts with a 'locator'. This component will locate
resources for ingestion. This may involve, for example, looking at the 
files on a disk, or on an FTP server. 

These are then passed to a named pipeline. A pipeline is made up of a
'collector', followed by zero or more 'transformers', followed by a
'dispatcher'. The collector gets the actual resource from the data
source, and passes it unchanged to the next component in the pipeline.

Transformers can, of course, transform the content, be it XSLT, 
regular expression replace, encoding corrections, and so on.

Finally, the dispatcher is responsible for sending the end result
to its destination. This could be by uploading via FTP, posting to
an HTTP server, or such.

== So Far ==
This system is at a very early stage, but given many of the ideas are
already formed in the Python version, for some period, development could
be (time permitting) relatively fast.

So far, the basic lightweight pipeline infrastructure is in place.

Next, a sample use case will be implemented, ingesting content from the 
BBC News RSS feed, and pushing it to an Apache Solr index.

Once this is done, basic infrastructure for 'branching' will be added.
This will require 'heavyweight pipelines' as branching is effectively
placing an asset into a queue for another pipeline to handle.

To show this in use, the above sample app will add a branch so that 
the thumbnails can also be indexed into Solr, allowing a user to
search for images as well as articles.

And then, well, there's lots more to come after that!

== To do ==

Below are some features still to implement. Unfortunately, I have not
distinguished between ideas that were implemented in the Python version
and ideas that could be implemented.

= Workflow =

The pipeline designer needs the ability to configure whether an asset
will be ingested if it has been seen before. This is done by each
asset having its own configurable URI. If each asset has a genuinely
unique URI, then that will be sufficient. If you wish content with a
non-unique URI to be ingested every hour, this can be made to happen
by attaching a timestamp to the URI, where that timestamp has been
truncated to the nearest hour. By providing a range of useful
variables for inclusion in the URI, it is hoped that a wide range
of scenarios can be catered for.

Once an asset has set off down a pipeline, various 'workflow' events
can happen. For example, a pipeline may branch. If an RSS feed is
ingested, that feed will likely contain multiple documents. The 
first few steps of the pipeline will split the single RSS document
into each individual document, and then pass each of these down
a separate pipeline.

Sometimes branching may be selective, for example where information
in the document can be used to decide which sub-pipeline the content
should continue down (e.g. photos within an RSS document may be
processed differently from articles).

Where a lot of content is passing down a pipeline, it may be 
preferable to run multiple threads. In some circumstances, it may
be preferable to have multiple hosts running pipelines in parallel.

Some pipeline stages may defer processing to external processes,
whether other processes on the same host, or for example web 
services on another host.

The system should provide buffering, such that assets, having
reached a certain position in a queue, can be buffered, for another
pipeline to pick them up and run them (maybe on another host, to 
share responsibility).

The workflow system will record the success and failure of ingestion
of assets.

= Logging =
There are two primary logs, process logs and error logs.

The process logs record simple success/failure against each asset.

The error logs record more detail about what went wrong when an asset
did fail. It might make sense to include some kind of unique identifier
in the process log against failures that can be used to track the 
same failure in the error log.

= Testing =
An instance of this pipeline tool is likely to support ingestion of
content from multiple sources. As new sources are added to the 
system, there is a risk that they introduce instability within 
pipelines configured for existing sources.

If, when configuring a pipeline, the pipeline designer provides a set
of sample content (configured against the collector) and a set of 
assertions (configured against the dispatcher), then the pipeline
could be run in test mode, processing the sample data and then 
ensuring that the content that reaches the dispatcher matches
the criteria specified in the provided assertions.

With this mechanism, it should be safe to enhance and extend the
pipeline system with additional data sources and functionalities
with the confidence that the system will continue to function as
expected.

= Archiving =
On occasion, content can need to be re-ingested, whether because
of errors in the pipeline configuration, or due to problems with
the pipeline tool itself.

This can be handled by archiving content, with the ability to
rerun the pipeline from the content in the archive.

Archiving components will be provided that can be inserted into 
the pipeline. When the pipeline reaches this stage, the content
can be written to a local archive. Ideally, it will be possible
to rerun the pipeline, powered off this archive, and restarting
at the archive component, effectively completing the job of
ingesting these assets.

= Scheduling =
The system will have basic cron-like scheduling, stating how
often each feed should be invoked. 

= Invocation =
It will be possible to specify the number of documents that
should be handled in any one run, whether logging should take
place, whether assets' URIs should be recorded as seen, whether
a dispatcher should actually dispatch. Many of these are likely
useful while developing pipelines (marking assets as 'seen' 
is very irritating when constantly running a pipeline for
development purposes).

= Scaling =
It is intended that a lot of the above will be wired into 
Apache Hadoop as the engine for scaling. Hadoop will be 
entirely optional.

===================================
Licence: This code is licensed under the Apache License 2.0.

Proper licence attribution forthcoming.

About

Componentised content ingestion and transformation pipeline

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages