Skip to content

Commit

Permalink
CXX-1702 support database aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
samantharitter committed Feb 10, 2020
1 parent 4e14d60 commit 4f14bed
Show file tree
Hide file tree
Showing 13 changed files with 263 additions and 40 deletions.
2 changes: 0 additions & 2 deletions data/crud/v2/db-aggregate.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"tests": [
{
"description": "Aggregate with $listLocalSessions",
"skipReason": "blocked on CXX-1702",
"operations": [
{
"name": "aggregate",
Expand Down Expand Up @@ -44,7 +43,6 @@
},
{
"description": "Aggregate with $listLocalSessions and allowDiskUse",
"skipReason": "blocked on CXX-1702",
"operations": [
{
"name": "aggregate",
Expand Down
32 changes: 1 addition & 31 deletions src/mongocxx/collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,37 +472,7 @@ cursor collection::_aggregate(const client_session* session,

bsoncxx::builder::basic::document b;

if (options.allow_disk_use()) {
b.append(kvp("allowDiskUse", *options.allow_disk_use()));
}

if (options.collation()) {
b.append(kvp("collation", *options.collation()));
}

if (options.max_time()) {
b.append(kvp("maxTimeMS", bsoncxx::types::b_int64{options.max_time()->count()}));
}

if (options.bypass_document_validation()) {
b.append(kvp("bypassDocumentValidation", *options.bypass_document_validation()));
}

if (options.hint()) {
b.append(kvp("hint", options.hint()->to_value()));
}

if (options.read_concern()) {
b.append(kvp("readConcern", options.read_concern()->to_document()));
}

if (options.write_concern()) {
b.append(kvp("writeConcern", options.write_concern()->to_document()));
}

if (options.batch_size()) {
b.append(kvp("batchSize", *options.batch_size()));
}
options.append(b);

if (session) {
b.append(bsoncxx::builder::concatenate_doc{session->_get_impl().to_document()});
Expand Down
35 changes: 35 additions & 0 deletions src/mongocxx/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,41 @@ database::operator bool() const noexcept {
return static_cast<bool>(_impl);
}

cursor database::_aggregate(const client_session* session,
const pipeline& pipeline,
const options::aggregate& options) {
scoped_bson_t stages(bsoncxx::document::view(pipeline._impl->view_array()));

bsoncxx::builder::basic::document b;

options.append(b);

if (session) {
b.append(bsoncxx::builder::concatenate_doc{session->_get_impl().to_document()});
}

scoped_bson_t options_bson(b.view());

const ::mongoc_read_prefs_t* rp_ptr = NULL;

if (options.read_preference()) {
rp_ptr = options.read_preference()->_impl->read_preference_t;
}

return cursor(libmongoc::database_aggregate(
_get_impl().database_t, stages.bson(), options_bson.bson(), rp_ptr));
}

cursor database::aggregate(const pipeline& pipeline, const options::aggregate& options) {
return _aggregate(nullptr, pipeline, options);
}

cursor database::aggregate(const client_session& session,
const pipeline& pipeline,
const options::aggregate& options) {
return _aggregate(&session, pipeline, options);
}

cursor database::_list_collections(const client_session* session,
bsoncxx::document::view_or_value filter) {
bsoncxx::builder::basic::document options_builder;
Expand Down
60 changes: 60 additions & 0 deletions src/mongocxx/database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,62 @@ class MONGOCXX_API database {
///
explicit operator bool() const noexcept;

///
/// @{
///
/// Runs an aggregation framework pipeline against this database for
/// pipeline stages that do not require an underlying collection,
/// such as $currentOp and $listLocalSessions.
///
/// @param pipeline
/// The pipeline of aggregation operations to perform.
/// @param options
/// Optional arguments, see mongocxx::options::aggregate.
///
/// @return A mongocxx::cursor with the results. If the query fails,
/// the cursor throws mongocxx::query_exception when the returned cursor
/// is iterated.
///
/// @see https://docs.mongodb.com/manual/reference/command/aggregate/#dbcmd.aggregate
///
/// @note
/// In order to pass a read concern to this, you must use the
/// database level set read concern - database::read_concern(rc).
/// (Write concern supported only for MongoDB 3.4+).
///
cursor aggregate(const pipeline& pipeline,
const options::aggregate& options = options::aggregate());

///
/// Runs an aggregation framework pipeline against this database for
/// pipeline stages that do not require an underlying collection,
/// such as $currentOp and $listLocalSessions.
///
/// @param session
/// The mongocxx::client_session with which to perform the aggregation.
/// @param pipeline
/// The pipeline of aggregation operations to perform.
/// @param options
/// Optional arguments, see mongocxx::options::aggregate.
///
/// @return A mongocxx::cursor with the results. If the query fails,
/// the cursor throws mongocxx::query_exception when the returned cursor
/// is iterated.
///
/// @see https://docs.mongodb.com/manual/reference/command/aggregate/#dbcmd.aggregate
///
/// @note
/// In order to pass a read concern to this, you must use the
/// database level set read concern - database::read_concern(rc).
/// (Write concern supported only for MongoDB 3.4+).
///
cursor aggregate(const client_session& session,
const pipeline& pipeline,
const options::aggregate& options = options::aggregate());
///
/// @}
///

///
/// @{
///
Expand Down Expand Up @@ -578,6 +634,10 @@ class MONGOCXX_API database {

MONGOCXX_PRIVATE database(const class client& client, bsoncxx::string::view_or_value name);

MONGOCXX_PRIVATE cursor _aggregate(const client_session* session,
const pipeline& pipeline,
const options::aggregate& options);

MONGOCXX_PRIVATE bsoncxx::document::value _run_command(
const client_session* session, bsoncxx::document::view_or_value command);

Expand Down
33 changes: 33 additions & 0 deletions src/mongocxx/options/aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <mongocxx/options/aggregate.hpp>

#include <bsoncxx/builder/basic/document.hpp>
#include <mongocxx/private/read_preference.hh>

#include <mongocxx/config/private/prelude.hh>
Expand All @@ -22,11 +23,43 @@ namespace mongocxx {
MONGOCXX_INLINE_NAMESPACE_BEGIN
namespace options {

using bsoncxx::builder::basic::kvp;

aggregate& aggregate::allow_disk_use(bool allow_disk_use) {
_allow_disk_use = allow_disk_use;
return *this;
}

void aggregate::append(bsoncxx::builder::basic::document& builder) const {
if (this->allow_disk_use()) {
builder.append(kvp("allowDiskUse", *this->allow_disk_use()));
}

if (this->collation()) {
builder.append(kvp("collation", *this->collation()));
}

if (this->max_time()) {
builder.append(kvp("maxTimeMS", bsoncxx::types::b_int64{this->max_time()->count()}));
}

if (this->bypass_document_validation()) {
builder.append(kvp("bypassDocumentValidation", *this->bypass_document_validation()));
}

if (this->hint()) {
builder.append(kvp("hint", this->hint()->to_value()));
}

if (this->write_concern()) {
builder.append(kvp("writeConcern", this->write_concern()->to_document()));
}

if (this->batch_size()) {
builder.append(kvp("batchSize", *this->batch_size()));
}
}

aggregate& aggregate::collation(bsoncxx::document::view_or_value collation) {
_collation = std::move(collation);
return *this;
Expand Down
6 changes: 6 additions & 0 deletions src/mongocxx/options/aggregate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <chrono>
#include <cstdint>

#include <bsoncxx/builder/basic/document.hpp>
#include <bsoncxx/document/view_or_value.hpp>
#include <bsoncxx/stdx/optional.hpp>
#include <mongocxx/hint.hpp>
Expand Down Expand Up @@ -252,6 +253,11 @@ class MONGOCXX_API aggregate {
const stdx::optional<class read_concern>& read_concern() const;

private:
friend class ::mongocxx::database;
friend class ::mongocxx::collection;

void append(bsoncxx::builder::basic::document& builder) const;

stdx::optional<bool> _allow_disk_use;
stdx::optional<std::int32_t> _batch_size;
stdx::optional<bsoncxx::document::view_or_value> _collation;
Expand Down
24 changes: 24 additions & 0 deletions src/mongocxx/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ pipeline& pipeline::count(std::string field) {
return *this;
}

pipeline& pipeline::current_op(bsoncxx::document::view_or_value current_op_args) {
_impl->sink().append([current_op_args](sub_document sub_doc) {
sub_doc.append(kvp("$currentOp", current_op_args));
});

return *this;
}

pipeline& pipeline::facet(bsoncxx::document::view_or_value facet_args) {
_impl->sink().append(
[facet_args](sub_document sub_doc) { sub_doc.append(kvp("$facet", facet_args)); });
Expand Down Expand Up @@ -126,6 +134,22 @@ pipeline& pipeline::limit(std::int32_t limit) {
return *this;
}

pipeline& pipeline::list_local_sessions(bsoncxx::document::view_or_value list_local_sessions_args) {
_impl->sink().append([list_local_sessions_args](sub_document sub_doc) {
sub_doc.append(kvp("$listLocalSessions", list_local_sessions_args));
});

return *this;
}

pipeline& pipeline::list_sessions(bsoncxx::document::view_or_value list_sessions_args) {
_impl->sink().append([list_sessions_args](sub_document sub_doc) {
sub_doc.append(kvp("$listSessions", list_sessions_args));
});

return *this;
}

pipeline& pipeline::lookup(bsoncxx::document::view_or_value lookup_args) {
_impl->sink().append(
[lookup_args](sub_document sub_doc) { sub_doc.append(kvp("$lookup", lookup_args)); });
Expand Down
46 changes: 46 additions & 0 deletions src/mongocxx/pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,23 @@ class MONGOCXX_API pipeline {
///
pipeline& count(std::string field);

///
/// Returns a stream of documents containing information on active and/or dormant
/// operations as well as inactive sessions that are holding locks as part of a
/// transaction. The stage returns a document for each operation or session.
///
/// This stage must be used with database aggregate on the 'admin' database.
///
/// @see https://docs.mongodb.com/manual/reference/operator/aggregation/currentOp/
///
/// @param current_op_args
/// A document containing the arguments for the current_op operation.
///
/// @return
/// A reference to the object on which this method is being called.
///
pipeline& current_op(bsoncxx::document::view_or_value current_op_args);

///
/// Processes multiple aggregation pipelines within a single stage on the same set of input
/// documents.
Expand Down Expand Up @@ -263,6 +280,35 @@ class MONGOCXX_API pipeline {
///
pipeline& limit(std::int32_t limit);

///
/// Lists the sessions cached in memory by the mongod or mongos instance.
///
/// This option must be used with database aggregate.
///
/// @see https://docs.mongodb.com/manual/reference/operator/aggregation/listLocalSessions/
///
/// @param list_local_sessions_args
/// A document containing the arguments for list_local_sessions.
///
/// @return
/// A reference to the object on which this method is being called.
///
pipeline& list_local_sessions(bsoncxx::document::view_or_value list_local_sessions_args);

///
/// Lists all sessions stored in the system.sessions collection in the config database.
/// These sessions are visible to all members of the MongoDB deployment.
///
/// @see https://docs.mongodb.com/manual/reference/operator/aggregation/listSessions/
///
/// @param list_sessions_args
/// A document containing the arguments for list_sessions.
///
/// @return
/// A reference to the object on which this method is being called.
///
pipeline& list_sessions(bsoncxx::document::view_or_value list_sessions_args);

///
/// Performs a left outer join to an unsharded collection in the same database to filter in
/// documents from the "joined" collection for processing.
Expand Down
1 change: 1 addition & 0 deletions src/mongocxx/private/libmongoc_symbols.hh
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ MONGOCXX_LIBMONGOC_SYMBOL(cursor_error_document)
MONGOCXX_LIBMONGOC_SYMBOL(cursor_new_from_command_reply_with_opts)
MONGOCXX_LIBMONGOC_SYMBOL(cursor_next)
MONGOCXX_LIBMONGOC_SYMBOL(cursor_set_max_await_time_ms)
MONGOCXX_LIBMONGOC_SYMBOL(database_aggregate)
MONGOCXX_LIBMONGOC_SYMBOL(database_command_with_opts)
MONGOCXX_LIBMONGOC_SYMBOL(database_copy)
MONGOCXX_LIBMONGOC_SYMBOL(database_create_collection)
Expand Down
32 changes: 32 additions & 0 deletions src/mongocxx/test/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,38 @@ TEST_CASE("Database integration tests", "[database]") {

auto case_insensitive_collation = make_document(kvp("locale", "en_US"), kvp("strength", 2));

SECTION("aggregation", "[database]") {
pipeline pipeline;

auto get_results = [](cursor&& cursor) {
std::vector<bsoncxx::document::value> results;
std::transform(cursor.begin(),
cursor.end(),
std::back_inserter(results),
[](bsoncxx::document::view v) { return bsoncxx::document::value{v}; });
return results;
};

mongo_client["admin"].run_command(make_document(kvp("killAllSessions", make_array())));

SECTION("listLocalSessions") {
auto session1 = mongo_client.start_session();
auto session2 = mongo_client.start_session();

pipeline.list_local_sessions({});
pipeline.limit(2);
pipeline.add_fields(make_document(kvp("name", "Jane")));
pipeline.project(make_document(kvp("name", 1)));

auto cursor = database.aggregate(pipeline);
auto results = get_results(std::move(cursor));

REQUIRE(results.size() == 2);
REQUIRE(results[0].view()["name"].get_utf8().value == stdx::string_view("Jane"));
REQUIRE(results[1].view()["name"].get_utf8().value == stdx::string_view("Jane"));
}
}

SECTION("A database may create a collection via create_collection") {
stdx::string_view collection_name{"collection_create_with_opts"};

Expand Down

0 comments on commit 4f14bed

Please sign in to comment.