Skip to content

Commit

Permalink
CDRIVER-2578 changestream getmore and agg lsid
Browse files Browse the repository at this point in the history
collection_watch runs an aggregate command with
collection_read_command_with_opts and then creates a cursor with
cursor_new_from_command_reply. To ensure they use the same implicit
session lsid, the change_stream_t must be responsible for creating
the implicit session and pass this session to both the command and
cursor as if it were an explicit session.
  • Loading branch information
kevinAlbs committed Apr 2, 2018
1 parent 3ddc4d1 commit 490306b
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 12 deletions.
3 changes: 3 additions & 0 deletions src/mongoc/mongoc-change-stream-private.h
Expand Up @@ -18,6 +18,7 @@
#define MONGOC_CHANGE_STREAM_PRIVATE_H

#include "mongoc-change-stream.h"
#include "mongoc-client-session.h"
#include "mongoc-collection.h"
#include "mongoc-cursor.h"

Expand All @@ -34,6 +35,8 @@ struct _mongoc_change_stream_t {
mongoc_collection_t *coll;
int64_t max_await_time_ms;
int32_t batch_size;

mongoc_client_session_t *implicit_session;
};

mongoc_change_stream_t *
Expand Down
62 changes: 51 additions & 11 deletions src/mongoc/mongoc-change-stream.c
Expand Up @@ -20,6 +20,7 @@
#include "mongoc-cursor-private.h"
#include "mongoc-collection-private.h"
#include "mongoc-client-session-private.h"
#include "mongoc-rpc-private.h"

#define CHANGE_STREAM_ERR(_str) \
bson_set_error (&stream->err, \
Expand Down Expand Up @@ -88,7 +89,7 @@ _mongoc_change_stream_make_cursor (mongoc_change_stream_t *stream)
size_t keyLen =
bson_uint32_to_string (key_int, &key_str, buf, sizeof (buf));
bson_append_value (
&pipeline, key_str, keyLen, bson_iter_value (&child_iter));
&pipeline, key_str, (int) keyLen, bson_iter_value (&child_iter));
++key_int;
}
}
Expand Down Expand Up @@ -120,6 +121,31 @@ _mongoc_change_stream_make_cursor (mongoc_change_stream_t *stream)
stream->coll->client, &iter, &cs, &stream->err)) {
goto cleanup;
}
} else if (stream->implicit_session) {
/* If an implicit session was created before, and this cursor is now
* being recreated after resuming, then use the same session as before. */
cs = stream->implicit_session;
if (!mongoc_client_session_append (cs, &command_opts, &stream->err)) {
goto cleanup;
}
} else {
/* Create an implicit session. This session lsid must be the same for the
* agg command and the subsequent getMores. Thus, this implicit session is
* passed as if it were an explicit session to
* collection_read_command_with_opts and cursor_new_from_reply, but it is
* still implicit and its lifetime is owned by this change_stream_t. */
mongoc_session_opt_t *session_opts;
session_opts = mongoc_session_opts_new ();
mongoc_session_opts_set_causal_consistency (session_opts, false);
/* returns NULL if sessions aren't supported. ignore errors. */
cs =
mongoc_client_start_session (stream->coll->client, session_opts, NULL);
stream->implicit_session = cs;
mongoc_session_opts_destroy (session_opts);
if (cs &&
!mongoc_client_session_append (cs, &command_opts, &stream->err)) {
goto cleanup;
}
}

server_id = mongoc_server_description_id (sd);
Expand Down Expand Up @@ -161,7 +187,8 @@ _mongoc_change_stream_make_cursor (mongoc_change_stream_t *stream)
}

if (stream->batch_size > 0) {
mongoc_cursor_set_batch_size (stream->cursor, stream->batch_size);
mongoc_cursor_set_batch_size (stream->cursor,
(uint32_t) stream->batch_size);
}

cleanup:
Expand All @@ -177,7 +204,7 @@ _mongoc_change_stream_new (const mongoc_collection_t *coll,
{
bool full_doc_set = false;
mongoc_change_stream_t *stream =
(mongoc_change_stream_t *) bson_malloc (sizeof (mongoc_change_stream_t));
(mongoc_change_stream_t *) bson_malloc0 (sizeof (mongoc_change_stream_t));

BSON_ASSERT (coll);
BSON_ASSERT (pipeline);
Expand All @@ -190,8 +217,6 @@ _mongoc_change_stream_new (const mongoc_collection_t *coll,
bson_init (&stream->opts);
bson_init (&stream->resume_token);
bson_init (&stream->err_doc);
memset (&stream->err, 0, sizeof (bson_error_t));
stream->cursor = NULL;

/*
* The passed options may consist of:
Expand Down Expand Up @@ -260,12 +285,13 @@ bool
mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
{
bson_iter_t iter;
bool ret = false;

BSON_ASSERT (stream);
BSON_ASSERT (bson);

if (stream->err.code != 0) {
return false;
goto end;
}

if (!mongoc_cursor_next (stream->cursor, bson)) {
Expand All @@ -275,7 +301,7 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)

if (!mongoc_cursor_error_document (stream->cursor, &err, &err_doc)) {
/* No error occurred, just no documents left */
return false;
goto end;
}

/* Change Streams Spec: An error is resumable if it is not a server error,
Expand Down Expand Up @@ -311,7 +337,7 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
!mongoc_cursor_error_document (stream->cursor, &err, &err_doc);
if (resumable) {
/* Empty batch. */
return false;
goto end;
}
}
}
Expand All @@ -320,7 +346,7 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
stream->err = err;
bson_destroy (&stream->err_doc);
bson_copy_to (err_doc, &stream->err_doc);
return false;
goto end;
}
}

Expand All @@ -332,14 +358,25 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
MONGOC_ERROR_CHANGE_STREAM_NO_RESUME_TOKEN,
"Cannot provide resume functionality when the resume "
"token is missing");
return false;
goto end;
}

/* Copy the resume token */
bson_reinit (&stream->resume_token);
BSON_APPEND_VALUE (
&stream->resume_token, "resumeAfter", bson_iter_value (&iter));
return true;
ret = true;

end:
/* Driver Sessions Spec: "When an implicit session is associated with a
* cursor for use with getMore operations, the session MUST be returned to
* the pool immediately following a getMore operation that indicates that the
* cursor has been exhausted." */
if (stream->implicit_session && stream->cursor->rpc.reply.cursor_id == 0) {
mongoc_client_session_destroy (stream->implicit_session);
stream->implicit_session = NULL;
}
return ret;
}

bool
Expand Down Expand Up @@ -373,6 +410,9 @@ mongoc_change_stream_destroy (mongoc_change_stream_t *stream)
if (stream->cursor) {
mongoc_cursor_destroy (stream->cursor);
}
if (stream->implicit_session) {
mongoc_client_session_destroy (stream->implicit_session);
}
mongoc_collection_destroy (stream->coll);
bson_free (stream);
}
57 changes: 56 additions & 1 deletion tests/test-mongoc-client-session.c
@@ -1,6 +1,7 @@
#include <mongoc-cursor-private.h>
#include "mongoc.h"
#include "mongoc-util-private.h"
#include "mongoc-change-stream-private.h"
#include "mongoc-collection-private.h"
#include "utlist.h"
#include "TestSuite.h"
Expand Down Expand Up @@ -1940,7 +1941,7 @@ test_find_indexes (session_test_t *test)
const mongoc_server_session_t *_tmp; \
int _n_sessions; \
CDL_COUNT ((_topology)->session_pool, _tmp, _n_sessions); \
ASSERT_CMPINT (_n_sessions, ==, (int) (_expected_size)); \
ASSERT_CMPINT (_n_sessions, ==, (int) (_expected_size)); \
} while (0)


Expand Down Expand Up @@ -1998,6 +1999,53 @@ test_cursor_implicit_session (void *ctx)
}


static void
test_change_stream_implicit_session (void *ctx)
{
session_test_t *test;
mongoc_topology_t *topology;
mongoc_client_session_t *cs;
bson_error_t error;
mongoc_change_stream_t *change_stream;
bson_t pipeline = BSON_INITIALIZER;
const bson_t *doc;
bson_t aggregate_lsid;

test = session_test_new (CORRECT_CLIENT, NOT_CAUSAL);
test->expect_explicit_lsid = false;
topology = test->client->topology;
cs = mongoc_client_start_session (test->client, NULL, &error);
ASSERT_OR_PRINT (cs, error);
change_stream =
mongoc_collection_watch (test->session_collection, &pipeline, NULL);
bson_destroy (&pipeline);
bson_copy_to (&test->sent_lsid, &aggregate_lsid);
ASSERT_POOL_SIZE (topology, 0);
BSON_ASSERT (change_stream->implicit_session);

/* push a new server session into the pool */
mongoc_client_session_destroy (cs);
ASSERT_POOL_SIZE (topology, 1);
ASSERT_SESSIONS_DIFFER (&aggregate_lsid, &topology->session_pool->lsid);

/* "getMore" uses the same lsid as "aggregate" did */
bson_reinit (&test->sent_lsid);
mongoc_change_stream_next (change_stream, &doc);
ASSERT_SESSIONS_MATCH (
&test->sent_lsid, &change_stream->implicit_session->server_session->lsid);
ASSERT_SESSIONS_MATCH (
&test->sent_lsid,
&change_stream->cursor->client_session->server_session->lsid);
ASSERT_SESSIONS_MATCH (&test->sent_lsid, &aggregate_lsid);
ASSERT_OR_PRINT (
!mongoc_change_stream_error_document (change_stream, &error, NULL),
error);
bson_destroy (&aggregate_lsid);
mongoc_change_stream_destroy (change_stream);
session_test_destroy (test);
}


static void
test_cmd_error (void *ctx)
{
Expand Down Expand Up @@ -2336,6 +2384,13 @@ test_session_install (TestSuite *suite)
NULL,
test_framework_skip_if_no_cluster_time,
test_framework_skip_if_no_crypto);
TestSuite_AddFull (suite,
"/Session/change_stream_implicit_session",
test_change_stream_implicit_session,
NULL,
NULL,
test_framework_skip_if_no_cluster_time,
test_framework_skip_if_no_crypto);
TestSuite_AddFull (suite,
"/Session/cmd_error",
test_cmd_error,
Expand Down

0 comments on commit 490306b

Please sign in to comment.