Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] JetStream: HeadersOnly in consumer configuration #476

Merged
merged 1 commit into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1451,6 +1451,8 @@ _marshalConsumerCreateReq(natsBuffer **new_buf, const char *stream, jsConsumerCo
s = natsBuf_Append(buf, ",\"flow_control\":true", -1);
if ((s == NATS_OK) && (cfg->Heartbeat > 0))
s = nats_marshalLong(buf, true, "idle_heartbeat", cfg->Heartbeat);
if ((s == NATS_OK) && cfg->HeadersOnly)
s = natsBuf_Append(buf, ",\"headers_only\":true", -1);
IFOK(s, natsBuf_Append(buf, "}}", -1));

if (s == NATS_OK)
Expand Down Expand Up @@ -1584,6 +1586,7 @@ _unmarshalConsumerConfig(nats_JSON *json, const char *fieldName, jsConsumerConfi
IFOK(s, nats_JSONGetLong(cjson, "max_ack_pending", &(cc->MaxAckPending)));
IFOK(s, nats_JSONGetBool(cjson, "flow_control", &(cc->FlowControl)));
IFOK(s, nats_JSONGetLong(cjson, "idle_heartbeat", &(cc->Heartbeat)));
IFOK(s, nats_JSONGetBool(cjson, "headers_only", &(cc->HeadersOnly)));
}

if (s == NATS_OK)
Expand Down
16 changes: 16 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ extern "C" {
*/
#define NATS_DEFAULT_URL "nats://localhost:4222"

/** \brief Message header for JetStream messages representing the message payload size
*
* When creating a JetStream consumer, if the `HeadersOnly` boolean is specified,
* the subscription will receive messages with headers only (no message payload),
* and a header of this name containing the size of the message payload that was
* omitted.
*
* @see jsConsumerConfig
*/
#define JS_MSG_SIZE "Nats-Msg-Size"

//
// Types.
//
Expand Down Expand Up @@ -529,6 +540,10 @@ typedef struct jsStreamInfo
*
* \note `Durable` cannot contain the character ".".
*
* \note `HeadersOnly` means that the subscription will not receive any message payload,
* instead, it will receive only messages headers (if present) with the addition of
* the header #JS_MSG_SIZE ("Nats-Msg-Size"), whose value is the payload size.
*
* @see jsConsumerConfig_Init
*
* \code{.unparsed}
Expand Down Expand Up @@ -563,6 +578,7 @@ typedef struct jsConsumerConfig
int64_t MaxAckPending;
bool FlowControl;
int64_t Heartbeat; ///< Heartbeat interval expressed in number of nanoseconds.
bool HeadersOnly;

} jsConsumerConfig;

Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ JetStreamSubscribeConfigCheck
JetStreamSubscribeIdleHeartbeat
JetStreamSubscribeFlowControl
JetStreamSubscribePull
JetStreamSubscribeHeadersOnly
JetStreamOrderedCons
JetStreamOrderedConsWithErrors
StanPBufAllocator
Expand Down
82 changes: 82 additions & 0 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -21164,6 +21164,7 @@ test_JetStreamUnmarshalConsumerInfo(void)
"{\"config\":{\"replay_policy\":\"" jsReplayInstantStr "\"}}",
"{\"config\":{\"replay_policy\":\"" jsReplayOriginalStr "\"}}",
"{\"config\":{\"deliver_group\":\"queue_name\"}}",
"{\"config\":{\"headers_only\":true}}",
};
const char *bad[] = {
"{\"stream_name\":123}",
Expand All @@ -21188,6 +21189,7 @@ test_JetStreamUnmarshalConsumerInfo(void)
"{\"config\":{\"flow_control\":\"abc\"}}",
"{\"config\":{\"idle_heartbeat\":\"abc\"}}",
"{\"config\":{\"deliver_group\":123}}",
"{\"config\":{\"headers_only\":123}}",
"{\"delivered\":123}",
"{\"delivered\":{\"consumer_seq\":\"abc\"}}",
"{\"delivered\":{\"stream_seq\":\"abc\"}}",
Expand Down Expand Up @@ -25454,6 +25456,85 @@ test_JetStreamSubscribePull(void)
rmtree(datastore);
}

static void
test_JetStreamSubscribeHeadersOnly(void)
{
natsStatus s;
natsConnection *nc = NULL;
natsSubscription *sub= NULL;
jsCtx *js = NULL;
natsMsg *msg = NULL;
natsPid pid = NATS_INVALID_PID;
jsErrCode jerr= 0;
char datastore[256] = {'\0'};
char cmdLine[1024] = {'\0'};
jsStreamConfig sc;
jsSubOptions so;
int i;

ENSURE_JS_VERSION(2, 6, 2);

_makeUniqueDir(datastore, sizeof(datastore), "datastore_");

test("Start JS server: ");
snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s", datastore);
pid = _startServer("nats://127.0.0.1:4222", cmdLine, true);
CHECK_SERVER_STARTED(pid);
testCond(true);

test("Connect: ");
s = natsConnection_ConnectTo(&nc, NATS_DEFAULT_URL);
testCond(s == NATS_OK);

test("Get context: ");
s = natsConnection_JetStream(&js, nc, NULL);
testCond(s == NATS_OK);

test("Create stream: ");
jsStreamConfig_Init(&sc);
sc.Name = "S";
s = js_AddStream(NULL, js, &sc, NULL, &jerr);
testCond((s == NATS_OK) && (jerr == 0));

test("Populate: ");
for (i=0; (s == NATS_OK) && (i<10); i++)
{
s = natsMsg_Create(&msg, "S", NULL, "hello", 5);
IFOK(s, natsMsgHeader_Set(msg, "User-Header", "MyValue"));
IFOK(s, js_PublishMsg(NULL, js, msg, NULL, NULL));
natsMsg_Destroy(msg);
msg = NULL;
}
testCond(s == NATS_OK);

test("Create consumer with headers only: ");
jsSubOptions_Init(&so);
so.Config.HeadersOnly = true;
s = js_SubscribeSync(&sub, js, "S", NULL, &so, &jerr);
testCond((s == NATS_OK) && (sub != NULL) && (jerr == 0));

test("Verify only headers: ");
for (i=0; (s == NATS_OK) && (i<10); i++)
{
const char *val = NULL;
s = natsSubscription_NextMsg(&msg, sub, 1000);
IFOK(s, (natsMsg_GetDataLength(msg) == 0 ? NATS_OK : NATS_ERR));
IFOK(s, natsMsgHeader_Get(msg, "User-Header", &val));
IFOK(s, (strcmp(val, "MyValue") == 0 ? NATS_OK : NATS_ERR));
IFOK(s, natsMsgHeader_Get(msg, JS_MSG_SIZE, &val));
IFOK(s, (strcmp(val, "5") == 0 ? NATS_OK : NATS_ERR));
natsMsg_Destroy(msg);
msg = NULL;
}
testCond(s == NATS_OK);

natsSubscription_Destroy(sub);
jsCtx_Destroy(js);
natsConnection_Destroy(nc);
_stopServer(pid);
rmtree(datastore);
}

static void
_orderedConsCB(natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure)
{
Expand Down Expand Up @@ -28400,6 +28481,7 @@ static testInfo allTests[] =
{"JetStreamSubscribeIdleHeartbeat", test_JetStreamSubscribeIdleHearbeat},
{"JetStreamSubscribeFlowControl", test_JetStreamSubscribeFlowControl},
{"JetStreamSubscribePull", test_JetStreamSubscribePull},
{"JetStreamSubscribeHeadersOnly", test_JetStreamSubscribeHeadersOnly},
{"JetStreamOrderedCons", test_JetStreamOrderedConsumer},
{"JetStreamOrderedConsWithErrors", test_JetStreamOrderedConsumerWithErrors},

Expand Down