-
Notifications
You must be signed in to change notification settings - Fork 0
/
algorithm.go
99 lines (83 loc) · 2.18 KB
/
algorithm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package parsing
import (
"context"
"io"
"golang.org/x/sync/errgroup"
"github.com/opencars/operations/pkg/csv"
"github.com/opencars/operations/pkg/domain/model"
"github.com/opencars/operations/pkg/logger"
)
const (
DefaultMappers = 1
DefaultReducers = 10
DefaultShufflers = 10
DefaultBulkSize = 1000
)
type MapReduce struct {
shuffler Shuffler
mapper Mapper
reducer Reducer
shufflers int
mappers int
reducers int
bulkSize int
convertibles chan []Convertible
batches chan []model.Operation
}
func NewMapReduce() *MapReduce {
return &MapReduce{
reducers: DefaultReducers,
shufflers: DefaultShufflers,
mappers: DefaultMappers,
bulkSize: DefaultBulkSize,
}
}
func (mr *MapReduce) Parse(ctx context.Context, resource *model.Resource, rc io.ReadCloser) (resErr error) {
mr.convertibles = make(chan []Convertible)
mr.batches = make(chan []model.Operation)
csvReader := csv.NewReader(rc, ';')
reducerGroup, reducerCtx := errgroup.WithContext(context.Background())
for i := 0; i < mr.reducers; i++ {
logger.Debugf("starting %d reducer", i)
reducerGroup.Go(func() error {
return mr.reducer.Reduce(reducerCtx, mr.batches)
})
}
defer func() {
logger.Debugf("waiting for reducers")
if err := reducerGroup.Wait(); err != nil {
resErr = err
}
}()
shufflerGroup, shufflerCtx := errgroup.WithContext(context.Background())
for i := 0; i < mr.shufflers; i++ {
logger.Debugf("starting %d shuffler", i)
shufflerGroup.Go(func() error {
return mr.shuffler.Shuffle(shufflerCtx, resource, mr.convertibles, mr.batches)
})
}
defer func() {
logger.Debugf("waiting for shufflers")
if err := shufflerGroup.Wait(); err != nil {
resErr = err
}
logger.Debugf("closing batches channel")
close(mr.batches)
}()
mapperGroup, mapperCtx := errgroup.WithContext(ctx)
for i := 0; i < mr.mappers; i++ {
logger.Debugf("starting %d mapper", i)
mapperGroup.Go(func() error {
return mr.mapper.Map(mapperCtx, csvReader, mr.convertibles)
})
}
defer func() {
logger.Debugf("waiting for mappers")
if err := mapperGroup.Wait(); err != nil {
resErr = err
}
logger.Debugf("closing convertibles")
close(mr.convertibles)
}()
return nil
}