Skip to content

Commit

Permalink
Merge 916fb8b into fb19ac1
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Jan 29, 2022
2 parents fb19ac1 + 916fb8b commit b681766
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 9 deletions.
28 changes: 19 additions & 9 deletions src/kv.c
Expand Up @@ -22,18 +22,25 @@
static const char *kvBucketNameTmpl = "KV_%s";
static const char *kvSubjectsTmpl = "$KV.%s.>";
static const char *kvSubjectsPreTmpl = "$KV.%s.";
// static const char *kvNoPending = "0";

#define KV_WATCH_FOR_EVER (int64_t)(0x7FFFFFFFFFFFFFFF)

#define DEFINE_BUF_FOR_SUBJECT \
char buffer[128]; \
natsBuffer buf;

#define BUILD_SUBJECT \
natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer)); \
s = natsBuf_Append(&buf, kv->pre, -1); \
IFOK(s, natsBuf_Append(&buf, key, -1)); \
#define USE_JS_PREFIX true
#define KEY_NAME_ONLY false

#define BUILD_SUBJECT(p) \
s = natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer)); \
if ((p) && kv->useJSPrefix) \
{ \
IFOK(s, natsBuf_Append(&buf, kv->js->opts.Prefix, -1)); \
IFOK(s, natsBuf_AppendByte(&buf, '.')); \
} \
IFOK(s, natsBuf_Append(&buf, kv->pre, -1)); \
IFOK(s, natsBuf_Append(&buf, key, -1)); \
IFOK(s, natsBuf_AppendByte(&buf, 0));

#define KV_DEFINE_LIST \
Expand Down Expand Up @@ -168,6 +175,7 @@ _createKV(kvStore **new_kv, jsCtx *js, const char *bucket)

if (s == NATS_OK)
{
kv->useJSPrefix = (strcmp(js->opts.Prefix, jsDefaultAPIPrefix) != 0 ? true : false);
kv->js = js;
js_retain(js);
*new_kv = kv;
Expand Down Expand Up @@ -372,7 +380,7 @@ _getEntry(kvEntry **new_entry, bool *deleted, kvStore *kv, const char *key)
if (!validKey(key))
return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey);

BUILD_SUBJECT;
BUILD_SUBJECT(KEY_NAME_ONLY);
IFOK(s, js_GetLastMsg(&msg, kv->js, kv->stream, natsBuf_Data(&buf), NULL, NULL));
IFOK(s, _createEntry(&e, kv, &msg));

Expand Down Expand Up @@ -443,7 +451,7 @@ _putEntry(uint64_t *rev, kvStore *kv, jsPubOptions *po, const char *key, const v
if (!validKey(key))
return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey);

BUILD_SUBJECT;
BUILD_SUBJECT(USE_JS_PREFIX);
IFOK(s, js_Publish(ppa, kv->js, natsBuf_Data(&buf), data, len, po, NULL));

if ((s == NATS_OK) && (rev != NULL))
Expand Down Expand Up @@ -544,7 +552,7 @@ _delete(kvStore *kv, const char *key, bool purge)
if (!validKey(key))
return nats_setError(NATS_INVALID_ARG, "%s", kvErrInvalidKey);

BUILD_SUBJECT;
BUILD_SUBJECT(USE_JS_PREFIX);
IFOK(s, natsMsg_Create(&msg, natsBuf_Data(&buf), NULL, NULL, 0));
if (s == NATS_OK)
{
Expand Down Expand Up @@ -619,6 +627,8 @@ kvStore_PurgeDeletes(kvStore *kv, kvWatchOptions *opts)

natsBuf_InitWithBackend(&buf, buffer, 0, sizeof(buffer));

// Go over the list, even when s != NATS_OK so we destroy
// each entry and don't have a memory leak.
for (; h != NULL; )
{
natsBuf_Reset(&buf);
Expand Down Expand Up @@ -792,7 +802,7 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti
w->kv = kv;
w->refs = 1;

BUILD_SUBJECT;
BUILD_SUBJECT(KEY_NAME_ONLY);
IFOK(s, natsMutex_Create(&(w->mu)));
if (s == NATS_OK)
{
Expand Down
1 change: 1 addition & 0 deletions src/natsp.h
Expand Up @@ -401,6 +401,7 @@ struct __kvStore
char *bucket;
char *stream;
char *pre;
bool useJSPrefix;

};

Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Expand Up @@ -237,6 +237,7 @@ KeyValueHistory
KeyValueKeys
KeyValueDeleteVsPurge
KeyValueDeleteTombstones
KeyValueCrossAccount
StanPBufAllocator
StanConnOptions
StanSubOptions
Expand Down
207 changes: 207 additions & 0 deletions test/test.c
Expand Up @@ -27762,6 +27762,212 @@ test_KeyValueDeleteTombstones(void)
JS_TEARDOWN;
}

static void
test_KeyValueCrossAccount(void)
{
natsStatus s;
natsConnection *nc1 = NULL;
natsConnection *nc2 = NULL;
jsCtx *js1 = NULL;
jsCtx *js2 = NULL;
natsPid pid = NATS_INVALID_PID;
kvStore *kv1 = NULL;
kvWatcher *w1 = NULL;
kvStore *kv2 = NULL;
kvWatcher *w2 = NULL;
kvEntry *e = NULL;
jsStreamInfo *si = NULL;
kvConfig kvc;
jsOptions o;
char datastore[256] = {'\0'};
char cmdLine[1024] = {'\0'};
char confFile[256] = {'\0'};

ENSURE_JS_VERSION(2, 6, 2);

_makeUniqueDir(datastore, sizeof(datastore), "datastore_");
_createConfFile(confFile, sizeof(confFile),
"jetstream: enabled\n"\
"accounts: {\n"\
" A: {\n"\
" users: [ {user: a, password: a} ]\n"\
" jetstream: enabled\n"\
" exports: [\n"\
" {service: '$JS.API.>' }\n"\
" {service: '$KV.>'}\n"\
" {stream: '_INBOX.>'}\n"\
" ]\n"\
" },\n"\
" I: {\n"\
" users: [ {user: i, password: i} ]\n"\
" imports: [\n"\
" {service: {account: A, subject: '$JS.API.>'}, to: 'fromA.>' }\n"\
" {service: {account: A, subject: '$KV.>'}, to: 'fromA.$KV.>' }\n"\
" {stream: {subject: '_INBOX.>', account: A}}\n"\
" ]\n"\
" }\n"\
"}");

test("Start JS server: ");
snprintf(cmdLine, sizeof(cmdLine), "-js -sd %s -c %s", datastore, confFile);
pid = _startServer("nats://127.0.0.1:4222", cmdLine, true);
CHECK_SERVER_STARTED(pid);
testCond(true);

test("Create conn1: ");
s = natsConnection_ConnectTo(&nc1, "nats://a:a@127.0.0.1:4222");
testCond(s == NATS_OK);

test("Get context1: ");
s = natsConnection_JetStream(&js1, nc1, NULL);
testCond(s == NATS_OK);

test("Create KV1: ");
kvConfig_Init(&kvc);
kvc.Bucket = "Map";
s = js_CreateKeyValue(&kv1, js1, &kvc);
testCond(s == NATS_OK);

test("Create Watcher1: ");
s = kvStore_Watch(&w1, kv1, "map", NULL);
IFOK(s, kvWatcher_Next(&e, w1, 1000));
testCond((s == NATS_OK) && (e == NULL));

test("Create conn2: ");
s = natsConnection_ConnectTo(&nc2, "nats://i:i@127.0.0.1:4222");
testCond(s == NATS_OK);

test("Get context2: ");
jsOptions_Init(&o);
o.Prefix = "fromA";
s = natsConnection_JetStream(&js2, nc2, &o);
testCond(s == NATS_OK);

test("Create KV2: ");
kvConfig_Init(&kvc);
kvc.Bucket = "Map";
s = js_CreateKeyValue(&kv2, js2, &kvc);
testCond(s == NATS_OK);

test("Create Watcher2: ");
s = kvStore_Watch(&w2, kv2, "map", NULL);
IFOK(s, kvWatcher_Next(&e, w2, 1000));
testCond((s == NATS_OK) && (e == NULL));

test("Put: ");
s = kvStore_PutString(NULL, kv2, "map", "value");
testCond(s == NATS_OK);

test("Get from kv1: ");
s = kvStore_Get(&e, kv1, "map");
testCond((s == NATS_OK) && (e != NULL)
&& (strcmp(kvEntry_Key(e), "map") == 0)
&& (strcmp(kvEntry_ValueString(e), "value") == 0));
kvEntry_Destroy(e);
e = NULL;

test("Get from kv2: ");
s = kvStore_Get(&e, kv2, "map");
testCond((s == NATS_OK) && (e != NULL)
&& (strcmp(kvEntry_Key(e), "map") == 0)
&& (strcmp(kvEntry_ValueString(e), "value") == 0));
kvEntry_Destroy(e);
e = NULL;

test("Watcher1 Next: ");
s = kvWatcher_Next(&e, w1, 1000);
testCond((s == NATS_OK) && (e != NULL)
&& (strcmp(kvEntry_Key(e), "map") == 0)
&& (strcmp(kvEntry_ValueString(e), "value") == 0));
kvEntry_Destroy(e);
e = NULL;

test("Watcher2 Next: ");
s = kvWatcher_Next(&e, w2, 1000);
testCond((s == NATS_OK) && (e != NULL)
&& (strcmp(kvEntry_Key(e), "map") == 0)
&& (strcmp(kvEntry_ValueString(e), "value") == 0));
kvEntry_Destroy(e);
e = NULL;

test("Purge key from kv2: ");
s = kvStore_Purge(kv2, "map");
testCond(s == NATS_OK);

test("Check purge ok from w1: ");
s = kvWatcher_Next(&e, w1, 1000);
testCond((s == NATS_OK) && (e != NULL) && (kvEntry_Operation(e) == kvOp_Purge));
kvEntry_Destroy(e);
e = NULL;

test("Check purge ok from w2: ");
s = kvWatcher_Next(&e, w2, 1000);
testCond((s == NATS_OK) && (e != NULL) && (kvEntry_Operation(e) == kvOp_Purge));
kvEntry_Destroy(e);
e = NULL;

test("Delete purge records: ");
s = kvStore_PurgeDeletes(kv2, NULL);
testCond(s == NATS_OK);

test("All gone: ");
s = js_GetStreamInfo(&si, js1, "KV_Map", NULL, NULL);
testCond((s == NATS_OK) && (si != NULL) && (si->State.Msgs == 0));
jsStreamInfo_Destroy(si);
si = NULL;

test("Delete key from kv2: ");
s = kvStore_Delete(kv2, "map");
testCond(s == NATS_OK);

test("Check key gone: ");
s = kvStore_Get(&e, kv1, "map");
testCond((s == NATS_NOT_FOUND) && (e == NULL));

kvWatcher_Destroy(w2);
w2 = NULL;
kvStore_Destroy(kv2);
kv2 = NULL;
jsCtx_Destroy(js2);
js2 = NULL;

test("Get context2 (with trailing dot for prefix): ");
jsOptions_Init(&o);
o.Prefix = "fromA";
s = natsConnection_JetStream(&js2, nc2, &o);
testCond(s == NATS_OK);

test("Create KV2: ");
kvConfig_Init(&kvc);
kvc.Bucket = "Map";
s = js_CreateKeyValue(&kv2, js2, &kvc);
testCond(s == NATS_OK);

test("Put: ");
s = kvStore_PutString(NULL, kv2, "map", "value2");
testCond(s == NATS_OK);

test("Get from kv1: ");
s = kvStore_Get(&e, kv1, "map");
testCond((s == NATS_OK) && (e != NULL)
&& (strcmp(kvEntry_Key(e), "map") == 0)
&& (strcmp(kvEntry_ValueString(e), "value2") == 0));
kvEntry_Destroy(e);
e = NULL;

kvWatcher_Destroy(w1);
kvStore_Destroy(kv1);
jsCtx_Destroy(js1);
kvWatcher_Destroy(w2);
kvStore_Destroy(kv2);
jsCtx_Destroy(js2);
natsConnection_Destroy(nc1);
natsConnection_Destroy(nc2);
_stopServer(pid);
rmtree(datastore);
remove(confFile);
}

#if defined(NATS_HAS_STREAMING)

static int
Expand Down Expand Up @@ -30211,6 +30417,7 @@ static testInfo allTests[] =
{"KeyValueKeys", test_KeyValueKeys},
{"KeyValueDeleteVsPurge", test_KeyValueDeleteVsPurge},
{"KeyValueDeleteTombstones", test_KeyValueDeleteTombstones},
{"KeyValueCrossAccount", test_KeyValueCrossAccount},

#if defined(NATS_HAS_STREAMING)
{"StanPBufAllocator", test_StanPBufAllocator},
Expand Down

0 comments on commit b681766

Please sign in to comment.