-
Notifications
You must be signed in to change notification settings - Fork 2
/
pipeline.go
131 lines (105 loc) · 2.94 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// TODO: Add metrics, error handling, logging, context, APM, APM transaction, etc.
package pipeline
import (
"context"
"sync"
"github.com/thalesfsp/etler/adapter"
"github.com/thalesfsp/etler/processor"
"github.com/thalesfsp/etler/shared"
"github.com/thalesfsp/status"
)
// Number is a simple struct to be used in the tests.
type Number struct {
// Numbers to be processed.
Numbers []int `json:"numbers"`
}
// Stage definition.
type Stage[In any, Out any] struct {
// Concurrent determines whether the stage should be run concurrently.
Concurrent bool `json:"concurrent"`
// Processors to be run in the stage.
Processors []processor.IProcessor[In, Out] `json:"processors"`
}
// IPipeline defines what a `Pipeline` must do.
type IPipeline[In any, Out any] interface {
shared.IMeta[In]
Run(ctx context.Context, in []In) (out []Out, err error)
}
// Pipeline definition.
type Pipeline[In any, Out any] struct {
// Description of the processor.
Description string `json:"description"`
// Name of the processor.
Name string `json:"name"`
// Adapters to be used in the pipeline.
Adapters map[string]adapter.IAdapter[In] `json:"adapters"`
// Control the pipeline.
Control chan string `json:"-"`
// Progress of the pipeline.
Progress int `json:"progress"`
// Stages to be used in the pipeline.
Stages []Stage[In, Out] `json:"stages"`
// State of the pipeline.
State status.Status `json:"state"`
}
// GetDescription returns the `Description` of the processor.
func (p *Pipeline[In, Out]) GetDescription() string {
return p.Description
}
// GetName returns the `Name` of the processor.
func (p *Pipeline[In, Out]) GetName() string {
return p.Name
}
// GetState returns the `State` of the processor.
func (p *Pipeline[In, Out]) GetState() status.Status {
return p.State
}
// SetState sets the `State` of the processor.
func (p *Pipeline[In, Out]) SetState(state status.Status) {
p.State = state
}
// Run the pipeline.
func (p *Pipeline[In, Out]) Run(ctx context.Context, in []In) (out []Out, err error) {
// Iterate through the stages, passing the output of each stage
// as the input of the next stage.
for _, s := range p.Stages {
if s.Concurrent {
var wg sync.WaitGroup
for _, p := range s.Processors {
wg.Add(1)
// Start a goroutine to run the stage.
//
// TODO: Make it boundaded.
go func(ctx context.Context, p processor.IProcessor[In, Out]) {
// Process the data.
out, err = p.Run(ctx, in)
wg.Done()
}(ctx, p)
}
wg.Wait()
} else {
// Process the data sequentially.
for _, p := range s.Processors {
out, err = p.Run(ctx, in)
if err != nil {
return out, err
}
}
}
}
return out, err
}
// New returns a new pipeline.
func New[In any, Out any](
name string,
description string,
stages []Stage[In, Out],
) IPipeline[In, Out] {
return &Pipeline[In, Out]{
Control: make(chan string),
Name: name,
Progress: 0,
Stages: stages,
State: status.Stopped,
}
}