/
api.go
134 lines (119 loc) · 4.81 KB
/
api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package streams
import (
"context"
"errors"
)
var (
// DefaultHub is the `streams` base instance used by the `streams` simple API. Recommended to use where only one
// hub instance is required.
//
// This variable MUST be allocated manually.
DefaultHub *Hub
// ErrNilDefaultHub DefaultHub has not been initialized.
ErrNilDefaultHub = errors.New("streams: DefaultHub has not been initialized")
)
func checkDefaultHubInstance() {
if DefaultHub == nil {
panic(ErrNilDefaultHub)
}
}
// RegisterStream creates a relation between a stream message type and metadata.
//
// If registering a Google's Protocol Buffer message, DO NOT use a pointer as message schema
// to avoid marshaling problems
func RegisterStream(message interface{}, metadata StreamMetadata) {
checkDefaultHubInstance()
DefaultHub.RegisterStream(message, metadata)
}
// RegisterStreamByString creates a relation between a string key and metadata.
func RegisterStreamByString(messageType string, metadata StreamMetadata) {
checkDefaultHubInstance()
DefaultHub.RegisterStreamByString(messageType, metadata)
}
// Read registers a new stream-reading background job.
//
// If reading from a Google's Protocol Buffer message pipeline, DO NOT use a pointer as message schema
// to avoid marshaling problems
func Read(message interface{}, opts ...ReaderNodeOption) error {
checkDefaultHubInstance()
return DefaultHub.Read(message, opts...)
}
// ReadByStreamKey registers a new stream-reading background job using the raw stream identifier (e.g. topic name).
func ReadByStreamKey(stream string, opts ...ReaderNodeOption) {
checkDefaultHubInstance()
DefaultHub.ReadByStreamKey(stream, opts...)
}
// Start initiates all daemons (e.g. stream-reading jobs) processes
func Start(ctx context.Context) {
checkDefaultHubInstance()
DefaultHub.Start(ctx)
}
// Write inserts a message into a stream assigned to the message in the StreamRegistry in order to propagate the
// data to a set of subscribed systems for further processing.
//
// Uses given context to inject correlation and causation IDs.
func Write(ctx context.Context, message interface{}) error {
checkDefaultHubInstance()
return DefaultHub.Write(ctx, message)
}
// WriteBatch inserts a set of messages into a stream assigned on the StreamRegistry in order to propagate the
// data to a set of subscribed systems for further processing.
//
// Uses given context to inject correlation and causation IDs.
//
// If an item from the batch fails, other items will fail too
func WriteBatch(ctx context.Context, messages ...interface{}) (uint32, error) {
checkDefaultHubInstance()
return DefaultHub.WriteBatch(ctx, messages...)
}
// WriteByMessageKey inserts a message into a stream using the custom message key from StreamRegistry in order to
// propagate the data to a set of subscribed systems for further processing.
//
// Uses given context to inject correlation and causation IDs.
func WriteByMessageKey(ctx context.Context, messageKey string, message interface{}) error {
checkDefaultHubInstance()
return DefaultHub.WriteByMessageKey(ctx, messageKey, message)
}
// WriteByMessageKeyBatch inserts a set of messages into a stream using the custom message key from StreamRegistry in order to
// propagate the data to a set of subscribed systems for further processing.
//
// Uses given context to inject correlation and causation IDs.
//
// If an item from the batch fails, other items will fail too
func WriteByMessageKeyBatch(ctx context.Context, items WriteByMessageKeyBatchItems) (uint32, error) {
checkDefaultHubInstance()
return DefaultHub.WriteByMessageKeyBatch(ctx, items)
}
// WriteRawMessage inserts a raw transport message into a stream in order to propagate the data to a set
// of subscribed systems for further processing.
//
// Uses given context to inject correlation and causation IDs.
func WriteRawMessage(ctx context.Context, message Message) error {
checkDefaultHubInstance()
return DefaultHub.WriteRawMessage(ctx, message)
}
// WriteRawMessageBatch inserts a set of raw transport message into a stream in order to propagate the data to a set
// of subscribed systems for further processing.
//
// Uses given context to inject correlation and causation IDs.
//
// The whole batch will be passed to the underlying Writer driver implementation as every driver has its own way to
// deal with batches
func WriteRawMessageBatch(ctx context.Context, messages ...Message) (uint32, error) {
checkDefaultHubInstance()
return DefaultHub.WriteRawMessageBatch(ctx, messages...)
}
// GetStreamReaderNodes retrieves ReaderNode(s) from a stream.
func GetStreamReaderNodes(stream string) []ReaderNode {
checkDefaultHubInstance()
list := DefaultHub.GetStreamReaderNodes(stream)
if list == nil {
return nil
}
res := make([]ReaderNode, 0, list.Size())
for _, item := range list.Values() {
node := item.(ReaderNode)
res = append(res, node)
}
return res
}