Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Ringbuffer with key and export some api for get messages from exchange_client #752

Merged
merged 11 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions include/nng/exchange/exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define EXCHANGE_H

#include <stddef.h>
#include "core/nng_impl.h"
#include "nng/supplemental/nanolib/ringbuffer.h"

#define EXCHANGE_NAME_LEN 100
Expand All @@ -17,11 +18,11 @@ struct exchange_s {
unsigned int rb_count;
};

int exchange_init(exchange_t **ex, char *name, char *topic,
unsigned int *rbsCaps, char **rbsName, unsigned int rbsCount);
int exchange_add_rb(exchange_t *ex, ringBuffer_t *rb);
int exchange_release(exchange_t *ex);
int exchange_handle_msg(exchange_t *ex, void *msg);
int exchange_get_ringBuffer(exchange_t *ex, char *rbName, ringBuffer_t **rb);
NNG_DECL int exchange_init(exchange_t **ex, char *name, char *topic,
unsigned int *rbsCaps, char **rbsName, unsigned int rbsCount);
NNG_DECL int exchange_add_rb(exchange_t *ex, ringBuffer_t *rb);
NNG_DECL int exchange_release(exchange_t *ex);
NNG_DECL int exchange_handle_msg(exchange_t *ex, int key, void *msg);
NNG_DECL int exchange_get_ringBuffer(exchange_t *ex, char *rbName, ringBuffer_t **rb);

#endif
2 changes: 2 additions & 0 deletions include/nng/exchange/exchange_client.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef EXCHANGE_CLIENT_H
#define EXCHANGE_CLIENT_H

#include "core/nng_impl.h"
#include "nng/exchange/exchange.h"
#define nng_exchange_self 0
#define nng_exchange_self_name "exchange-client"
Expand All @@ -14,6 +15,7 @@
#define NNG_EXCHANGE_PEER_NAME "exchange-server"
#define NNG_OPT_EXCHANGE_ADD "exchange-client-add"
#define NNG_OPT_EXCHANGE_GET_EX_QUEUE "exchange-client-get-ex-queue"
#define NNG_OPT_EXCHANGE_GET_RBMSGMAP "exchange-client-get-rbmsgmap"

int nng_exchange_client_open(nng_socket *sock);

Expand Down
3 changes: 1 addition & 2 deletions include/nng/supplemental/nanolib/conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "rule.h"
#include "acl_conf.h"
#include "nng/supplemental/util/platform.h"
#include "nng/exchange/exchange.h"

#define PID_PATH_NAME "/tmp/nanomq/nanomq.pid"
#define CONF_PATH_NAME "/etc/nanomq.conf"
Expand Down Expand Up @@ -305,7 +304,7 @@ typedef struct conf_exchange_client_node conf_exchange_client_node;
struct conf_exchange_client_node {
void *sock;
size_t exchange_count;
exchange_t **ex_list;
void **ex_list;
nng_mtx *mtx;
};

Expand Down
15 changes: 14 additions & 1 deletion include/nng/supplemental/nanolib/ringbuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define RINGBUFFER_H
#include <stdio.h>
#include <stdlib.h>
#include "core/nng_impl.h"

#define RBNAME_LEN 100
#define RINGBUFFER_MAX_SIZE 0xffff
Expand All @@ -16,8 +17,16 @@
typedef struct ringBuffer_s ringBuffer_t;
typedef struct ringBufferMsg_s ringBufferMsg_t;
typedef struct ringBufferRule_s ringBufferRule_t;
typedef struct ringBuffer_msgs_s ringBuffer_msgs_t;

struct ringBuffer_msgs_s {
int key;
nni_msg *msg;
nni_list_node node;
};

struct ringBufferMsg_s {
int key;
void *data;
/* TTL of each message */
unsigned long long expiredAt;
Expand All @@ -42,7 +51,7 @@ struct ringBuffer_s {
ringBufferRule_t *deqinRuleList[RBRULELIST_MAX_SIZE];
ringBufferRule_t *deqoutRuleList[RBRULELIST_MAX_SIZE];

/* TODO: LOCK */
nni_mtx lock;

ringBufferMsg_t *msgs;
};
Expand Down Expand Up @@ -70,8 +79,12 @@ int ringBuffer_init(ringBuffer_t **rb,
unsigned int overWrite,
unsigned long long expiredAt);
int ringBuffer_enqueue(ringBuffer_t *rb,
int key,
void *data,
unsigned long long expiredAt);
int ringBuffer_dequeue(ringBuffer_t *rb, void **data);
int ringBuffer_release(ringBuffer_t *rb);

int ringBuffer_search_msg_by_key(ringBuffer_t *rb, int key, nni_msg **msg);
int ringBuffer_search_msgs_by_key(ringBuffer_t *rb, int key, int count, nni_list **list);
#endif
6 changes: 3 additions & 3 deletions src/mqtt/protocol/exchange/exchange.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include "nng/exchange/exchange.h"
#include "core/nng_impl.h"
#include "nng/exchange/exchange.h"

int
exchange_init(exchange_t **ex, char *name, char *topic,
Expand Down Expand Up @@ -112,7 +112,7 @@ exchange_release(exchange_t *ex)
}

int
exchange_handle_msg(exchange_t *ex, void *msg)
exchange_handle_msg(exchange_t *ex, int key, void *msg)
{
unsigned int i = 0;
int ret = 0;
Expand All @@ -122,7 +122,7 @@ exchange_handle_msg(exchange_t *ex, void *msg)
}

for (i = 0; i < ex->rb_count; i++) {
ret = ringBuffer_enqueue(ex->rbs[i], msg, -1);
ret = ringBuffer_enqueue(ex->rbs[i], key, msg, -1);
if (ret != 0) {
log_error("Ring Buffer enqueue failed\n");
return -1;
Expand Down
Loading
Loading