-
Notifications
You must be signed in to change notification settings - Fork 127
/
conn.h
151 lines (106 loc) · 5.19 KB
/
conn.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Copyright 2015-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef CONN_H_
#define CONN_H_
#include "natsp.h"
#define RESP_INFO_POOL_MAX_SIZE (10)
#ifdef DEV_MODE
// For type safety
void natsConn_Lock(natsConnection *nc);
void natsConn_Unlock(natsConnection *nc);
#else
// We know what we are doing :-)
#define natsConn_Lock(c) (natsMutex_Lock((c)->mu))
#define natsConn_Unlock(c) (natsMutex_Unlock((c)->mu))
#endif // DEV_MODE
#define SET_WRITE_DEADLINE(nc) if ((nc)->opts->writeDeadline > 0) natsDeadline_Init(&(nc)->sockCtx.writeDeadline, (nc)->opts->writeDeadline)
natsStatus
natsConn_create(natsConnection **newConn, natsOptions *options);
void
natsConn_retain(natsConnection *nc);
void
natsConn_release(natsConnection *nc);
natsStatus
natsConn_bufferWrite(natsConnection *nc, const char *buffer, int len);
natsStatus
natsConn_bufferFlush(natsConnection *nc);
bool
natsConn_isClosed(natsConnection *nc);
bool
natsConn_isReconnecting(natsConnection *nc);
natsStatus
natsConn_flushOrKickFlusher(natsConnection *nc);
natsStatus
natsConn_processMsg(natsConnection *nc, char *buf, int bufLen);
void
natsConn_processOK(natsConnection *nc);
void
natsConn_processErr(natsConnection *nc, char *buf, int bufLen);
void
natsConn_processPing(natsConnection *nc);
void
natsConn_processPong(natsConnection *nc);
#define natsConn_subscribeNoPool(sub, nc, subj, cb, closure) natsConn_subscribeImpl((sub), (nc), true, (subj), NULL, 0, (cb), (closure), true, NULL)
#define natsConn_subscribeNoPoolNoLock(sub, nc, subj, cb, closure) natsConn_subscribeImpl((sub), (nc), false, (subj), NULL, 0, (cb), (closure), true, NULL)
#define natsConn_subscribeSyncNoPool(sub, nc, subj) natsConn_subscribeNoPool((sub), (nc), (subj), NULL, NULL)
#define natsConn_subscribeWithTimeout(sub, nc, subj, timeout, cb, closure) natsConn_subscribeImpl((sub), (nc), true, (subj), NULL, (timeout), (cb), (closure), false, NULL)
#define natsConn_subscribe(sub, nc, subj, cb, closure) natsConn_subscribeWithTimeout((sub), (nc), (subj), 0, (cb), (closure))
#define natsConn_subscribeSync(sub, nc, subj) natsConn_subscribe((sub), (nc), (subj), NULL, NULL)
#define natsConn_queueSubscribeWithTimeout(sub, nc, subj, queue, timeout, cb, closure) natsConn_subscribeImpl((sub), (nc), true, (subj), (queue), (timeout), (cb), (closure), false, NULL)
#define natsConn_queueSubscribe(sub, nc, subj, queue, cb, closure) natsConn_queueSubscribeWithTimeout((sub), (nc), (subj), (queue), 0, (cb), (closure))
#define natsConn_queueSubscribeSync(sub, nc, subj, queue) natsConn_queueSubscribe((sub), (nc), (subj), (queue), NULL, NULL)
natsStatus
natsConn_subscribeImpl(natsSubscription **newSub,
natsConnection *nc, bool lock, const char *subj, const char *queue,
int64_t timeout, natsMsgHandler cb, void *cbClosure,
bool preventUseOfLibDlvPool, jsSub *jsi);
natsStatus
natsConn_unsubscribe(natsConnection *nc, natsSubscription *sub, int max, bool drainMode, int64_t timeout);
natsStatus
natsConn_enqueueUnsubProto(natsConnection *nc, int64_t sid);
natsStatus
natsConn_drainSub(natsConnection *nc, natsSubscription *sub, bool checkConnDrainStatus);
bool
natsConn_isDraining(natsConnection *nc);
bool
natsConn_isDrainingPubs(natsConnection *nc);
void
natsConn_removeSubscription(natsConnection *nc, natsSubscription *sub);
void
natsConn_processAsyncINFO(natsConnection *nc, char *buf, int len);
natsStatus
natsConn_addRespInfo(respInfo **newResp, natsConnection *nc, char *respInbox, int respInboxSize);
void
natsConn_disposeRespInfo(natsConnection *nc, respInfo *resp, bool needsLock);
natsStatus
natsConn_initResp(natsConnection *nc, natsMsgHandler cb);
void
natsConn_destroyRespPool(natsConnection *nc);
natsStatus
natsConn_publish(natsConnection *nc, natsMsg *msg, const char *reply, bool directFlush);
natsStatus
natsConn_userFromFile(char **userJWT, char **customErrTxt, void *closure);
natsStatus
natsConn_signatureHandler(char **customErrTxt, unsigned char **sig, int *sigLen, const char *nonce, void *closure);
natsStatus
natsConn_sendSubProto(natsConnection *nc, const char *subject, const char *queue, int64_t sid);
natsStatus
natsConn_sendUnsubProto(natsConnection *nc, int64_t subId, int max);
#define natsConn_setFilter(c, f) natsConn_setFilterWithClosure((c), (f), NULL)
void
natsConn_setFilterWithClosure(natsConnection *nc, natsMsgFilter f, void* closure);
void
natsConn_close(natsConnection *nc);
void
natsConn_destroy(natsConnection *nc, bool fromPublicDestroy);
#endif /* CONN_H_ */