-
Notifications
You must be signed in to change notification settings - Fork 0
/
provider.go
114 lines (92 loc) · 2.75 KB
/
provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package provider
import (
"cloud.google.com/go/bigquery"
"context"
"github.com/napptive/analytics/pkg/config"
"github.com/napptive/analytics/pkg/entities"
"github.com/napptive/nerrors/pkg/nerrors"
"github.com/rs/zerolog/log"
"google.golang.org/api/option"
"sync"
"time"
)
const databaseTimeout = time.Second * 10
// Provider with an interface that defines the monitoring provider methods
type Provider interface {
// Send inserts the data in the database
Send(data entities.Operation) error
// Flush flush the data
Flush() error
}
// BigQueryProvider with a provider that stores the operation information in bigquery
type BigQueryProvider struct {
// Client with the client to connect to bigquery
Client *bigquery.Client
// BigQueryConfig with all the configuration (schema, table, etc.)
config.BigQueryConfig
// Cache with a cache of operation data
// in this cache we store the operations to avoid send msg by msg
Cache []entities.Operation
// Mutex for managing login cache access.
sync.Mutex
}
// NewBigQueryProvider
func NewBigQueryProvider(cfg config.BigQueryConfig) (Provider, error) {
// validate config
if err := cfg.IsValid(); err != nil {
return nil, err
}
client, err := bigquery.NewClient(context.Background(), cfg.ProjectID,
option.WithCredentialsFile(cfg.CredentialsPath))
if err != nil {
return nil, err
}
provider := &BigQueryProvider{
Client: client,
BigQueryConfig: cfg,
Cache: []entities.Operation{},
Mutex: sync.Mutex{},
}
// start the loop
go provider.LaunchSendingLoop()
return provider, nil
}
// Send stores the operation information in a cache
func (bq *BigQueryProvider) Send(operation entities.Operation) error {
bq.Lock()
defer bq.Unlock()
// store the login in the cache
bq.Cache = append(bq.Cache, operation)
return nil
}
func (bq *BigQueryProvider) Flush() error {
return bq.SendCacheOperation()
}
// LaunchSendingLoop launch a loop to insert the cache data in the database
func (bq *BigQueryProvider) LaunchSendingLoop () {
for range time.Tick(bq.SendingTime) {
if err := bq.SendCacheOperation();
err != nil{
log.Error().Str("error", err.Error()).Str("trace", nerrors.FromError(err).StackTraceToString()).
Msg("error sending operation data")
}
}
}
// SendCacheOperation inserts all the operations in the database
func (bq *BigQueryProvider) SendCacheOperation() error {
bq.Lock()
toSend := bq.Cache
bq.Cache = []entities.Operation{}
bq.Unlock()
if len(toSend) == 0 {
return nil
}
i := bq.Client.Dataset(bq.Schema).Table(bq.Table).Inserter()
ctx, cancel := context.WithTimeout(context.Background(), databaseTimeout)
defer cancel()
err := i.Put(ctx, toSend)
if err != nil {
return nerrors.NewInternalErrorFrom(err, "error sending operation")
}
return nil
}