Skip to content

Commit

Permalink
Merge pull request #562 from nats-io/js_direct_get
Browse files Browse the repository at this point in the history
[ADDED] JetStream: Direct Get Message
  • Loading branch information
kozlovic committed Jul 20, 2022
2 parents 2e73b73 + 482bee4 commit 418dd00
Show file tree
Hide file tree
Showing 9 changed files with 797 additions and 268 deletions.
6 changes: 6 additions & 0 deletions src/js.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ extern const int64_t jsDefaultRequestWait;
// jsApiMsgGetT is the endpoint to get a message, either by sequence or last per subject.
#define jsApiMsgGetT "%.*s.STREAM.MSG.GET.%s"

// jsApiMsgGetT is the endpoint to get a message, either by sequence or last per subject.
#define jsApiDirectMsgGetT "%.*s.DIRECT.GET.%s"

// Creates a subject based on the option's prefix, the subject format and its values.
#define js_apiSubj(s, o, f, ...) (nats_asprintf((s), (f), (o)->Prefix, __VA_ARGS__) < 0 ? NATS_NO_MEMORY : NATS_OK)

Expand Down Expand Up @@ -231,3 +234,6 @@ js_retain(jsCtx *js);

void
js_release(jsCtx *js);

natsStatus
js_directGetMsgToJSMsg(const char *stream, natsMsg *msg);
174 changes: 173 additions & 1 deletion src/jsm.c
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,8 @@ js_unmarshalStreamConfig(nats_JSON *json, const char *fieldName, jsStreamConfig
IFOK(s, nats_JSONGetBool(jcfg, "deny_purge", &(cfg->DenyPurge)));
IFOK(s, nats_JSONGetBool(jcfg, "allow_rollup_hdrs", &(cfg->AllowRollup)));
IFOK(s, _unmarshalRePublish(jcfg, "republish", &(cfg->RePublish)));
IFOK(s, nats_JSONGetBool(jcfg, "allow_direct", &(cfg->AllowDirect)));
IFOK(s, nats_JSONGetBool(jcfg, "mirror_direct", &(cfg->MirrorDirect)));

if (s == NATS_OK)
*new_cfg = cfg;
Expand Down Expand Up @@ -715,6 +717,10 @@ js_marshalStreamConfig(natsBuffer **new_buf, jsStreamConfig *cfg)
}
IFOK(s, natsBuf_Append(buf, "\"}", -1));
}
if ((s == NATS_OK) && cfg->AllowDirect)
IFOK(s, natsBuf_Append(buf, ",\"allow_direct\":true", -1));
if ((s == NATS_OK) && cfg->MirrorDirect)
IFOK(s, natsBuf_Append(buf, ",\"mirror_direct\":true", -1));

IFOK(s, natsBuf_AppendByte(buf, '}'));

Expand Down Expand Up @@ -1438,7 +1444,7 @@ _getMsg(natsMsg **msg, jsCtx *js, const char *stream, uint64_t seq, const char *
IFOK(s, natsBuf_AppendByte(&buf, '{'));
if ((s == NATS_OK) && (seq > 0))
{
nats_marshalULong(&buf, false, "seq", seq);
s = nats_marshalULong(&buf, false, "seq", seq);
}
else
{
Expand Down Expand Up @@ -1490,6 +1496,172 @@ js_GetLastMsg(natsMsg **msg, jsCtx *js, const char *stream, const char *subject,
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
jsDirectGetMsgOptions_Init(jsDirectGetMsgOptions *opts)
{
if (opts == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

memset(opts, 0, sizeof(jsDirectGetMsgOptions));
return NATS_OK;
}

natsStatus
js_directGetMsgToJSMsg(const char *stream, natsMsg *msg)
{
natsStatus s;
const char *val = NULL;
int64_t seq = 0;
int64_t tm = 0;

if ((msg->hdrLen == 0) && (msg->headers == NULL))
return nats_setError(NATS_ERR, "%s", "direct get message response should have headers");

// If the server returns an error (not found/bad request), we would receive
// an empty body message with the Status header. Check for that.
if ((natsMsg_GetDataLength(msg) == 0)
&& (natsMsgHeader_Get(msg, STATUS_HDR, &val) == NATS_OK))
{
if (strcmp(val, NOT_FOUND_STATUS) == 0)
return nats_setDefaultError(NATS_NOT_FOUND);
else
{
natsMsgHeader_Get(msg, DESCRIPTION_HDR, &val);
return nats_setError(NATS_ERR, "error getting message: %s", val);
}
}

s = natsMsgHeader_Get(msg, JSStream, &val);
if ((s != NATS_OK) || (strcmp(val, stream) != 0))
return nats_setError(NATS_ERR, "missing or invalid stream name '%s'", val);

val = NULL;
s = natsMsgHeader_Get(msg, JSSequence, &val);
if ((s != NATS_OK) || ((seq = nats_ParseInt64(val, (int) strlen(val))) == -1))
return nats_setError(NATS_ERR, "missing or invalid sequence '%s'", val);

val = NULL;
s = natsMsgHeader_Get(msg, JSTimeStamp, &val);
if ((s == NATS_OK) && !nats_IsStringEmpty(val))
{
// The server sends the time in this format (always UTC):
// 2006-01-02 15:04:05.999999999 +0000 UTC
// But for our parsing to work (from JSON) we will convert this
// to something like that:
// 2006-01-02T15:04:05.999999999Z
char tmpTime[40] = {'\0'};
char *ptr = NULL;

if (snprintf(tmpTime, sizeof(tmpTime), "%s", val) >= (int) sizeof(tmpTime)) {
tmpTime[39] = '\0';
}

ptr = strchr(tmpTime, ' ');
if (ptr != NULL)
{
*ptr = 'T';
ptr++;

ptr = strchr(ptr, ' ');
if (ptr != NULL)
{
*ptr = 'Z';
ptr++;
*ptr = '\0';

s = nats_parseTime((char*) tmpTime, &tm);
}
}
}
if ((s != NATS_OK) || (tm == 0))
return nats_setError(NATS_ERR, "missing or invalid timestamp '%s'", val);

val = NULL;
s = natsMsgHeader_Get(msg, JSSubject, &val);
if ((s != NATS_OK) || nats_IsStringEmpty(val))
return nats_setError(NATS_ERR, "missing or invalid subject '%s'", val);

// Will point the message subject to the JSSubject header value.
// This will remain in the message memory allocated block, even
// if later the user changes the JSSubject header.
msg->subject = val;
msg->seq = seq;
msg->time = tm;
return NATS_OK;
}

natsStatus
js_DirectGetMsg(natsMsg **msg, jsCtx *js, const char *stream, jsOptions *opts, jsDirectGetMsgOptions *dgOpts)
{
natsStatus s = NATS_OK;
char *subj = NULL;
natsMsg *resp = NULL;
natsConnection *nc = NULL;
bool freePfx = false;
bool comma = false;
jsOptions o;
char buffer[64];
natsBuffer buf;

if ((msg == NULL) || (js == NULL) || (dgOpts == NULL))
return nats_setDefaultError(NATS_INVALID_ARG);

if (nats_IsStringEmpty(stream))
return nats_setError(NATS_INVALID_ARG, "%s", jsErrStreamNameRequired);

s = js_setOpts(&nc, &freePfx, js, opts, &o);
if (s == NATS_OK)
{
if (nats_asprintf(&subj, jsApiDirectMsgGetT, js_lenWithoutTrailingDot(o.Prefix), o.Prefix, stream) < 0)
s = nats_setDefaultError(NATS_NO_MEMORY);

if (freePfx)
NATS_FREE((char*) o.Prefix);
}
IFOK(s, natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer)));
IFOK(s, natsBuf_AppendByte(&buf, '{'));
if ((s == NATS_OK) && (dgOpts->Sequence > 0))
{
comma = true;
s = nats_marshalULong(&buf, false, "seq", dgOpts->Sequence);
}
if ((s == NATS_OK) && !nats_IsStringEmpty(dgOpts->NextBySubject))
{
if (comma)
s = natsBuf_AppendByte(&buf, ',');

comma = true;
IFOK(s, natsBuf_Append(&buf, "\"next_by_subj\":\"", -1));
IFOK(s, natsBuf_Append(&buf, dgOpts->NextBySubject, -1));
IFOK(s, natsBuf_AppendByte(&buf, '"'));
}
if ((s == NATS_OK) && !nats_IsStringEmpty(dgOpts->LastBySubject))
{
if (comma)
s = natsBuf_AppendByte(&buf, ',');

IFOK(s, natsBuf_Append(&buf, "\"last_by_subj\":\"", -1));
IFOK(s, natsBuf_Append(&buf, dgOpts->LastBySubject, -1));
IFOK(s, natsBuf_AppendByte(&buf, '"'));
}
IFOK(s, natsBuf_AppendByte(&buf, '}'));

// Send the request
IFOK(s, natsConnection_Request(&resp, js->nc, subj, natsBuf_Data(&buf), natsBuf_Len(&buf), o.Wait));
// Convert the response to a JS message returned to the user.
IFOK(s, js_directGetMsgToJSMsg(stream, resp));

natsBuf_Cleanup(&buf);
NATS_FREE(subj);

if (s == NATS_OK)
*msg = resp;
else
natsMsg_Destroy(resp);

return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_deleteMsg(jsCtx *js, bool noErase, const char *stream, uint64_t seq, jsOptions *opts, jsErrCode *errCode)
{
Expand Down
42 changes: 36 additions & 6 deletions src/kv.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg)
int64_t replicas= 1;
kvStore *kv = NULL;
char *subject= NULL;
jsStreamInfo *si = NULL;
jsStreamConfig sc;

if ((new_kv == NULL) || (js == NULL) || (cfg == NULL))
Expand Down Expand Up @@ -287,29 +288,40 @@ js_CreateKeyValue(kvStore **new_kv, jsCtx *js, kvConfig *cfg)
sc.Replicas = replicas;
sc.AllowRollup = true;
sc.DenyDelete = true;
sc.AllowDirect = true;

// If connecting to a v2.7.2+, create with discard new policy
if (natsConn_srvVersionAtLeast(kv->js->nc, 2, 7, 2))
sc.Discard = js_DiscardNew;

s = js_AddStream(NULL, js, &sc, NULL, &jerr);
s = js_AddStream(&si, js, &sc, NULL, &jerr);
if ((s != NATS_OK) && (jerr == JSStreamNameExistErr))
{
jsStreamInfo *si = NULL;
jsStreamInfo_Destroy(si);
si = NULL;

nats_clearLastError();
s = js_GetStreamInfo(&si, js, sc.Name, NULL, NULL);
if (s == NATS_OK)
{
si->Config->Discard = sc.Discard;
if (_sameStreamCfg(si->Config, &sc))
s = js_UpdateStream(NULL, js, &sc, NULL, NULL);
{
jsStreamInfo_Destroy(si);
si = NULL;
s = js_UpdateStream(&si, js, &sc, NULL, NULL);
}
else
s = nats_setError(NATS_ERR, "%s",
"Existing configuration is different");
}
jsStreamInfo_Destroy(si);
}
if (s == NATS_OK)
{
// If the stream allow direct get message calls, then we will do so.
kv->useDirect = si->Config->AllowDirect;
}
jsStreamInfo_Destroy(si);
}
if (s == NATS_OK)
*new_kv = kv;
Expand Down Expand Up @@ -338,6 +350,9 @@ js_KeyValue(kvStore **new_kv, jsCtx *js, const char *bucket)
s = js_GetStreamInfo(&si, js, kv->stream, NULL, NULL);
if (s == NATS_OK)
{
// If the stream allow direct get message calls, then we will do so.
kv->useDirect = si->Config->AllowDirect;

// Do some quick sanity checks that this is a correctly formed stream for KV.
// Max msgs per subject should be > 0.
if (si->Config->MaxMsgsPerSubject < 1)
Expand Down Expand Up @@ -456,6 +471,7 @@ _getEntry(kvEntry **new_entry, bool *deleted, kvStore *kv, const char *key, uint
natsMsg *msg = NULL;
kvEntry *e = NULL;
DEFINE_BUF_FOR_SUBJECT;
jsDirectGetMsgOptions dgo;

*new_entry = NULL;
*deleted = false;
Expand All @@ -464,15 +480,29 @@ _getEntry(kvEntry **new_entry, bool *deleted, kvStore *kv, const char *key, uint
return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey);

BUILD_SUBJECT(KEY_NAME_ONLY);
if (revision == 0)

if (kv->useDirect)
{
jsDirectGetMsgOptions_Init(&dgo);
if (revision == 0)
dgo.LastBySubject = natsBuf_Data(&buf);
else
dgo.Sequence = revision;

IFOK(s, js_DirectGetMsg(&msg, kv->js, kv->stream, NULL, &dgo));
}
else if (revision == 0)
{
IFOK(s, js_GetLastMsg(&msg, kv->js, kv->stream, natsBuf_Data(&buf), NULL, NULL));
}
else
{
IFOK(s, js_GetMsg(&msg, kv->js, kv->stream, revision, NULL, NULL));
IFOK(s, (strcmp(natsMsg_GetSubject(msg), natsBuf_Data(&buf)) == 0 ? NATS_OK : NATS_NOT_FOUND));
}
// If a sequence was provided, just make sure that the retrieved
// message subject matches the request.
if (revision != 0)
IFOK(s, (strcmp(natsMsg_GetSubject(msg), natsBuf_Data(&buf)) == 0 ? NATS_OK : NATS_NOT_FOUND));
IFOK(s, _createEntry(&e, kv, &msg));
if (s == NATS_OK)
e->op = _getKVOp(e->msg);
Expand Down
Loading

0 comments on commit 418dd00

Please sign in to comment.