Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] JetStream: Direct Get Message #562

Merged
merged 1 commit into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ extern const int64_t jsDefaultRequestWait;
// jsApiMsgGetT is the endpoint to get a message, either by sequence or last per subject.
#define jsApiMsgGetT "%.*s.STREAM.MSG.GET.%s"

// jsApiMsgGetT is the endpoint to get a message, either by sequence or last per subject.
#define jsApiDirectMsgGetT "%.*s.DIRECT.GET.%s"

// Creates a subject based on the option's prefix, the subject format and its values.
#define js_apiSubj(s, o, f, ...) (nats_asprintf((s), (f), (o)->Prefix, __VA_ARGS__) < 0 ? NATS_NO_MEMORY : NATS_OK)

Expand Down Expand Up @@ -231,3 +234,6 @@ js_retain(jsCtx *js);

void
js_release(jsCtx *js);

natsStatus
js_directGetMsgToJSMsg(const char *stream, natsMsg *msg);
174 changes: 173 additions & 1 deletion src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,8 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig
IFOK(s, nats_JSONGetBool(jcfg, "deny_purge", &(cfg->DenyPurge)));
IFOK(s, nats_JSONGetBool(jcfg, "allow_rollup_hdrs", &(cfg->AllowRollup)));
IFOK(s, _unmarshalRePublish(jcfg, "republish", &(cfg->RePublish)));
IFOK(s, nats_JSONGetBool(jcfg, "allow_direct", &(cfg->AllowDirect)));
IFOK(s, nats_JSONGetBool(jcfg, "mirror_direct", &(cfg->MirrorDirect)));

if (s == NATS_OK)
*new_cfg = cfg;
Expand Down Expand Up @@ -715,6 +717,10 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg)
}
IFOK(s, natsBuf_Append(buf, "\"}", -1));
}
if ((s == NATS_OK) && cfg->AllowDirect)
IFOK(s, natsBuf_Append(buf, ",\"allow_direct\":true", -1));
if ((s == NATS_OK) && cfg->MirrorDirect)
IFOK(s, natsBuf_Append(buf, ",\"mirror_direct\":true", -1));

IFOK(s, natsBuf_AppendByte(buf, '}'));

Expand Down Expand Up @@ -1438,7 +1444,7 @@ _getMsg(natsMsg **msg, jsCtx *js, const char *stream, uint64_t seq, const char *
IFOK(s, natsBuf_AppendByte(&buf, '{'));
if ((s == NATS_OK) && (seq > 0))
{
nats_marshalULong(&buf, false, "seq", seq);
s = nats_marshalULong(&buf, false, "seq", seq);
}
else
{
Expand Down Expand Up @@ -1490,6 +1496,172 @@ js_GetLastMsg(natsMsg **msg, jsCtx *js, const char *stream, const char *subject,
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
jsDirectGetMsgOptions_Init(jsDirectGetMsgOptions *opts)
{
if (opts == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

memset(opts, 0, sizeof(jsDirectGetMsgOptions));
return NATS_OK;
}

natsStatus
js_directGetMsgToJSMsg(const char *stream, natsMsg *msg)
{
natsStatus s;
const char *val = NULL;
int64_t seq = 0;
int64_t tm = 0;

if ((msg->hdrLen == 0) && (msg->headers == NULL))
return nats_setError(NATS_ERR, "%s", "direct get message response should have headers");

// If the server returns an error (not found/bad request), we would receive
// an empty body message with the Status header. Check for that.
if ((natsMsg_GetDataLength(msg) == 0)
&& (natsMsgHeader_Get(msg, STATUS_HDR, &val) == NATS_OK))
{
if (strcmp(val, NOT_FOUND_STATUS) == 0)
return nats_setDefaultError(NATS_NOT_FOUND);
else
{
natsMsgHeader_Get(msg, DESCRIPTION_HDR, &val);
return nats_setError(NATS_ERR, "error getting message: %s", val);
}
}

s = natsMsgHeader_Get(msg, JSStream, &val);
if ((s != NATS_OK) || (strcmp(val, stream) != 0))
return nats_setError(NATS_ERR, "missing or invalid stream name '%s'", val);

val = NULL;
s = natsMsgHeader_Get(msg, JSSequence, &val);
if ((s != NATS_OK) || ((seq = nats_ParseInt64(val, (int) strlen(val))) == -1))
return nats_setError(NATS_ERR, "missing or invalid sequence '%s'", val);

val = NULL;
s = natsMsgHeader_Get(msg, JSTimeStamp, &val);
if ((s == NATS_OK) && !nats_IsStringEmpty(val))
{
// The server sends the time in this format (always UTC):
// 2006-01-02 15:04:05.999999999 +0000 UTC
// But for our parsing to work (from JSON) we will convert this
// to something like that:
// 2006-01-02T15:04:05.999999999Z
char tmpTime[40] = {'\0'};
char *ptr = NULL;

if (snprintf(tmpTime, sizeof(tmpTime), "%s", val) >= (int) sizeof(tmpTime)) {
tmpTime[39] = '\0';
}

ptr = strchr(tmpTime, ' ');
if (ptr != NULL)
{
*ptr = 'T';
ptr++;

ptr = strchr(ptr, ' ');
if (ptr != NULL)
{
*ptr = 'Z';
ptr++;
*ptr = '\0';

s = nats_parseTime((char*) tmpTime, &tm);
}
}
}
if ((s != NATS_OK) || (tm == 0))
return nats_setError(NATS_ERR, "missing or invalid timestamp '%s'", val);

val = NULL;
s = natsMsgHeader_Get(msg, JSSubject, &val);
if ((s != NATS_OK) || nats_IsStringEmpty(val))
return nats_setError(NATS_ERR, "missing or invalid subject '%s'", val);

// Will point the message subject to the JSSubject header value.
// This will remain in the message memory allocated block, even
// if later the user changes the JSSubject header.
msg->subject = val;
msg->seq = seq;
msg->time = tm;
return NATS_OK;
}

natsStatus
js_DirectGetMsg(natsMsg **msg, jsCtx *js, const char *stream, jsOptions *opts, jsDirectGetMsgOptions *dgOpts)
{
natsStatus s = NATS_OK;
char *subj = NULL;
natsMsg *resp = NULL;
natsConnection *nc = NULL;
bool freePfx = false;
bool comma = false;
jsOptions o;
char buffer[64];
natsBuffer buf;

if ((msg == NULL) || (js == NULL) || (dgOpts == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

if (nats_IsStringEmpty(stream))
return nats_setError(NATS_INVALID_ARG, "%s", jsErrStreamNameRequired);

s = js_setOpts(&nc, &freePfx, js, opts, &o);
if (s == NATS_OK)
{
if (nats_asprintf(&subj, jsApiDirectMsgGetT, js_lenWithoutTrailingDot(o.Prefix), o.Prefix, stream) < 0)
s = nats_setDefaultError(NATS_NO_MEMORY);

if (freePfx)
NATS_FREE((char*) o.Prefix);
}
IFOK(s, natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer)));
IFOK(s, natsBuf_AppendByte(&buf, '{'));
if ((s == NATS_OK) && (dgOpts->Sequence > 0))
{
comma = true;
s = nats_marshalULong(&buf, false, "seq", dgOpts->Sequence);
}
if ((s == NATS_OK) && !nats_IsStringEmpty(dgOpts->NextBySubject))
{
if (comma)
s = natsBuf_AppendByte(&buf, ',');

comma = true;
IFOK(s, natsBuf_Append(&buf, "\"next_by_subj\":\"", -1));
IFOK(s, natsBuf_Append(&buf, dgOpts->NextBySubject, -1));
IFOK(s, natsBuf_AppendByte(&buf, '"'));
}
if ((s == NATS_OK) && !nats_IsStringEmpty(dgOpts->LastBySubject))
{
if (comma)
s = natsBuf_AppendByte(&buf, ',');

IFOK(s, natsBuf_Append(&buf, "\"last_by_subj\":\"", -1));
IFOK(s, natsBuf_Append(&buf, dgOpts->LastBySubject, -1));
IFOK(s, natsBuf_AppendByte(&buf, '"'));
}
IFOK(s, natsBuf_AppendByte(&buf, '}'));

// Send the request
IFOK(s, natsConnection_Request(&resp, js->nc, subj, natsBuf_Data(&buf), natsBuf_Len(&buf), o.Wait));
// Convert the response to a JS message returned to the user.
IFOK(s, js_directGetMsgToJSMsg(stream, resp));

natsBuf_Cleanup(&buf);
NATS_FREE(subj);

if (s == NATS_OK)
*msg = resp;
else
natsMsg_Destroy(resp);

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_deleteMsg(jsCtx *js, bool noErase, const char *stream, uint64_t seq, jsOptions *opts, jsErrCode *errCode)
{
Expand Down
42 changes: 36 additions & 6 deletions src/kv.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg)
int64_t replicas= 1;
kvStore *kv = NULL;
char *subject= NULL;
jsStreamInfo *si = NULL;
jsStreamConfig sc;

if ((new_kv == NULL) || (js == NULL) || (cfg == NULL))
Expand Down Expand Up @@ -287,29 +288,40 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg)
sc.Replicas = replicas;
sc.AllowRollup = true;
sc.DenyDelete = true;
sc.AllowDirect = true;

// If connecting to a v2.7.2+, create with discard new policy
if (natsConn_srvVersionAtLeast(kv->js->nc, 2, 7, 2))
sc.Discard = js_DiscardNew;

s = js_AddStream(NULL, js, &sc, NULL, &jerr);
s = js_AddStream(&si, js, &sc, NULL, &jerr);
if ((s != NATS_OK) && (jerr == JSStreamNameExistErr))
{
jsStreamInfo *si = NULL;
jsStreamInfo_Destroy(si);
si = NULL;

nats_clearLastError();
s = js_GetStreamInfo(&si, js, sc.Name, NULL, NULL);
if (s == NATS_OK)
{
si->Config->Discard = sc.Discard;
if (_sameStreamCfg(si->Config, &sc))
s = js_UpdateStream(NULL, js, &sc, NULL, NULL);
{
jsStreamInfo_Destroy(si);
si = NULL;
s = js_UpdateStream(&si, js, &sc, NULL, NULL);
}
else
s = nats_setError(NATS_ERR, "%s",
"Existing configuration is different");
}
jsStreamInfo_Destroy(si);
}
if (s == NATS_OK)
{
// If the stream allow direct get message calls, then we will do so.
kv->useDirect = si->Config->AllowDirect;
}
jsStreamInfo_Destroy(si);
}
if (s == NATS_OK)
*new_kv = kv;
Expand Down Expand Up @@ -338,6 +350,9 @@ js_KeyValue(kvStore **new_kv, jsCtx *js, const char *bucket)
s = js_GetStreamInfo(&si, js, kv->stream, NULL, NULL);
if (s == NATS_OK)
{
// If the stream allow direct get message calls, then we will do so.
kv->useDirect = si->Config->AllowDirect;

// Do some quick sanity checks that this is a correctly formed stream for KV.
// Max msgs per subject should be > 0.
if (si->Config->MaxMsgsPerSubject < 1)
Expand Down Expand Up @@ -456,6 +471,7 @@ _getEntry(kvEntry **new_entry, bool *deleted, kvStore *kv, const char *key, uint
natsMsg *msg = NULL;
kvEntry *e = NULL;
DEFINE_BUF_FOR_SUBJECT;
jsDirectGetMsgOptions dgo;

*new_entry = NULL;
*deleted = false;
Expand All @@ -464,15 +480,29 @@ _getEntry(kvEntry **new_entry, bool *deleted, kvStore *kv, const char *key, uint
return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey);

BUILD_SUBJECT(KEY_NAME_ONLY);
if (revision == 0)

if (kv->useDirect)
{
jsDirectGetMsgOptions_Init(&dgo);
if (revision == 0)
dgo.LastBySubject = natsBuf_Data(&buf);
else
dgo.Sequence = revision;

IFOK(s, js_DirectGetMsg(&msg, kv->js, kv->stream, NULL, &dgo));
}
else if (revision == 0)
{
IFOK(s, js_GetLastMsg(&msg, kv->js, kv->stream, natsBuf_Data(&buf), NULL, NULL));
}
else
{
IFOK(s, js_GetMsg(&msg, kv->js, kv->stream, revision, NULL, NULL));
IFOK(s, (strcmp(natsMsg_GetSubject(msg), natsBuf_Data(&buf)) == 0 ? NATS_OK : NATS_NOT_FOUND));
}
// If a sequence was provided, just make sure that the retrieved
// message subject matches the request.
if (revision != 0)
IFOK(s, (strcmp(natsMsg_GetSubject(msg), natsBuf_Data(&buf)) == 0 ? NATS_OK : NATS_NOT_FOUND));
IFOK(s, _createEntry(&e, kv, &msg));
if (s == NATS_OK)
e->op = _getKVOp(e->msg);
Expand Down
Loading