diff --git a/README.md b/README.md index b2e562d..bd4c664 100644 --- a/README.md +++ b/README.md @@ -4,4 +4,119 @@ [![License](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/containous/traefik/blob/master/LICENSE.md) # pipeline -A pipeline in go +pipeline is a package that simplifies creating staged pipelines in `go`. + +![Pipeline](/images/pipeline.png) + +It provides a simple API to construct **stages** and **steps** to execute complex tasks. The packages supports many features including concurrent stages and configurable workers as well as buffered steps. + +## A Basic Pipeline + +At minimum to create a `Pipeline`, you'll need to create a single `Stage` with a single `Step`. + +Every `Step` in the `Pipeline` requires input and provides output via channels. + +```go +func main() { + // Create a step with a function that does the work + step := pipeline.NewStep("the step", func(ctx *pipeline.Context, in <-chan interface{}, out chan interface{}) error { + // Take the input + input := (<-in).(string) + + // Do something with it + input += " pipeline" + + // Pass it along + out<-input + return nil + }) + + // Create the pipeline with one stage that contains the step + p := pipeline.NewPipeline("the pipeline") + p.AddStage(pipeline.NewStage("the stage", step)) + + // Run the pipeline + in := make(chan interface{}) + go func() { + in<-"hello" + }() + out := p.Process(nil, in) + + // Get the output + fmt.Println((<-out).(string)) +} +``` + +## Stages + +A stage is a collection of steps that can be run serially or concurrently. + +It can be created using `pipeline.NewStage(...)` and by default it will create a new serial stage. + +### Serial Stages + +A serial stage will run all the steps in the stage serially, or one after another. The output from one step will flow into the input of the next step. + +Serial stages can be created with: +```go +stage := pipeline.NewSerialStage(name, step1, ...) +``` + +### Concurrent Stages + +A concurrent stage will run all the steps in the stage concurrently, or together. The input for each step will come from the output of the last stage and all the output of the steps in this stage will be provided to the next stage. + +Concurrent stages can be created with: +```go +stage := pipeline.NewConcurrentStage(name, step1, ...) +``` + +## Steps + +A step is a single processing unit of a pipeline. It takes input in the form of an `interface{}` from a channel, does some work on it, and then should provide the output to another channel. + +Steps can be configured in various ways the modify the pipeline and provide flexibility. The kinds of available steps are: +- worker +- buffered +- fan out + +It can be created using `pipeline.NewStep(...)` and by default it will create a new worker step with a worker count of 1. + +### Worker Step + +A worker step lets you define the number of workers to spawn in go routines to handle this step. This effectively makes the step concurrent by allowing multiple routines process input. + +Worker steps can be created with: +```go +step := pipeline.NewWorkerStep(name, workerCount, stepFn) +``` + +### Buffered Steps + +A buffered step creates a step with a buffered output channel allowing data to be buffered while processing. Sending to the output channel doesn't block if the buffer still has space and allows the step to continue reading from the input channel. + +Buffered steps can be created with: +```go +step := pipeline.NewBufferedStep(name, stepFn) +``` + +### Fan Out Steps + +A fan out step creates a step that replicates or fans out the input channel across a number of workers. In this model, the worker count indicates how many concurrent steps to replicate the input data on. Note that this will process redundant data streams. + +Fan out steps can be created with: +```go +step := pipeline.NewFanOutStep(name, workerCount, stepFn) +``` + +## Contibuting +Contributions are what makes the open-source community such an amazing place to learn, inspire, and create. Any contributions you make are **greatly appreciated**. + +1. Fork the Project +2. Create your Feature Branch (`git checkout -b feature/AmazingFeature`) +3. Commit your Changes (`git commit -m 'Add some AmazingFeature'`) +4. Push to the Branch (`git push origin feature/AmazingFeature`) +5. Open a Pull Request + +## License +Distributed under the MIT License. diff --git a/images/pipeline.png b/images/pipeline.png new file mode 100644 index 0000000..f04d20e Binary files /dev/null and b/images/pipeline.png differ diff --git a/stage.go b/stage.go index 8241431..779222a 100644 --- a/stage.go +++ b/stage.go @@ -28,6 +28,11 @@ func newStage(name string, concurrent bool, steps ...*Step) *Stage { return s } +// NewStage creates a new stage, defaults to serial stage. +func NewStage(name string, steps ...*Step) *Stage { + return NewSerialStage(name, steps...) +} + // NewSerialStage creates a new stage for given steps that will run sequentially. func NewSerialStage(name string, steps ...*Step) *Stage { return newStage(name, false, steps...) diff --git a/step.go b/step.go index 7e8c5d9..662efbd 100644 --- a/step.go +++ b/step.go @@ -26,8 +26,7 @@ type Step struct { // Defaults to false Buffered bool // FanOut indicates whether workers should perform redundant work, i.e., - // fan out input channels to each one as opposed to using the a - // single one. + // fan out input channels to each one as opposed to using a single one. // Defaults to false FanOut bool // The number of workers to spawn in go routines to handle this step.