generated from rog-golang-buddies/golang-template-repository
-
Notifications
You must be signed in to change notification settings - Fork 0
/
apispec.go
95 lines (89 loc) · 3.26 KB
/
apispec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package handler
import (
"context"
"encoding/json"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/config"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/dto"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/logger"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/process"
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/queue/publisher"
"github.com/wagslane/go-rabbitmq"
)
type ApiSpecDocHandler struct {
publisher publisher.Publisher
config config.QueueConfig
processor process.UrlProcessor
log logger.Logger
}
func (asdh *ApiSpecDocHandler) Handle(ctx context.Context, delivery rabbitmq.Delivery) rabbitmq.Action {
asdh.log.Infof("consumed: %v", string(delivery.Body))
//call process here
var req dto.UrlRequest
err := json.Unmarshal(delivery.Body, &req)
if err != nil {
asdh.log.Errorf("error unmarshalling message: '%v', err: %s", string(delivery.Body), err)
if req.IsNotifyUser {
procErr := dto.NewProcessingError(
"internal unmarshalling problem occurred; probably incompatible model versions", err.Error())
err = asdh.publish(&delivery, dto.NewUserNotification(&procErr), asdh.config.NotificationQueue)
if err != nil {
asdh.log.Error("error while notifying user")
}
}
return rabbitmq.NackDiscard
}
//here processing of the request happens...
asd, err := asdh.processor.Process(ctx, req.FileUrl)
if err != nil {
asdh.log.Error("error while processing url: ", err)
if req.IsNotifyUser {
procErr := dto.NewProcessingError("error while processing url", err.Error())
err = asdh.publish(&delivery, dto.NewUserNotification(&procErr), asdh.config.NotificationQueue)
if err != nil {
asdh.log.Error("error while notifying user")
}
}
return rabbitmq.NackDiscard
}
//publish to the required queue success or error
result := dto.ScrapingResult{IsNotifyUser: req.IsNotifyUser, ApiSpecDoc: asd}
err = asdh.publish(&delivery, result, asdh.config.ScrapingResultQueue)
if err != nil {
asdh.log.Error("error while publishing: ", err)
//Here is some error while publishing happened - probably something wrong with the queue
return rabbitmq.NackDiscard
}
if req.IsNotifyUser {
err = asdh.publish(&delivery, dto.NewUserNotification(nil), asdh.config.NotificationQueue)
if err != nil {
asdh.log.Error("error while notifying user")
//don't discard this message because it was published to the storage service successfully
}
}
asdh.log.Info("url scraped successfully")
return rabbitmq.Ack
}
func (asdh *ApiSpecDocHandler) publish(delivery *rabbitmq.Delivery, message any, queue string) error {
content, err := json.Marshal(message)
if err != nil {
asdh.log.Info("error while marshalling: ", err)
return err
}
return asdh.publisher.Publish(content,
[]string{queue},
rabbitmq.WithPublishOptionsCorrelationID(delivery.CorrelationId),
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsPersistentDelivery,
)
}
func NewApiSpecDocHandler(publisher publisher.Publisher,
config config.QueueConfig,
processor process.UrlProcessor,
log logger.Logger) Handler {
return &ApiSpecDocHandler{
publisher: publisher,
config: config,
processor: processor,
log: log,
}
}