/
envelope.go
139 lines (124 loc) · 4.95 KB
/
envelope.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
// from https://github.com/dapr/components-contrib/blob/master/pubsub/envelope_test.go
package protocol
import (
"fmt"
jsoniter "github.com/json-iterator/go"
"github.com/nats-io/nuid"
"log"
)
var uidGen = nuid.New()
const (
// DefaultCloudEventType is the default event type for an pulse published event
DefaultCloudEventType = "com.pulse.event.sent"
// CloudEventsSpecVersion is the specversion used by pulse for the cloud events implementation
CloudEventsSpecVersion = "1.0"
//ContentType is the Cloud Events HTTP content type
ContentType = "application/cloudevents+json"
// DefaultCloudEventSource is the default event source
DefaultCloudEventSource = "pulse"
// DefaultCloudEventWebhook
DefaultCloudEventWebhook = ""
)
// CloudEventsEnvelope describes the Dapr implementation of the Cloud Events spec
// Spec details: https://github.com/cloudevents/spec/blob/master/spec.md
type CloudEventsEnvelope struct {
ID string `json:"id"`
Source string `json:"source"`
Type string `json:"type"`
SpecVersion string `json:"specversion"`
// See DataContentType https://tools.ietf.org/html/rfc2046
DataContentType string `json:"datacontenttype"`
Data []byte `json:"data"`
Topic string `json:"topic"`
WebhookURL string `json:"webhookUrl"`
OrderingKey string `json:"orderingKey"` // for test, order id, would be ordered consume by the consumer.
// Where the protocol from and to. what gob is the protocol have. when and why have this protocol.
// Logic is represents the fields that don't need initialize by the protocol producer.
Size int
// DeliveryAttempt is the number of times a protocol has been delivered.
// This is part of the dead lettering feature that forwards messages that
// fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
// If dead lettering is enabled, this will be set on all attempts, starting
// with value 1. Otherwise, the value will be nil.
// This field is read-only.
DeliveryAttempt *int
calledDone bool
DoneFunc func(string, bool) `json:"-"`
}
// NewSimpleByteMessage -
func NewSimpleByteMessage(data []byte) *CloudEventsEnvelope {
e, err := NewCloudEventsEnvelope("", "", "", "", "", "", "", data)
if err != nil {
log.Println(err)
}
return e
}
// NewCloudEventsEnvelope returns CloudEventsEnvelope from data or a new one when data content was not
func NewCloudEventsEnvelope(id, source, datacontentType, eventType, topic, webhook, orderingKey string, data []byte) (*CloudEventsEnvelope, error) {
// defaults
if id == "" {
id = uidGen.Next()
}
if source == "" {
source = DefaultCloudEventSource
}
if eventType == "" {
eventType = DefaultCloudEventType
}
if webhook == "" {
webhook = DefaultCloudEventWebhook
}
e := &CloudEventsEnvelope{
ID: id,
SpecVersion: CloudEventsSpecVersion,
DataContentType: datacontentType,
Source: source,
Type: eventType,
Topic: topic,
Data: data,
WebhookURL: webhook,
OrderingKey: orderingKey,
}
// Use a PublishRequest with only the Messages field to calculate the size
// of an individual protocol. This accurately calculates the size of the
// encoded proto protocol by accounting for the length of an individual
// PubSubMessage and Data/Attributes field.
// TODO(hongalex): if this turns out to take significant time, try to approximate it.
b, err := jsoniter.ConfigFastest.Marshal(e)
if err != nil {
return nil, err
}
e.Size = len(b)
return e, nil
}
// hint: now protocol string just print the event
func (m *CloudEventsEnvelope) String() string {
return fmt.Sprintf("Id: %s Data: %s OrderingKey: %s DeliveryAttempt: %d calledDone: %v DoneFunc: %T size: %d", m.ID, m.Data, m.OrderingKey, m.DeliveryAttempt, m.calledDone, m.DoneFunc, m.Size)
}
// Ack indicates successful processing of a Message passed to the Subscriber.Receive callback.
// It should not be called on any other Message value.
// If protocol acknowledgement fails, the Message will be redelivered.
// Client code must call Ack or Nack when finished for each received Message.
// Calls to Ack or Nack have no effect after the first call.
func (m *CloudEventsEnvelope) Ack() {
m.done(true)
}
// Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback.
// It should not be called on any other Message value.
// Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
// Client code must call Ack or Nack when finished for each received Message.
// Calls to Ack or Nack have no effect after the first call.
func (m *CloudEventsEnvelope) Nack() {
m.done(false)
}
func (m *CloudEventsEnvelope) done(ack bool) {
if m.calledDone {
return
}
m.calledDone = true
m.DoneFunc(m.ID, ack)
}