/
find.go
100 lines (87 loc) · 2.65 KB
/
find.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package kafka
import (
"context"
"errors"
"github.com/rs/zerolog/log"
"github.com/segmentio/kafka-go"
"github.com/tony-spark/recipetor-backend/ingredient-service/internal/ingredient"
"github.com/tony-spark/recipetor-backend/ingredient-service/internal/ingredient/service"
"io"
"time"
)
type FindIngredientsWorker struct {
ingredientService service.Service
ingredientsReqReader *kafka.Reader
ingredientsWriter *kafka.Writer
}
func NewFindIngredientsWorker(ingredientService service.Service, brokers []string) (Worker, error) {
ingredientsReqReader, err := newReader(brokers, "ingredients-service-find", TopicIngredientsReq)
if err != nil {
return nil, err
}
ingredientsWriter := newWriter(brokers, TopicIngredients)
return FindIngredientsWorker{
ingredientService: ingredientService,
ingredientsReqReader: ingredientsReqReader,
ingredientsWriter: ingredientsWriter,
}, nil
}
func (w FindIngredientsWorker) Process(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
default:
}
var dto ingredient.FindIngredientsDTO
corID, err := readDTO(ctx, w.ingredientsReqReader, &dto)
if err != nil {
if errors.Is(err, io.EOF) {
return err
}
continue
}
log.Info().Msgf("got FindIngredientsDTO: %+v", dto)
if len(dto.ID) > 0 {
cntx, cancel := context.WithTimeout(ctx, 5*time.Second)
ingr, err := w.ingredientService.GetByID(cntx, dto.ID)
cancel()
ingredientDTO := ingredient.IngredientDTO{
ID: dto.ID,
}
if err != nil {
log.Error().Err(err).Msg("failed to find ingredient")
ingredientDTO.Error = err.Error()
} else {
ingredientDTO.Ingredient = ingr
}
write(w.ingredientsWriter, dto.ID, ingredientDTO, corID)
log.Info().Msgf("sent IngredientDTO: %+v", ingredientDTO)
}
if len(dto.NameQuery) > 0 {
cntx, cancel := context.WithTimeout(ctx, 5*time.Second)
ingredients, err := w.ingredientService.SearchByName(cntx, dto.NameQuery)
cancel()
if err != nil {
log.Error().Err(err).Msg("failed to find ingredients")
ingredientDTO := ingredient.IngredientDTO{
Error: err.Error(),
NameQuery: dto.NameQuery,
}
write(w.ingredientsWriter, dto.NameQuery, ingredientDTO, corID)
log.Info().Msgf("sent IngredientDTO: %+v", ingredientDTO)
}
for _, ingr := range ingredients {
ingredientDTO := ingredient.IngredientDTO{
Ingredient: ingr,
NameQuery: dto.NameQuery,
}
write(w.ingredientsWriter, dto.NameQuery, ingredientDTO, corID)
log.Info().Msgf("sent IngredientDTO: %+v", ingredientDTO)
}
}
}
}
func (w FindIngredientsWorker) Stop() error {
return closeAll(w.ingredientsReqReader, w.ingredientsWriter)
}