Skip to content

Commit

Permalink
Merge pull request #888 from estolfo/change-stream
Browse files Browse the repository at this point in the history
RUBY-1228 Change Streams
  • Loading branch information
estolfo committed Sep 26, 2017
2 parents 5d31fff + 5cec119 commit ed5e2c1
Show file tree
Hide file tree
Showing 12 changed files with 985 additions and 3 deletions.
2 changes: 1 addition & 1 deletion lib/mongo.rb
Expand Up @@ -29,8 +29,8 @@
require 'mongo/protocol'
require 'mongo/client'
require 'mongo/cluster'
require 'mongo/collection'
require 'mongo/cursor'
require 'mongo/collection'
require 'mongo/database'
require 'mongo/dbref'
require 'mongo/grid'
Expand Down
32 changes: 32 additions & 0 deletions lib/mongo/collection.rb
Expand Up @@ -281,6 +281,38 @@ def aggregate(pipeline, options = {})
View.new(self, {}).aggregate(pipeline, options)
end

# As of version 3.6 of the MongoDB server, a ``$changeStream`` pipeline stage is supported
# in the aggregation framework. This stage allows users to request that notifications are sent for
# all changes to a particular collection.
#
# @example Get change notifications for a given collection.
# collection.watch([{ '$match' => { operationType: { '$in' => ['insert', 'replace'] } } }])
#
# @param [ Array<Hash> ] pipeline Optional additional filter operators.
# @param [ Hash ] options The change stream options.
#
# @option options [ String ] :full_document Allowed values: ‘default’, ‘updateLookup’. Defaults to ‘default’.
# When set to ‘updateLookup’, the change notification for partial updates will include both a delta
# describing the changes to the document, as well as a copy of the entire document that was changed
# from some time after the change occurred.
# @option options [ BSON::Document, Hash ] :resume_after Specifies the logical starting point for the
# new change stream.
# @option options [ Integer ] :max_await_time_ms The maximum amount of time for the server to wait
# on new documents to satisfy a change stream query.
# @option options [ Integer ] :batch_size The number of documents to return per batch.
# @option options [ BSON::Document, Hash ] :collation The collation to use.
#
# @note A change stream only allows 'majority' read concern.
# @note This helper method is preferable to running a raw aggregation with a $changeStream stage,
# for the purpose of supporting resumability.
#
# @return [ ChangeStream ] The change stream object.
#
# @since 2.5.0
def watch(pipeline = [], options = {})
View::ChangeStream.new(View.new(self, {}, options), pipeline, options)
end

# Get a count of matching documents in the collection.
#
# @example Get the count.
Expand Down
1 change: 1 addition & 0 deletions lib/mongo/collection/view.rb
Expand Up @@ -17,6 +17,7 @@
require 'mongo/collection/view/iterable'
require 'mongo/collection/view/explainable'
require 'mongo/collection/view/aggregation'
require 'mongo/collection/view/change_stream'
require 'mongo/collection/view/map_reduce'
require 'mongo/collection/view/readable'
require 'mongo/collection/view/writable'
Expand Down
3 changes: 3 additions & 0 deletions lib/mongo/collection/view/builder/aggregation.rb
Expand Up @@ -29,6 +29,9 @@ class Aggregation
MAPPINGS = BSON::Document.new(
:allow_disk_use => 'allowDiskUse',
:max_time_ms => 'maxTimeMS',
# This is intentional; max_await_time_ms is an alias for maxTimeMS used on getmore
# commands for change streams.
:max_await_time_ms => 'maxTimeMS',
:explain => 'explain',
:bypass_document_validation => 'bypassDocumentValidation',
:collation => 'collation',
Expand Down
159 changes: 159 additions & 0 deletions lib/mongo/collection/view/change_stream.rb
@@ -0,0 +1,159 @@
# Copyright (C) 2017 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'mongo/collection/view/change_stream/retryable'

module Mongo
class Collection
class View

# Provides behaviour around a `$changeStream` pipeline stage in the
# aggregation framework. Specifying this stage allows users to request that
# notifications are sent for all changes to a particular collection or database.
#
# @note Only available in server versions 3.6 and higher.
# @note ChangeStreams do not work properly with JRuby because of the issue documented
# here: https://github.com/jruby/jruby/issues/4212
# Namely, JRuby eagerly evaluates #next on an Enumerator in a background green thread.
# So calling #next on the change stream will cause getmores to be called in a loop in the background.
#
#
# @since 2.5.0
class ChangeStream < Aggregation
include Retryable

# @return [ String ] The fullDocument option default value.
#
# @since 2.5.0
FULL_DOCUMENT_DEFAULT = 'default'.freeze

# @return [ BSON::Document ] The change stream options.
#
# @since 2.5.0
attr_reader :options

# Initialize the change stream for the provided collection view, pipeline
# and options.
#
# @example Create the new change stream view.
# ChangeStream.new(view, pipeline, options)
#
# @param [ Collection::View ] view The collection view.
# @param [ Array<Hash> ] pipeline The pipeline of operators to filter the change notifications.
# @param [ Hash ] opts The change stream options.
#
# @option options [ String ] :full_document Allowed values: ‘default’, ‘updateLookup’. Defaults to ‘default’.
# When set to ‘updateLookup’, the change notification for partial updates will include both a delta
# describing the changes to the document, as well as a copy of the entire document that was changed
# from some time after the change occurred.
# @option options [ BSON::Document, Hash ] :resume_after Specifies the logical starting point for the
# new change stream.
# @option options [ Integer ] :max_await_time_ms The maximum amount of time for the server to wait
# on new documents to satisfy a change stream query.
# @option options [ Integer ] :batch_size The number of documents to return per batch.
# @option options [ BSON::Document, Hash ] :collation The collation to use.
#
# @since 2.5.0
def initialize(view, pipeline, options = {})
@view = view
@change_stream_filters = pipeline && pipeline.dup
@options = options && options.dup.freeze
@resume_token = @options[:resume_after]
read_with_one_retry { create_cursor! }
end

# Iterate through documents returned by the change stream.
#
# @example Iterate through the stream of documents.
# stream.each do |document|
# p document
# end
#
# @return [ Enumerator ] The enumerator.
#
# @since 2.5.0
#
# @yieldparam [ BSON::Document ] Each change stream document.
def each
raise StopIteration.new if closed?
begin
@cursor.each do |doc|
cache_resume_token(doc)
yield doc
end if block_given?
@cursor.to_enum
rescue => e
close
if retryable?(e)
create_cursor!
retry
end
raise
end
end

# Close the change stream.
#
# @example Close the change stream.
# stream.close
#
# @return [ nil ] nil.
#
# @since 2.5.0
def close
unless closed?
begin; @cursor.send(:kill_cursors); rescue; end
@cursor = nil
end
end

# Is the change stream closed?
#
# @example Determine whether the change stream is closed.
# stream.closed?
#
# @return [ true, false ] If the change stream is closed.
#
# @since 2.5.0
def closed?
@cursor.nil?
end

private

def cache_resume_token(doc)
unless @resume_token = (doc[:_id] && doc[:_id].dup)
raise Error::MissingResumeToken.new
end
end

def create_cursor!
server = server_selector.select_server(cluster, false)
result = send_initial_query(server)
@cursor = Cursor.new(view, result, server, disable_retry: true)
end

def pipeline
change_doc = { fullDocument: ( @options[:full_document] || FULL_DOCUMENT_DEFAULT ) }
change_doc[:resumeAfter] = @resume_token if @resume_token
[{ '$changeStream' => change_doc }] + @change_stream_filters
end

def send_initial_query(server)
initial_query_op.execute(server)
end
end
end
end
end
57 changes: 57 additions & 0 deletions lib/mongo/collection/view/change_stream/retryable.rb
@@ -0,0 +1,57 @@
# Copyright (C) 2017 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
class Collection
class View
class ChangeStream < Aggregation

# Behavior around resuming a change stream.
#
# @since 2.5.0
module Retryable

private

RETRY_MESSAGES = [
'not master',
'(43)' # cursor not found error code
].freeze

def read_with_one_retry
yield
rescue => e
if retryable?(e)
yield
else
raise(e)
end
end

def retryable?(error)
network_error?(error) || retryable_operation_failure?(error)
end

def network_error?(error)
[ Error::SocketError, Error::SocketTimeoutError].include?(error.class)
end

def retryable_operation_failure?(error)
error.is_a?(Error::OperationFailure) && RETRY_MESSAGES.any? { |m| error.message.include?(m) }
end
end
end
end
end
end
15 changes: 13 additions & 2 deletions lib/mongo/cursor.rb
Expand Up @@ -50,15 +50,20 @@ class Cursor
# @param [ CollectionView ] view The +CollectionView+ defining the query.
# @param [ Operation::Result ] result The result of the first execution.
# @param [ Server ] server The server this cursor is locked to.
# @param [ Hash ] options The cursor options.
#
# @option options [ true, false ] :disable_retry Whether to disable retrying on
# error when sending getmores.
#
# @since 2.0.0
def initialize(view, result, server)
def initialize(view, result, server, options = {})
@view = view
@server = server
@initial_result = result
@remaining = limit if limited?
@cursor_id = result.cursor_id
@coll_name = nil
@options = options
register
ObjectSpace.define_finalizer(self, self.class.finalize(result.cursor_id,
cluster,
Expand Down Expand Up @@ -185,8 +190,12 @@ def exhausted?
end

def get_more
read_with_retry do
if @options[:disable_retry]
process(get_more_operation.execute(@server))
else
read_with_retry do
process(get_more_operation.execute(@server))
end
end
end

Expand All @@ -203,6 +212,8 @@ def kill_cursors
read_with_one_retry do
kill_cursors_operation.execute(@server)
end
ensure
@cursor_id = 0
end

def kill_cursors_operation
Expand Down
1 change: 1 addition & 0 deletions lib/mongo/error.rb
Expand Up @@ -103,6 +103,7 @@ class Error < StandardError
require 'mongo/error/unexpected_chunk_length'
require 'mongo/error/unexpected_response'
require 'mongo/error/missing_file_chunk'
require 'mongo/error/missing_resume_token'
require 'mongo/error/unsupported_array_filters'
require 'mongo/error/unknown_payload_type'
require 'mongo/error/unsupported_collation'
Expand Down
39 changes: 39 additions & 0 deletions lib/mongo/error/missing_resume_token.rb
@@ -0,0 +1,39 @@
# Copyright (C) 2017 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
class Error

# Raised if a change stream document is returned without a resume token.
#
# @since 2.5.0
class MissingResumeToken < Error

# The error message.
#
# @since 2.5.0
MESSAGE = 'Cannot provide resume functionality when the resume token is missing'.freeze

# Create the new exception.
#
# @example Create the new exception.
# Mongo::Error::MissingResumeToken.new
#
# @since 2.5.0
def initialize
super(MESSAGE)
end
end
end
end

0 comments on commit ed5e2c1

Please sign in to comment.