-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
eventdata_repository.go
74 lines (63 loc) · 1.74 KB
/
eventdata_repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package bigquery
import (
"context"
"fmt"
"time"
"cloud.google.com/go/bigquery"
"github.com/italolelis/watchops/internal/app/provider"
)
const (
datasetID = "watchops"
tableID = "events_raw"
)
type (
// EventDataWriter implement writer interface.
EventDataWriter struct {
client *bigquery.Client
}
bqEvent struct {
EventType string
ID string
Metadata string
TimeCreated time.Time
Signature string
MsgID string
Source string
}
)
func (e *bqEvent) Save() (map[string]bigquery.Value, string, error) {
return map[string]bigquery.Value{
"event_type": e.EventType,
"id": e.ID,
"metadata": e.Metadata,
"time_created": e.TimeCreated,
"signature": e.Signature,
"msg_id": e.MsgID,
"source": e.Source,
}, bigquery.NoDedupeID, nil
}
// NewEventDataWriter creates a new instance of EventDataWriter.
func NewEventDataWriter(ctx context.Context, projectID string) (*EventDataWriter, error) {
client, err := bigquery.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("could not create data operations client: %w", err)
}
return &EventDataWriter{client: client}, nil
}
// Add adds event data coming from webhooks.
func (w *EventDataWriter) Add(ctx context.Context, eventData provider.Event) error {
inserter := w.client.Dataset(datasetID).Table(tableID).Inserter()
return inserter.Put(ctx, &bqEvent{
EventType: eventData.EventType,
ID: eventData.ID,
Metadata: string(eventData.Metadata),
TimeCreated: eventData.TimeCreated,
Signature: eventData.Signature,
MsgID: eventData.MsgID,
Source: eventData.Source,
})
}
// Closes the connection to the database.
func (w *EventDataWriter) Close() error {
return w.client.Close()
}