/
event.go
107 lines (82 loc) · 2.68 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package events
import (
"encoding/json"
"fmt"
"time"
"github.com/initialed85/uneventful/internal/helpers"
"github.com/jackc/pgtype"
"github.com/segmentio/ksuid"
)
type Event struct {
EventID ksuid.KSUID `json:"event_id"`
CorrelationID ksuid.KSUID `json:"correlation_id"`
Timestamp time.Time `json:"timestamp"`
SourceName string `json:"source_name"`
SourceID ksuid.KSUID `json:"source_uuid"`
TypeName string `json:"type_name"`
Data json.RawMessage `json:"data"`
}
func ToJSON(e *Event) ([]byte, error) {
return json.Marshal(&e)
}
func (e *Event) String() string {
return fmt.Sprintf("Event{id=%s, type=%#+v, size=%vB}", e.EventID, e.TypeName, len(e.Data))
}
func FromJSON(data []byte) (*Event, error) {
e := Event{}
err := json.Unmarshal(data, &e)
return &e, err
}
func NewWithCorrelation(correlationID ksuid.KSUID, typeName string, data json.RawMessage) *Event {
e := Event{EventID: ksuid.New(), CorrelationID: correlationID, TypeName: typeName, Data: data, Timestamp: helpers.GetNow()}
return &e
}
func NewWithoutCorrelation(typeName string, data json.RawMessage) *Event {
e := Event{EventID: ksuid.New(), Timestamp: helpers.GetNow(), TypeName: typeName, Data: data}
return &e
}
func (e *Event) SetSource(name string, id ksuid.KSUID) {
e.SourceName = name
e.SourceID = id
}
func (e *Event) ToJSON() ([]byte, error) {
return ToJSON(e)
}
func (e *Event) FromJSON(data []byte) error {
event, err := FromJSON(data)
if err != nil {
return err
}
e.EventID = event.EventID
e.CorrelationID = event.CorrelationID
e.Timestamp = event.Timestamp
e.SourceName = event.SourceName
e.SourceID = event.SourceID
e.TypeName = event.TypeName
e.Data = event.Data
return nil
}
func (e *Event) ToConsumedDatabaseEvent() (*DatabaseEvent, error) {
jsonData, err := e.Data.MarshalJSON()
if err != nil {
return nil, err
}
jsonbData := pgtype.JSONB{}
err = jsonbData.Scan(jsonData)
if err != nil {
return nil, err
}
return &DatabaseEvent{EventID: e.EventID.String(), CorrelationID: e.CorrelationID.String(), Timestamp: e.Timestamp, SourceName: e.SourceName, SourceID: e.SourceID.String(), TypeName: e.TypeName, Data: jsonbData, IsHandled: false}, nil
}
func (e *Event) ToDatabaseEvent() (*DatabaseEvent, error) {
jsonData, err := e.Data.MarshalJSON()
if err != nil {
return nil, err
}
jsonbData := pgtype.JSONB{}
err = jsonbData.Scan(jsonData)
if err != nil {
return nil, err
}
return &DatabaseEvent{EventID: e.EventID.String(), CorrelationID: e.CorrelationID.String(), Timestamp: e.Timestamp, SourceName: e.SourceName, SourceID: e.SourceID.String(), TypeName: e.TypeName, Data: jsonbData, IsHandled: false}, nil
}