Pipeline-Go is a Go library for building flexible, concurrent data processing pipelines. It is inspired by a similar Python project and provides a structured way to process streams of data "frames" through a series of composable "processors".
⭐️ pipeline design ⭐️
- Frame: The basic unit of data that flows through a pipeline. Frames can be of various types, such as
TextFrame,ImageRawFrame, or control frames likeEndFrameandStartFrame. - Processor: An interface for components that act on frames. Processors are the building blocks of a pipeline. They can be used to filter, transform, aggregate, or perform any other operation on frames.
- Pipeline: A sequence of processors linked together. Pipelines can be simple linear sequences or more complex parallel structures.
- Task: An executable instance of a pipeline. A
PipelineTaskmanages the lifecycle of a pipeline, including running it, queueing frames, and handling shutdown.
- Sequential & Parallel Pipelines: Build simple linear pipelines (
NewPipeline) or complex concurrent pipelines that process frames in parallel (NewParallelPipeline,NewSyncParallelPipeline). - Rich Set of Processors: Includes a variety of built-in processors for common tasks:
- Filtering:
FrameFilter(based on a function) andTypeFilter(based on frame type). - Aggregation:
SentenceAggregator,GatedAggregator,HoldFramesAggregator, andHoldLastFrameAggregator. - Output Processing: Simple
OutputProcessorfor basic frame handling, and advancedAdvancedOutputProcessorandOutputFrameProcessorfor complex output scenarios with async processing, interruption handling, and metrics support.
- Filtering:
- Extensible: Easily create your own custom processors by implementing the
IFrameProcessorinterface. - Concurrency-Safe: Designed with concurrency in mind, using Go channels and goroutines for asynchronous processing.
- Async Processing:
AsyncFrameProcessorenables asynchronous frame handling with interruption support. - Metrics Collection: Enhanced processors with built-in metrics collection for TTFB and processing time.
/
├── examples/ # processor examples
├── pkg/
│ ├── logger/ # defined logger to config
│ ├── frames/ # Defines all data and control frame types
│ ├── notifiers/ # A simple channel-based notification system
│ ├── pipeline/ # Core logic for Pipeline, PipelineTask, and parallel variants
│ ├── idl/ # rpc IDL
│ ├── serializers/ # pb json serializers
│ └── processors/ # All built-in IFrameProcessor implementations
├── go.mod
see examples
go run examples/filter_example.gogo run examples/async_example.go
# with interrupt
go run examples/async_interruption_example.gogo run examples/advanced_output_example.gogo run examples/metrics_example.goTo run the complete test suite and see verbose output:
go test -v ./pkg/...- pipeline-py: https://github.com/weedge/pipeline-py