Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] JetStream: js_GetMsg and js_GetLastMsg #481

Merged
merged 1 commit into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ extern const int64_t jsDefaultRequestWait;
// jsApiMsgDeleteT is the endpoint to remove a message.
#define jsApiMsgDeleteT "%.*s.STREAM.MSG.DELETE.%s"

// jsApiMsgGetT is the endpoint to get a message, either by sequence or last per subject.
#define jsApiMsgGetT "%.*s.STREAM.MSG.GET.%s"

// Creates a subject based on the option's prefix, the subject format and its values.
#define js_apiSubj(s, o, f, ...) (nats_asprintf((s), (f), (o)->Prefix, __VA_ARGS__) < 0 ? NATS_NO_MEMORY : NATS_OK)

Expand Down
169 changes: 169 additions & 0 deletions src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,175 @@ js_DeleteStream(jsCtx *js, const char *stream, jsOptions *opts, jsErrCode *errCo
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_decodeBytesLen(nats_JSON *json, const char *field, const char **str, int *strLen, int *decodedLen)
{
natsStatus s = NATS_OK;

s = nats_JSONGetStrPtr(json, field, str);
if ((s == NATS_OK) && (*str != NULL))
s = nats_Base64_DecodeLen(*str, strLen, decodedLen);

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalStoredMsg(nats_JSON *json, natsMsg **new_msg)
{
natsStatus s;
natsMsg *msg = NULL;
const char *subject= NULL;
const char *hdrs = NULL;
const char *data = NULL;
int hdrsl = 0;
int dhdrsl = 0;
int datal = 0;
int ddatal = 0;

s = nats_JSONGetStrPtr(json, "subject", &subject);
IFOK(s, _decodeBytesLen(json, "hdrs", &hdrs, &hdrsl, &dhdrsl));
IFOK(s, _decodeBytesLen(json, "data", &data, &datal, &ddatal));
if ((s == NATS_OK) && (subject != NULL))
{
s = natsMsg_create(&msg, subject, (int) strlen(subject),
NULL, 0, NULL, dhdrsl+ddatal, dhdrsl);
if (s == NATS_OK)
{
if ((hdrs != NULL) && (dhdrsl > 0))
nats_Base64_DecodeInPlace(hdrs, hdrsl, (unsigned char*) msg->hdr);
if ((data != NULL) && (ddatal > 0))
nats_Base64_DecodeInPlace(data, datal, (unsigned char*) msg->data);
}
IFOK(s, nats_JSONGetULong(json, "seq", &(msg->seq)));
IFOK(s, nats_JSONGetTime(json, "time", &(msg->time)));
}
if (s == NATS_OK)
*new_msg = msg;

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_unmarshalGetMsgResp(natsMsg **msg, natsMsg *resp, jsErrCode *errCode)
{
nats_JSON *json = NULL;
jsApiResponse ar;
natsStatus s;

s = js_unmarshalResponse(&ar, &json, resp);
if (s != NATS_OK)
return NATS_UPDATE_ERR_STACK(s);

if (js_apiResponseIsErr(&ar))
{
if (errCode != NULL)
*errCode = (int) ar.Error.ErrCode;

// If the error code is JSNoMessageFoundErr then pick NATS_NOT_FOUND.
if (ar.Error.ErrCode == JSNoMessageFoundErr)
s = NATS_NOT_FOUND;
else
s = nats_setError(NATS_ERR, "%s", ar.Error.Description);
}
else
{
nats_JSON *mjson = NULL;

s = nats_JSONGetObject(json, "message", &mjson);
if ((s == NATS_OK) && (mjson == NULL))
s = nats_setError(NATS_NOT_FOUND, "%s", "message content not found");
else
s = _unmarshalStoredMsg(mjson, msg);
}

js_freeApiRespContent(&ar);
nats_JSONDestroy(json);
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_getMsg(natsMsg **msg, jsCtx *js, const char *stream, uint64_t seq, const char *subject, jsOptions *opts, jsErrCode *errCode)
{
natsStatus s = NATS_OK;
char *subj = NULL;
natsMsg *resp = NULL;
natsConnection *nc = NULL;
bool freePfx = false;
jsOptions o;
char buffer[64];
natsBuffer buf;

if ((msg == NULL) || (js == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

if (nats_IsStringEmpty(stream))
return nats_setError(NATS_INVALID_ARG, "%s", jsErrStreamNameRequired);

s = js_setOpts(&nc, &freePfx, js, opts, &o);
if (s == NATS_OK)
{
if (nats_asprintf(&subj, jsApiMsgGetT, js_lenWithoutTrailingDot(o.Prefix), o.Prefix, stream) < 0)
s = nats_setDefaultError(NATS_NO_MEMORY);

if (freePfx)
NATS_FREE((char*) o.Prefix);
}
IFOK(s, natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer)));
IFOK(s, natsBuf_AppendByte(&buf, '{'));
if ((s == NATS_OK) && (seq > 0))
{
nats_marshalULong(&buf, false, "seq", seq);
}
else
{
IFOK(s, natsBuf_Append(&buf, "\"last_by_subj\":\"", -1));
IFOK(s, natsBuf_Append(&buf, subject, -1));
IFOK(s, natsBuf_AppendByte(&buf, '"'));
}
IFOK(s, natsBuf_AppendByte(&buf, '}'));

// Send the request
IFOK_JSR(s, natsConnection_Request(&resp, js->nc, subj, natsBuf_Data(&buf), natsBuf_Len(&buf), o.Wait));
// Unmarshal response
IFOK(s, _unmarshalGetMsgResp(msg, resp, errCode));

natsBuf_Cleanup(&buf);
natsMsg_Destroy(resp);
NATS_FREE(subj);

return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_GetMsg(natsMsg **msg, jsCtx *js, const char *stream, uint64_t seq, jsOptions *opts, jsErrCode *errCode)
{
natsStatus s;

if (errCode != NULL)
*errCode = 0;

if (seq < 1)
return nats_setDefaultError(NATS_INVALID_ARG);

s = _getMsg(msg, js, stream, seq, NULL, opts, errCode);
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_GetLastMsg(natsMsg **msg, jsCtx *js, const char *stream, const char *subject, jsOptions *opts, jsErrCode *errCode)
{
natsStatus s;

if (errCode != NULL)
*errCode = 0;

if (nats_IsStringEmpty(subject))
return nats_setDefaultError(NATS_INVALID_ARG);

s = _getMsg(msg, js, stream, 0, subject, opts, errCode);
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_deleteMsg(jsCtx *js, bool noErase, const char *stream, uint64_t seq, jsOptions *opts, jsErrCode *errCode)
{
Expand Down
30 changes: 27 additions & 3 deletions src/msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,24 @@ natsMsg_GetDataLength(const natsMsg *msg)
return msg->dataLen;
}

uint64_t
natsMsg_GetSequence(natsMsg *msg)
{
if (msg == NULL)
return 0;

return msg->seq;
}

int64_t
natsMsg_GetTime(natsMsg *msg)
{
if (msg == NULL)
return 0;

return msg->time;
}

natsStatus
natsMsg_create(natsMsg **newMsg,
const char *subject, int subjLen,
Expand Down Expand Up @@ -765,6 +783,8 @@ natsMsg_create(natsMsg **newMsg,
msg->headers = NULL;
msg->sub = NULL;
msg->next = NULL;
msg->seq = 0;
msg->time = 0;

ptr = (char*) (((char*) &(msg->next)) + sizeof(msg->next));

Expand All @@ -788,18 +808,22 @@ natsMsg_create(natsMsg **newMsg,
if (hasHdrs)
{
msg->hdr = ptr;
memcpy(ptr, buf, hdrLen);
if (buf != NULL)
{
memcpy(ptr, buf, hdrLen);
buf += hdrLen;
}
ptr += hdrLen;
*(ptr++) = '\0';

msg->hdrLen = hdrLen;
natsMsg_setNeedsLift(msg);
dataLen -= hdrLen;
buf += hdrLen;
}
msg->data = (const char*) ptr;
msg->dataLen = dataLen;
memcpy(ptr, buf, dataLen);
if (buf != NULL)
memcpy(ptr, buf, dataLen);
ptr += dataLen;
*(ptr) = '\0';

Expand Down
2 changes: 2 additions & 0 deletions src/msg.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ struct __natsMsg
int dataLen;
int hdrLen;
int flags;
uint64_t seq;
int64_t time;

// subscription (needed when delivery done by connection,
// or for JetStream).
Expand Down
64 changes: 64 additions & 0 deletions src/nats.h
Original file line number Diff line number Diff line change
Expand Up @@ -5035,6 +5035,46 @@ js_PurgeStream(jsCtx *js, const char *stream, jsOptions *opts, jsErrCode *errCod
NATS_EXTERN natsStatus
js_DeleteStream(jsCtx *js, const char *stream, jsOptions *opts, jsErrCode *errCode);

/** \brief Retrieves a JetStream message from the stream by sequence.
*
* Retrieves a raw stream message stored in JetStream by sequence number.
*
* \note The message needs to be destroyed by calling #natsMsg_Destroy.
*
* @see js_GetLastMsg
* @see natsMsg_Destroy
*
* @param msg the memory location where the library will store the pointer to the #natsMsg.
* @param js the pointer to the #jsCtx context.
* @param stream the name of the stream.
* @param seq the sequence in the stream of the message being retrieved.
* @param opts the pointer to the #jsOptions object, possibly `NULL`.
* @param errCode the location where to store the JetStream specific error code, or `NULL`
* if not needed.
*/
NATS_EXTERN natsStatus
js_GetMsg(natsMsg **msg, jsCtx *js, const char *stream, uint64_t seq, jsOptions *opts, jsErrCode *errCode);

/** \brief Retrieves the last JetStream message from the stream for a given subject.
*
* Retrieves the last JetStream message from the stream for a given subject.
*
* \note The message needs to be destroyed by calling #natsMsg_Destroy.
*
* @see js_GetMsg
* @see natsMsg_Destroy
*
* @param msg the memory location where the library will store the pointer to the #natsMsg.
* @param js the pointer to the #jsCtx context.
* @param stream the name of the stream.
* @param subject the subject for which the last message is being retrieved.
* @param opts the pointer to the #jsOptions object, possibly `NULL`.
* @param errCode the location where to store the JetStream specific error code, or `NULL`
* if not needed.
*/
NATS_EXTERN natsStatus
js_GetLastMsg(natsMsg **msg, jsCtx *js, const char *stream, const char *subject, jsOptions *opts, jsErrCode *errCode);

/** \brief Deletes a message from the stream.
*
* Deletes the message at sequence <c>seq</c> in the stream named <c>stream</c>.
Expand Down Expand Up @@ -5608,6 +5648,30 @@ natsMsg_InProgress(natsMsg *msg, jsOptions *opts);
NATS_EXTERN natsStatus
natsMsg_Term(natsMsg *msg, jsOptions *opts);

/** \brief Returns the sequence number of this JetStream message.
*
* Returns the sequence number of this JetStream message, or `0` if `msg` is `NULL`
* or not a JetStream message.
*
* \note This applies to JetStream messages retrieved with #js_GetMsg or #js_GetLastMsg.
*
* @param msg the pointer to the #natsMsg object.
*/
NATS_EXTERN uint64_t
natsMsg_GetSequence(natsMsg *msg);

/** \brief Returns the timestamp (in UTC) of this JetStream message.
*
* Returns the timestamp (in UTC) of this JetStream message, or `0` if `msg` is `NULL`
* or not a JetStream message.
*
* \note This applies to JetStream messages retrieved with #js_GetMsg or #js_GetLastMsg.
*
* @param msg the pointer to the #natsMsg object.
*/
NATS_EXTERN int64_t
natsMsg_GetTime(natsMsg *msg);

/** @} */ // end of jsMsg

/** @} */ // end of jsGroup
Expand Down
3 changes: 1 addition & 2 deletions src/stan/msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ stanMsg_create(stanMsg **newMsg, stanSubscription *sub, Pb__MsgProto *pb)
msg->timestamp = pb->timestamp;
msg->redelivered = pb->redelivered;
msg->sub = sub;
msg->next = NULL;

ptr = (char*) (((char*) &(msg->next)) + sizeof(msg->next));
ptr = (char*) (((char*) &(msg->sub)) + sizeof(msg->sub));
msg->data = (const char*) ptr;
msg->dataLen = payloadSize;
memcpy(ptr, (char*) pb->data.data, payloadSize);
Expand Down
4 changes: 1 addition & 3 deletions src/stan/stanp.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,9 @@ struct __stanMsg
int64_t timestamp;
const char *data;
int dataLen;
stanSubscription *sub;
bool redelivered;

// Must be last field!
struct __natsMsg *next;
stanSubscription *sub;

// Nothing after this: the message payload goes there.
};
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ JetStreamOrderedCons
JetStreamOrderedConsWithErrors
JetStreamOrderedConsAutoUnsub
JetStreamStreamsSealAndRollup
JetStreamGetMsgAndLastMsg
StanPBufAllocator
StanConnOptions
StanSubOptions
Expand Down
Loading